Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# 28. Support Agreement Dispatcher
# 31. Support Agreement Dispatcher

Date: 2025-07-07

Expand Down
58 changes: 58 additions & 0 deletions docs/adr/0032-remove-explicit-clear-lock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# 32. Remove Semaphore from Explicit Clear

Date: 2019-08-01

## Status

Accepted

## Context

We want to avoid publishing a message twice from an outbox. This may happen because we fail to update the Outbox when a message is sent, and we cannot avoid that. However, we create the risk that we publish twice if we run two overlapping publish operations, at the same time.

For this reason `OutboxProducerMediator` has `SemaphoreSlim` 's_backgroundClearSemaphoreToken'. We attempt to signal the semaphore, but use a TimeSpan.Zero wait, so that if another clear is running, we give up. For a background process, this is fine, as we assume another run of the background process to clear items from the Outbox will run soon, and pick up anything that the last run missed. Latency may increase a little for some messages, but we don't dual publish.

We can also have an explicit clear for a range of messages. There is a risk that if this runs during a background publish, it could clear messages that the background publish is already clearing. However, as the background clear only clears message that have been waiting for a configured period of time, it is likely that the explicit clear list will not be old enough to be in any background process that is running at the same time. So we consider the risk of a dual publish to be an acceptable one here.

Prior to the introduction of `IDistributedLock` we had no facility for running multiple sweepers for resilience, but only having one active. For this reason we added another `SemaphoreSlim` to `OutboxProducerMediator`. Instead of testing for the lock with a `WaitAsync(TimeSpan.Zero)` this lock blocked waiting for the lock to become free (even if async so a thread was not blocked).

```csharp
internal async Task ClearOutboxAsync(
IEnumerable<Guid> posts,
bool continueOnCapturedContext = false,
CancellationToken cancellationToken = default)
{

if (!HasAsyncOutbox())
throw new InvalidOperationException("No async outbox defined.");

await _clearSemaphoreToken.WaitAsync(cancellationToken);
try
{
foreach (var messageId in posts)
{
var message = await AsyncOutbox.GetAsync(messageId, OutboxTimeout, cancellationToken);
if (message == null || message.Header.MessageType == MessageType.MT_NONE)
throw new NullReferenceException($"Message with Id {messageId} not found in the Outbox");

await DispatchAsync(new[] {message}, continueOnCapturedContext, cancellationToken);
}
}
finally
{
_clearSemaphoreToken.Release();
}

CheckOutstandingMessages();
}
```

At scale, this proves problematic as you now have sequential `Clear` operations on the outbox, even though the range of messages to clear is not sequential. In pratice, this means that you will back up HTTP API requests that write to the Outbox, behind this semaphore. Once enough requests queue you up, you will end up with a Bad Gateway error.

## Decision

Drop the usage of `_clearSemaphoreToken` as `IDistributedLock` now protects us against dual publish by Sweepers.

## Consequences

There is a low risk that we get a dual publish where a background clear runs over the same rage as an explicit clear, if the age of a row to clear was set too low.
17 changes: 1 addition & 16 deletions src/Paramore.Brighter/OutboxProducerMediator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public partial class OutboxProducerMediator<TMessage, TTransaction> : IAmAnOutbo
private readonly IAmAnOutboxCircuitBreaker? _outboxCircuitBreaker;
private readonly Dictionary<string, List<TMessage>> _outboxBatches = new();

private static readonly SemaphoreSlim s_clearSemaphoreToken = new(1, 1);

private static readonly SemaphoreSlim s_backgroundClearSemaphoreToken = new(1, 1);

//Used to checking the limit on outstanding messages for an Outbox. We throw at that point. Writes to the static
Expand Down Expand Up @@ -306,7 +304,6 @@ public void ClearOutbox(
throw new InvalidOperationException("No outbox defined.");

// Only allow a single Clear to happen at a time
s_clearSemaphoreToken.Wait();
var parentSpan = requestContext.Span;

var childSpans = new ConcurrentDictionary<string, Activity>();
Expand Down Expand Up @@ -337,7 +334,6 @@ public void ClearOutbox(
{
_tracer?.EndSpans(childSpans);
requestContext.Span = parentSpan;
s_clearSemaphoreToken.Release();
}

CheckOutstandingMessages(requestContext);
Expand All @@ -364,7 +360,6 @@ public async Task ClearOutboxAsync(
if (!HasAsyncOutbox())
throw new InvalidOperationException("No async outbox defined.");

await s_clearSemaphoreToken.WaitAsync(cancellationToken);
var parentSpan = requestContext.Span;

var childSpans = new ConcurrentDictionary<string, Activity>();
Expand Down Expand Up @@ -394,7 +389,6 @@ await DispatchAsync([message], requestContext, continueOnCapturedContext,
{
_tracer?.EndSpans(childSpans);
requestContext.Span = parentSpan;
s_clearSemaphoreToken.Release();
}

CheckOutstandingMessages(requestContext);
Expand Down Expand Up @@ -597,18 +591,10 @@ private async Task BackgroundDispatchUsingAsync(
CancellationToken cancellationToken
)
{
WaitHandle[] clearTokens = new WaitHandle[2];
clearTokens[0] = s_backgroundClearSemaphoreToken.AvailableWaitHandle;
clearTokens[1] = s_clearSemaphoreToken.AvailableWaitHandle;
_outboxCircuitBreaker?.CoolDown();

if (WaitHandle.WaitAll(clearTokens, TimeSpan.Zero))
if ( await s_backgroundClearSemaphoreToken.WaitAsync(TimeSpan.Zero, cancellationToken))
{
//NOTE: The wait handle only signals availability, still need to increment the counter:
// see https://learn.microsoft.com/en-us/dotnet/api/System.Threading.SemaphoreSlim.AvailableWaitHandle
await s_backgroundClearSemaphoreToken.WaitAsync(cancellationToken);
await s_clearSemaphoreToken.WaitAsync(cancellationToken);

var parentSpan = requestContext.Span;
var span = _tracer?.CreateClearSpan(CommandProcessorSpanOperation.Clear, requestContext.Span, null,
_instrumentationOptions);
Expand Down Expand Up @@ -648,7 +634,6 @@ CancellationToken cancellationToken
finally
{
_tracer?.EndSpan(span);
s_clearSemaphoreToken.Release();
s_backgroundClearSemaphoreToken.Release();
}

Expand Down
Loading