Skip to content

Commit 8c58b48

Browse files
committed
Prevent repartitioning of certain operator's direct children (#1731)
1 parent e4a056f commit 8c58b48

5 files changed

Lines changed: 150 additions & 67 deletions

File tree

datafusion/src/physical_optimizer/repartition.rs

Lines changed: 120 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
use std::sync::Arc;
2020

2121
use super::optimizer::PhysicalOptimizerRule;
22+
use crate::physical_plan::Partitioning::*;
2223
use crate::physical_plan::{
2324
empty::EmptyExec, repartition::RepartitionExec, ExecutionPlan,
2425
};
25-
use crate::physical_plan::{Distribution, Partitioning::*};
2626
use crate::{error::Result, execution::context::ExecutionConfig};
2727

2828
/// Optimizer that introduces repartition to introduce more parallelism in the plan
@@ -38,26 +38,24 @@ impl Repartition {
3838

3939
fn optimize_partitions(
4040
target_partitions: usize,
41-
requires_single_partition: bool,
4241
plan: Arc<dyn ExecutionPlan>,
42+
should_repartition: bool,
4343
) -> Result<Arc<dyn ExecutionPlan>> {
4444
// Recurse into children bottom-up (added nodes should be as deep as possible)
4545

4646
let new_plan = if plan.children().is_empty() {
4747
// leaf node - don't replace children
4848
plan.clone()
4949
} else {
50+
let should_repartition_children = plan.should_repartition_children();
5051
let children = plan
5152
.children()
5253
.iter()
5354
.map(|child| {
5455
optimize_partitions(
5556
target_partitions,
56-
matches!(
57-
plan.required_child_distribution(),
58-
Distribution::SinglePartition
59-
),
6057
child.clone(),
58+
should_repartition_children,
6159
)
6260
})
6361
.collect::<Result<_>>()?;
@@ -77,7 +75,7 @@ fn optimize_partitions(
7775
// But also not very useful to inlude
7876
let is_empty_exec = plan.as_any().downcast_ref::<EmptyExec>().is_some();
7977

80-
if perform_repartition && !requires_single_partition && !is_empty_exec {
78+
if perform_repartition && should_repartition && !is_empty_exec {
8179
Ok(Arc::new(RepartitionExec::try_new(
8280
new_plan,
8381
RoundRobinBatch(target_partitions),
@@ -97,7 +95,7 @@ impl PhysicalOptimizerRule for Repartition {
9795
if config.target_partitions == 1 {
9896
Ok(plan)
9997
} else {
100-
optimize_partitions(config.target_partitions, true, plan)
98+
optimize_partitions(config.target_partitions, plan, false)
10199
}
102100
}
103101

@@ -107,93 +105,148 @@ impl PhysicalOptimizerRule for Repartition {
107105
}
108106
#[cfg(test)]
109107
mod tests {
110-
use arrow::datatypes::Schema;
108+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
111109

112110
use super::*;
113111
use crate::datasource::PartitionedFile;
112+
use crate::physical_plan::expressions::col;
114113
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
115-
use crate::physical_plan::projection::ProjectionExec;
116-
use crate::physical_plan::Statistics;
114+
use crate::physical_plan::filter::FilterExec;
115+
use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
116+
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
117+
use crate::physical_plan::{displayable, Statistics};
117118
use crate::test::object_store::TestObjectStore;
118119

120+
fn schema() -> SchemaRef {
121+
Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
122+
}
123+
124+
fn parquet_exec() -> Arc<ParquetExec> {
125+
Arc::new(ParquetExec::new(
126+
FileScanConfig {
127+
object_store: TestObjectStore::new_arc(&[("x", 100)]),
128+
file_schema: schema(),
129+
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
130+
statistics: Statistics::default(),
131+
projection: None,
132+
limit: None,
133+
table_partition_cols: vec![],
134+
},
135+
None,
136+
))
137+
}
138+
139+
fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
140+
Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
141+
}
142+
143+
fn hash_aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
144+
let schema = schema();
145+
Arc::new(
146+
HashAggregateExec::try_new(
147+
AggregateMode::Final,
148+
vec![],
149+
vec![],
150+
Arc::new(
151+
HashAggregateExec::try_new(
152+
AggregateMode::Partial,
153+
vec![],
154+
vec![],
155+
input,
156+
schema.clone(),
157+
)
158+
.unwrap(),
159+
),
160+
schema,
161+
)
162+
.unwrap(),
163+
)
164+
}
165+
166+
fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
167+
Arc::new(GlobalLimitExec::new(
168+
Arc::new(LocalLimitExec::new(input, 100)),
169+
100,
170+
))
171+
}
172+
173+
fn trim_plan_display(plan: &str) -> Vec<&str> {
174+
plan.split('\n')
175+
.map(|s| s.trim())
176+
.filter(|s| !s.is_empty())
177+
.collect()
178+
}
179+
119180
#[test]
120181
fn added_repartition_to_single_partition() -> Result<()> {
121-
let file_schema = Arc::new(Schema::empty());
122-
let parquet_project = ProjectionExec::try_new(
123-
vec![],
124-
Arc::new(ParquetExec::new(
125-
FileScanConfig {
126-
object_store: TestObjectStore::new_arc(&[("x", 100)]),
127-
file_schema,
128-
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
129-
statistics: Statistics::default(),
130-
projection: None,
131-
limit: None,
132-
table_partition_cols: vec![],
133-
},
134-
None,
135-
)),
136-
)?;
137-
138182
let optimizer = Repartition {};
139183

140184
let optimized = optimizer.optimize(
141-
Arc::new(parquet_project),
185+
hash_aggregate(parquet_exec()),
142186
&ExecutionConfig::new().with_target_partitions(10),
143187
)?;
144188

145-
assert_eq!(
146-
optimized.children()[0]
147-
.output_partitioning()
148-
.partition_count(),
149-
10
150-
);
189+
let plan = displayable(optimized.as_ref()).indent().to_string();
190+
191+
let expected = &[
192+
"HashAggregateExec: mode=Final, gby=[], aggr=[]",
193+
"HashAggregateExec: mode=Partial, gby=[], aggr=[]",
194+
"RepartitionExec: partitioning=RoundRobinBatch(10)",
195+
"ParquetExec: limit=None, partitions=[x]",
196+
];
151197

198+
assert_eq!(&trim_plan_display(&plan), &expected);
152199
Ok(())
153200
}
154201

155202
#[test]
156203
fn repartition_deepest_node() -> Result<()> {
157-
let file_schema = Arc::new(Schema::empty());
158-
let parquet_project = ProjectionExec::try_new(
159-
vec![],
160-
Arc::new(ProjectionExec::try_new(
161-
vec![],
162-
Arc::new(ParquetExec::new(
163-
FileScanConfig {
164-
object_store: TestObjectStore::new_arc(&[("x", 100)]),
165-
file_schema,
166-
file_groups: vec![vec![PartitionedFile::new(
167-
"x".to_string(),
168-
100,
169-
)]],
170-
statistics: Statistics::default(),
171-
projection: None,
172-
limit: None,
173-
table_partition_cols: vec![],
174-
},
175-
None,
176-
)),
177-
)?),
204+
let optimizer = Repartition {};
205+
206+
let optimized = optimizer.optimize(
207+
hash_aggregate(filter_exec(parquet_exec())),
208+
&ExecutionConfig::new().with_target_partitions(10),
178209
)?;
179210

211+
let plan = displayable(optimized.as_ref()).indent().to_string();
212+
213+
let expected = &[
214+
"HashAggregateExec: mode=Final, gby=[], aggr=[]",
215+
"HashAggregateExec: mode=Partial, gby=[], aggr=[]",
216+
"FilterExec: c1@0",
217+
"RepartitionExec: partitioning=RoundRobinBatch(10)",
218+
"ParquetExec: limit=None, partitions=[x]",
219+
];
220+
221+
assert_eq!(&trim_plan_display(&plan), &expected);
222+
Ok(())
223+
}
224+
225+
#[test]
226+
fn repartition_ignores() -> Result<()> {
180227
let optimizer = Repartition {};
181228

182229
let optimized = optimizer.optimize(
183-
Arc::new(parquet_project),
230+
hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))),
184231
&ExecutionConfig::new().with_target_partitions(10),
185232
)?;
186233

187-
// RepartitionExec is added to deepest node
188-
assert!(optimized.children()[0]
189-
.as_any()
190-
.downcast_ref::<RepartitionExec>()
191-
.is_none());
192-
assert!(optimized.children()[0].children()[0]
193-
.as_any()
194-
.downcast_ref::<RepartitionExec>()
195-
.is_some());
196-
234+
let plan = displayable(optimized.as_ref()).indent().to_string();
235+
236+
let expected = &[
237+
"HashAggregateExec: mode=Final, gby=[], aggr=[]",
238+
"HashAggregateExec: mode=Partial, gby=[], aggr=[]",
239+
"RepartitionExec: partitioning=RoundRobinBatch(10)",
240+
"GlobalLimitExec: limit=100",
241+
"LocalLimitExec: limit=100",
242+
"FilterExec: c1@0",
243+
"RepartitionExec: partitioning=RoundRobinBatch(10)",
244+
"GlobalLimitExec: limit=100",
245+
"LocalLimitExec: limit=100", // Should not repartition for LocalLimitExec
246+
"ParquetExec: limit=None, partitions=[x]",
247+
];
248+
249+
assert_eq!(&trim_plan_display(&plan), &expected);
197250
Ok(())
198251
}
199252
}

datafusion/src/physical_plan/limit.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,10 @@ impl ExecutionPlan for LocalLimitExec {
300300
_ => Statistics::default(),
301301
}
302302
}
303+
304+
fn should_repartition_children(&self) -> bool {
305+
false
306+
}
303307
}
304308

305309
/// Truncate a RecordBatch to maximum of n rows

datafusion/src/physical_plan/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,32 @@ pub trait ExecutionPlan: Debug + Send + Sync {
135135
/// Returns the execution plan as [`Any`](std::any::Any) so that it can be
136136
/// downcast to a specific implementation.
137137
fn as_any(&self) -> &dyn Any;
138+
138139
/// Get the schema for this execution plan
139140
fn schema(&self) -> SchemaRef;
141+
140142
/// Specifies the output partitioning scheme of this plan
141143
fn output_partitioning(&self) -> Partitioning;
144+
142145
/// Specifies the data distribution requirements of all the children for this operator
143146
fn required_child_distribution(&self) -> Distribution {
144147
Distribution::UnspecifiedDistribution
145148
}
149+
150+
/// Returns `true` if the direct children of this `ExecutionPlan` should be repartitioned
151+
/// to introduce greater concurrency to the plan
152+
///
153+
/// The default implementation returns `true` unless `Self::request_child_distribution`
154+
/// returns `Distribution::SinglePartition`
155+
///
156+
/// Operators that do not benefit from additional partitioning may want to return `false`
157+
fn should_repartition_children(&self) -> bool {
158+
!matches!(
159+
self.required_child_distribution(),
160+
Distribution::SinglePartition
161+
)
162+
}
163+
146164
/// Get a list of child execution plans that provide the input for this plan. The returned list
147165
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
148166
/// values for binary nodes (such as joins).

datafusion/src/physical_plan/projection.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ impl ExecutionPlan for ProjectionExec {
185185
self.expr.iter().map(|(e, _)| Arc::clone(e)),
186186
)
187187
}
188+
189+
fn should_repartition_children(&self) -> bool {
190+
false
191+
}
188192
}
189193

190194
/// If e is a direct column reference, returns the field level

datafusion/src/physical_plan/union.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ impl ExecutionPlan for UnionExec {
143143
.reduce(stats_union)
144144
.unwrap_or_default()
145145
}
146+
147+
fn should_repartition_children(&self) -> bool {
148+
false
149+
}
146150
}
147151

148152
/// Stream wrapper that records `BaselineMetrics` for a particular

0 commit comments

Comments
 (0)