Skip to content

Commit 3913a35

Browse files
committed
Minor: refactor bloom filter tests to reduce duplication
1 parent 415c2ce commit 3913a35

1 file changed

Lines changed: 131 additions & 169 deletions

File tree

  • datafusion/core/src/datasource/physical_plan/parquet

datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs

Lines changed: 131 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,60 +1017,26 @@ mod tests {
10171017
// +-----------+
10181018
#[tokio::test]
10191019
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
1020-
// load parquet file
1021-
let testdata = datafusion_common::test_util::parquet_test_data();
1022-
let file_name = "data_index_bloom_encoding_stats.parquet";
1023-
let path = format!("{testdata}/{file_name}");
1024-
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1025-
1026-
// generate pruning predicate `(String = "Hello_Not_exists")`
1027-
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1028-
let expr = col(r#""String""#).eq(lit("Hello_Not_Exists"));
1029-
let expr = logical2physical(&expr, &schema);
1030-
let pruning_predicate =
1031-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1032-
1033-
let row_groups = vec![0];
1034-
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1035-
file_name,
1036-
data,
1037-
&pruning_predicate,
1038-
&row_groups,
1039-
)
1040-
.await
1041-
.unwrap();
1042-
assert!(pruned_row_groups.is_empty());
1020+
BloomFilterTest::new_data_index_bloom_encoding_stats()
1021+
.with_expect_all_pruned()
1022+
// generate pruning predicate `(String = "Hello_Not_exists")`
1023+
.run(col(r#""String""#).eq(lit("Hello_Not_Exists")))
1024+
.await
10431025
}
10441026

10451027
#[tokio::test]
10461028
async fn test_row_group_bloom_filter_pruning_predicate_mutiple_expr() {
1047-
// load parquet file
1048-
let testdata = datafusion_common::test_util::parquet_test_data();
1049-
let file_name = "data_index_bloom_encoding_stats.parquet";
1050-
let path = format!("{testdata}/{file_name}");
1051-
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1052-
1053-
// generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")`
1054-
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1055-
let expr = lit("1").eq(lit("1")).and(
1056-
col(r#""String""#)
1057-
.eq(lit("Hello_Not_Exists"))
1058-
.or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))),
1059-
);
1060-
let expr = logical2physical(&expr, &schema);
1061-
let pruning_predicate =
1062-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1063-
1064-
let row_groups = vec![0];
1065-
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1066-
file_name,
1067-
data,
1068-
&pruning_predicate,
1069-
&row_groups,
1070-
)
1071-
.await
1072-
.unwrap();
1073-
assert!(pruned_row_groups.is_empty());
1029+
BloomFilterTest::new_data_index_bloom_encoding_stats()
1030+
.with_expect_all_pruned()
1031+
// generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")`
1032+
.run(
1033+
lit("1").eq(lit("1")).and(
1034+
col(r#""String""#)
1035+
.eq(lit("Hello_Not_Exists"))
1036+
.or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))),
1037+
),
1038+
)
1039+
.await
10741040
}
10751041

10761042
#[tokio::test]
@@ -1106,144 +1072,140 @@ mod tests {
11061072

11071073
#[tokio::test]
11081074
async fn test_row_group_bloom_filter_pruning_predicate_with_exists_value() {
1109-
// load parquet file
1110-
let testdata = datafusion_common::test_util::parquet_test_data();
1111-
let file_name = "data_index_bloom_encoding_stats.parquet";
1112-
let path = format!("{testdata}/{file_name}");
1113-
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1114-
1115-
// generate pruning predicate `(String = "Hello")`
1116-
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1117-
let expr = col(r#""String""#).eq(lit("Hello"));
1118-
let expr = logical2physical(&expr, &schema);
1119-
let pruning_predicate =
1120-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1121-
1122-
let row_groups = vec![0];
1123-
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1124-
file_name,
1125-
data,
1126-
&pruning_predicate,
1127-
&row_groups,
1128-
)
1129-
.await
1130-
.unwrap();
1131-
assert_eq!(pruned_row_groups, row_groups);
1075+
BloomFilterTest::new_data_index_bloom_encoding_stats()
1076+
.with_expect_none_pruned()
1077+
// generate pruning predicate `(String = "Hello")`
1078+
.run(col(r#""String""#).eq(lit("Hello")))
1079+
.await
11321080
}
11331081

11341082
#[tokio::test]
11351083
async fn test_row_group_bloom_filter_pruning_predicate_with_exists_2_values() {
1136-
// load parquet file
1137-
let testdata = datafusion_common::test_util::parquet_test_data();
1138-
let file_name = "data_index_bloom_encoding_stats.parquet";
1139-
let path = format!("{testdata}/{file_name}");
1140-
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1141-
1142-
// generate pruning predicate `(String = "Hello") OR (String = "the quick")`
1143-
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1144-
let expr = col(r#""String""#)
1145-
.eq(lit("Hello"))
1146-
.or(col(r#""String""#).eq(lit("the quick")));
1147-
let expr = logical2physical(&expr, &schema);
1148-
let pruning_predicate =
1149-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1150-
1151-
let row_groups = vec![0];
1152-
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1153-
file_name,
1154-
data,
1155-
&pruning_predicate,
1156-
&row_groups,
1157-
)
1158-
.await
1159-
.unwrap();
1160-
assert_eq!(pruned_row_groups, row_groups);
1084+
BloomFilterTest::new_data_index_bloom_encoding_stats()
1085+
.with_expect_none_pruned()
1086+
// generate pruning predicate `(String = "Hello") OR (String = "the quick")`
1087+
.run(
1088+
col(r#""String""#)
1089+
.eq(lit("Hello"))
1090+
.or(col(r#""String""#).eq(lit("the quick"))),
1091+
)
1092+
.await
11611093
}
11621094

11631095
#[tokio::test]
11641096
async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values() {
1165-
// load parquet file
1166-
let testdata = datafusion_common::test_util::parquet_test_data();
1167-
let file_name = "data_index_bloom_encoding_stats.parquet";
1168-
let path = format!("{testdata}/{file_name}");
1169-
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1170-
1171-
// generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")`
1172-
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1173-
let expr = col(r#""String""#)
1174-
.eq(lit("Hello"))
1175-
.or(col(r#""String""#).eq(lit("the quick")))
1176-
.or(col(r#""String""#).eq(lit("are you")));
1177-
let expr = logical2physical(&expr, &schema);
1178-
let pruning_predicate =
1179-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1180-
1181-
let row_groups = vec![0];
1182-
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1183-
file_name,
1184-
data,
1185-
&pruning_predicate,
1186-
&row_groups,
1187-
)
1097+
BloomFilterTest::new_data_index_bloom_encoding_stats()
1098+
.with_expect_none_pruned()
1099+
// generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")`
1100+
.run(
1101+
col(r#""String""#)
1102+
.eq(lit("Hello"))
1103+
.or(col(r#""String""#).eq(lit("the quick")))
1104+
.or(col(r#""String""#).eq(lit("are you"))),
1105+
)
11881106
.await
1189-
.unwrap();
1190-
assert_eq!(pruned_row_groups, row_groups);
11911107
}
11921108

11931109
#[tokio::test]
11941110
async fn test_row_group_bloom_filter_pruning_predicate_with_or_not_eq() {
1195-
// load parquet file
1196-
let testdata = datafusion_common::test_util::parquet_test_data();
1197-
let file_name = "data_index_bloom_encoding_stats.parquet";
1198-
let path = format!("{testdata}/{file_name}");
1199-
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1200-
1201-
// generate pruning predicate `(String = "foo") OR (String != "bar")`
1202-
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1203-
let expr = col(r#""String""#)
1204-
.not_eq(lit("foo"))
1205-
.or(col(r#""String""#).not_eq(lit("bar")));
1206-
let expr = logical2physical(&expr, &schema);
1207-
let pruning_predicate =
1208-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1209-
1210-
let row_groups = vec![0];
1211-
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1212-
file_name,
1213-
data,
1214-
&pruning_predicate,
1215-
&row_groups,
1216-
)
1217-
.await
1218-
.unwrap();
1219-
assert_eq!(pruned_row_groups, row_groups);
1111+
BloomFilterTest::new_data_index_bloom_encoding_stats()
1112+
.with_expect_none_pruned()
1113+
// generate pruning predicate `(String = "foo") OR (String != "bar")`
1114+
.run(
1115+
col(r#""String""#)
1116+
.not_eq(lit("foo"))
1117+
.or(col(r#""String""#).not_eq(lit("bar"))),
1118+
)
1119+
.await
12201120
}
12211121

12221122
#[tokio::test]
12231123
async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() {
1224-
// load parquet file
1225-
let testdata = datafusion_common::test_util::parquet_test_data();
1226-
let file_name = "alltypes_plain.parquet";
1227-
let path = format!("{testdata}/{file_name}");
1228-
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1229-
12301124
// generate pruning predicate on a column without a bloom filter
1231-
let schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, false)]);
1232-
let expr = col(r#""string_col""#).eq(lit("0"));
1233-
let expr = logical2physical(&expr, &schema);
1234-
let pruning_predicate =
1235-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1125+
BloomFilterTest::new_all_types()
1126+
.with_expect_none_pruned()
1127+
.run(col(r#""string_col""#).eq(lit("0")))
1128+
.await
1129+
}
12361130

1237-
let row_groups = vec![0];
1238-
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1239-
file_name,
1240-
data,
1241-
&pruning_predicate,
1242-
&row_groups,
1243-
)
1244-
.await
1245-
.unwrap();
1246-
assert_eq!(pruned_row_groups, row_groups);
1131+
struct BloomFilterTest {
1132+
file_name: String,
1133+
schema: Schema,
1134+
// which row groups should be attempted to prune
1135+
row_groups: Vec<usize>,
1136+
// which row groups are expected to be left after pruning. Must be set
1137+
// otherwise will panic on run()
1138+
post_pruning_row_groups: Option<Vec<usize>>,
1139+
}
1140+
1141+
impl BloomFilterTest {
1142+
/// Return a test for data_index_bloom_encoding_stats.parquet
1143+
fn new_data_index_bloom_encoding_stats() -> Self {
1144+
Self {
1145+
file_name: String::from("data_index_bloom_encoding_stats.parquet"),
1146+
schema: Schema::new(vec![Field::new("String", DataType::Utf8, false)]),
1147+
row_groups: vec![0],
1148+
post_pruning_row_groups: None,
1149+
}
1150+
}
1151+
1152+
// Return a test for alltypes_plain.parquet
1153+
fn new_all_types() -> Self {
1154+
Self {
1155+
file_name: String::from("alltypes_plain.parquet"),
1156+
schema: Schema::new(vec![Field::new(
1157+
"string_col",
1158+
DataType::Utf8,
1159+
false,
1160+
)]),
1161+
row_groups: vec![0],
1162+
post_pruning_row_groups: None,
1163+
}
1164+
}
1165+
1166+
/// Expect all row groups to be pruned
1167+
pub fn with_expect_all_pruned(mut self) -> Self {
1168+
self.post_pruning_row_groups = Some(vec![]);
1169+
self
1170+
}
1171+
1172+
/// Expect all row groups not to be pruned
1173+
pub fn with_expect_none_pruned(mut self) -> Self {
1174+
self.post_pruning_row_groups = Some(self.row_groups.clone());
1175+
self
1176+
}
1177+
1178+
/// Prune this file using the specified expression and check that the expected row groups are left
1179+
async fn run(self, expr: Expr) {
1180+
let Self {
1181+
file_name,
1182+
schema,
1183+
row_groups,
1184+
post_pruning_row_groups,
1185+
} = self;
1186+
1187+
let post_pruning_row_groups =
1188+
post_pruning_row_groups.expect("post_pruning_row_groups must be set");
1189+
1190+
let testdata = datafusion_common::test_util::parquet_test_data();
1191+
let path = format!("{testdata}/{file_name}");
1192+
let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1193+
1194+
// generate pruning predicate on a column without a bloom filter
1195+
let expr = logical2physical(&expr, &schema);
1196+
let pruning_predicate =
1197+
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1198+
1199+
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1200+
&file_name,
1201+
data,
1202+
&pruning_predicate,
1203+
&row_groups,
1204+
)
1205+
.await
1206+
.unwrap();
1207+
assert_eq!(pruned_row_groups, post_pruning_row_groups);
1208+
}
12471209
}
12481210

12491211
async fn test_row_group_bloom_filter_pruning_predicate(

0 commit comments

Comments
 (0)