Skip to content

Commit ad0459e

Browse files
Append generated column to the schema instead of prepending for WindowAggExec (#4746)
* Append window result instead of prepending * Fix expected projections in tests Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
1 parent 41c72cf commit ad0459e

5 files changed

Lines changed: 49 additions & 122 deletions

File tree

datafusion/core/src/physical_optimizer/optimize_sorts.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,7 @@ mod tests {
677677
vec![
678678
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
679679
" FilterExec: NOT non_nullable_col@1",
680-
" SortExec: [non_nullable_col@2 ASC NULLS LAST]",
680+
" SortExec: [non_nullable_col@1 ASC NULLS LAST]",
681681
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
682682
" SortExec: [non_nullable_col@1 DESC]",
683683
" MemoryExec: partitions=0, partition_sizes=[]",

datafusion/core/src/physical_plan/windows/mod.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -265,17 +265,18 @@ mod tests {
265265
schema.as_ref(),
266266
)?],
267267
input,
268-
schema,
268+
schema.clone(),
269269
vec![],
270270
None,
271271
)?);
272272

273273
let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
274274
assert_eq!(result.len(), 1);
275275

276+
let n_schema_fields = schema.fields().len();
276277
let columns = result[0].columns();
277278

278-
let count: &Int64Array = as_primitive_array(&columns[0])?;
279+
let count: &Int64Array = as_primitive_array(&columns[n_schema_fields])?;
279280
assert_eq!(count.value(0), 100);
280281
assert_eq!(count.value(99), 100);
281282
Ok(())
@@ -326,19 +327,20 @@ mod tests {
326327
let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
327328
assert_eq!(result.len(), 1);
328329

330+
let n_schema_fields = schema.fields().len();
329331
let columns = result[0].columns();
330332

331333
// c3 is small int
332334

333-
let count: &Int64Array = as_primitive_array(&columns[0])?;
335+
let count: &Int64Array = as_primitive_array(&columns[n_schema_fields])?;
334336
assert_eq!(count.value(0), 100);
335337
assert_eq!(count.value(99), 100);
336338

337-
let max: &Int8Array = as_primitive_array(&columns[1])?;
339+
let max: &Int8Array = as_primitive_array(&columns[n_schema_fields + 1])?;
338340
assert_eq!(max.value(0), 125);
339341
assert_eq!(max.value(99), 125);
340342

341-
let min: &Int8Array = as_primitive_array(&columns[2])?;
343+
let min: &Int8Array = as_primitive_array(&columns[n_schema_fields + 2])?;
342344
assert_eq!(min.value(0), -117);
343345
assert_eq!(min.value(99), -117);
344346

datafusion/core/src/physical_plan/windows/window_agg_exec.rs

Lines changed: 17 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::physical_plan::metrics::{
2525
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
2626
};
2727
use crate::physical_plan::{
28-
Column, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
28+
ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
2929
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
3030
SendableRecordBatchStream, Statistics, WindowExpr,
3131
};
@@ -39,8 +39,6 @@ use arrow::{
3939
record_batch::RecordBatch,
4040
};
4141
use datafusion_common::DataFusionError;
42-
use datafusion_physical_expr::rewrite::TreeNodeRewritable;
43-
use datafusion_physical_expr::EquivalentClass;
4442
use futures::stream::Stream;
4543
use futures::{ready, StreamExt};
4644
use log::debug;
@@ -65,8 +63,6 @@ pub struct WindowAggExec {
6563
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
6664
/// Sort Keys
6765
pub sort_keys: Option<Vec<PhysicalSortExpr>>,
68-
/// The output ordering
69-
output_ordering: Option<Vec<PhysicalSortExpr>>,
7066
/// Execution metrics
7167
metrics: ExecutionPlanMetricsSet,
7268
}
@@ -82,33 +78,6 @@ impl WindowAggExec {
8278
) -> Result<Self> {
8379
let schema = create_schema(&input_schema, &window_expr)?;
8480
let schema = Arc::new(schema);
85-
let window_expr_len = window_expr.len();
86-
// Although WindowAggExec does not change the output ordering from the input, but can not return the output ordering
87-
// from the input directly, need to adjust the column index to align with the new schema.
88-
let output_ordering = input
89-
.output_ordering()
90-
.map(|sort_exprs| {
91-
let new_sort_exprs: Result<Vec<PhysicalSortExpr>> = sort_exprs
92-
.iter()
93-
.map(|e| {
94-
let new_expr = e.expr.clone().transform_down(&|e| {
95-
Ok(e.as_any().downcast_ref::<Column>().map(|col| {
96-
Arc::new(Column::new(
97-
col.name(),
98-
window_expr_len + col.index(),
99-
))
100-
as Arc<dyn PhysicalExpr>
101-
}))
102-
})?;
103-
Ok(PhysicalSortExpr {
104-
expr: new_expr,
105-
options: e.options,
106-
})
107-
})
108-
.collect();
109-
new_sort_exprs
110-
})
111-
.map_or(Ok(None), |v| v.map(Some))?;
11281

11382
Ok(Self {
11483
input,
@@ -117,7 +86,6 @@ impl WindowAggExec {
11786
input_schema,
11887
partition_keys,
11988
sort_keys,
120-
output_ordering,
12189
metrics: ExecutionPlanMetricsSet::new(),
12290
})
12391
}
@@ -176,34 +144,10 @@ impl ExecutionPlan for WindowAggExec {
176144

177145
/// Get the output partitioning of this plan
178146
fn output_partitioning(&self) -> Partitioning {
179-
// Although WindowAggExec does not change the output partitioning from the input, but can not return the output partitioning
180-
// from the input directly, need to adjust the column index to align with the new schema.
181-
let window_expr_len = self.window_expr.len();
182-
let input_partitioning = self.input.output_partitioning();
183-
match input_partitioning {
184-
Partitioning::RoundRobinBatch(size) => Partitioning::RoundRobinBatch(size),
185-
Partitioning::UnknownPartitioning(size) => {
186-
Partitioning::UnknownPartitioning(size)
187-
}
188-
Partitioning::Hash(exprs, size) => {
189-
let new_exprs = exprs
190-
.into_iter()
191-
.map(|expr| {
192-
expr.transform_down(&|e| {
193-
Ok(e.as_any().downcast_ref::<Column>().map(|col| {
194-
Arc::new(Column::new(
195-
col.name(),
196-
window_expr_len + col.index(),
197-
))
198-
as Arc<dyn PhysicalExpr>
199-
}))
200-
})
201-
.unwrap()
202-
})
203-
.collect::<Vec<_>>();
204-
Partitioning::Hash(new_exprs, size)
205-
}
206-
}
147+
// because we can have repartitioning using the partition keys
148+
// this would be either 1 or more than 1 depending on the presense of
149+
// repartitioning
150+
self.input.output_partitioning()
207151
}
208152

209153
/// Specifies whether this plan generates an infinite stream of records.
@@ -221,7 +165,7 @@ impl ExecutionPlan for WindowAggExec {
221165
}
222166

223167
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
224-
self.output_ordering.as_deref()
168+
self.input().output_ordering()
225169
}
226170

227171
fn maintains_input_order(&self) -> bool {
@@ -244,30 +188,7 @@ impl ExecutionPlan for WindowAggExec {
244188
}
245189

246190
fn equivalence_properties(&self) -> EquivalenceProperties {
247-
// Although WindowAggExec does not change the equivalence properties from the input, but can not return the equivalence properties
248-
// from the input directly, need to adjust the column index to align with the new schema.
249-
let window_expr_len = self.window_expr.len();
250-
let mut new_properties = EquivalenceProperties::new(self.schema());
251-
let new_eq_classes = self
252-
.input
253-
.equivalence_properties()
254-
.classes()
255-
.iter()
256-
.map(|prop| {
257-
let new_head = Column::new(
258-
prop.head().name(),
259-
window_expr_len + prop.head().index(),
260-
);
261-
let new_others = prop
262-
.others()
263-
.iter()
264-
.map(|col| Column::new(col.name(), window_expr_len + col.index()))
265-
.collect::<Vec<_>>();
266-
EquivalentClass::new(new_head, new_others)
267-
})
268-
.collect::<Vec<_>>();
269-
new_properties.extend(new_eq_classes);
270-
new_properties
191+
self.input().equivalence_properties()
271192
}
272193

273194
fn with_new_children(
@@ -334,12 +255,13 @@ impl ExecutionPlan for WindowAggExec {
334255
let win_cols = self.window_expr.len();
335256
let input_cols = self.input_schema.fields().len();
336257
// TODO stats: some windowing function will maintain invariants such as min, max...
337-
let mut column_statistics = vec![ColumnStatistics::default(); win_cols];
258+
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
338259
if let Some(input_col_stats) = input_stat.column_statistics {
339260
column_statistics.extend(input_col_stats);
340261
} else {
341262
column_statistics.extend(vec![ColumnStatistics::default(); input_cols]);
342263
}
264+
column_statistics.extend(vec![ColumnStatistics::default(); win_cols]);
343265
Statistics {
344266
is_exact: input_stat.is_exact,
345267
num_rows: input_stat.num_rows,
@@ -354,10 +276,11 @@ fn create_schema(
354276
window_expr: &[Arc<dyn WindowExpr>],
355277
) -> Result<Schema> {
356278
let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len());
279+
fields.extend_from_slice(input_schema.fields());
280+
// append results to the schema
357281
for expr in window_expr {
358282
fields.push(expr.field()?);
359283
}
360-
fields.extend_from_slice(input_schema.fields());
361284
Ok(Schema::new(fields))
362285
}
363286

@@ -433,7 +356,7 @@ impl WindowAggStream {
433356
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?,
434357
)
435358
}
436-
let mut columns = transpose(partition_results)
359+
let columns = transpose(partition_results)
437360
.iter()
438361
.map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
439362
.collect::<Vec<_>>()
@@ -442,9 +365,11 @@ impl WindowAggStream {
442365

443366
// combine with the original cols
444367
// note the setup of window aggregates is that they newly calculated window
445-
// expressions are always prepended to the columns
446-
columns.extend_from_slice(batch.columns());
447-
RecordBatch::try_new(self.schema.clone(), columns)
368+
// expression results are always appended to the columns
369+
let mut batch_columns = batch.columns().to_vec();
370+
// calculate window cols
371+
batch_columns.extend_from_slice(&columns);
372+
RecordBatch::try_new(self.schema.clone(), batch_columns)
448373
}
449374

450375
/// Evaluates the partition points given the sort columns. If the sort columns are

0 commit comments

Comments
 (0)