Skip to content

Commit 1737d49

Browse files
my-vegetable-has-explodedhaohuaijinwaynexiaalamb
authored
feat: support inlist in LiteralGurantee for pruning (#8654)
* support inlist in LiteralGuarantee for pruning. * add more tests * rm useless notes * Apply suggestions from code review Co-authored-by: Huaijin <haohuaijin@gmail.com> * add tests in row_groups * Apply suggestions from code review Co-authored-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * update comment & add more tests --------- Co-authored-by: Huaijin <haohuaijin@gmail.com> Co-authored-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 6403222 commit 1737d49

2 files changed

Lines changed: 216 additions & 162 deletions

File tree

datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs

Lines changed: 14 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -293,15 +293,10 @@ mod tests {
293293
use arrow::datatypes::DataType::Decimal128;
294294
use arrow::datatypes::Schema;
295295
use arrow::datatypes::{DataType, Field};
296-
use datafusion_common::{config::ConfigOptions, TableReference, ToDFSchema};
297-
use datafusion_common::{DataFusionError, Result};
298-
use datafusion_expr::{
299-
builder::LogicalTableSource, cast, col, lit, AggregateUDF, Expr, ScalarUDF,
300-
TableSource, WindowUDF,
301-
};
296+
use datafusion_common::{Result, ToDFSchema};
297+
use datafusion_expr::{cast, col, lit, Expr};
302298
use datafusion_physical_expr::execution_props::ExecutionProps;
303299
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
304-
use datafusion_sql::planner::ContextProvider;
305300
use parquet::arrow::arrow_to_parquet_schema;
306301
use parquet::arrow::async_reader::ParquetObjectReader;
307302
use parquet::basic::LogicalType;
@@ -1105,13 +1100,18 @@ mod tests {
11051100
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
11061101

11071102
// generate pruning predicate
1108-
let schema = Schema::new(vec![
1109-
Field::new("String", DataType::Utf8, false),
1110-
Field::new("String3", DataType::Utf8, false),
1111-
]);
1112-
let sql =
1113-
"SELECT * FROM tbl WHERE \"String\" IN ('Hello_Not_Exists', 'Hello_Not_Exists2')";
1114-
let expr = sql_to_physical_plan(sql).unwrap();
1103+
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1104+
1105+
let expr = col(r#""String""#).in_list(
1106+
vec![
1107+
lit("Hello_Not_Exists"),
1108+
lit("Hello_Not_Exists2"),
1109+
lit("Hello_Not_Exists3"),
1110+
lit("Hello_Not_Exist4"),
1111+
],
1112+
false,
1113+
);
1114+
let expr = logical2physical(&expr, &schema);
11151115
let pruning_predicate =
11161116
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
11171117

@@ -1312,97 +1312,4 @@ mod tests {
13121312

13131313
Ok(pruned_row_group)
13141314
}
1315-
1316-
fn sql_to_physical_plan(sql: &str) -> Result<Arc<dyn PhysicalExpr>> {
1317-
use datafusion_optimizer::{
1318-
analyzer::Analyzer, optimizer::Optimizer, OptimizerConfig, OptimizerContext,
1319-
};
1320-
use datafusion_sql::{
1321-
planner::SqlToRel,
1322-
sqlparser::{ast::Statement, parser::Parser},
1323-
};
1324-
use sqlparser::dialect::GenericDialect;
1325-
1326-
// parse the SQL
1327-
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
1328-
let ast: Vec<Statement> = Parser::parse_sql(&dialect, sql).unwrap();
1329-
let statement = &ast[0];
1330-
1331-
// create a logical query plan
1332-
let schema_provider = TestSchemaProvider::new();
1333-
let sql_to_rel = SqlToRel::new(&schema_provider);
1334-
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();
1335-
1336-
// hard code the return value of now()
1337-
let config = OptimizerContext::new().with_skip_failing_rules(false);
1338-
let analyzer = Analyzer::new();
1339-
let optimizer = Optimizer::new();
1340-
// analyze and optimize the logical plan
1341-
let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
1342-
let plan = optimizer.optimize(&plan, &config, |_, _| {})?;
1343-
// convert the logical plan into a physical plan
1344-
let exprs = plan.expressions();
1345-
let expr = &exprs[0];
1346-
let df_schema = plan.schema().as_ref().to_owned();
1347-
let tb_schema: Schema = df_schema.clone().into();
1348-
let execution_props = ExecutionProps::new();
1349-
create_physical_expr(expr, &df_schema, &tb_schema, &execution_props)
1350-
}
1351-
1352-
struct TestSchemaProvider {
1353-
options: ConfigOptions,
1354-
tables: HashMap<String, Arc<dyn TableSource>>,
1355-
}
1356-
1357-
impl TestSchemaProvider {
1358-
pub fn new() -> Self {
1359-
let mut tables = HashMap::new();
1360-
tables.insert(
1361-
"tbl".to_string(),
1362-
create_table_source(vec![Field::new(
1363-
"String".to_string(),
1364-
DataType::Utf8,
1365-
false,
1366-
)]),
1367-
);
1368-
1369-
Self {
1370-
options: Default::default(),
1371-
tables,
1372-
}
1373-
}
1374-
}
1375-
1376-
impl ContextProvider for TestSchemaProvider {
1377-
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
1378-
match self.tables.get(name.table()) {
1379-
Some(table) => Ok(table.clone()),
1380-
_ => datafusion_common::plan_err!("Table not found: {}", name.table()),
1381-
}
1382-
}
1383-
1384-
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
1385-
None
1386-
}
1387-
1388-
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
1389-
None
1390-
}
1391-
1392-
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
1393-
None
1394-
}
1395-
1396-
fn options(&self) -> &ConfigOptions {
1397-
&self.options
1398-
}
1399-
1400-
fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
1401-
None
1402-
}
1403-
}
1404-
1405-
fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
1406-
Arc::new(LogicalTableSource::new(Arc::new(Schema::new(fields))))
1407-
}
14081315
}

0 commit comments

Comments
 (0)