Skip to content

Commit 3df57d1

Browse files
onursaticiAdamGS
authored andcommitted
datafusion 48 (vortex-data#3560)
Signed-off-by: Onur Satici <onur@spiraldb.com> Signed-off-by: Adam Gutglick <adam@spiraldb.com> Co-authored-by: Adam Gutglick <adam@spiraldb.com> Signed-off-by: mwlon <m.w.loncaric@gmail.com>
1 parent d02913c commit 3df57d1

11 files changed

Lines changed: 198 additions & 159 deletions

File tree

Cargo.lock

Lines changed: 76 additions & 62 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,16 @@ anyhow = "1.0.95"
5858
arbitrary = "1.3.2"
5959
arcref = "0.2.0"
6060
arrayref = "0.3.7"
61-
arrow = { version = "55", default-features = false }
62-
arrow-arith = "55"
63-
arrow-array = "55"
64-
arrow-buffer = "55"
65-
arrow-cast = "55"
66-
arrow-data = "55"
67-
arrow-ord = "55"
68-
arrow-schema = "55"
69-
arrow-select = "55"
70-
arrow-string = "55"
61+
arrow = { version = "55.1", default-features = false }
62+
arrow-arith = "55.1"
63+
arrow-array = "55.1"
64+
arrow-buffer = "55.1"
65+
arrow-cast = "55.1"
66+
arrow-data = "55.1"
67+
arrow-ord = "55.1"
68+
arrow-schema = "55.1"
69+
arrow-select = "55.1"
70+
arrow-string = "55.1"
7171
async-stream = "0.3.6"
7272
async-trait = "0.1.88"
7373
bindgen = "0.71.1"
@@ -78,15 +78,15 @@ bzip2 = "0.5.0"
7878
cbindgen = "0.29.0"
7979
cc = "1.2"
8080
cfg-if = "1"
81-
chrono = "0.4.40"
81+
chrono = "0.4.41"
8282
clap = "4.5"
8383
compio = { version = "0.14", features = ["io-uring"], default-features = false }
8484
crossbeam-queue = "0.3"
8585
crossterm = "0.28"
8686
dashmap = "6.1.0"
87-
datafusion = { version = "47", default-features = false }
88-
datafusion-common = { version = "47" }
89-
datafusion-physical-plan = { version = "47" }
87+
datafusion = { version = "48", default-features = false }
88+
datafusion-common = { version = "48" }
89+
datafusion-physical-plan = { version = "48" }
9090
divan = { package = "codspeed-divan-compat", version = "2.8.0" }
9191
duckdb = { path = "duckdb-vortex/duckdb-rs/crates/duckdb", features = [
9292
"vtab-full",
@@ -102,7 +102,7 @@ futures = { version = "0.3.31", default-features = false }
102102
futures-util = "0.3.31"
103103
glob = "0.3.2"
104104
goldenfile = "1"
105-
half = { version = "2.5", features = ["std", "num-traits"] }
105+
half = { version = "2.6", features = ["std", "num-traits"] }
106106
hashbrown = "0.15.1"
107107
homedir = "0.3.3"
108108
humansize = "2.1.3"
@@ -124,7 +124,7 @@ once_cell = "1.21"
124124
opentelemetry = "0.29.0"
125125
opentelemetry-otlp = "0.29.0"
126126
opentelemetry_sdk = "0.29.0"
127-
parquet = "55"
127+
parquet = "55.1"
128128
paste = "1.0.15"
129129
pco = "0.4.4"
130130
pin-project = "1.1.5"
@@ -163,14 +163,14 @@ taffy = "0.8.0"
163163
tar = "0.4"
164164
tempfile = "3"
165165
thiserror = "2.0.3"
166-
tokio = "1.44.2"
166+
tokio = "1.45.1"
167167
tokio-stream = "0.1.17"
168168
tracing = { version = "0.1.41" }
169169
tracing-chrome = "0.7.2"
170170
tracing-futures = "0.2.5"
171171
tracing-subscriber = "0.3.19"
172172
url = "2.5.4"
173-
uuid = { version = "1.16", features = ["js"] }
173+
uuid = { version = "1.17", features = ["js"] }
174174
walkdir = "2.5.0"
175175
wasm-bindgen-futures = "0.4.39"
176176
witchcraft-metrics = "1.0.1"

bench-vortex/src/clickbench.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,9 @@ pub async fn register_vortex_files(
216216

217217
let table_url = ListingTableUrl::parse(vortex_path)?;
218218

219-
let config =
220-
ListingTableConfig::new(table_url).with_listing_options(ListingOptions::new(format));
219+
let config = ListingTableConfig::new(table_url).with_listing_options(
220+
ListingOptions::new(format).with_session_config_options(session.state().config()),
221+
);
221222

222223
let config = if let Some(schema) = schema {
223224
config.with_schema(schema.into())
@@ -248,7 +249,9 @@ pub fn register_parquet_files(
248249
let table_url = ListingTableUrl::parse(table_path)?;
249250

250251
let config = ListingTableConfig::new(table_url)
251-
.with_listing_options(ListingOptions::new(format))
252+
.with_listing_options(
253+
ListingOptions::new(format).with_session_config_options(session.state().config()),
254+
)
252255
.with_schema(schema.clone().into());
253256

254257
let listing_table = Arc::new(ListingTable::try_new(config)?);

bench-vortex/src/datasets/file.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ pub async fn register_parquet_files(
6666
info!("Registering table from {}", &parquet_path);
6767
let table_url = ListingTableUrl::parse(parquet_path)?;
6868

69-
let config = ListingTableConfig::new(table_url)
70-
.with_listing_options(ListingOptions::new(format));
69+
let config = ListingTableConfig::new(table_url).with_listing_options(
70+
ListingOptions::new(format).with_session_config_options(session.state().config()),
71+
);
7172

7273
let config = if let Some(schema) = schema {
7374
config.with_schema(schema.into())
@@ -97,8 +98,9 @@ pub async fn register_vortex_files(
9798
// Register the Vortex file
9899
let format = Arc::new(VortexFormat::default());
99100
let table_url = ListingTableUrl::parse(file_url.as_str())?;
100-
let config = ListingTableConfig::new(table_url)
101-
.with_listing_options(ListingOptions::new(format));
101+
let config = ListingTableConfig::new(table_url).with_listing_options(
102+
ListingOptions::new(format).with_session_config_options(session.state().config()),
103+
);
102104

103105
let config = if let Some(schema) = schema {
104106
config.with_schema(schema.into())

bench-vortex/src/public_bi.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,10 @@ impl PBIData {
393393
let path = self.get_file_path(&table.name, file_type);
394394
let table_url = ListingTableUrl::parse(path.to_str().expect("unicode"))?;
395395
let config = ListingTableConfig::new(table_url)
396-
.with_listing_options(ListingOptions::new(df_format))
396+
.with_listing_options(
397+
ListingOptions::new(df_format)
398+
.with_session_config_options(session.state().config()),
399+
)
397400
.with_schema(schema.into());
398401

399402
let listing_table = Arc::new(ListingTable::try_new(config)?);

vortex-datafusion/examples/vortex_table.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ async fn main() -> anyhow::Result<()> {
5757
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?,
5858
)?;
5959
let config = ListingTableConfig::new(table_url)
60-
.with_listing_options(ListingOptions::new(format))
60+
.with_listing_options(
61+
ListingOptions::new(format).with_session_config_options(ctx.state().config()),
62+
)
6163
.infer_schema(&ctx.state())
6264
.await?;
6365

vortex-datafusion/src/lib.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use std::fmt::Debug;
44

55
use datafusion::arrow::datatypes::{DataType, Schema};
66
use datafusion::common::stats::Precision as DFPrecision;
7-
use datafusion::logical_expr::{Expr, Operator};
7+
use datafusion::logical_expr::Operator;
8+
use datafusion::physical_expr::PhysicalExprRef;
9+
use datafusion::physical_plan::expressions::{BinaryExpr, Column, LikeExpr, Literal};
810
use vortex::stats::Precision;
911

1012
mod convert;
@@ -47,26 +49,24 @@ fn supported_data_types(dt: DataType) -> bool {
4749
is_supported
4850
}
4951

50-
fn can_be_pushed_down(expr: &Expr, schema: &Schema) -> bool {
51-
match expr {
52-
Expr::BinaryExpr(expr)
53-
if expr.op.is_logic_operator() || SUPPORTED_BINARY_OPS.contains(&expr.op) =>
54-
{
55-
can_be_pushed_down(expr.left.as_ref(), schema)
56-
& can_be_pushed_down(expr.right.as_ref(), schema)
57-
}
58-
Expr::Column(col) => match schema.column_with_name(col.name()) {
59-
Some((_, field)) => supported_data_types(field.data_type().clone()),
60-
_ => false,
61-
},
62-
Expr::Like(like) => {
63-
can_be_pushed_down(&like.expr, schema) && can_be_pushed_down(&like.pattern, schema)
64-
}
65-
Expr::Literal(lit) => supported_data_types(lit.data_type()),
66-
_ => {
67-
log::debug!("DataFusion expression can't be pushed down: {expr:?}");
68-
false
69-
}
52+
fn can_be_pushed_down(expr: &PhysicalExprRef, schema: &Schema) -> bool {
53+
let expr = expr.as_any();
54+
if let Some(binary) = expr.downcast_ref::<BinaryExpr>() {
55+
(binary.op().is_logic_operator() || SUPPORTED_BINARY_OPS.contains(binary.op()))
56+
&& can_be_pushed_down(binary.left(), schema)
57+
&& can_be_pushed_down(binary.right(), schema)
58+
} else if let Some(col) = expr.downcast_ref::<Column>() {
59+
schema
60+
.column_with_name(col.name())
61+
.map(|(_, field)| supported_data_types(field.data_type().clone()))
62+
.unwrap_or(false)
63+
} else if let Some(like) = expr.downcast_ref::<LikeExpr>() {
64+
can_be_pushed_down(like.expr(), schema) && can_be_pushed_down(like.pattern(), schema)
65+
} else if let Some(lit) = expr.downcast_ref::<Literal>() {
66+
supported_data_types(lit.value().data_type())
67+
} else {
68+
log::debug!("DataFusion expression can't be pushed down: {expr:?}");
69+
false
7070
}
7171
}
7272

vortex-datafusion/src/persistent/format.rs

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,21 @@ use datafusion::common::{
1212
config_datafusion_err, not_impl_err,
1313
};
1414
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
15-
use datafusion::datasource::file_format::{FileFormat, FileFormatFactory, FilePushdownSupport};
15+
use datafusion::datasource::file_format::{FileFormat, FileFormatFactory};
1616
use datafusion::datasource::physical_plan::{
1717
FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
1818
};
1919
use datafusion::datasource::sink::DataSinkExec;
2020
use datafusion::datasource::source::DataSourceExec;
21-
use datafusion::logical_expr::Expr;
2221
use datafusion::logical_expr::dml::InsertOp;
23-
use datafusion::physical_expr::{LexRequirement, PhysicalExpr};
22+
use datafusion::physical_expr::LexRequirement;
2423
use datafusion::physical_plan::ExecutionPlan;
2524
use futures::{FutureExt, StreamExt as _, TryStreamExt as _, stream};
2625
use itertools::Itertools;
2726
use object_store::{ObjectMeta, ObjectStore};
2827
use vortex::dtype::DType;
2928
use vortex::dtype::arrow::FromArrowType;
3029
use vortex::error::{VortexExpect, VortexResult, vortex_err};
31-
use vortex::expr::{ExprRef, VortexExpr, and};
3230
use vortex::file::VORTEX_FILE_EXTENSION;
3331
use vortex::metrics::VortexMetrics;
3432
use vortex::session::VortexSession;
@@ -38,8 +36,8 @@ use vortex::stats::{Stat, StatsProviderExt, StatsSet};
3836
use super::cache::VortexFileCache;
3937
use super::sink::VortexSink;
4038
use super::source::VortexSource;
41-
use crate::convert::{TryFromDataFusion, TryToDataFusion};
42-
use crate::{PrecisionExt as _, can_be_pushed_down};
39+
use crate::PrecisionExt as _;
40+
use crate::convert::TryToDataFusion;
4341

4442
/// Vortex implementation of a DataFusion [`FileFormat`].
4543
pub struct VortexFormat {
@@ -298,7 +296,6 @@ impl FileFormat for VortexFormat {
298296
&self,
299297
_state: &dyn Session,
300298
file_scan_config: FileScanConfig,
301-
filters: Option<&Arc<dyn PhysicalExpr>>,
302299
) -> DFResult<Arc<dyn ExecutionPlan>> {
303300
if file_scan_config
304301
.file_groups
@@ -317,11 +314,7 @@ impl FileFormat for VortexFormat {
317314
return not_impl_err!("Vortex doesn't support output ordering");
318315
}
319316

320-
let mut source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone());
321-
if let Some(predicate) = make_vortex_predicate(filters) {
322-
source = source.with_predicate(predicate);
323-
}
324-
317+
let source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone());
325318
Ok(DataSourceExec::from_data_source(
326319
FileScanConfigBuilder::from(file_scan_config)
327320
.with_source(Arc::new(source))
@@ -350,23 +343,6 @@ impl FileFormat for VortexFormat {
350343
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
351344
}
352345

353-
fn supports_filters_pushdown(
354-
&self,
355-
_file_schema: &Schema,
356-
table_schema: &Schema,
357-
filters: &[&Expr],
358-
) -> DFResult<FilePushdownSupport> {
359-
let is_pushdown = filters
360-
.iter()
361-
.all(|expr| can_be_pushed_down(expr, table_schema));
362-
363-
if is_pushdown {
364-
Ok(FilePushdownSupport::Supported)
365-
} else {
366-
Ok(FilePushdownSupport::NotSupportedForFilter)
367-
}
368-
}
369-
370346
fn file_source(&self) -> Arc<dyn FileSource> {
371347
Arc::new(VortexSource::new(
372348
self.file_cache.clone(),
@@ -375,22 +351,6 @@ impl FileFormat for VortexFormat {
375351
}
376352
}
377353

378-
pub(crate) fn make_vortex_predicate(
379-
predicate: Option<&Arc<dyn PhysicalExpr>>,
380-
) -> Option<Arc<dyn VortexExpr>> {
381-
predicate
382-
// If we cannot convert an expr to a vortex expr, we run no filter, since datafusion
383-
// will rerun the filter expression anyway.
384-
.and_then(|expr| {
385-
// This splits expressions into conjunctions and converts them to vortex expressions.
386-
// Any inconvertible expressions are dropped since true /\ a == a.
387-
datafusion::physical_expr::split_conjunction(expr)
388-
.into_iter()
389-
.filter_map(|e| ExprRef::try_from_df(e.as_ref()).ok())
390-
.reduce(and)
391-
})
392-
}
393-
394354
#[cfg(test)]
395355
mod tests {
396356
use datafusion::execution::SessionStateBuilder;

vortex-datafusion/src/persistent/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ mod tests {
9696
assert!(table_url.is_collection());
9797

9898
let config = ListingTableConfig::new(table_url)
99-
.with_listing_options(ListingOptions::new(format))
99+
.with_listing_options(
100+
ListingOptions::new(format).with_session_config_options(ctx.state().config()),
101+
)
100102
.infer_schema(&ctx.state())
101103
.await?;
102104

vortex-datafusion/src/persistent/sink.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ mod tests {
127127
use datafusion::execution::SessionStateBuilder;
128128
use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Values};
129129
use datafusion::prelude::SessionContext;
130+
use datafusion::scalar::ScalarValue;
130131
use tempfile::TempDir;
131132

132133
use crate::persistent::{VortexFormatFactory, register_vortex_format_factory};
@@ -157,8 +158,8 @@ mod tests {
157158
let values = Values {
158159
schema: Arc::new(my_tbl.schema().clone()),
159160
values: vec![vec![
160-
Expr::Literal("hello".into()),
161-
Expr::Literal(42_i32.into()),
161+
Expr::Literal(ScalarValue::new_utf8view("hello"), None),
162+
Expr::Literal(42_i32.into(), None),
162163
]],
163164
};
164165

0 commit comments

Comments
 (0)