@@ -24,7 +24,6 @@ use std::fmt;
2424use std:: fmt:: { Debug , Formatter } ;
2525use std:: sync:: Arc ;
2626
27- use crate :: common:: spawn_buffered;
2827use crate :: execution_plan:: { Boundedness , CardinalityEffect , EmissionType } ;
2928use crate :: expressions:: PhysicalSortExpr ;
3029use crate :: limit:: LimitStream ;
@@ -687,23 +686,35 @@ impl ExternalSorter {
687686 let mut current_batches = Vec :: new ( ) ;
688687 let mut current_size = 0 ;
689688
690- for batch in std:: mem:: take ( & mut self . in_mem_batches ) {
689+ // Drain in_mem_batches using pop() to release memory earlier.
690+ // This avoids holding onto the entire vector during iteration.
691+ // Note:
692+ // Now we use `sort_in_place_threshold_bytes` to determine, in future we can make it more dynamic.
693+ while let Some ( batch) = self . in_mem_batches . pop ( ) {
691694 let batch_size = get_reserved_byte_for_record_batch ( & batch) ;
695+
696+ // If adding this batch would exceed the memory threshold, merge current_batches.
692697 if current_size + batch_size > self . sort_in_place_threshold_bytes
693698 && !current_batches. is_empty ( )
694699 {
700+ // Merge accumulated batches into one.
695701 let merged = concat_batches ( & self . schema , & current_batches) ?;
696702 current_batches. clear ( ) ;
703+
704+ // Update memory reservation.
697705 self . reservation . try_shrink ( current_size) ?;
698706 let merged_size = get_reserved_byte_for_record_batch ( & merged) ;
699707 self . reservation . try_grow ( merged_size) ?;
708+
700709 merged_batches. push ( merged) ;
701710 current_size = 0 ;
702711 }
712+
703713 current_batches. push ( batch) ;
704714 current_size += batch_size;
705715 }
706716
717+ // Merge any remaining batches after the loop.
707718 if !current_batches. is_empty ( ) {
708719 let merged = concat_batches ( & self . schema , & current_batches) ?;
709720 self . reservation . try_shrink ( current_size) ?;
@@ -712,15 +723,19 @@ impl ExternalSorter {
712723 merged_batches. push ( merged) ;
713724 }
714725
726+ // Create sorted streams directly without using spawn_buffered.
727+ // This allows for sorting to happen inline and enables earlier batch drop.
715728 let streams = merged_batches
716729 . into_iter ( )
717730 . map ( |batch| {
718731 let metrics = self . metrics . baseline . intermediate ( ) ;
719732 let reservation = self
720733 . reservation
721734 . split ( get_reserved_byte_for_record_batch ( & batch) ) ;
735+
736+ // Sort the batch inline.
722737 let input = self . sort_batch_stream ( batch, metrics, reservation) ?;
723- Ok ( spawn_buffered ( input, 1 ) )
738+ Ok ( input)
724739 } )
725740 . collect :: < Result < _ > > ( ) ?;
726741
0 commit comments