Skip to content

feat: support repartitioning of FFI execution plans#20449

Merged
timsaucer merged 11 commits intoapache:mainfrom
timsaucer:feat/ffi-support-repartitioned-plans
Mar 19, 2026
Merged

feat: support repartitioning of FFI execution plans#20449
timsaucer merged 11 commits intoapache:mainfrom
timsaucer:feat/ffi-support-repartitioned-plans

Conversation

@timsaucer
Copy link
Member

@timsaucer timsaucer commented Feb 20, 2026

Which issue does this PR close?

This is a blocker for #20450

Rationale for this change

This PR introduces an important concept in the FFI work to avoids creating wrappers upon wrappers of plans. It was discovered as part of the work to create FFI physical optimizer rules. Suppose we have a foreign plan. Then we attempt to turn this into an FFI plan. What we will end up with currently is a FFI plan where the underlying private data is a foreign plan that additionally contains a FFI plan. Instead any time we are creating an FFI object we should check to see if it is locally downcastable to a Foreign plan and if so to just access the already existing FFI object.

This pattern is adapted across all FFI objects in this PR.

With this work in place we can also properly support repartioning via FFI as well as new_with_children via FFI.

What changes are included in this PR?

  • Adds access pattern for creating new FFI objects. When they are already a locally downcastable to a Foreign wrapper then we simply get the underlying existing FFI object instead of creating a wrapper around a wrapper.
  • Implement repartitioning and new_with_children via FFI on execution plans.

Are these changes tested?

Integration tests are added.

Are there any user-facing changes?

The one use facing change is that for some of the aggregates and accumulators that take in closures we require these closures to be static so that we can downcast the boxed traits.

@github-actions github-actions bot added documentation Improvements or additions to documentation development-process Related to development process of DataFusion logical-expr Logical plan and expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) catalog Related to the catalog crate common Related to common crate execution Related to the execution crate proto Related to proto crate functions Changes to functions implementation ffi Changes to the ffi crate spark labels Feb 20, 2026
@timsaucer timsaucer force-pushed the feat/ffi-support-repartitioned-plans branch from 4982027 to ca8c5de Compare February 24, 2026 15:16
@timsaucer timsaucer removed documentation Improvements or additions to documentation development-process Related to development process of DataFusion core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) common Related to common crate execution Related to the execution crate labels Feb 24, 2026
Copy link
Member Author

@timsaucer timsaucer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to mark the places where I made a change other than making the traits require Any.

pub struct BooleanGroupsAccumulator<F>
where
F: Fn(bool, bool) -> bool + Send + Sync,
F: Fn(bool, bool) -> bool + Send + Sync + 'static,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change, along with the similar updates in this and other files, is the only thing that I think will be controversial in this PR.

where
T: ArrowPrimitiveType + Send,
F: Fn(&mut T::Native, T::Native) + Send + Sync,
F: Fn(&mut T::Native, T::Native) + Send + Sync + 'static,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change, along with the similar updates in this and other files, is the only thing that I think will be controversial in this PR.

where
T: ArrowNumericType + Send,
F: Fn(T::Native, u64) -> Result<T::Native> + Send,
F: Fn(T::Native, u64) -> Result<T::Native> + Send + 'static,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change, along with the similar updates in this and other files, is the only thing that I think will be controversial in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 requiring 'static seems like a fair ask. If this anyways required Send it's likely that pretty much everything was 'static already.

I imagine this requirement comes from the fact that now the GroupsAccumulator implementation requires Any, and Any is not automatically satisfied it the type has non 'static references. Is that the case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! I also have this plan to move all of the aggregates and windows to use trait upcasting like in this PR: #20812 It reduces a lot of boilerplate code

where
T: ArrowNumericType + Send,
F: Fn(T::Native, i64) -> Result<T::Native> + Send,
F: Fn(T::Native, i64) -> Result<T::Native> + Send + 'static,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change, along with the similar updates in this and other files, is the only thing that I think will be controversial in this PR.

@timsaucer timsaucer marked this pull request as ready for review February 24, 2026 15:25
@timsaucer timsaucer requested a review from Copilot February 24, 2026 15:25
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enables repartitioning and with_new_children operations for FFI execution plans by introducing a pattern that prevents creating wrappers around wrappers. When converting a plan to FFI format, the code now checks if it's already a Foreign plan and extracts the underlying FFI object instead of wrapping it again.

Changes:

  • Added 'static lifetime bounds to closure types in aggregate accumulators and groups accumulators to enable downcasting
  • Implemented FFI support for with_new_children and repartitioned operations on execution plans
  • Added runtime propagation logic to ensure all local execution plans in a chain share the same runtime

Reviewed changes

Copilot reviewed 28 out of 28 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
datafusion/spark/src/function/aggregate/avg.rs Added 'static bound to closure type parameter F in AvgGroupsAccumulator
datafusion/proto/src/physical_plan/mod.rs Added Any trait bound to PhysicalExtensionCodec
datafusion/proto/src/logical_plan/mod.rs Added std::any::Any trait bound to LogicalExtensionCodec
datafusion/functions-aggregate/src/average.rs Added 'static bound to closure type parameter F in AvgGroupsAccumulator
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs Added 'static bound to closure type parameter F in PrimitiveGroupsAccumulator
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs Added 'static bound to closure type parameter F in BooleanGroupsAccumulator
datafusion/ffi/tests/ffi_execution_plan.rs Added integration test for runtime propagation through execution plan hierarchy
datafusion/ffi/src/udwf/partition_evaluator.rs Added check to extract underlying FFI object when converting ForeignPartitionEvaluator
datafusion/ffi/src/udwf/mod.rs Added check to extract underlying FFI object when converting ForeignWindowUDF
datafusion/ffi/src/udtf.rs Added check to extract underlying FFI object when converting ForeignTableFunction
datafusion/ffi/src/udf/mod.rs Added check to extract underlying FFI object when converting ForeignScalarUDF
datafusion/ffi/src/udaf/mod.rs Added check to extract underlying FFI object when converting ForeignAggregateUDF
datafusion/ffi/src/udaf/groups_accumulator.rs Added check to extract underlying FFI object when converting ForeignGroupsAccumulator
datafusion/ffi/src/udaf/accumulator.rs Added check to extract underlying FFI object when converting ForeignAccumulator
datafusion/ffi/src/tests/mod.rs Added create_empty_exec function and export to ForeignLibraryModule
datafusion/ffi/src/table_provider.rs Added check to extract underlying FFI object when converting ForeignTableProvider
datafusion/ffi/src/session/mod.rs Added check to extract underlying FFI object when converting ForeignSession
datafusion/ffi/src/schema_provider.rs Added check to extract underlying FFI object when converting ForeignSchemaProvider
datafusion/ffi/src/proto/physical_extension_codec.rs Renamed variables from provider to codec, added runtime propagation, and unwrapping check
datafusion/ffi/src/proto/logical_extension_codec.rs Added check to extract underlying FFI object when converting ForeignLogicalExtensionCodec
datafusion/ffi/src/physical_expr/mod.rs Added check to extract underlying FFI object when converting ForeignPhysicalExpr
datafusion/ffi/src/execution_plan.rs Implemented with_new_children and repartitioned FFI operations, added runtime propagation logic
datafusion/ffi/src/catalog_provider_list.rs Added check to extract underlying FFI object when converting ForeignCatalogProviderList
datafusion/ffi/src/catalog_provider.rs Added check to extract underlying FFI object when converting ForeignCatalogProvider
datafusion/expr/src/partition_evaluator.rs Added std::any::Any trait bound to PartitionEvaluator
datafusion/expr-common/src/groups_accumulator.rs Added std::any::Any trait bound to GroupsAccumulator
datafusion/expr-common/src/accumulator.rs Added std::any::Any trait bound to Accumulator
datafusion/catalog/src/table.rs Added Any trait bound to TableFunctionImpl

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

timsaucer and others added 3 commits March 8, 2026 17:34
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@timsaucer timsaucer force-pushed the feat/ffi-support-repartitioned-plans branch from 89a9215 to 390e97e Compare March 8, 2026 16:37
Copy link
Contributor

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! from what I can see mostly mechanical 👍

Comment on lines +393 to +395
let new_plan =
df_result!((self.plan.with_new_children)(&self.plan, children))?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the unsafe block can be scoped to this:

        let new_plan =
            unsafe { df_result!((self.plan.with_new_children)(&self.plan, children))? };

where
T: ArrowNumericType + Send,
F: Fn(T::Native, u64) -> Result<T::Native> + Send,
F: Fn(T::Native, u64) -> Result<T::Native> + Send + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 requiring 'static seems like a fair ask. If this anyways required Send it's likely that pretty much everything was 'static already.

I imagine this requirement comes from the fact that now the GroupsAccumulator implementation requires Any, and Any is not automatically satisfied it the type has non 'static references. Is that the case?

/// [`merge_batch`]: Self::merge_batch
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
pub trait Accumulator: Send + Sync + Debug {
pub trait Accumulator: Send + Sync + Debug + std::any::Any {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other traits of the project, rather than requiring Any in the trait, they have instead an additional fn as_any(&self) -> &dyn Any method for upcasting concrete implementations to Any.

Do you think as_any() could have been used here instead? I don't see a strong reason in favor of it, but maybe you actually have a strong reason against it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment on the other thread but basically I want to remove these as we go: #20812

@timsaucer timsaucer added this pull request to the merge queue Mar 19, 2026
Merged via the queue into apache:main with commit 897b5c1 Mar 19, 2026
34 checks passed
@timsaucer timsaucer deleted the feat/ffi-support-repartitioned-plans branch March 19, 2026 15:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

catalog Related to the catalog crate ffi Changes to the ffi crate functions Changes to functions implementation logical-expr Logical plan and expressions proto Related to proto crate spark

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants