Skip to content

[branch-53] ensure dynamic filters are correctly pushed down through aggregations…#123

Closed
jayshrivastava wants to merge 1 commit into
branch-53from
js/cherry-pick-dynamic-filter-pushdown-through-aggregations
Closed

[branch-53] ensure dynamic filters are correctly pushed down through aggregations…#123
jayshrivastava wants to merge 1 commit into
branch-53from
js/cherry-pick-dynamic-filter-pushdown-through-aggregations

Conversation

@jayshrivastava
Copy link
Copy Markdown

@jayshrivastava jayshrivastava commented May 19, 2026

Cherry-pick of apache#21059

…apache#21059)

- Closes apache#21065.

In plans such as the following, dynamic filters are not pushed down
through the aggregation
```
CREATE TABLE data (a VARCHAR, ts TIMESTAMP, value DOUBLE)
    AS VALUES
      ('h1', '2024-01-01T00:05:00', 1.0),
      ('h1', '2024-01-01T00:15:00', 2.0),
      ('h2', '2024-01-01T00:25:00', 3.0),
      ('h3', '2024-01-01T00:35:00', 4.0);

SELECT * FROM contexts c
  INNER JOIN (
    SELECT a, date_bin(interval '1 hour', ts) AS bucket, min(value) AS min_val
    FROM (SELECT value, a, ts FROM data)
    GROUP BY a, date_bin(interval '1 hour', ts)
  ) agg ON c.a = agg.a;
```

```
    HashJoinExec: mode=Auto, join_type=Inner, on=[(a@0, a@0)]
      DataSourceExec: partitions=1
      ProjectionExec: [a@0, date_bin(1h, ts)@1 as bucket, min(value)@2 as min_val]
        AggregateExec: mode=FinalPartitioned, gby=[a@0, date_bin(1h, ts)@1], aggr=[min(value)]
          AggregateExec: mode=Partial, gby=[a@1, date_bin(1h, ts@2)], aggr=[min(value)]
            ProjectionExec: [value@2, a@0, ts@1]        ← reorders columns
              DataSourceExec: partitions=1
```

`AggregateExec::gather_filters_for_pushdown` compared parent filter
columns (output schema indices) against grouping expression columns
(input schema indices). When a `ProjectionExec` below the aggregate
reorders columns, the index mismatch causes filters (such as HashJoin
dynamic filters) to be incorrectly blocked.

This change fixes the column index mapping in
`AggregateExec::gather_filters_for_pushdown`

- `test_pushdown_through_aggregate_with_reordered_input_columns` —
filter on grouping column with reordered input is pushed down
-
`test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result`
— filter on aggregate result column is not pushed down
- `test_pushdown_through_aggregate_grouping_sets_with_reordered_input` —
GROUPING SETS: filter on common column pushed, filter on missing column
blocked
-
`test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input`
— HashJoin dynamic filter pushes through aggregate with reordered input
and is populated with values after
   execution
  - All tests verified to fail without the fix

No.
@datadog-official
Copy link
Copy Markdown

Pipelines

Fix all issues with BitsAI

⚠️ Warnings

🚦 1 Pipeline job failed

Rust | build and run with wasm-pack   View in Datadog   GitHub Actions

🛟 This job is unlikely to succeed on retry. Please review your pipeline configuration. Failed to download wasm-pack: 404 Not Found error from source URL.

Useful? React with 👍 / 👎

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 114139c | Docs | Datadog PR Page | Give us feedback!

@jayshrivastava jayshrivastava changed the title ensure dynamic filters are correctly pushed down through aggregations… [branch-53] ensure dynamic filters are correctly pushed down through aggregations… May 19, 2026
LiaCastaneda added a commit that referenced this pull request May 22, 2026
apache#22453) (#126)

## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->

- Closes #.

## Rationale for this change

When the substrait consumer hits an `Aggregate` with two identical
measures (e.g. `sum(a)` present twice), planning fails with `Schema
contains duplicate unqualified field name`. Substrait carries column
names at the plan root rather than on the measures themselves, so the
measures arrive at `Aggregate` schema construction without aliases --
and two identical exprs produce two identical field names. PR apache#20539
fixed the `NameTracker` to dedupe duplicate names in the consumer, but
it was only applied to grouping expressions, not to the measures.

The planner sees:

```
field 1: (qualifier: None, name: "sum(data.a)")
field 2: (qualifier: None, name: "sum(data.a)")
```

which is rejected when constructing the Aggregate's output schema.

## What changes are included in this PR?

Run aggregate measures through the same `NameTracker` like the grouping
expressions in `from_aggregate_rel`

## Are these changes tested?

Yes -- added a roundtrip test `aggregate_identical_measures`. Without
the fix it produces `Error: SchemaError(DuplicateUnqualifiedField {
name: "sum(data.a)" }, Some(""))`

## Are there any user-facing changes?

No.

(cherry picked from commit 097efae)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant