Skip to content

Commit d193508

Browse files
Make the sink input aware of its plan (#7610)
Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
1 parent 2c83b02 commit d193508

2 files changed

Lines changed: 26 additions & 2 deletions

File tree

datafusion/core/src/datasource/listing/table.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ use futures::{future, stream, StreamExt, TryStreamExt};
3434
use crate::datasource::file_format::file_compression_type::{
3535
FileCompressionType, FileTypeExt,
3636
};
37-
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
37+
use crate::datasource::physical_plan::{
38+
is_plan_streaming, FileScanConfig, FileSinkConfig,
39+
};
3840
use crate::datasource::{
3941
file_format::{
4042
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
@@ -894,7 +896,13 @@ impl TableProvider for ListingTable {
894896
output_schema: self.schema(),
895897
table_partition_cols: self.options.table_partition_cols.clone(),
896898
writer_mode,
897-
unbounded_input: self.options().infinite_source,
899+
// A plan can produce finite number of rows even if it has unbounded sources, like LIMIT
900+
// queries. Thus, we can check if the plan is streaming to ensure file sink input is
901+
// unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now`
902+
// to consume data at the input. When `unbounded_input` flag is `false` (e.g non-streaming data),
903+
// all of the data at the input is sink after execution finishes. See discussion for rationale:
904+
// https://github.com/apache/arrow-datafusion/pull/7610#issuecomment-1728979918
905+
unbounded_input: is_plan_streaming(&input)?,
898906
single_file_output: self.options.single_file,
899907
overwrite,
900908
file_type_writer_options,

datafusion/core/src/datasource/physical_plan/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ use datafusion_common::{file_options::FileTypeWriterOptions, plan_err};
6363
use datafusion_physical_expr::expressions::Column;
6464

6565
use arrow::compute::cast;
66+
use datafusion_physical_plan::ExecutionPlan;
6667
use log::debug;
6768
use object_store::path::Path;
6869
use object_store::ObjectMeta;
@@ -500,6 +501,21 @@ fn get_projected_output_ordering(
500501
all_orderings
501502
}
502503

504+
// Get output (un)boundedness information for the given `plan`.
505+
pub(crate) fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
506+
let result = if plan.children().is_empty() {
507+
plan.unbounded_output(&[])
508+
} else {
509+
let children_unbounded_output = plan
510+
.children()
511+
.iter()
512+
.map(is_plan_streaming)
513+
.collect::<Result<Vec<_>>>();
514+
plan.unbounded_output(&children_unbounded_output?)
515+
};
516+
result
517+
}
518+
503519
#[cfg(test)]
504520
mod tests {
505521
use arrow_array::cast::AsArray;

0 commit comments

Comments
 (0)