fix: Flush pending inserts in streamingbatchwriter on delete records #2408
fix: Flush pending inserts in streamingbatchwriter on delete records #2408adamcrowufl wants to merge 1 commit intocloudquery:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes an ordering issue in StreamingBatchWriter where DeleteRecords could begin while insert batches (including nested table inserts) were still buffered, leading to inconsistent results.
Changes:
- Added
flushInsertWorkers(ctx)and invoked it before starting the delete-record worker. - Added a unit test to verify that sending a delete-record message triggers flushing of pending inserts.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| writers/streamingbatchwriter/streamingbatchwriter.go | Flushes insert workers before initializing/using the delete-record worker. |
| writers/streamingbatchwriter/streamingbatchwriter_test.go | Adds a test asserting delete-record processing flushes pending insert batches. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| t.Fatal(err) | ||
| } | ||
| waitForLength(t, testClient.MessageLen, messageTypeInsert, 1) | ||
| _ = wr.Close(ctx) |
There was a problem hiding this comment.
The test ignores the error from wr.Close(ctx). If Close fails (or hangs) this can mask real issues and leak goroutines into subsequent tests. Consider checking the returned error (or using t.Cleanup to close and assert).
| _ = wr.Close(ctx) | |
| if err := wr.Close(ctx); err != nil { | |
| t.Fatal(err) | |
| } |
| done := make(chan bool) | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case worker.flush <- done: | ||
| } | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-done: | ||
| } |
There was a problem hiding this comment.
flushInsertWorkers can return early on ctx.Done() after the flush request has been received by the worker but before this function reads from done. In that case the worker goroutine will block forever on done <- true, effectively deadlocking that worker (and potentially the writer). Consider making done a buffered channel (size 1) and/or ensuring the ack is always drainable even when the context is canceled.
| bldr.Field(0).(*array.Int64Builder).Append(1) | ||
| record := bldr.NewRecord() | ||
|
|
||
| md := arrow.NewMetadata( | ||
| []string{schema.MetadataTableName}, | ||
| []string{insertTable.Name}, | ||
| ) | ||
| newSchema := arrow.NewSchema( | ||
| record.Schema().Fields(), | ||
| &md, | ||
| ) | ||
|
|
||
| record = array.NewRecord(newSchema, record.Columns(), record.NumRows()) | ||
|
|
There was a problem hiding this comment.
The test builds a record via NewRecord() and then wraps it with array.NewRecord(...) without releasing the builder/records. With Arrow's ref-counted memory, this can leak allocations across the test run. Prefer using NewRecordBatch() (as other tests do) and ensure the RecordBuilder/record(s) are released when no longer needed (and avoid the extra schema/metadata re-wrap if the table schema already includes the table-name metadata).
| bldr.Field(0).(*array.Int64Builder).Append(1) | |
| record := bldr.NewRecord() | |
| md := arrow.NewMetadata( | |
| []string{schema.MetadataTableName}, | |
| []string{insertTable.Name}, | |
| ) | |
| newSchema := arrow.NewSchema( | |
| record.Schema().Fields(), | |
| &md, | |
| ) | |
| record = array.NewRecord(newSchema, record.Columns(), record.NumRows()) | |
| defer bldr.Release() | |
| bldr.Field(0).(*array.Int64Builder).Append(1) | |
| record := bldr.NewRecord() | |
| defer record.Release() |
Problem: DeleteRecords could run while insert batches and nested tables were still buffered which caused inconsistency in results (bug: Support flushing of rows in nested tables for delete record in streamingbatchwriter #15478)
Fix: Added flush insert workers before starting the delete-record process
Test: go test ./writers/streamingbatchwriter -v (test added at bottom of writers/streamingbatchwriter/streamingbatchwriter_test.go)