Skip to content

Commit b16ad9b

Browse files
authored
fix: SortMergeJoin don't wait for all input before emitting (#20482)
## Which issue does this PR close? N/A ## Rationale for this change I noticed while playing around with local tests and debugging memory issue, that `SortMergeJoinStream` wait for all input before start emitting, which shouldn't be the case as we can emit early when we have enough data. also, this cause huge memory pressure ## What changes are included in this PR? Trying to fix the issue, not sure yet ## Are these changes tested? Yes ## Are there any user-facing changes? ----- ## TODO: - [x] update docs - [x] finish fix
1 parent db5197b commit b16ad9b

3 files changed

Lines changed: 562 additions & 34 deletions

File tree

datafusion/physical-plan/src/joins/sort_merge_join/stream.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ pub(super) enum SortMergeJoinState {
7070
Polling,
7171
/// Joining polled data and making output
7272
JoinOutput,
73+
/// Emit ready data if have any and then go back to [`Self::Init`] state
74+
EmitReadyThenInit,
7375
/// No more output
7476
Exhausted,
7577
}
@@ -598,13 +600,45 @@ impl Stream for SortMergeJoinStream {
598600
self.current_ordering = self.compare_streamed_buffered()?;
599601
self.state = SortMergeJoinState::JoinOutput;
600602
}
603+
SortMergeJoinState::EmitReadyThenInit => {
604+
// If have data to emit, emit it and if no more, change to next
605+
606+
// Verify metadata alignment before checking if we have batches to output
607+
self.joined_record_batches
608+
.filter_metadata
609+
.debug_assert_metadata_aligned();
610+
611+
// For filtered joins, skip output and let Init state handle it
612+
if needs_deferred_filtering(&self.filter, self.join_type) {
613+
self.state = SortMergeJoinState::Init;
614+
continue;
615+
}
616+
617+
// For non-filtered joins, only output if we have a completed batch
618+
// (opportunistic output when target batch size is reached)
619+
if self
620+
.joined_record_batches
621+
.joined_batches
622+
.has_completed_batch()
623+
{
624+
let record_batch = self
625+
.joined_record_batches
626+
.joined_batches
627+
.next_completed_batch()
628+
.expect("has_completed_batch was true");
629+
(&record_batch)
630+
.record_output(&self.join_metrics.baseline_metrics());
631+
return Poll::Ready(Some(Ok(record_batch)));
632+
}
633+
self.state = SortMergeJoinState::Init;
634+
}
601635
SortMergeJoinState::JoinOutput => {
602636
self.join_partial()?;
603637

604638
if self.num_unfrozen_pairs() < self.batch_size {
605639
if self.buffered_data.scanning_finished() {
606640
self.buffered_data.scanning_reset();
607-
self.state = SortMergeJoinState::Init;
641+
self.state = SortMergeJoinState::EmitReadyThenInit;
608642
}
609643
} else {
610644
self.freeze_all()?;

0 commit comments

Comments
 (0)