Add ExecutionPlan::apply_expressions()#20337
Conversation
adriangb
left a comment
There was a problem hiding this comment.
This makes sense to me. It mirrors the APIs for Logical expressions, is clean and a relatively small change.
But since this is an API change let's leave this open for a couple of days and get at least 1 more approval from a committer before moving forward with it.
| // Check expressions from this node | ||
| let exprs = plan.expressions(); | ||
| for expr in exprs.iter() { | ||
| if let Some(_df) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() { |
There was a problem hiding this comment.
Should this expr.apply() for nested expressions? Should it deduplicate Arc'ed copies?
There was a problem hiding this comment.
Should this expr.apply() for nested expressions?
iiuc the LogicalPlan counterpart returns just the top level expressions.
Should it deduplicate Arc'ed copies?
yeah deduping is a good idea
There was a problem hiding this comment.
I was referring to this helper function, not the general API. The general API should only expose top level expressions and do no deduplication.
There was a problem hiding this comment.
about deduping, the objective of this test is to prove how many times the Dynamic Filter appears in the plan and if each node is able count how many dynamic filters it contains, if we dedup then we would count it once only
| /// joins). | ||
| fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>; | ||
|
|
||
| /// Returns all expressions (non-recursively) evaluated by the current |
There was a problem hiding this comment.
This API forces an allocation and also cloning all the PhysicalExprs -- what would you think about adding apply_expressions and map_expressions methods to parallel the ones on LogicalPlan instead?
- https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html#method.apply_expressions
- https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html#method.map_expressions
Maybe you can start with just the apply_expressions one in this PR
I think we should probably also not provide a default implementation to force all implementations to properly visit the expressions
If we provide this default implementation, then downstream implementors will likely not implement the API and if something in the datafusion core depends on the API in the future it will be hard to debug what is going on
There was a problem hiding this comment.
I think we should probably also not provide a default implementation to force all implementations to properly visit the expressions
If we provide this default implementation, then downstream implementors will likely not implement the API and if something in the datafusion core depends on the API in the future it will be hard to debug what is going on
makes sense, I included a default implementation because didn't want to incroduce a breaking change but is better to be safe and force the implementation 👍
what would you think about adding apply_expressions and map_expressions methods to parallel the ones on LogicalPlan instead?
nice catch, I missed the allocation fact, I will give it a try
alamb
left a comment
There was a problem hiding this comment.
Thanks @LiaCastaneda and @adriangb
I am a little worried about the default implementation here --
I also think a slightly different API might be worth considering
|
Thanks for reviewing Andrew - that's very good feedback that I missed in my review. I agree that |
|
Thanks both for the reviews! I will work on your suggestion @alamb |
10c7c28 to
51dd8d0
Compare
938297d to
bd5b02f
Compare
bd5b02f to
88730b0
Compare
…ns-function-physical-plan
| let mut tnr = TreeNodeRecursion::Continue; | ||
| if let Some(ordering) = self.cache.output_ordering() { | ||
| for sort_expr in ordering { | ||
| tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; |
There was a problem hiding this comment.
It took me a while to understand visit_sibling -- iiuc it basically short circuits the loop for us. Oncefreturns Stop, every subsequent tnr.visit_sibling(...) call just skips the next f and passes Stop through, so we don't need a manual match + early return after each call. I added a small test in ExecutionPlan to test it works when it returns Stop
There was a problem hiding this comment.
Worth putting in as a code comment? I always struggle to follow the logic of these tree traversals. It makes sense once you get it but yeah they're hard to grok - having a comment that tries to explain in words what is going on may be helpful.
| fn apply_expressions( | ||
| &self, | ||
| f: &mut dyn FnMut( | ||
| &dyn datafusion::physical_plan::PhysicalExpr, |
There was a problem hiding this comment.
should this / could this be &Arc<dyn PhysicalExpr>?
There was a problem hiding this comment.
I think it would bring similiar issues to #19937. It will be hard to do operations on the expression like downcasting.
There was a problem hiding this comment.
Those that fail to learn from history are doomed to repeat it... seems like I flunked the class 😆
There was a problem hiding this comment.
tbf I had a rough time upgrading DF because of this, so that’s mainly why I remember it so well haha
| let mut tnr = TreeNodeRecursion::Continue; | ||
| if let Some(ordering) = self.cache.output_ordering() { | ||
| for sort_expr in ordering { | ||
| tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; |
There was a problem hiding this comment.
Worth putting in as a code comment? I always struggle to follow the logic of these tree traversals. It makes sense once you get it but yeah they're hard to grok - having a comment that tries to explain in words what is going on may be helpful.
|
|
||
| [#19692]: https://github.com/apache/datafusion/issues/19692 | ||
|
|
||
| ### `ExecutionPlan::apply_expressions` is now a required method |
There was a problem hiding this comment.
@LiaCastaneda the 53.0.0 branch has been cut, so I don't think this will make it in since it's decidedly a new feature. Is that okay with you? If so let's move this section to 54.0.0. I think it's always good to merge a new API right after a release, it gives us time to make non breaking changes if we find issues 2 weeks in.
There was a problem hiding this comment.
ohh, I was not aware of it, no problem from my side, I will add it to 54.0.0
adriangb
left a comment
There was a problem hiding this comment.
Some minor comments but I think we can merge this whenever you think it's ready Lía
|
There are some conflicts again, wil fix them... |
Thank you and sorry for the delays causing conflicts and bump to v54 |
…ns-function-physical-plan
|
no worries, they were not too complex to solve, I added if so, I think the PR is good to go |
|
Hi! There is a patch #20009 that adds a more expressive API by splitting responsibilities into:
This approach not only helps to check for specific types of expressions in the plan but also enables replacing them, which extends the number of contexts where the API can be used. It looks a bit confusing to have all these methods together ( pub fn visit_expressions(
plan: &dyn ExecutionPlan,
f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
) -> Result<TreeNodeRecursion> {
let mut tnr = TreeNodeRecursion::Continue;
for expr in plan.physical_expressions() {
tnr = tnr.visit_sibling(|| f(expr.as_ref()))?;
}
Ok(tnr)
} |
|
👋 Hey, I was not aware there was already an initiative to build a similar API. This PR implements |
Yes, it would be nice to have a writing API. The important property we need is that |
|
I think we can reuse the properties of the rest of the plan (avoiding I created this issue #20899. I haven't started working on it yet and probably won't have much time this week, so I'll likely give it a try next week, but feel free to take it if you'd like |
|
Actually, now that I think about it, there are some cases where we would need to recompute properties right? for example, if a user changes an expression from a > something to a < something. How do we specify in this API whether we want to recompute properties or not? should |
Yes, it may be useful to explicitly ask for properties re-computation. And it seems for me that by default the safest option is to force properties to be re-computed. Another way to satisfy it is to introduce "args struct" like: struct MapExpressionsArgs<'a> {
f: &'a dyn FnMut(&Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>,
preserve_properties: bool,
}Like is done here: datafusion/datafusion/catalog/src/table.rs Lines 366 to 372 in 8d9b080 to not add a bool argument each time when the method semantics is extended. But maybe this is overkill here and bool parameter will be enough. |
|
lets continue this discussion in the issue |
Which issue does this PR close?
DynamicFilterPhysicalExprexpressions from outside the plan #18296Needed for datafusion-contrib/datafusion-distributed#180
Rationale for this change
Right now, there is no easy way to know if a given node in the plan holds Dynamic Filters or to traverse all physical expressions in an ExecutionPlan. This PR implements
apply_expressions()that visits allPhysicalExprs inside anExecutionPlanusing a callback pattern, includingDynamicFilterPhysicalExpr. This is similar to the existingapply_expressions()API forLogicalPlan.What changes are included in this PR?
apply_expressions()method to theExecutionPlantrait with no default implementation, forcing all implementors to explicitly handle their expressionsFnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>to avoid allocationsapply_expressions()for allExecutionPlanimplementationsapply_expressions()toFileSourceandDataSourcetraits (required, no default)Are these changes tested?
Yes, added a test that traverses the plan and discovers dynamic filters using
apply_expressions().Are there any user-facing changes?
Yes, the new API
ExecutionPlan::apply_expressions(),FileSource::apply_expressions(), andDataSource::apply_expressions().