Skip to content
Merged
29 changes: 29 additions & 0 deletions docs/adr/0013-publish-should-be-in-parallel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# 1. Record architecture decisions

Date: 2024-07-06

## Status

Accepted

## Context

The current implementation of the `Publish` method on `CommandProcessor` is sequential. This means that when you call `Publish` with an `Event` we call the `IHandleRequests` that subscribe to that `Event` one-by-one. This is not efficient and can be slow when there are many `IHandleRequests` subscribed to the `Event`.

It is unlikely that the `IHandleRequests` will have any dependencies on each other, so there is no reason why they can't be called in parallel.

It is unlikely that we expect the `IHandleRequests` to be called in a specific order, as this is not guaranteed by an `IEnumerable` across their registrations so there is no reason why they can't be called in parallel. In fact, the call to `PublishAsync` (which calls `IHandleRequestsAsync`) is asynchronous and continuations may not run in parallel either (unless run in the context of our `ServiceActivator` which provides a `SynchronizationContext` that means we run the continuations sequentially.

## Decision

We will change the implementation of the `Publish` method on `CommandProcessor` to call the `IHandleRequests` that subscribe to the `Event` in parallel. For a `Publish` method call we will use `Parallel.ForEach` to call each `IHandleRequests` in parallel. For `PublishAsync` we will use `Task.WhenAll` to call each `IHandleRequestsAsync` in parallel.

## Consequences

This will make the `Publish` method faster and more efficient. It will also mean that the `IHandleRequests` for an `Event` will be called in parallel and so we will need to ensure that they are thread-safe. This is likely to be the case as the `IHandleRequests` are likely to be stateless and so should be thread-safe. If they are not thread-safe then we will need to make them thread-safe.

Our `RequestContext` class was not thread-safe and we need to make it thread-safe. We could provide a copy, but as a caller may choose to inspect the `RequestContext` after the `IHandleRequests` have been called we will need to make the `RequestContext` thread-safe. This is a small change that is mostly hidden behind the property accessors of the `RequestContext` class.

We use a `ConcurrentDictionary` to store the `RequestContext Span` data against the current thread, thus meaning each thread has its own span. This does mean that the caller will not have access to that thread-specific data, but this is not a problem as the caller should not be inspecting the thread-specific `RequestContext` `Span` through the property after the `IHandleRequests` have been called (most likely it provided the `Span`).

We use a `ConcurrentDictionary` for the `RequestContext` `Bag` property, which is a dictionary of key-value pairs. This is thread-safe and so we do not need to make any changes to this property.
4 changes: 2 additions & 2 deletions src/Paramore.Brighter.ServiceActivator/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ private RequestContext InitRequestContext(Activity span, Message message)
var context = _requestContextFactory.Create();
context.Span = span;
context.OriginatingMessage = message;
context.Bag.Add("ChannelName", Channel.Name);
context.Bag.Add("RequestStart", DateTime.UtcNow);
context.Bag.AddOrUpdate("ChannelName", Channel.Name, (s, o) => Channel.Name);
context.Bag.AddOrUpdate("RequestStart", DateTime.UtcNow, (s, o) => DateTime.UtcNow);
return context;
}

Expand Down
45 changes: 26 additions & 19 deletions src/Paramore.Brighter/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -332,21 +332,22 @@ public void Publish<T>(T @event, RequestContext requestContext = null) where T :
s_logger.LogInformation("Found {HandlerCount} pipelines for event: {EventType} {Id}", handlerCount,
@event.GetType(), @event.Id);

var exceptions = new List<Exception>();
foreach (var handleRequests in handlerChain)
var exceptions = new ConcurrentBag<Exception>();
Parallel.ForEach(handlerChain, (handleRequests) =>
{
try
{
handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions);
context.Span =handlerSpans[handleRequests.Name.ToString()];
handleRequests.Handle(@event);
context.Span = span;
var handlerName = handleRequests.Name.ToString();
handlerSpans[handlerName] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions);
context.Span = handlerSpans[handlerName];
handleRequests.Handle(@event);
context.Span = span;
}
catch (Exception e)
{
exceptions.Add(e);
}
}
});

_tracer?.LinkSpans(handlerSpans);

Expand Down Expand Up @@ -405,22 +406,26 @@ public async Task PublishAsync<T>(
@event.GetType(), @event.Id
);

var exceptions = new List<Exception>();
foreach (var handleRequests in handlerChain)
var exceptions = new ConcurrentBag<Exception>();

try
{
try
{
handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions);
context.Span =handlerSpans[handleRequests.Name.ToString()];
await handleRequests.HandleAsync(@event, cancellationToken).ConfigureAwait(continueOnCapturedContext);
context.Span = span;
}
catch (Exception e)
var tasks = new List<Task>();
foreach (var handleRequests in handlerChain)
{
exceptions.Add(e);
handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions);
context.Span =handlerSpans[handleRequests.Name.ToString()];
tasks.Add(handleRequests.HandleAsync(@event, cancellationToken));
context.Span = span;
}

await Task.WhenAll(tasks).ConfigureAwait(continueOnCapturedContext);
}

catch (Exception e)
{
exceptions.Add(e);
}

_tracer?.LinkSpans(handlerSpans);

if (exceptions.Any())
Expand Down Expand Up @@ -524,6 +529,8 @@ public string DepositPost<TRequest>(TRequest request, RequestContext requestCont
/// <param name="transactionProvider">The transaction provider to use with an outbox</param>
/// <param name="requestContext">The context of the request; if null we will start one via a <see cref="IAmARequestContextFactory"/> </param>
/// <param name="args">For transports or outboxes that require additional parameters such as topic, provide an optional arg</param>
/// <param name="batchId">The id of any batch of deposits we are called within; this will be set by the call to DepositPost with
/// a collection of requests and there is no need to set this yourself</param>
/// <typeparam name="TRequest">The type of the request</typeparam>
/// <typeparam name="TTransaction">The type of Db transaction used by the Outbox</typeparam>
/// <returns>The Id of the Message that has been deposited.</returns>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,13 @@ THE SOFTWARE. */

namespace Paramore.Brighter.FeatureSwitch.Providers
{
public class FluentConfigRegistry : IAmAFeatureSwitchRegistry
public class FluentConfigRegistry(IDictionary<Type, FeatureSwitchStatus> switches) : IAmAFeatureSwitchRegistry
{
public MissingConfigStrategy MissingConfigStrategy { get; set; } = MissingConfigStrategy.Exception;

private readonly IDictionary<Type, FeatureSwitchStatus> _switches;

public FluentConfigRegistry(IDictionary<Type, FeatureSwitchStatus> switches)
{
_switches = switches;
}

public FeatureSwitchStatus StatusOf(Type handler)
{
var configExists = _switches.ContainsKey(handler);
var configExists = switches.ContainsKey(handler);

if (!configExists)
{
Expand All @@ -60,7 +53,7 @@ public FeatureSwitchStatus StatusOf(Type handler)
}
}

return _switches[handler];
return switches[handler];
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/Paramore.Brighter/IRequestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ THE SOFTWARE. */

#endregion

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using Paramore.Brighter.FeatureSwitch;
Expand All @@ -45,7 +46,7 @@ public interface IRequestContext
/// Gets the bag.
/// </summary>
/// <value>The bag.</value>
Dictionary<string, object> Bag { get; }
ConcurrentDictionary<string, object> Bag { get; }

/// <summary>
/// Gets the policies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private TRequest CatchAll(TRequest command)
}
catch (Exception exception)
{
Context.Bag.Add(CAUSE_OF_FALLBACK_EXCEPTION, exception);
Context.Bag.AddOrUpdate(CAUSE_OF_FALLBACK_EXCEPTION, exception, (s, o) => exception);
return base.Fallback(command);
}
}
Expand All @@ -97,7 +97,7 @@ private TRequest CatchBrokenCircuit(TRequest command)
}
catch (BrokenCircuitException brokenCircuitExceptionexception)
{
Context.Bag.Add(CAUSE_OF_FALLBACK_EXCEPTION, brokenCircuitExceptionexception);
Context.Bag.AddOrUpdate(CAUSE_OF_FALLBACK_EXCEPTION, brokenCircuitExceptionexception, (s, o) => brokenCircuitExceptionexception);
return base.Fallback(command);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private async Task<TRequest> CatchAll(TRequest command, CancellationToken cancel
}
catch (Exception exception)
{
Context.Bag.Add(CAUSE_OF_FALLBACK_EXCEPTION, exception);
Context.Bag.AddOrUpdate(CAUSE_OF_FALLBACK_EXCEPTION, exception, (s, o) => exception);
}
return await FallbackAsync(command, cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
}
Expand All @@ -95,7 +95,7 @@ private async Task<TRequest> CatchBrokenCircuit(TRequest command, CancellationTo
}
catch (BrokenCircuitException brokenCircuitExceptionexception)
{
Context.Bag.Add(CAUSE_OF_FALLBACK_EXCEPTION, brokenCircuitExceptionexception);
Context.Bag.AddOrUpdate(CAUSE_OF_FALLBACK_EXCEPTION, brokenCircuitExceptionexception, (s, o) => brokenCircuitExceptionexception);
}
return await FallbackAsync(command, cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
}
Expand Down
60 changes: 28 additions & 32 deletions src/Paramore.Brighter/RequestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ THE SOFTWARE. */
#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using Paramore.Brighter.FeatureSwitch;
Expand All @@ -38,56 +39,51 @@ namespace Paramore.Brighter
/// </summary>
public class RequestContext : IRequestContext
{
private Message _originatingMessage;

/// <summary>
/// Initializes a new instance of the <see cref="RequestContext"/> class.
/// </summary>
public RequestContext()
{
Bag = new Dictionary<string, object>();
}

/// <summary>
/// Gets the Span [Activity] associated with the request
/// </summary>
public Activity Span { get; set; }
private readonly ConcurrentDictionary<int, Activity> _spans = new();

/// <summary>
/// Gets the bag.
/// </summary>
/// <value>The bag.</value>
public Dictionary<string, object> Bag { get; private set; }
public ConcurrentDictionary<string, object> Bag { get; } = new();

/// <summary>
/// Gets the Feature Switches
/// </summary>
public IAmAFeatureSwitchRegistry FeatureSwitches { get; set; }

/// <summary>
/// When we pass a requestContext through a receiver pipeline, we may want to pass the original message that started the pipeline.
/// This is primarily useful for debugging - how did we get to this request?. But it is also useful for some request metadata that we
/// do not want to transfer to the Request.
/// This is not thread-safe; the assumption is that you set this from a single thread and access the message from multiple threads. It is
/// not intended to be set from multiple threads.
///</summary>
/// <value>The originating message</value>
public Message OriginatingMessage
{
get { return _originatingMessage; }
set
{
if (_originatingMessage == null)
_originatingMessage = value;
else
throw new InvalidOperationException("You may only set the originating message once on the same context");
}


}

public Message OriginatingMessage { get; set; }

/// <summary>
/// Gets the policies.
/// </summary>
/// <value>The policies.</value>
public IPolicyRegistry<string> Policies { get; set; }

/// <summary>
/// Gets the Feature Switches
/// Gets the Span [Activity] associated with the request
/// This is thread-safe, so that you can access the context from multiple threads
/// This is mainly required for Publish, which uses the same context across multiple Publish handlers
/// </summary>
public IAmAFeatureSwitchRegistry FeatureSwitches { get; set; }
public Activity Span
{
get
{
_spans.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out var span);
return span;
}
set
{
_spans.AddOrUpdate(System.Threading.Thread.CurrentThread.ManagedThreadId, value, (key, oldValue) => value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,11 @@ THE SOFTWARE. */

namespace Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles
{
internal class MyOtherEventHandler : RequestHandler<MyEvent>
internal class MyOtherEventHandler(IDictionary<string, string> receivedMessages) : RequestHandler<MyEvent>
{
private readonly IDictionary<string, string> _receivedMessages;

public MyOtherEventHandler(IDictionary<string, string> receivedMessages)
{
_receivedMessages = receivedMessages;
}

public override MyEvent Handle(MyEvent @event)
{
_receivedMessages.Add(nameof(MyOtherEventHandler), @event.Id);
receivedMessages.Add(nameof(MyOtherEventHandler), @event.Id);
return base.Handle(@event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Send()
//act
var context = new RequestContext();
var testBagValue = Guid.NewGuid().ToString();
context.Bag.Add("TestString", testBagValue) ;
context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue) ;
commandProcessor.Send(new MyCommand(), context);

//assert
Expand Down Expand Up @@ -79,7 +79,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Send_Async()
//act
var context = new RequestContext();
var testBagValue = Guid.NewGuid().ToString();
context.Bag.Add("TestString", testBagValue) ;
context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue);
await commandProcessor.SendAsync(new MyCommand(), context);

//assert
Expand Down Expand Up @@ -107,7 +107,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Publish()
//act
var context = new RequestContext();
var testBagValue = Guid.NewGuid().ToString();
context.Bag.Add("TestString", testBagValue) ;
context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue);
commandProcessor.Publish(new MyEvent(), context);

//assert
Expand Down Expand Up @@ -135,7 +135,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Publish_Async()
//act
var context = new RequestContext();
var testBagValue = Guid.NewGuid().ToString();
context.Bag.Add("TestString", testBagValue) ;
context.Bag.AddOrUpdate("TestString", testBagValue, (s, o) => testBagValue);
await commandProcessor.PublishAsync(new MyEvent(), context);

//assert
Expand Down Expand Up @@ -187,7 +187,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Deposit()
//act
var context = new RequestContext();
var testBagValue = Guid.NewGuid().ToString();
context.Bag.Add("TestString", testBagValue) ;
context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue) ;
commandProcessor.DepositPost(new MyCommand(), context);

//assert
Expand Down Expand Up @@ -235,7 +235,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Deposit_Async()
//act
var context = new RequestContext();
var testBagValue = Guid.NewGuid().ToString();
context.Bag.Add("TestString", testBagValue) ;
context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue);
await commandProcessor.DepositPostAsync(new MyCommand(), context);

//assert
Expand Down Expand Up @@ -287,7 +287,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Clear()
//act
var context = new RequestContext();
var testBagValue = Guid.NewGuid().ToString();
context.Bag.Add("TestString", testBagValue) ;
context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue) ;
commandProcessor.ClearOutbox(new []{myCommand.Id}, context);

//assert
Expand Down Expand Up @@ -339,7 +339,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Clear_Async()
//act
var context = new RequestContext();
var testBagValue = Guid.NewGuid().ToString();
context.Bag.Add("TestString", testBagValue) ;
context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue) ;
await commandProcessor.ClearOutboxAsync(new []{myCommand.Id}, context);

//assert
Expand Down
Loading