[Fix] RMQ event handlers not removed (#4044)#4045
Conversation
|
Issues with the Fix The handlers are removed inside OnPublishSucceeded / OnPublishFailed — before the pending confirmation is processed for the current delivery tag. But more importantly, unsubscribing in the event handler creates a window in which a subsequent SendWithDelayAsync call has already re-registered handlers, and the first event's handler then removes those new registrations. With concurrent sends on the same producer, this can cause missed confirmations. The root cause is that handlers are registered per-send when they should be registered once per channel. The handlers already use _pendingConfirmations (a ConcurrentDictionary) to correlate delivery tags, so they work correctly for multiple in-flight messages. The fix should be:
This eliminates the add/remove churn entirely and is safe for concurrent sends. // In EnsureBrokerAsync (or right after it returns), subscribe once: // Remove the per-send subscription entirely from SendWithDelayAsync This is simpler, correct for concurrent use, and doesn't require the RegisterChannelEvents method at all. The PR has no tests verifying the fix. A test that sends N messages and asserts the handler is invoked exactly N times (once per message, not N×(N+1)/2 times) would prevent regression. |
|
@ravriel Just nudging in case you missed the feedback |
This reverts commit 1208c5b.
| private void RegisterChannelEvents(bool add) | ||
| { | ||
| if (add) | ||
| { | ||
| Channel?.BasicAcksAsync += OnPublishSucceeded; | ||
| Channel?.BasicNacksAsync += OnPublishFailed; | ||
| } | ||
| else | ||
| { | ||
| Channel?.BasicAcksAsync -= OnPublishSucceeded; | ||
| Channel?.BasicNacksAsync -= OnPublishFailed; | ||
| } | ||
| } |
There was a problem hiding this comment.
Split it into two methods, register and unregister
There was a problem hiding this comment.
We need to do it in a different place, as we don't want to register/unregister per publish
iancooper
left a comment
There was a problem hiding this comment.
Thanks for adding the test, I think we should link at cleaning up in the Dispose as well, then we are done
| { | ||
| Log.ErrorTalkingToSocketAsync(s_logger, io, Connection.AmpqUri.GetSanitizedUri()); | ||
| await ResetConnectionToBrokerAsync(cancellationToken); | ||
| Channel?.BasicAcksAsync -= OnPublishSucceeded; |
There was a problem hiding this comment.
We should unsubscribe in Dispose, if not already disposed, since if we exit without an error, we want to remove any subscriptions.
There was a problem hiding this comment.
Gates Passed
4 Quality Gates Passed
See analysis details in CodeScene
Quality Gate Profile: Clean Code Collective
Install CodeScene MCP: safeguard and uplift AI-generated code. Catch issues early with our IDE extension and CLI tool.
|
Thanks for the contribution @ravriel I am going to get a release out soon |
Fixes #4044 by removing channel event handlers in each event handler.