Skip to content

Commit 88730b0

Browse files
committed
Implement apply_expressions in missing plans
1 parent 51dd8d0 commit 88730b0

6 files changed

Lines changed: 114 additions & 1 deletion

File tree

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use async_trait::async_trait;
2727
use datafusion::arrow::array::{UInt8Builder, UInt64Builder};
2828
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2929
use datafusion::arrow::record_batch::RecordBatch;
30+
use datafusion::common::tree_node::TreeNodeRecursion;
3031
use datafusion::datasource::{TableProvider, TableType, provider_as_source};
3132
use datafusion::error::Result;
3233
use datafusion::execution::context::TaskContext;
@@ -283,4 +284,19 @@ impl ExecutionPlan for CustomExec {
283284
None,
284285
)?))
285286
}
287+
288+
fn apply_expressions(
289+
&self,
290+
f: &mut dyn FnMut(
291+
&dyn datafusion::physical_plan::PhysicalExpr,
292+
) -> Result<TreeNodeRecursion>,
293+
) -> Result<TreeNodeRecursion> {
294+
// Visit expressions in the output ordering from equivalence properties
295+
if let Some(ordering) = self.cache.output_ordering() {
296+
for sort_expr in ordering {
297+
f(sort_expr.expr.as_ref())?;
298+
}
299+
}
300+
Ok(TreeNodeRecursion::Continue)
301+
}
286302
}

datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
use arrow::record_batch::RecordBatch;
3030
use arrow_schema::SchemaRef;
3131
use datafusion::common::record_batch;
32+
use datafusion::common::tree_node::TreeNodeRecursion;
3233
use datafusion::common::{exec_datafusion_err, internal_err};
3334
use datafusion::datasource::{DefaultTableSource, memory::MemTable};
3435
use datafusion::error::Result;
@@ -296,4 +297,19 @@ impl ExecutionPlan for BufferingExecutionPlan {
296297
}),
297298
)))
298299
}
300+
301+
fn apply_expressions(
302+
&self,
303+
f: &mut dyn FnMut(
304+
&dyn datafusion::physical_plan::PhysicalExpr,
305+
) -> Result<TreeNodeRecursion>,
306+
) -> Result<TreeNodeRecursion> {
307+
// Visit expressions in the output ordering from equivalence properties
308+
if let Some(ordering) = self.properties.output_ordering() {
309+
for sort_expr in ordering {
310+
f(sort_expr.expr.as_ref())?;
311+
}
312+
}
313+
Ok(TreeNodeRecursion::Continue)
314+
}
299315
}

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use std::sync::Arc;
3838

3939
use datafusion::common::Result;
4040
use datafusion::common::internal_err;
41+
use datafusion::common::tree_node::TreeNodeRecursion;
4142
use datafusion::execution::TaskContext;
4243
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
4344
use datafusion::prelude::SessionContext;
@@ -128,6 +129,15 @@ impl ExecutionPlan for ParentExec {
128129
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
129130
unreachable!()
130131
}
132+
133+
fn apply_expressions(
134+
&self,
135+
_f: &mut dyn FnMut(
136+
&dyn datafusion::physical_plan::PhysicalExpr,
137+
) -> Result<TreeNodeRecursion>,
138+
) -> Result<TreeNodeRecursion> {
139+
Ok(TreeNodeRecursion::Continue)
140+
}
131141
}
132142

133143
/// A PhysicalExtensionCodec that can serialize and deserialize ParentExec
@@ -204,6 +214,15 @@ impl ExecutionPlan for ChildExec {
204214
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
205215
unreachable!()
206216
}
217+
218+
fn apply_expressions(
219+
&self,
220+
_f: &mut dyn FnMut(
221+
&dyn datafusion::physical_plan::PhysicalExpr,
222+
) -> Result<TreeNodeRecursion>,
223+
) -> Result<TreeNodeRecursion> {
224+
Ok(TreeNodeRecursion::Continue)
225+
}
207226
}
208227

209228
/// A PhysicalExtensionCodec that can serialize and deserialize ChildExec

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ use datafusion::{
117117
};
118118
use datafusion_common::{
119119
DFSchemaRef, DataFusionError, Result, Statistics, internal_err, not_impl_err,
120-
plan_datafusion_err, plan_err,
120+
plan_datafusion_err, plan_err, tree_node::TreeNodeRecursion,
121121
};
122122
use datafusion_expr::{
123123
UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
@@ -743,6 +743,21 @@ impl ExecutionPlan for SampleExec {
743743

744744
Ok(stats)
745745
}
746+
747+
fn apply_expressions(
748+
&self,
749+
f: &mut dyn FnMut(
750+
&dyn datafusion::physical_plan::PhysicalExpr,
751+
) -> Result<TreeNodeRecursion>,
752+
) -> Result<TreeNodeRecursion> {
753+
// Visit expressions in the output ordering from equivalence properties
754+
if let Some(ordering) = self.cache.output_ordering() {
755+
for sort_expr in ordering {
756+
f(sort_expr.expr.as_ref())?;
757+
}
758+
}
759+
Ok(TreeNodeRecursion::Continue)
760+
}
746761
}
747762

748763
/// Bernoulli sampler: includes each row with probability `(upper - lower)`.

datafusion/ffi/src/execution_plan.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::Arc;
2121

2222
use abi_stable::StableAbi;
2323
use abi_stable::std_types::{RString, RVec};
24+
use datafusion_common::tree_node::TreeNodeRecursion;
2425
use datafusion_common::{DataFusionError, Result};
2526
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
2627
use datafusion_physical_plan::{
@@ -293,6 +294,21 @@ impl ExecutionPlan for ForeignExecutionPlan {
293294
.map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
294295
}
295296
}
297+
298+
fn apply_expressions(
299+
&self,
300+
f: &mut dyn FnMut(
301+
&dyn datafusion_physical_plan::PhysicalExpr,
302+
) -> Result<TreeNodeRecursion>,
303+
) -> Result<TreeNodeRecursion> {
304+
// Visit expressions in the output ordering from equivalence properties
305+
if let Some(ordering) = self.properties.output_ordering() {
306+
for sort_expr in ordering {
307+
f(sort_expr.expr.as_ref())?;
308+
}
309+
}
310+
Ok(TreeNodeRecursion::Continue)
311+
}
296312
}
297313

298314
#[cfg(test)]
@@ -367,6 +383,21 @@ pub(crate) mod tests {
367383
) -> Result<SendableRecordBatchStream> {
368384
unimplemented!()
369385
}
386+
387+
fn apply_expressions(
388+
&self,
389+
f: &mut dyn FnMut(
390+
&dyn datafusion_physical_plan::PhysicalExpr,
391+
) -> Result<TreeNodeRecursion>,
392+
) -> Result<TreeNodeRecursion> {
393+
// Visit expressions in the output ordering from equivalence properties
394+
if let Some(ordering) = self.props.output_ordering() {
395+
for sort_expr in ordering {
396+
f(sort_expr.expr.as_ref())?;
397+
}
398+
}
399+
Ok(TreeNodeRecursion::Continue)
400+
}
370401
}
371402

372403
#[test]

datafusion/ffi/src/tests/async_provider.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use arrow::array::RecordBatch;
3333
use arrow::datatypes::Schema;
3434
use async_trait::async_trait;
3535
use datafusion_catalog::TableProvider;
36+
use datafusion_common::tree_node::TreeNodeRecursion;
3637
use datafusion_common::{Result, exec_err};
3738
use datafusion_execution::RecordBatchStream;
3839
use datafusion_expr::Expr;
@@ -219,6 +220,21 @@ impl ExecutionPlan for AsyncTestExecutionPlan {
219220
batch_receiver: self.batch_receiver.resubscribe(),
220221
}))
221222
}
223+
224+
fn apply_expressions(
225+
&self,
226+
f: &mut dyn FnMut(
227+
&dyn datafusion_physical_plan::PhysicalExpr,
228+
) -> Result<TreeNodeRecursion>,
229+
) -> Result<TreeNodeRecursion> {
230+
// Visit expressions in the output ordering from equivalence properties
231+
for ordering in self.properties.output_ordering() {
232+
for sort_expr in ordering {
233+
f(sort_expr.expr.as_ref())?;
234+
}
235+
}
236+
Ok(TreeNodeRecursion::Continue)
237+
}
222238
}
223239

224240
impl datafusion_physical_plan::DisplayAs for AsyncTestExecutionPlan {

0 commit comments

Comments
 (0)