From 351c13d2a4f79f8794b6b6873a2a7623c3c9f9b8 Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Tue, 2 Jul 2024 17:31:49 +0100 Subject: [PATCH 1/7] feature: use parallel publishing to events as no need to process them in sequence. --- src/Paramore.Brighter/CommandProcessor.cs | 44 ++++++++++++----------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index 275a8517e8..32f72f589c 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -332,21 +332,21 @@ public void Publish(T @event, RequestContext requestContext = null) where T : s_logger.LogInformation("Found {HandlerCount} pipelines for event: {EventType} {Id}", handlerCount, @event.GetType(), @event.Id); - var exceptions = new List(); - foreach (var handleRequests in handlerChain) + var exceptions = new ConcurrentBag(); + Parallel.ForEach(handlerChain, (handleRequests) => { try { - handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions); - context.Span =handlerSpans[handleRequests.Name.ToString()]; - handleRequests.Handle(@event); - context.Span = span; + handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions); + context.Span = handlerSpans[handleRequests.Name.ToString()]; + handleRequests.Handle(@event); + context.Span = span; } catch (Exception e) { exceptions.Add(e); } - } + }); _tracer?.LinkSpans(handlerSpans); @@ -405,22 +405,26 @@ public async Task PublishAsync( @event.GetType(), @event.Id ); - var exceptions = new List(); + var tasks = new List(); + var exceptions = new ConcurrentBag(); + foreach (var handleRequests in handlerChain) { - try - { - handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions); - context.Span =handlerSpans[handleRequests.Name.ToString()]; - await handleRequests.HandleAsync(@event, cancellationToken).ConfigureAwait(continueOnCapturedContext); - context.Span = span; - } - catch (Exception e) - { - exceptions.Add(e); - } + handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions); + context.Span =handlerSpans[handleRequests.Name.ToString()]; + tasks.Add(handleRequests.HandleAsync(@event, cancellationToken)); + context.Span = span; } - + + try + { + await Task.WhenAll(tasks).ConfigureAwait(continueOnCapturedContext); + } + catch (Exception e) + { + exceptions.Add(e); + } + _tracer?.LinkSpans(handlerSpans); if (exceptions.Any()) From 382f8f3bc180fc16b4e41e7ea74f56efa1d94e42 Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Wed, 3 Jul 2024 08:13:26 +0100 Subject: [PATCH 2/7] fix: ensure that we catch all exceptions from the run --- src/Paramore.Brighter/CommandProcessor.cs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index 32f72f589c..ca1799d3d6 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -405,19 +405,19 @@ public async Task PublishAsync( @event.GetType(), @event.Id ); - var tasks = new List(); var exceptions = new ConcurrentBag(); - foreach (var handleRequests in handlerChain) - { - handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions); - context.Span =handlerSpans[handleRequests.Name.ToString()]; - tasks.Add(handleRequests.HandleAsync(@event, cancellationToken)); - context.Span = span; - } - try { + var tasks = new List(); + foreach (var handleRequests in handlerChain) + { + handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions); + context.Span =handlerSpans[handleRequests.Name.ToString()]; + tasks.Add(handleRequests.HandleAsync(@event, cancellationToken)); + context.Span = span; + } + await Task.WhenAll(tasks).ConfigureAwait(continueOnCapturedContext); } catch (Exception e) From e09704d575aa6c88854842e72c47320eb98bd23d Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Thu, 4 Jul 2024 20:23:05 +0100 Subject: [PATCH 3/7] fix: clean up using statements --- tests/Paramore.Brighter.Extensions.Tests/TestDifferentSetups.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/Paramore.Brighter.Extensions.Tests/TestDifferentSetups.cs b/tests/Paramore.Brighter.Extensions.Tests/TestDifferentSetups.cs index 5546d07074..ac23f137bc 100644 --- a/tests/Paramore.Brighter.Extensions.Tests/TestDifferentSetups.cs +++ b/tests/Paramore.Brighter.Extensions.Tests/TestDifferentSetups.cs @@ -1,8 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; -using System.Transactions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Time.Testing; From 21f13505ce33886321f33eeec8dff44abb49dd69 Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Fri, 5 Jul 2024 18:34:02 +0100 Subject: [PATCH 4/7] fix: make RequestContext.cs thread safe --- .../MessagePump.cs | 4 +- src/Paramore.Brighter/CommandProcessor.cs | 5 +- .../Providers/FluentConfigRegistry.cs | 13 +--- src/Paramore.Brighter/IRequestContext.cs | 3 +- .../Handlers/FallbackPolicyHandler.cs | 4 +- ...allbackPolicyHandlerRequestHandlerAsync.cs | 4 +- src/Paramore.Brighter/RequestContext.cs | 58 +++++++++-------- .../TestDoubles/MyOtherEventHandler.cs | 11 +--- .../When_A_Request_Context_Is_Provided.cs | 16 ++--- .../When_Accessing_A_Request_Context.cs | 63 +++++++++++++++++++ .../When_Creating_A_Request_Context.cs | 0 ...hen_Repeatedly_Fails_Breaks_The_Circuit.cs | 2 +- ...peatedly_Fails_Breaks_The_Circuit_Async.cs | 2 +- ...Is_Called_It_Can_Only_Be_Obtained_Once.cs} | 2 +- ...n_A_Stop_Message_Is_Added_To_A_Channel.cs} | 0 ...hen_Acknowledge_Is_Called_On_A_Channel.cs} | 0 ...hen_Listening_To_Messages_On_A_Channel.cs} | 0 ..._No_Acknowledge_Is_Called_On_A_Channel.cs} | 0 ...When_Requeuing_A_Message_With_No_Delay.cs} | 0 ..._Empty_Read_From_That_Before_Receiving.cs} | 0 ...Publishing_A_Request_A_Span_Is_Exported.cs | 18 +++--- 21 files changed, 131 insertions(+), 74 deletions(-) create mode 100644 tests/Paramore.Brighter.Core.Tests/Context/When_Accessing_A_Request_Context.cs rename tests/Paramore.Brighter.Core.Tests/{RequestContextFactory => Context}/When_Creating_A_Request_Context.cs (100%) rename tests/Paramore.Brighter.Core.Tests/Locking/{InMemoryLockingProviderTests.cs => When_Lock_Is_Called_It_Can_Only_Be_Obtained_Once.cs} (92%) rename tests/Paramore.Brighter.Core.Tests/MessagingGateway/{When_a_stop_message_is_added_to_a_channel.cs => When_A_Stop_Message_Is_Added_To_A_Channel.cs} (100%) rename tests/Paramore.Brighter.Core.Tests/MessagingGateway/{When_acknowledge_is_called_on_a_channel.cs => When_Acknowledge_Is_Called_On_A_Channel.cs} (100%) rename tests/Paramore.Brighter.Core.Tests/MessagingGateway/{When_listening_to_messages_on_a_channel.cs => When_Listening_To_Messages_On_A_Channel.cs} (100%) rename tests/Paramore.Brighter.Core.Tests/MessagingGateway/{When_no_acknowledge_is_called_on_a_channel.cs => When_No_Acknowledge_Is_Called_On_A_Channel.cs} (100%) rename tests/Paramore.Brighter.Core.Tests/MessagingGateway/{When_requeuing_a_message_with_no_delay.cs => When_Requeuing_A_Message_With_No_Delay.cs} (100%) rename tests/Paramore.Brighter.Core.Tests/MessagingGateway/{When_the_buffer_is_not_empty_read_from_that_before_receiving.cs => When_The_Buffer_Is_Not_Empty_Read_From_That_Before_Receiving.cs} (100%) diff --git a/src/Paramore.Brighter.ServiceActivator/MessagePump.cs b/src/Paramore.Brighter.ServiceActivator/MessagePump.cs index 185f9f0010..81a000e0d5 100644 --- a/src/Paramore.Brighter.ServiceActivator/MessagePump.cs +++ b/src/Paramore.Brighter.ServiceActivator/MessagePump.cs @@ -374,8 +374,8 @@ private RequestContext InitRequestContext(Activity span, Message message) var context = _requestContextFactory.Create(); context.Span = span; context.OriginatingMessage = message; - context.Bag.Add("ChannelName", Channel.Name); - context.Bag.Add("RequestStart", DateTime.UtcNow); + context.Bag.AddOrUpdate("ChannelName", Channel.Name, (s, o) => Channel.Name); + context.Bag.AddOrUpdate("RequestStart", DateTime.UtcNow, (s, o) => DateTime.UtcNow); return context; } diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index ca1799d3d6..0f78abd683 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -337,8 +337,9 @@ public void Publish(T @event, RequestContext requestContext = null) where T : { try { - handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions); - context.Span = handlerSpans[handleRequests.Name.ToString()]; + var handlerName = handleRequests.Name.ToString(); + handlerSpans[handlerName] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions); + context.Span = handlerSpans[handlerName]; handleRequests.Handle(@event); context.Span = span; } diff --git a/src/Paramore.Brighter/FeatureSwitch/Providers/FluentConfigRegistry.cs b/src/Paramore.Brighter/FeatureSwitch/Providers/FluentConfigRegistry.cs index a604cfc226..0b914d14ec 100644 --- a/src/Paramore.Brighter/FeatureSwitch/Providers/FluentConfigRegistry.cs +++ b/src/Paramore.Brighter/FeatureSwitch/Providers/FluentConfigRegistry.cs @@ -27,20 +27,13 @@ THE SOFTWARE. */ namespace Paramore.Brighter.FeatureSwitch.Providers { - public class FluentConfigRegistry : IAmAFeatureSwitchRegistry + public class FluentConfigRegistry(IDictionary switches) : IAmAFeatureSwitchRegistry { public MissingConfigStrategy MissingConfigStrategy { get; set; } = MissingConfigStrategy.Exception; - private readonly IDictionary _switches; - - public FluentConfigRegistry(IDictionary switches) - { - _switches = switches; - } - public FeatureSwitchStatus StatusOf(Type handler) { - var configExists = _switches.ContainsKey(handler); + var configExists = switches.ContainsKey(handler); if (!configExists) { @@ -60,7 +53,7 @@ public FeatureSwitchStatus StatusOf(Type handler) } } - return _switches[handler]; + return switches[handler]; } } diff --git a/src/Paramore.Brighter/IRequestContext.cs b/src/Paramore.Brighter/IRequestContext.cs index 3606c76dac..bfe08279fc 100644 --- a/src/Paramore.Brighter/IRequestContext.cs +++ b/src/Paramore.Brighter/IRequestContext.cs @@ -22,6 +22,7 @@ THE SOFTWARE. */ #endregion +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using Paramore.Brighter.FeatureSwitch; @@ -45,7 +46,7 @@ public interface IRequestContext /// Gets the bag. /// /// The bag. - Dictionary Bag { get; } + ConcurrentDictionary Bag { get; } /// /// Gets the policies. diff --git a/src/Paramore.Brighter/Policies/Handlers/FallbackPolicyHandler.cs b/src/Paramore.Brighter/Policies/Handlers/FallbackPolicyHandler.cs index 7841ed701b..b67cfdcc2d 100644 --- a/src/Paramore.Brighter/Policies/Handlers/FallbackPolicyHandler.cs +++ b/src/Paramore.Brighter/Policies/Handlers/FallbackPolicyHandler.cs @@ -84,7 +84,7 @@ private TRequest CatchAll(TRequest command) } catch (Exception exception) { - Context.Bag.Add(CAUSE_OF_FALLBACK_EXCEPTION, exception); + Context.Bag.AddOrUpdate(CAUSE_OF_FALLBACK_EXCEPTION, exception, (s, o) => exception); return base.Fallback(command); } } @@ -97,7 +97,7 @@ private TRequest CatchBrokenCircuit(TRequest command) } catch (BrokenCircuitException brokenCircuitExceptionexception) { - Context.Bag.Add(CAUSE_OF_FALLBACK_EXCEPTION, brokenCircuitExceptionexception); + Context.Bag.AddOrUpdate(CAUSE_OF_FALLBACK_EXCEPTION, brokenCircuitExceptionexception, (s, o) => brokenCircuitExceptionexception); return base.Fallback(command); } } diff --git a/src/Paramore.Brighter/Policies/Handlers/FallbackPolicyHandlerRequestHandlerAsync.cs b/src/Paramore.Brighter/Policies/Handlers/FallbackPolicyHandlerRequestHandlerAsync.cs index b8cf853989..9fa9f9c5e8 100644 --- a/src/Paramore.Brighter/Policies/Handlers/FallbackPolicyHandlerRequestHandlerAsync.cs +++ b/src/Paramore.Brighter/Policies/Handlers/FallbackPolicyHandlerRequestHandlerAsync.cs @@ -82,7 +82,7 @@ private async Task CatchAll(TRequest command, CancellationToken cancel } catch (Exception exception) { - Context.Bag.Add(CAUSE_OF_FALLBACK_EXCEPTION, exception); + Context.Bag.AddOrUpdate(CAUSE_OF_FALLBACK_EXCEPTION, exception, (s, o) => exception); } return await FallbackAsync(command, cancellationToken).ConfigureAwait(ContinueOnCapturedContext); } @@ -95,7 +95,7 @@ private async Task CatchBrokenCircuit(TRequest command, CancellationTo } catch (BrokenCircuitException brokenCircuitExceptionexception) { - Context.Bag.Add(CAUSE_OF_FALLBACK_EXCEPTION, brokenCircuitExceptionexception); + Context.Bag.AddOrUpdate(CAUSE_OF_FALLBACK_EXCEPTION, brokenCircuitExceptionexception, (s, o) => brokenCircuitExceptionexception); } return await FallbackAsync(command, cancellationToken).ConfigureAwait(ContinueOnCapturedContext); } diff --git a/src/Paramore.Brighter/RequestContext.cs b/src/Paramore.Brighter/RequestContext.cs index 95c889d640..20db51bae9 100644 --- a/src/Paramore.Brighter/RequestContext.cs +++ b/src/Paramore.Brighter/RequestContext.cs @@ -23,6 +23,7 @@ THE SOFTWARE. */ #endregion using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using Paramore.Brighter.FeatureSwitch; @@ -38,45 +39,39 @@ namespace Paramore.Brighter /// public class RequestContext : IRequestContext { - private Message _originatingMessage; - - /// - /// Initializes a new instance of the class. - /// - public RequestContext() - { - Bag = new Dictionary(); - } - - /// - /// Gets the Span [Activity] associated with the request - /// - public Activity Span { get; set; } + private readonly ConcurrentDictionary _messages = new(); + private readonly ConcurrentDictionary _spans = new(); /// /// Gets the bag. /// /// The bag. - public Dictionary Bag { get; private set; } + public ConcurrentDictionary Bag { get; } = new(); + + /// + /// Gets the Feature Switches + /// + public IAmAFeatureSwitchRegistry FeatureSwitches { get; set; } /// /// When we pass a requestContext through a receiver pipeline, we may want to pass the original message that started the pipeline. /// This is primarily useful for debugging - how did we get to this request?. But it is also useful for some request metadata that we /// do not want to transfer to the Request. + /// This is thread-safe, so that you can access the context from multiple threads + /// This is mainly required for Publish, which uses the same context across multiple Publish handlers /// /// The originating message public Message OriginatingMessage { - get { return _originatingMessage; } + get + { + _messages.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out var message); + return message; + } set { - if (_originatingMessage == null) - _originatingMessage = value; - else - throw new InvalidOperationException("You may only set the originating message once on the same context"); + _messages.AddOrUpdate(System.Threading.Thread.CurrentThread.ManagedThreadId, value, (key, oldValue) => value); } - - } /// @@ -84,10 +79,23 @@ public Message OriginatingMessage /// /// The policies. public IPolicyRegistry Policies { get; set; } - + /// - /// Gets the Feature Switches + /// Gets the Span [Activity] associated with the request + /// This is thread-safe, so that you can access the context from multiple threads + /// This is mainly required for Publish, which uses the same context across multiple Publish handlers /// - public IAmAFeatureSwitchRegistry FeatureSwitches { get; set; } + public Activity Span + { + get + { + _spans.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out var span); + return span; + } + set + { + _spans.AddOrUpdate(System.Threading.Thread.CurrentThread.ManagedThreadId, value, (key, oldValue) => value); + } + } } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyOtherEventHandler.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyOtherEventHandler.cs index 1ec778480a..97a9522d48 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyOtherEventHandler.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyOtherEventHandler.cs @@ -27,18 +27,11 @@ THE SOFTWARE. */ namespace Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles { - internal class MyOtherEventHandler : RequestHandler + internal class MyOtherEventHandler(IDictionary receivedMessages) : RequestHandler { - private readonly IDictionary _receivedMessages; - - public MyOtherEventHandler(IDictionary receivedMessages) - { - _receivedMessages = receivedMessages; - } - public override MyEvent Handle(MyEvent @event) { - _receivedMessages.Add(nameof(MyOtherEventHandler), @event.Id); + receivedMessages.Add(nameof(MyOtherEventHandler), @event.Id); return base.Handle(@event); } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_A_Request_Context_Is_Provided.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_A_Request_Context_Is_Provided.cs index 4d9d63ad12..e20ce6bbc5 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_A_Request_Context_Is_Provided.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_A_Request_Context_Is_Provided.cs @@ -49,7 +49,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Send() //act var context = new RequestContext(); var testBagValue = Guid.NewGuid().ToString(); - context.Bag.Add("TestString", testBagValue) ; + context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue) ; commandProcessor.Send(new MyCommand(), context); //assert @@ -79,7 +79,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Send_Async() //act var context = new RequestContext(); var testBagValue = Guid.NewGuid().ToString(); - context.Bag.Add("TestString", testBagValue) ; + context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue); await commandProcessor.SendAsync(new MyCommand(), context); //assert @@ -107,7 +107,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Publish() //act var context = new RequestContext(); var testBagValue = Guid.NewGuid().ToString(); - context.Bag.Add("TestString", testBagValue) ; + context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue); commandProcessor.Publish(new MyEvent(), context); //assert @@ -135,7 +135,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Publish_Async() //act var context = new RequestContext(); var testBagValue = Guid.NewGuid().ToString(); - context.Bag.Add("TestString", testBagValue) ; + context.Bag.AddOrUpdate("TestString", testBagValue, (s, o) => testBagValue); await commandProcessor.PublishAsync(new MyEvent(), context); //assert @@ -187,7 +187,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Deposit() //act var context = new RequestContext(); var testBagValue = Guid.NewGuid().ToString(); - context.Bag.Add("TestString", testBagValue) ; + context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue) ; commandProcessor.DepositPost(new MyCommand(), context); //assert @@ -235,7 +235,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Deposit_Async() //act var context = new RequestContext(); var testBagValue = Guid.NewGuid().ToString(); - context.Bag.Add("TestString", testBagValue) ; + context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue); await commandProcessor.DepositPostAsync(new MyCommand(), context); //assert @@ -287,7 +287,7 @@ public void When_A_Request_Context_Is_Provided_On_A_Clear() //act var context = new RequestContext(); var testBagValue = Guid.NewGuid().ToString(); - context.Bag.Add("TestString", testBagValue) ; + context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue) ; commandProcessor.ClearOutbox(new []{myCommand.Id}, context); //assert @@ -339,7 +339,7 @@ public async Task When_A_Request_Context_Is_Provided_On_A_Clear_Async() //act var context = new RequestContext(); var testBagValue = Guid.NewGuid().ToString(); - context.Bag.Add("TestString", testBagValue) ; + context.Bag.AddOrUpdate("TestString", testBagValue, (key, oldValue) => testBagValue) ; await commandProcessor.ClearOutboxAsync(new []{myCommand.Id}, context); //assert diff --git a/tests/Paramore.Brighter.Core.Tests/Context/When_Accessing_A_Request_Context.cs b/tests/Paramore.Brighter.Core.Tests/Context/When_Accessing_A_Request_Context.cs new file mode 100644 index 0000000000..de607ef97d --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/Context/When_Accessing_A_Request_Context.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using OpenTelemetry; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; +using Paramore.Brighter.Core.Tests.FeatureSwitch.TestDoubles; +using Paramore.Brighter.FeatureSwitch; +using Paramore.Brighter.FeatureSwitch.Providers; +using Polly; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.Context; + +public class RequestContextTests +{ + [Fact] + public void When_Accessing_A_Request_Context() + { + //arrange + var builder = Sdk.CreateTracerProviderBuilder(); + + var exportedActivities = new List(); + var traceProvider = builder + .AddSource("Paramore.Brighter.Tests", "Paramore.Brighter") + .ConfigureResource(r => r.AddService("in-memory-tracer")) + .AddInMemoryExporter(exportedActivities) + .Build(); + + var activitySource = new ActivitySource("Paramore.Brighter.Tests"); + var span = activitySource.StartActivity(); + + var message = new Message( + new MessageHeader(Guid.NewGuid().ToString(), "test", MessageType.MT_COMMAND), + new MessageBody("test content")); + + //act + + var context = new RequestContext + { + FeatureSwitches = FluentConfigRegistryBuilder + .With() + .StatusOf() + .Is(FeatureSwitchStatus.On) + .Build(), + Policies = new PolicyRegistry{ + { "key", Policy.NoOp() } + } + }; + context.Bag.AddOrUpdate("key", "value", (key, oldValue) => "value"); + context.Span = span; + context.OriginatingMessage = message; + + //assert + Assert.Equal(context.Bag["key"], "value"); + Assert.NotNull(context.Policies["key"]); + Assert.Equal(span?.Id, context.Span?.Id); + Assert.NotNull(context.OriginatingMessage); + Assert.Equal(context.OriginatingMessage.Header.Id, message.Header.Id); + + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/RequestContextFactory/When_Creating_A_Request_Context.cs b/tests/Paramore.Brighter.Core.Tests/Context/When_Creating_A_Request_Context.cs similarity index 100% rename from tests/Paramore.Brighter.Core.Tests/RequestContextFactory/When_Creating_A_Request_Context.cs rename to tests/Paramore.Brighter.Core.Tests/Context/When_Creating_A_Request_Context.cs diff --git a/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit.cs b/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit.cs index c70a3129ba..5bbf2f3599 100644 --- a/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit.cs +++ b/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit.cs @@ -22,7 +22,7 @@ public class CommandProcessorWithBothRetryAndCircuitBreaker : IDisposable private Exception _firstException; private Exception _secondException; private int _retryCount; - private Context _context; + private Polly.Context _context; public CommandProcessorWithBothRetryAndCircuitBreaker() { diff --git a/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit_Async.cs b/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit_Async.cs index e70f635c0e..bdbafb0179 100644 --- a/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit_Async.cs @@ -23,7 +23,7 @@ public class CommandProcessorWithBothRetryAndCircuitBreakerAsync : IDisposable private Exception _firstException; private Exception _secondException; private int _retryCount; - private Context _context; + private Polly.Context _context; public CommandProcessorWithBothRetryAndCircuitBreakerAsync() { diff --git a/tests/Paramore.Brighter.Core.Tests/Locking/InMemoryLockingProviderTests.cs b/tests/Paramore.Brighter.Core.Tests/Locking/When_Lock_Is_Called_It_Can_Only_Be_Obtained_Once.cs similarity index 92% rename from tests/Paramore.Brighter.Core.Tests/Locking/InMemoryLockingProviderTests.cs rename to tests/Paramore.Brighter.Core.Tests/Locking/When_Lock_Is_Called_It_Can_Only_Be_Obtained_Once.cs index 76458a6f9f..a6a9fdbcc6 100644 --- a/tests/Paramore.Brighter.Core.Tests/Locking/InMemoryLockingProviderTests.cs +++ b/tests/Paramore.Brighter.Core.Tests/Locking/When_Lock_Is_Called_It_Can_Only_Be_Obtained_Once.cs @@ -11,7 +11,7 @@ public class InMemoryLockingProviderTests [Fact] - public async Task GivenAnInMemoryLockingProvider_WhenLockIsCalled_ItCanOnlyBeObtainedOnce() + public async Task WhenLockIsCalled_ItCanOnlyBeObtainedOnce() { var resourceName = $"TestLock-{Guid.NewGuid()}"; diff --git a/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_a_stop_message_is_added_to_a_channel.cs b/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_A_Stop_Message_Is_Added_To_A_Channel.cs similarity index 100% rename from tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_a_stop_message_is_added_to_a_channel.cs rename to tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_A_Stop_Message_Is_Added_To_A_Channel.cs diff --git a/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_acknowledge_is_called_on_a_channel.cs b/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_Acknowledge_Is_Called_On_A_Channel.cs similarity index 100% rename from tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_acknowledge_is_called_on_a_channel.cs rename to tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_Acknowledge_Is_Called_On_A_Channel.cs diff --git a/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_listening_to_messages_on_a_channel.cs b/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_Listening_To_Messages_On_A_Channel.cs similarity index 100% rename from tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_listening_to_messages_on_a_channel.cs rename to tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_Listening_To_Messages_On_A_Channel.cs diff --git a/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_no_acknowledge_is_called_on_a_channel.cs b/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_No_Acknowledge_Is_Called_On_A_Channel.cs similarity index 100% rename from tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_no_acknowledge_is_called_on_a_channel.cs rename to tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_No_Acknowledge_Is_Called_On_A_Channel.cs diff --git a/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_requeuing_a_message_with_no_delay.cs b/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_Requeuing_A_Message_With_No_Delay.cs similarity index 100% rename from tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_requeuing_a_message_with_no_delay.cs rename to tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_Requeuing_A_Message_With_No_Delay.cs diff --git a/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_the_buffer_is_not_empty_read_from_that_before_receiving.cs b/tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_The_Buffer_Is_Not_Empty_Read_From_That_Before_Receiving.cs similarity index 100% rename from tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_the_buffer_is_not_empty_read_from_that_before_receiving.cs rename to tests/Paramore.Brighter.Core.Tests/MessagingGateway/When_The_Buffer_Is_Not_Empty_Read_From_That_Before_Receiving.cs diff --git a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Publish/When_Publishing_A_Request_A_Span_Is_Exported.cs b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Publish/When_Publishing_A_Request_A_Span_Is_Exported.cs index 267bd66249..13f11ce6fd 100644 --- a/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Publish/When_Publishing_A_Request_A_Span_Is_Exported.cs +++ b/tests/Paramore.Brighter.Core.Tests/Observability/CommandProcessor/Publish/When_Publishing_A_Request_A_Span_Is_Exported.cs @@ -107,11 +107,10 @@ public void When_Publishing_A_Request_With_Span_In_Context_Child_Spans_Are_Expor first.Tags.Any(t => t.Key == BrighterSemanticConventions.RequestBody && t.Value == JsonSerializer.Serialize(@event)).Should().BeTrue(); first.Tags.Any(t => t is { Key: BrighterSemanticConventions.Operation, Value: "publish" }).Should().BeTrue(); - first.Events.Count().Should().Be(1); - first.Events.First().Name.Should().Be(nameof(MyEventHandler)); - first.Events.First().Tags.Any(t => t.Key == BrighterSemanticConventions.HandlerName && (string)t.Value == nameof(MyEventHandler)).Should().BeTrue(); - first.Events.First().Tags.Any(t => t.Key == BrighterSemanticConventions.HandlerType && (string)t.Value == "sync").Should().BeTrue(); - first.Events.First().Tags.Any(t => t.Key == BrighterSemanticConventions.IsSink && (bool)t.Value).Should().BeTrue(); + var activityEvent = first.Events.Single(e => e.Name == nameof(MyEventHandler) || e.Name == nameof(MyOtherEventHandler)); + activityEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.HandlerName && (string)t.Value == activityEvent.Name).Should().BeTrue(); + activityEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.HandlerType && (string)t.Value == "sync").Should().BeTrue(); + activityEvent.Tags.Any(t => t.Value != null && t.Key == BrighterSemanticConventions.IsSink && (bool)t.Value).Should().BeTrue(); //--second publish var second = publishActivities.Last(); @@ -121,11 +120,10 @@ public void When_Publishing_A_Request_With_Span_In_Context_Child_Spans_Are_Expor second.Tags.Any(t => t.Key == BrighterSemanticConventions.RequestBody && t.Value == JsonSerializer.Serialize(@event)).Should().BeTrue(); second.Tags.Any(t => t is { Key: BrighterSemanticConventions.Operation, Value: "publish" }).Should().BeTrue(); - second.Events.Count().Should().Be(1); - second.Events.First().Name.Should().Be(nameof(MyOtherEventHandler)); - second.Events.First().Tags.Any(t => t.Key == BrighterSemanticConventions.HandlerName && (string)t.Value == nameof(MyOtherEventHandler)).Should().BeTrue(); - second.Events.First().Tags.Any(t => t.Key == BrighterSemanticConventions.HandlerType && (string)t.Value == "sync").Should().BeTrue(); - second.Events.First().Tags.Any(t => t.Key == BrighterSemanticConventions.IsSink && (bool)t.Value).Should().BeTrue(); + activityEvent = second.Events.Single(e => e.Name == nameof(MyEventHandler) || e.Name == nameof(MyOtherEventHandler)); + activityEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.HandlerName && (string)t.Value == activityEvent.Name).Should().BeTrue(); + activityEvent.Tags.Any(t => t.Key == BrighterSemanticConventions.HandlerType && (string)t.Value == "sync").Should().BeTrue(); + activityEvent.Tags.Any(t => t.Value != null && t.Key == BrighterSemanticConventions.IsSink && (bool)t.Value).Should().BeTrue(); //TODO: Needs adding when https://github.com/dotnet/runtime/pull/101381 is released /* From b75090abe0dc86cf96bb22e777616a64678e462e Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Sat, 6 Jul 2024 16:27:10 +0100 Subject: [PATCH 5/7] docs: add an adr for the publish change --- .../adr/0013-publish-should-be-in-parallel.md | 25 +++++++++++++++++++ src/Paramore.Brighter/CommandProcessor.cs | 2 ++ 2 files changed, 27 insertions(+) create mode 100644 docs/adr/0013-publish-should-be-in-parallel.md diff --git a/docs/adr/0013-publish-should-be-in-parallel.md b/docs/adr/0013-publish-should-be-in-parallel.md new file mode 100644 index 0000000000..ba06bbe575 --- /dev/null +++ b/docs/adr/0013-publish-should-be-in-parallel.md @@ -0,0 +1,25 @@ +# 1. Record architecture decisions + +Date: 2024-07-06 + +## Status + +Accepted + +## Context + +The current implementation of the `Publish` method on `CommandProcessor` is sequential. This means that when you call `Publish` with an `Event` we call the `IHandleRequests` that subscribe to that `Event` one-by-one. This is not efficient and can be slow when there are many `IHandleRequests` subscribed to the `Event`. + +It is unlikely that the `IHandleRequests` will have any dependencies on each other, so there is no reason why they can't be called in parallel. + +It is unlikely that we expect the `IHandleRequests` to be called in a specific order, so there is no reason why they can't be called in parallel. In fact, the call to `PublishAsync` (which calls `IHandleRequestsAsync`) is asynchronous and continuations may not run in parallel either (unless run in the context of our `ServiceActivator` which provides a `SynchronizationContext` that means we run the continuations sequentially. + +## Decision + +We will change the implementation of the `Publish` method on `CommandProcessor` to call the `IHandleRequests` that subscribe to the `Event` in parallel. For a `Publish` method call we will use `Parallel.ForEach` to call each `IHandleRequests` in parallel. For `PublishAsync` we will use `Task.WhenAll` to call each `IHandleRequestsAsync` in parallel. + +## Consequences + +This will make the `Publish` method faster and more efficient. It will also mean that the `IHandleRequests` for an `Event` will be called in parallel and so we will need to ensure that they are thread-safe. This is likely to be the case as the `IHandleRequests` are likely to be stateless and so should be thread-safe. If they are not thread-safe then we will need to make them thread-safe. + +Our `RequestContext` class was not thread-safe and we need to make it thread-safe. We could provide a copy, but as a caller may choose to inspect the `RequestContext` after the `IHandleRequests` have been called we will need to make the `RequestContext` thread-safe. This is a small change that is mostly hidden behind the property accessors of the `RequestContext` class. We use a `ConcurrentDictionary` to store the `RequestContext` data against the current thread, thus meaning each thread has its own data. This does mean that the caller will not have access to the thread-specific data, but this is not a problem as the caller should not be inspecting the thread-specific `RequestContext` data, such as a `Span` after the `IHandleRequests` have been called. \ No newline at end of file diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index ee40b45470..f02d7bf351 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -529,6 +529,8 @@ public string DepositPost(TRequest request, RequestContext requestCont /// The transaction provider to use with an outbox /// The context of the request; if null we will start one via a /// For transports or outboxes that require additional parameters such as topic, provide an optional arg + /// The id of any batch of deposits we are called within; this will be set by the call to DepositPost with + /// a collection of requests and there is no need to set this yourself /// The type of the request /// The type of Db transaction used by the Outbox /// The Id of the Message that has been deposited. From ec6033ef51f0adf08dfa29ba739e396eaac0ae81 Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Sat, 6 Jul 2024 16:43:01 +0100 Subject: [PATCH 6/7] fix: don't make the message property multi-threaded as likely only set from one thread --- .../adr/0013-publish-should-be-in-parallel.md | 8 ++++++-- src/Paramore.Brighter/RequestContext.cs | 20 ++++--------------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/docs/adr/0013-publish-should-be-in-parallel.md b/docs/adr/0013-publish-should-be-in-parallel.md index ba06bbe575..8822c6ba9e 100644 --- a/docs/adr/0013-publish-should-be-in-parallel.md +++ b/docs/adr/0013-publish-should-be-in-parallel.md @@ -12,7 +12,7 @@ The current implementation of the `Publish` method on `CommandProcessor` is sequ It is unlikely that the `IHandleRequests` will have any dependencies on each other, so there is no reason why they can't be called in parallel. -It is unlikely that we expect the `IHandleRequests` to be called in a specific order, so there is no reason why they can't be called in parallel. In fact, the call to `PublishAsync` (which calls `IHandleRequestsAsync`) is asynchronous and continuations may not run in parallel either (unless run in the context of our `ServiceActivator` which provides a `SynchronizationContext` that means we run the continuations sequentially. +It is unlikely that we expect the `IHandleRequests` to be called in a specific order, as this is not guaranteed by an `IEnumerable` across their registrations so there is no reason why they can't be called in parallel. In fact, the call to `PublishAsync` (which calls `IHandleRequestsAsync`) is asynchronous and continuations may not run in parallel either (unless run in the context of our `ServiceActivator` which provides a `SynchronizationContext` that means we run the continuations sequentially. ## Decision @@ -22,4 +22,8 @@ We will change the implementation of the `Publish` method on `CommandProcessor` This will make the `Publish` method faster and more efficient. It will also mean that the `IHandleRequests` for an `Event` will be called in parallel and so we will need to ensure that they are thread-safe. This is likely to be the case as the `IHandleRequests` are likely to be stateless and so should be thread-safe. If they are not thread-safe then we will need to make them thread-safe. -Our `RequestContext` class was not thread-safe and we need to make it thread-safe. We could provide a copy, but as a caller may choose to inspect the `RequestContext` after the `IHandleRequests` have been called we will need to make the `RequestContext` thread-safe. This is a small change that is mostly hidden behind the property accessors of the `RequestContext` class. We use a `ConcurrentDictionary` to store the `RequestContext` data against the current thread, thus meaning each thread has its own data. This does mean that the caller will not have access to the thread-specific data, but this is not a problem as the caller should not be inspecting the thread-specific `RequestContext` data, such as a `Span` after the `IHandleRequests` have been called. \ No newline at end of file +Our `RequestContext` class was not thread-safe and we need to make it thread-safe. We could provide a copy, but as a caller may choose to inspect the `RequestContext` after the `IHandleRequests` have been called we will need to make the `RequestContext` thread-safe. This is a small change that is mostly hidden behind the property accessors of the `RequestContext` class. + +We use a `ConcurrentDictionary` to store the `RequestContext Span` data against the current thread, thus meaning each thread has its own span. This does mean that the caller will not have access to that thread-specific data, but this is not a problem as the caller should not be inspecting the thread-specific `RequestContext` `Span` after the `IHandleRequests` have been called. + +We use a `ConcurrentDictionary` for the `RequestContext` `Bag` property, which is a dictionary of key-value pairs. This is thread-safe and so we do not need to make any changes to this property. \ No newline at end of file diff --git a/src/Paramore.Brighter/RequestContext.cs b/src/Paramore.Brighter/RequestContext.cs index 20db51bae9..e703f771ea 100644 --- a/src/Paramore.Brighter/RequestContext.cs +++ b/src/Paramore.Brighter/RequestContext.cs @@ -39,7 +39,6 @@ namespace Paramore.Brighter /// public class RequestContext : IRequestContext { - private readonly ConcurrentDictionary _messages = new(); private readonly ConcurrentDictionary _spans = new(); /// @@ -57,23 +56,12 @@ public class RequestContext : IRequestContext /// When we pass a requestContext through a receiver pipeline, we may want to pass the original message that started the pipeline. /// This is primarily useful for debugging - how did we get to this request?. But it is also useful for some request metadata that we /// do not want to transfer to the Request. - /// This is thread-safe, so that you can access the context from multiple threads - /// This is mainly required for Publish, which uses the same context across multiple Publish handlers + /// This is not thread-safe; the assumption is that you set this from a single thread and access the message from multiple threads. It is + /// not intended to be set from multiple threads. /// /// The originating message - public Message OriginatingMessage - { - get - { - _messages.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out var message); - return message; - } - set - { - _messages.AddOrUpdate(System.Threading.Thread.CurrentThread.ManagedThreadId, value, (key, oldValue) => value); - } - } - + public Message OriginatingMessage { get; set; } + /// /// Gets the policies. /// From 92fdef34fb3db3be3d84515b4552b4274567ee28 Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Sat, 6 Jul 2024 16:44:32 +0100 Subject: [PATCH 7/7] docs: make it clear that you can access the span if you set it --- docs/adr/0013-publish-should-be-in-parallel.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/adr/0013-publish-should-be-in-parallel.md b/docs/adr/0013-publish-should-be-in-parallel.md index 8822c6ba9e..60761c6821 100644 --- a/docs/adr/0013-publish-should-be-in-parallel.md +++ b/docs/adr/0013-publish-should-be-in-parallel.md @@ -24,6 +24,6 @@ This will make the `Publish` method faster and more efficient. It will also mean Our `RequestContext` class was not thread-safe and we need to make it thread-safe. We could provide a copy, but as a caller may choose to inspect the `RequestContext` after the `IHandleRequests` have been called we will need to make the `RequestContext` thread-safe. This is a small change that is mostly hidden behind the property accessors of the `RequestContext` class. -We use a `ConcurrentDictionary` to store the `RequestContext Span` data against the current thread, thus meaning each thread has its own span. This does mean that the caller will not have access to that thread-specific data, but this is not a problem as the caller should not be inspecting the thread-specific `RequestContext` `Span` after the `IHandleRequests` have been called. +We use a `ConcurrentDictionary` to store the `RequestContext Span` data against the current thread, thus meaning each thread has its own span. This does mean that the caller will not have access to that thread-specific data, but this is not a problem as the caller should not be inspecting the thread-specific `RequestContext` `Span` through the property after the `IHandleRequests` have been called (most likely it provided the `Span`). We use a `ConcurrentDictionary` for the `RequestContext` `Bag` property, which is a dictionary of key-value pairs. This is thread-safe and so we do not need to make any changes to this property. \ No newline at end of file