From 7ed799240d8263a7766237ab286a9bc5e8174ce9 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Thu, 1 May 2025 20:47:30 +0800 Subject: [PATCH 1/4] add benchmark for SortPreservingMergeExec --- datafusion/physical-plan/Cargo.toml | 6 + .../physical-plan/benches/sort_preserving.rs | 134 ++++++++++++++++++ datafusion/physical-plan/src/lib.rs | 2 +- 3 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 datafusion/physical-plan/benches/sort_preserving.rs diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 5210ee26755c9..35c4afdfebd77 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -36,6 +36,7 @@ workspace = true [features] force_hash_collisions = [] +bench = [] [lib] name = "datafusion_physical_plan" @@ -86,3 +87,8 @@ name = "partial_ordering" [[bench]] harness = false name = "spill_io" + +[[bench]] +harness = false +name = "sort_preserving" +required-features = ["bench"] diff --git a/datafusion/physical-plan/benches/sort_preserving.rs b/datafusion/physical-plan/benches/sort_preserving.rs new file mode 100644 index 0000000000000..4f9cb421b0e73 --- /dev/null +++ b/datafusion/physical-plan/benches/sort_preserving.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::{ + array::{ArrayRef, StringArray}, + record_batch::RecordBatch, +}; +use arrow_schema::SortOptions; +use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr}; +use datafusion_physical_plan::test::TestMemoryExec; +use datafusion_physical_plan::{ + collect, sorts::sort_preserving_merge::SortPreservingMergeExec, +}; + +use std::sync::Arc; + +const BENCH_ROWS: usize = 1_000_000; // 1 million rows + +fn get_random_large_string(idx: usize) -> String { + let base_content = [ + concat!( + "# Advanced Topics in Computer Science\n\n", + "## Summary\nThis article explores complex system design patterns and...\n\n", + "```rust\nfn process_data(data: &mut [i32]) {\n // Parallel processing example\n data.par_iter_mut().for_each(|x| *x *= 2);\n}\n```\n\n", + "## Performance Considerations\nWhen implementing concurrent systems...\n" + ), + concat!( + "## API Documentation\n\n", + "```json\n{\n \"endpoint\": \"/api/v2/users\",\n \"methods\": [\"GET\", \"POST\"],\n \"parameters\": {\n \"page\": \"number\"\n }\n}\n```\n\n", + "# Authentication Guide\nSecure your API access using OAuth 2.0...\n" + ), + concat!( + "# Data Processing Pipeline\n\n", + "```python\nfrom multiprocessing import Pool\n\ndef main():\n with Pool(8) as p:\n results = p.map(process_item, data)\n```\n\n", + "## Summary of Optimizations\n1. Batch processing\n2. Memory pooling\n3. Concurrent I/O operations\n" + ), + concat!( + "# System Architecture Overview\n\n", + "## Components\n- Load Balancer\n- Database Cluster\n- Cache Service\n\n", + "```go\nfunc main() {\n router := gin.Default()\n router.GET(\"/api/health\", healthCheck)\n router.Run(\":8080\")\n}\n```\n" + ), + concat!( + "## Configuration Reference\n\n", + "```yaml\nserver:\n port: 8080\n max_threads: 32\n\ndatabase:\n url: postgres://user@prod-db:5432/main\n```\n\n", + "# Deployment Strategies\nBlue-green deployment patterns with...\n" + ), + ]; + base_content[idx % base_content.len()].to_string() +} + +fn generate_sorted_string_column(rows: usize) -> ArrayRef { + let mut values = Vec::with_capacity(rows); + for i in 0..rows { + values.push(get_random_large_string(i)); + } + values.sort(); + Arc::new(StringArray::from(values)) +} + +fn create_partitions( + num_partitions: usize, + column_names: &[&str], +) -> Vec> { + (0..num_partitions) + .map(|_| { + let rows = column_names + .iter() + .map(|&name| (name.to_owned(), generate_sorted_string_column(BENCH_ROWS))) + .collect::>(); + + let batch = RecordBatch::try_from_iter(rows).unwrap(); + vec![batch] + }) + .collect() +} + +fn bench_merge_sorted_preserving(c: &mut Criterion) { + let num_partitions = 3; + + let column_names = vec!["col1", "col2", "col3"]; + + // Create sorted partitions + let partitions = create_partitions(num_partitions, &column_names); + let schema = partitions[0][0].schema(); + + // Define sort order (col1 ASC, col2 ASC, col3 ASC) + let sort_order = LexOrdering::new( + column_names + .iter() + .map(|&name| { + PhysicalSortExpr::new(col(name, &schema).unwrap(), SortOptions::default()) + }) + .collect(), + ); + + let task_ctx = Arc::new(TaskContext::default()); + + c.bench_function("sort_preserving_merge_1m_rows_and_3_columns", |b| { + b.iter_batched( + || { + let exec = + TestMemoryExec::try_new_exec(&partitions, schema.clone(), None) + .unwrap(); + Arc::new(SortPreservingMergeExec::new(sort_order.clone(), exec)) + }, + |merge_exec| { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + collect(merge_exec, task_ctx.clone()).await.unwrap(); + }); + }, + BatchSize::LargeInput, + ) + }); +} + +criterion_group!(benches, bench_merge_sorted_preserving); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index a1862554b303e..ba423f958c78e 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -92,5 +92,5 @@ pub mod udaf { } pub mod coalesce; -#[cfg(test)] +#[cfg(any(test, feature = "bench"))] pub mod test; From 4555ea7ab5aa9d8c6fe1f7ec504c3984db01f004 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Thu, 1 May 2025 20:48:37 +0800 Subject: [PATCH 2/4] add comments --- datafusion/physical-plan/benches/sort_preserving.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/physical-plan/benches/sort_preserving.rs b/datafusion/physical-plan/benches/sort_preserving.rs index 4f9cb421b0e73..c79b56c2dcb29 100644 --- a/datafusion/physical-plan/benches/sort_preserving.rs +++ b/datafusion/physical-plan/benches/sort_preserving.rs @@ -90,6 +90,10 @@ fn create_partitions( .collect() } +/// Run this benchmark with: +/// ```sh +/// cargo bench --features="bench" --bench sort_preserving -- --sample-size=10 +/// ``` fn bench_merge_sorted_preserving(c: &mut Criterion) { let num_partitions = 3; From 07a752aabf0d1971598b909a6cb7079fa15c4497 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Thu, 1 May 2025 22:30:16 +0800 Subject: [PATCH 3/4] add comments --- datafusion/physical-plan/benches/sort_preserving.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/benches/sort_preserving.rs b/datafusion/physical-plan/benches/sort_preserving.rs index c79b56c2dcb29..158eced8edb1b 100644 --- a/datafusion/physical-plan/benches/sort_preserving.rs +++ b/datafusion/physical-plan/benches/sort_preserving.rs @@ -89,7 +89,7 @@ fn create_partitions( }) .collect() } - +/// Add a benchmark to test the optimization effect of reusing Rows. /// Run this benchmark with: /// ```sh /// cargo bench --features="bench" --bench sort_preserving -- --sample-size=10 From 30761e48169fd47105c127530bf7cb5c671f37c2 Mon Sep 17 00:00:00 2001 From: ackingliu Date: Mon, 5 May 2025 00:42:16 +0800 Subject: [PATCH 4/4] Cover more test scenarios --- datafusion/physical-plan/Cargo.toml | 2 +- ...preserving.rs => sort_preserving_merge.rs} | 152 +++++++++++++----- 2 files changed, 109 insertions(+), 45 deletions(-) rename datafusion/physical-plan/benches/{sort_preserving.rs => sort_preserving_merge.rs} (51%) diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 35c4afdfebd77..4f58b575f3a0b 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -90,5 +90,5 @@ name = "spill_io" [[bench]] harness = false -name = "sort_preserving" +name = "sort_preserving_merge" required-features = ["bench"] diff --git a/datafusion/physical-plan/benches/sort_preserving.rs b/datafusion/physical-plan/benches/sort_preserving_merge.rs similarity index 51% rename from datafusion/physical-plan/benches/sort_preserving.rs rename to datafusion/physical-plan/benches/sort_preserving_merge.rs index 158eced8edb1b..9586dbf94727b 100644 --- a/datafusion/physical-plan/benches/sort_preserving.rs +++ b/datafusion/physical-plan/benches/sort_preserving_merge.rs @@ -16,10 +16,10 @@ // under the License. use arrow::{ - array::{ArrayRef, StringArray}, + array::{ArrayRef, StringArray, UInt64Array}, record_batch::RecordBatch, }; -use arrow_schema::SortOptions; +use arrow_schema::{SchemaRef, SortOptions}; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr}; @@ -32,7 +32,7 @@ use std::sync::Arc; const BENCH_ROWS: usize = 1_000_000; // 1 million rows -fn get_random_large_string(idx: usize) -> String { +fn get_large_string(idx: usize) -> String { let base_content = [ concat!( "# Advanced Topics in Computer Science\n\n", @@ -67,21 +67,34 @@ fn get_random_large_string(idx: usize) -> String { fn generate_sorted_string_column(rows: usize) -> ArrayRef { let mut values = Vec::with_capacity(rows); for i in 0..rows { - values.push(get_random_large_string(i)); + values.push(get_large_string(i)); } values.sort(); Arc::new(StringArray::from(values)) } -fn create_partitions( +fn generate_sorted_u64_column(rows: usize) -> ArrayRef { + Arc::new(UInt64Array::from((0_u64..rows as u64).collect::>())) +} + +fn create_partitions( num_partitions: usize, - column_names: &[&str], + num_columns: usize, + num_rows: usize, ) -> Vec> { (0..num_partitions) .map(|_| { - let rows = column_names - .iter() - .map(|&name| (name.to_owned(), generate_sorted_string_column(BENCH_ROWS))) + let rows = (0..num_columns) + .map(|i| { + ( + format!("col-{i}"), + if IS_LARGE_COLUMN_TYPE { + generate_sorted_string_column(num_rows) + } else { + generate_sorted_u64_column(num_rows) + }, + ) + }) .collect::>(); let batch = RecordBatch::try_from_iter(rows).unwrap(); @@ -89,49 +102,100 @@ fn create_partitions( }) .collect() } + +struct BenchData { + bench_name: String, + partitions: Vec>, + schema: SchemaRef, + sort_order: LexOrdering, +} + +fn get_bench_data() -> Vec { + let mut ret = Vec::new(); + let mut push_bench_data = |bench_name: &str, partitions: Vec>| { + let schema = partitions[0][0].schema(); + // Define sort order (col1 ASC, col2 ASC, col3 ASC) + let sort_order = LexOrdering::new( + schema + .fields() + .iter() + .map(|field| { + PhysicalSortExpr::new( + col(field.name(), &schema).unwrap(), + SortOptions::default(), + ) + }) + .collect(), + ); + ret.push(BenchData { + bench_name: bench_name.to_string(), + partitions, + schema, + sort_order, + }); + }; + // 1. single large string column + { + let partitions = create_partitions::(3, 1, BENCH_ROWS); + push_bench_data("single_large_string_column_with_1m_rows", partitions); + } + // 2. single u64 column + { + let partitions = create_partitions::(3, 1, BENCH_ROWS); + push_bench_data("single_u64_column_with_1m_rows", partitions); + } + // 3. multiple large string columns + { + let partitions = create_partitions::(3, 3, BENCH_ROWS); + push_bench_data("multiple_large_string_columns_with_1m_rows", partitions); + } + // 4. multiple u64 columns + { + let partitions = create_partitions::(3, 3, BENCH_ROWS); + push_bench_data("multiple_u64_columns_with_1m_rows", partitions); + } + ret +} + /// Add a benchmark to test the optimization effect of reusing Rows. /// Run this benchmark with: /// ```sh -/// cargo bench --features="bench" --bench sort_preserving -- --sample-size=10 +/// cargo bench --features="bench" --bench sort_preserving_merge -- --sample-size=10 /// ``` fn bench_merge_sorted_preserving(c: &mut Criterion) { - let num_partitions = 3; - - let column_names = vec!["col1", "col2", "col3"]; - - // Create sorted partitions - let partitions = create_partitions(num_partitions, &column_names); - let schema = partitions[0][0].schema(); - - // Define sort order (col1 ASC, col2 ASC, col3 ASC) - let sort_order = LexOrdering::new( - column_names - .iter() - .map(|&name| { - PhysicalSortExpr::new(col(name, &schema).unwrap(), SortOptions::default()) - }) - .collect(), - ); - let task_ctx = Arc::new(TaskContext::default()); - - c.bench_function("sort_preserving_merge_1m_rows_and_3_columns", |b| { - b.iter_batched( - || { - let exec = - TestMemoryExec::try_new_exec(&partitions, schema.clone(), None) + let bench_data = get_bench_data(); + for data in bench_data.into_iter() { + let BenchData { + bench_name, + partitions, + schema, + sort_order, + } = data; + c.bench_function( + &format!("bench_merge_sorted_preserving/{}", bench_name), + |b| { + b.iter_batched( + || { + let exec = TestMemoryExec::try_new_exec( + &partitions, + schema.clone(), + None, + ) .unwrap(); - Arc::new(SortPreservingMergeExec::new(sort_order.clone(), exec)) + Arc::new(SortPreservingMergeExec::new(sort_order.clone(), exec)) + }, + |merge_exec| { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + collect(merge_exec, task_ctx.clone()).await.unwrap(); + }); + }, + BatchSize::LargeInput, + ) }, - |merge_exec| { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - collect(merge_exec, task_ctx.clone()).await.unwrap(); - }); - }, - BatchSize::LargeInput, - ) - }); + ); + } } criterion_group!(benches, bench_merge_sorted_preserving);