@@ -4494,4 +4494,103 @@ mod tests {
44944494 fn columns ( schema : & Schema ) -> Vec < String > {
44954495 schema. fields ( ) . iter ( ) . map ( |f| f. name ( ) . clone ( ) ) . collect ( )
44964496 }
4497+
4498+ /// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table.
4499+ #[ tokio:: test]
4500+ async fn test_hash_join_marks_filter_complete ( ) -> Result < ( ) > {
4501+ let task_ctx = Arc :: new ( TaskContext :: default ( ) ) ;
4502+ let left = build_table (
4503+ ( "a1" , & vec ! [ 1 , 2 , 3 ] ) ,
4504+ ( "b1" , & vec ! [ 4 , 5 , 6 ] ) ,
4505+ ( "c1" , & vec ! [ 7 , 8 , 9 ] ) ,
4506+ ) ;
4507+ let right = build_table (
4508+ ( "a2" , & vec ! [ 10 , 20 , 30 ] ) ,
4509+ ( "b1" , & vec ! [ 4 , 5 , 6 ] ) ,
4510+ ( "c2" , & vec ! [ 70 , 80 , 90 ] ) ,
4511+ ) ;
4512+
4513+ let on = vec ! [ (
4514+ Arc :: new( Column :: new_with_schema( "b1" , & left. schema( ) ) ?) as _,
4515+ Arc :: new( Column :: new_with_schema( "b1" , & right. schema( ) ) ?) as _,
4516+ ) ] ;
4517+
4518+ // Create a dynamic filter manually
4519+ let dynamic_filter = HashJoinExec :: create_dynamic_filter ( & on) ;
4520+ let dynamic_filter_clone = Arc :: clone ( & dynamic_filter) ;
4521+
4522+ // Create HashJoinExec with the dynamic filter
4523+ let mut join = HashJoinExec :: try_new (
4524+ left,
4525+ right,
4526+ on,
4527+ None ,
4528+ & JoinType :: Inner ,
4529+ None ,
4530+ PartitionMode :: CollectLeft ,
4531+ NullEquality :: NullEqualsNothing ,
4532+ ) ?;
4533+ join. dynamic_filter = Some ( HashJoinExecDynamicFilter {
4534+ filter : dynamic_filter,
4535+ bounds_accumulator : OnceLock :: new ( ) ,
4536+ } ) ;
4537+
4538+ // Execute the join
4539+ let stream = join. execute ( 0 , task_ctx) ?;
4540+ let _batches = common:: collect ( stream) . await ?;
4541+
4542+ // After the join completes, the dynamic filter should be marked as complete
4543+ // wait_complete() should return immediately
4544+ dynamic_filter_clone. wait_complete ( ) . await ;
4545+
4546+ Ok ( ( ) )
4547+ }
4548+
4549+ /// This test verifies that the dynamic filter is marked as complete even when the build side is empty.
4550+ #[ tokio:: test]
4551+ async fn test_hash_join_marks_filter_complete_empty_build_side ( ) -> Result < ( ) > {
4552+ let task_ctx = Arc :: new ( TaskContext :: default ( ) ) ;
4553+ // Empty left side (build side)
4554+ let left = build_table ( ( "a1" , & vec ! [ ] ) , ( "b1" , & vec ! [ ] ) , ( "c1" , & vec ! [ ] ) ) ;
4555+ let right = build_table (
4556+ ( "a2" , & vec ! [ 10 , 20 , 30 ] ) ,
4557+ ( "b1" , & vec ! [ 4 , 5 , 6 ] ) ,
4558+ ( "c2" , & vec ! [ 70 , 80 , 90 ] ) ,
4559+ ) ;
4560+
4561+ let on = vec ! [ (
4562+ Arc :: new( Column :: new_with_schema( "b1" , & left. schema( ) ) ?) as _,
4563+ Arc :: new( Column :: new_with_schema( "b1" , & right. schema( ) ) ?) as _,
4564+ ) ] ;
4565+
4566+ // Create a dynamic filter manually
4567+ let dynamic_filter = HashJoinExec :: create_dynamic_filter ( & on) ;
4568+ let dynamic_filter_clone = Arc :: clone ( & dynamic_filter) ;
4569+
4570+ // Create HashJoinExec with the dynamic filter
4571+ let mut join = HashJoinExec :: try_new (
4572+ left,
4573+ right,
4574+ on,
4575+ None ,
4576+ & JoinType :: Inner ,
4577+ None ,
4578+ PartitionMode :: CollectLeft ,
4579+ NullEquality :: NullEqualsNothing ,
4580+ ) ?;
4581+ join. dynamic_filter = Some ( HashJoinExecDynamicFilter {
4582+ filter : dynamic_filter,
4583+ bounds_accumulator : OnceLock :: new ( ) ,
4584+ } ) ;
4585+
4586+ // Execute the join
4587+ let stream = join. execute ( 0 , task_ctx) ?;
4588+ let _batches = common:: collect ( stream) . await ?;
4589+
4590+ // Even with empty build side, the dynamic filter should be marked as complete
4591+ // wait_complete() should return immediately
4592+ dynamic_filter_clone. wait_complete ( ) . await ;
4593+
4594+ Ok ( ( ) )
4595+ }
44974596}
0 commit comments