Skip to content

Commit 6334f47

Browse files
committed
bug: remove busy-wait while sort is ongoing (#16321)
1 parent 1daa5ed commit 6334f47

1 file changed

Lines changed: 34 additions & 23 deletions

File tree

  • datafusion/physical-plan/src/sorts

datafusion/physical-plan/src/sorts/merge.rs

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! Merge that deals with an arbitrary size of streaming inputs.
1919
//! This is an order-preserving merge.
2020
21-
use std::collections::VecDeque;
2221
use std::pin::Pin;
2322
use std::sync::Arc;
2423
use std::task::{ready, Context, Poll};
@@ -143,11 +142,8 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {
143142
/// number of rows produced
144143
produced: usize,
145144

146-
/// This queue contains partition indices in order. When a partition is polled and returns `Poll::Ready`,
147-
/// it is removed from the vector. If a partition returns `Poll::Pending`, it is moved to the end of the
148-
/// vector to ensure the next iteration starts with a different partition, preventing the same partition
149-
/// from being continuously polled.
150-
uninitiated_partitions: VecDeque<usize>,
145+
/// This vector contains the indices of the partitions that have not started emitting yet.
146+
uninitiated_partitions: Vec<usize>,
151147
}
152148

153149
impl<C: CursorValues> SortPreservingMergeStream<C> {
@@ -216,36 +212,51 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
216212
// Once all partitions have set their corresponding cursors for the loser tree,
217213
// we skip the following block. Until then, this function may be called multiple
218214
// times and can return Poll::Pending if any partition returns Poll::Pending.
215+
219216
if self.loser_tree.is_empty() {
220-
while let Some(&partition_idx) = self.uninitiated_partitions.front() {
217+
// Manual indexing since we're iterating over the vector and shrinking it in the loop
218+
let mut idx = 0;
219+
while idx < self.uninitiated_partitions.len() {
220+
// unwrap is safe since we just checked the index
221+
let &partition_idx = self.uninitiated_partitions.get(idx).unwrap();
221222
match self.maybe_poll_stream(cx, partition_idx) {
222223
Poll::Ready(Err(e)) => {
223224
self.aborted = true;
224225
return Poll::Ready(Some(Err(e)));
225226
}
226227
Poll::Pending => {
227-
// If a partition returns Poll::Pending, to avoid continuously polling it
228-
// and potentially increasing upstream buffer sizes, we move it to the
229-
// back of the polling queue.
230-
self.uninitiated_partitions.rotate_left(1);
231-
232-
// This function could remain in a pending state, so we manually wake it here.
233-
// However, this approach can be investigated further to find a more natural way
234-
// to avoid disrupting the runtime scheduler.
235-
cx.waker().wake_by_ref();
236-
return Poll::Pending;
228+
// The polled stream is pending which means we're already set up to
229+
// be woken when necessary
230+
// Try the next stream
231+
idx += 1;
237232
}
238233
_ => {
239-
// If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None),
240-
// we remove this partition from the queue so it is not polled again.
241-
self.uninitiated_partitions.pop_front();
234+
// The polled stream is ready
235+
// Remove it from uninitiated_partitions
236+
// Don't bump idx here, since a new element will have taken its
237+
// place which we'll try in the next loop iteration
238+
// `swap_remove` changes element order, but the order of the elements
239+
// doesn't really matter since we visit each one anyway.
240+
self.uninitiated_partitions.swap_remove(idx);
242241
}
243242
}
244243
}
245244

246-
// Claim the memory for the uninitiated partitions
247-
self.uninitiated_partitions.shrink_to_fit();
248-
self.init_loser_tree();
245+
if self.uninitiated_partitions.is_empty() {
246+
// If there are no more uninitiated partitions, set up the loser tree and continue
247+
// to the next phase.
248+
249+
// Claim the memory for the uninitiated partitions
250+
self.uninitiated_partitions.shrink_to_fit();
251+
self.init_loser_tree();
252+
} else {
253+
// There are still uninitiated partitions so return pending.
254+
// We only get here if we've polled all uninitiated streams and at least one of them
255+
// returned pending itself. That means we will be woken as soon as one of the
256+
// streams would like to be polled again.
257+
// There is no need to reschedule ourselves eagerly.
258+
return Poll::Pending;
259+
}
249260
}
250261

251262
// NB timer records time taken on drop, so there are no

0 commit comments

Comments
 (0)