diff --git a/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs b/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs index e688f50f4..0d77fa81e 100644 --- a/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs +++ b/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs @@ -95,9 +95,11 @@ void IObserver.OnNext(IWolverineEvent value) public IHandlerPipeline Pipeline { get; } = Substitute.For(); public WolverineTracker Tracker { get; } = new(NullLogger.Instance); + public Dictionary Routers { get; } = new(); + public IMessageRouter RoutingFor(Type messageType) { - return Substitute.For(); + return Routers.TryGetValue(messageType, out var router) ? router : Substitute.For(); } public T? TryFindExtension() where T : class diff --git a/src/Testing/CoreTests/Runtime/Partitioning/global_partitioning_tests.cs b/src/Testing/CoreTests/Runtime/Partitioning/global_partitioning_tests.cs index 42a7a66d2..cab82e4b6 100644 --- a/src/Testing/CoreTests/Runtime/Partitioning/global_partitioning_tests.cs +++ b/src/Testing/CoreTests/Runtime/Partitioning/global_partitioning_tests.cs @@ -1,12 +1,17 @@ +using System.Diagnostics; +using CoreTests.Runtime; using Microsoft.Extensions.Logging; using NSubstitute; using NSubstitute.ExceptionExtensions; +using Wolverine; using Wolverine.Configuration; using Wolverine.ComplianceTests; using Wolverine.Runtime; using Wolverine.Runtime.Partitioning; +using Wolverine.Runtime.Routing; using Wolverine.Runtime.WorkerQueues; using Wolverine.Transports; +using Wolverine.Transports.Sending; using Xunit; namespace CoreTests.Runtime.Partitioning; @@ -361,31 +366,57 @@ public void pipeline_delegates_to_local_queue() public class GlobalPartitionedInterceptorTests { private readonly IReceiver _inner; - private readonly IMessageBus _messageBus; private readonly IListener _listener; - private readonly ILogger _logger; - private readonly WolverineOptions _options; + private readonly MockWolverineRuntime _runtime; + private readonly List _routedEnvelopes = new(); public GlobalPartitionedInterceptorTests() { _inner = Substitute.For(); - _messageBus = Substitute.For(); _listener = Substitute.For(); - _logger = Substitute.For(); - _options = new WolverineOptions(); + _runtime = new MockWolverineRuntime(); } private GlobalPartitionedInterceptor CreateInterceptor(params Type[] matchingTypes) { - var topologies = new List(); foreach (var type in matchingTypes) { - var topology = new GlobalPartitionedMessageTopology(_options); + var topology = new GlobalPartitionedMessageTopology(_runtime.Options); topology.Message(type); - topologies.Add(topology); + _runtime.Options.MessagePartitioning.GlobalPartitionedTopologies.Add(topology); } - return new GlobalPartitionedInterceptor(_inner, _messageBus, topologies, _logger); + return new GlobalPartitionedInterceptor(_inner, _runtime); + } + + private void ArrangeRouter(Func? routeFactory = null) + { + var router = Substitute.For(); + router.RouteForPublish(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + var message = callInfo.Arg(); + var options = callInfo.Arg(); + Envelope[] outgoing; + if (routeFactory is not null) + { + outgoing = routeFactory(options); + } + else + { + var envelope = new Envelope(message) + { + Sender = NoopSendingAgent.Instance, + Destination = NoopSendingAgent.Instance.Destination, + Status = EnvelopeStatus.Outgoing, + }; + options?.Override(envelope); + outgoing = [envelope]; + } + _routedEnvelopes.AddRange(outgoing); + return outgoing; + }); + _runtime.Routers[typeof(T)] = router; } [Fact] @@ -398,7 +429,7 @@ public async Task passes_non_matching_messages_through_to_inner_receiver() await interceptor.ReceivedAsync(_listener, envelope); await _inner.Received(1).ReceivedAsync(_listener, envelope); - await _messageBus.DidNotReceive().PublishAsync(Arg.Any(), Arg.Any()); + _routedEnvelopes.ShouldBeEmpty(); } [Fact] @@ -417,15 +448,16 @@ public async Task passes_non_matching_messages_through_via_batch() public async Task re_publishes_matching_messages_via_message_bus() { var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); - var message = new GlobalTestMessage("123"); + ArrangeRouter(); + var envelope = ObjectMother.Envelope(); - envelope.Message = message; + envelope.Message = new GlobalTestMessage("123"); envelope.GroupId = "group1"; await interceptor.ReceivedAsync(_listener, envelope); - await _messageBus.Received(1).PublishAsync(message, Arg.Is(o => - o.GroupId == "group1")); + _routedEnvelopes.Count.ShouldBe(1); + _routedEnvelopes[0].GroupId.ShouldBe("group1"); await _inner.DidNotReceive().ReceivedAsync(_listener, envelope); } @@ -433,9 +465,10 @@ await _messageBus.Received(1).PublishAsync(message, Arg.Is(o => public async Task completes_intercepted_messages_on_the_listener() { var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); - var message = new GlobalTestMessage("123"); + ArrangeRouter(); + var envelope = ObjectMother.Envelope(); - envelope.Message = message; + envelope.Message = new GlobalTestMessage("123"); await interceptor.ReceivedAsync(_listener, envelope); @@ -446,29 +479,13 @@ public async Task completes_intercepted_messages_on_the_listener() public async Task defers_messages_when_re_publish_fails() { var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); - var message = new GlobalTestMessage("123"); - var envelope = ObjectMother.Envelope(); - envelope.Message = message; - - _messageBus.PublishAsync(Arg.Any(), Arg.Any()) - .Returns(ValueTask.FromException(new Exception("Transport failure"))); - - await interceptor.ReceivedAsync(_listener, envelope); - - await _listener.Received(1).DeferAsync(envelope); - await _listener.DidNotReceive().CompleteAsync(envelope); - } + var router = Substitute.For(); + router.RouteForPublish(Arg.Any(), Arg.Any()) + .Throws(new Exception("Transport failure")); + _runtime.Routers[typeof(GlobalTestMessage)] = router; - [Fact] - public async Task defers_messages_when_re_publish_fails_single_envelope() - { - var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); - var message = new GlobalTestMessage("123"); var envelope = ObjectMother.Envelope(); - envelope.Message = message; - - _messageBus.PublishAsync(Arg.Any(), Arg.Any()) - .Returns(ValueTask.FromException(new Exception("Transport failure"))); + envelope.Message = new GlobalTestMessage("123"); await interceptor.ReceivedAsync(_listener, envelope); @@ -480,6 +497,7 @@ public async Task defers_messages_when_re_publish_fails_single_envelope() public async Task batch_splits_matching_and_non_matching() { var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); + ArrangeRouter(); var matching = ObjectMother.Envelope(); matching.Message = new GlobalTestMessage("1"); @@ -489,11 +507,8 @@ public async Task batch_splits_matching_and_non_matching() await interceptor.ReceivedAsync(_listener, new[] { matching, nonMatching }); - // Matching message should be re-published - await _messageBus.Received(1).PublishAsync(Arg.Any(), Arg.Any()); + _routedEnvelopes.Count.ShouldBe(1); await _listener.Received(1).CompleteAsync(matching); - - // Non-matching should pass through to inner await _inner.Received(1).ReceivedAsync(_listener, Arg.Is(arr => arr.Length == 1)); } @@ -501,6 +516,7 @@ public async Task batch_splits_matching_and_non_matching() public async Task does_not_call_inner_when_all_messages_are_intercepted() { var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); + ArrangeRouter(); var e1 = ObjectMother.Envelope(); e1.Message = new GlobalTestMessage("1"); @@ -547,8 +563,130 @@ public async Task does_not_intercept_when_message_is_null() await interceptor.ReceivedAsync(_listener, envelope); await _inner.Received(1).ReceivedAsync(_listener, envelope); - await _messageBus.DidNotReceive().PublishAsync(Arg.Any(), Arg.Any()); + _routedEnvelopes.ShouldBeEmpty(); + } + + [Fact] + public async Task does_not_copy_custom_inbound_headers_when_propagation_rule_not_configured() + { + var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); + ArrangeRouter(); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new GlobalTestMessage("123"); + envelope.Headers["Opta-X-Metrics-Id"] = "metrics-abc"; + envelope.Headers["Opta-X-Causation-Id"] = "causation-xyz"; + + await interceptor.ReceivedAsync(_listener, envelope); + + var outgoing = _routedEnvelopes.ShouldHaveSingleItem(); + // Cast disambiguates between Shouldly's IDictionary / IReadOnlyDictionary + // ShouldNotContainKey overloads (Envelope.Headers is a concrete Dictionary). + ((IDictionary)outgoing.Headers).ShouldNotContainKey("Opta-X-Metrics-Id"); + ((IDictionary)outgoing.Headers).ShouldNotContainKey("Opta-X-Causation-Id"); + } + + [Fact] + public async Task copies_only_allowlisted_custom_headers_when_propagation_rule_configured() + { + _runtime.Options.Policies.PropagateIncomingHeadersToOutgoing("Opta-X-Metrics-Id"); + + var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); + ArrangeRouter(_ => [new Envelope(new GlobalTestMessage("123")) + { + Sender = NoopSendingAgent.Instance, + Destination = NoopSendingAgent.Instance.Destination, + Status = EnvelopeStatus.Outgoing, + }]); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new GlobalTestMessage("123"); + envelope.Headers["Opta-X-Metrics-Id"] = "metrics-abc"; + envelope.Headers["Opta-X-Causation-Id"] = "causation-xyz"; + + await interceptor.ReceivedAsync(_listener, envelope); + + var outgoing = _routedEnvelopes.ShouldHaveSingleItem(); + outgoing.Headers["Opta-X-Metrics-Id"].ShouldBe("metrics-abc"); + ((IDictionary)outgoing.Headers).ShouldNotContainKey("Opta-X-Causation-Id"); } + + [Fact] + public async Task carries_inbound_correlation_id_onto_re_routed_envelope() + { + var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); + ArrangeRouter(); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new GlobalTestMessage("123"); + envelope.CorrelationId = "inbound-correlation"; + + await interceptor.ReceivedAsync(_listener, envelope); + + var outgoing = _routedEnvelopes.ShouldHaveSingleItem(); + outgoing.CorrelationId.ShouldBe("inbound-correlation"); + } + + [Fact] + public async Task carries_inbound_tenant_id_onto_re_routed_envelope() + { + var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); + ArrangeRouter(); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new GlobalTestMessage("123"); + envelope.TenantId = "tenant-42"; + + await interceptor.ReceivedAsync(_listener, envelope); + + var outgoing = _routedEnvelopes.ShouldHaveSingleItem(); + outgoing.TenantId.ShouldBe("tenant-42"); + } + + [Fact] + public async Task continues_inbound_distributed_trace_on_re_routed_envelope() + { + using var listener = new ActivityListener + { + ShouldListenTo = source => source.Name == "Wolverine", + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, + ActivityStarted = _ => { }, + ActivityStopped = _ => { }, + }; + ActivitySource.AddActivityListener(listener); + + var traceId = ActivityTraceId.CreateRandom(); + var spanId = ActivitySpanId.CreateRandom(); + var inboundParentId = $"00-{traceId}-{spanId}-01"; + + var interceptor = CreateInterceptor(typeof(GlobalTestMessage)); + ArrangeRouter(); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new GlobalTestMessage("123"); + envelope.ParentId = inboundParentId; + + await interceptor.ReceivedAsync(_listener, envelope); + + var outgoing = _routedEnvelopes.ShouldHaveSingleItem(); + outgoing.ParentId.ShouldNotBeNull(); + outgoing.ParentId.ShouldContain(traceId.ToString()); + } +} + +internal sealed class NoopSendingAgent : ISendingAgent +{ + public static readonly NoopSendingAgent Instance = new(); + + public Uri Destination { get; } = new("noop://test"); + public Uri? ReplyUri { get; set; } + public bool Latched => false; + public bool IsDurable => false; + public bool SupportsNativeScheduledSend => false; + public Endpoint Endpoint { get; } = null!; + public DateTimeOffset LastMessageSentAt => DateTimeOffset.UtcNow; + public ValueTask EnqueueOutgoingAsync(Envelope envelope) => ValueTask.CompletedTask; + public ValueTask StoreAndForwardAsync(Envelope envelope) => ValueTask.CompletedTask; } #endregion diff --git a/src/Wolverine/Runtime/Partitioning/GlobalPartitionedInterceptor.cs b/src/Wolverine/Runtime/Partitioning/GlobalPartitionedInterceptor.cs index a541cdabb..d2cdbf177 100644 --- a/src/Wolverine/Runtime/Partitioning/GlobalPartitionedInterceptor.cs +++ b/src/Wolverine/Runtime/Partitioning/GlobalPartitionedInterceptor.cs @@ -1,3 +1,5 @@ +using System.Diagnostics; +using JasperFx.Core; using Microsoft.Extensions.Logging; using Wolverine.Runtime.WorkerQueues; using Wolverine.Transports; @@ -12,17 +14,16 @@ namespace Wolverine.Runtime.Partitioning; internal class GlobalPartitionedInterceptor : IReceiver { private readonly IReceiver _inner; - private readonly IMessageBus _messageBus; + private readonly IWolverineRuntime _runtime; private readonly List _topologies; private readonly ILogger _logger; - public GlobalPartitionedInterceptor(IReceiver inner, IMessageBus messageBus, - List topologies, ILogger logger) + public GlobalPartitionedInterceptor(IReceiver inner, IWolverineRuntime runtime) { _inner = inner; - _messageBus = messageBus; - _topologies = topologies; - _logger = logger; + _runtime = runtime; + _topologies = runtime.Options.MessagePartitioning.GlobalPartitionedTopologies; + _logger = runtime.LoggerFactory.CreateLogger(); } public IHandlerPipeline Pipeline => _inner.Pipeline; @@ -81,12 +82,24 @@ private async Task TryReRouteAsync(IListener listener, Envelope envelope) } } - // Re-route through Wolverine's routing which will hit GlobalPartitionedRoute - await _messageBus.PublishAsync(envelope.Message!, new DeliveryOptions + // The bus is seeded with the inbound envelope so PropagateHeadersRule and + // every other IMetadataRule.ApplyCorrelation impl can read the originator + // when they enrich each outbound envelope — same shape as the publish path + // inside a regular handler. Context-correlation field copying (CorrelationId, + // ConversationId, TenantId, UserName, ParentId, SagaId) happens in the bus's + // overridden TrackEnvelopeCorrelation via Envelope.CopyContextCorrelationFrom, + // so we don't need to re-state any of those fields on DeliveryOptions here — + // GroupId is the only piece the routing layer itself needs to read. + var options = new DeliveryOptions { GroupId = envelope.GroupId, - TenantId = envelope.TenantId - }); + }; + + var bus = new RouteBus(_runtime, envelope); + + using var activity = StartReRouteActivity(envelope); + + await bus.PublishAsync(envelope.Message!, options); await listener.CompleteAsync(envelope); return true; } @@ -99,6 +112,19 @@ private async Task TryReRouteAsync(IListener listener, Envelope envelope) } } + private static Activity? StartReRouteActivity(Envelope envelope) + { + if (envelope.ParentId.IsEmpty()) + { + return null; + } + + return WolverineTracing.ActivitySource.StartActivity( + "wolverine global-partitioning re-route", + ActivityKind.Internal, + envelope.ParentId); + } + public ValueTask DrainAsync() => _inner.DrainAsync(); public void Dispose() => _inner.Dispose(); @@ -119,4 +145,54 @@ private bool ShouldIntercept(Envelope envelope) return false; } + + /// + /// MessageBus subclass used only by the global-partitioning re-route path. + /// Two responsibilities: + /// + /// Seed with the inbound envelope so + /// implementations such as + /// PropagateHeadersRule see originator.Envelope when they enrich + /// the outbound envelopes — without this seed those rules short-circuit and the + /// PropagateIncomingHeadersToOutgoing allowlist is silently ignored for + /// globally-partitioned messages. + /// Override so each + /// re-routed outbound envelope inherits the inbound's full context-correlation + /// set (CorrelationId, ConversationId, TenantId, + /// UserName, ParentId, SagaId) via + /// — the same forwarding + /// semantics already used by ScheduledSendEnvelopeHandler for unwrapped + /// scheduled sends and by TrackedSession.ReplayAll for replayed envelopes. + /// The base TrackEnvelopeCorrelation only pulls CorrelationId / + /// TenantId / UserName from the bus's own properties and writes a + /// fresh ParentId from Activity.Current, which is the wrong shape + /// for a forwarded envelope: ConversationId would restart, SagaId + /// would drop, and the trace would re-root at the interceptor hop instead of + /// continuing the inbound's parent. + /// + /// + private sealed class RouteBus : MessageBus + { + private readonly Envelope _inbound; + + public RouteBus(IWolverineRuntime runtime, Envelope inbound) : base(runtime) + { + _inbound = inbound; + Envelope = inbound; + } + + internal override void TrackEnvelopeCorrelation(Envelope outbound, Activity? activity) + { + // Preserve any per-message Source override (e.g. CloudEvents producer + // setting a spec-valid `source` URI) before the inherited copy stamps + // the application service name as a default. + if (outbound.Source.IsEmpty()) + { + outbound.Source = Runtime.Options.ServiceName; + } + + outbound.CopyContextCorrelationFrom(_inbound); + outbound.Store = Storage; + } + } } diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index aa47a2fc1..eb2537c5b 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -316,11 +316,7 @@ public async ValueTask StartAsync() && !Endpoint.UsedInShardedTopology && Endpoint.Uri.Scheme != "local") { - _receiver = new GlobalPartitionedInterceptor( - _receiver, - new Runtime.MessageBus(_runtime), - _runtime.Options.MessagePartitioning.GlobalPartitionedTopologies, - _logger); + _receiver = new GlobalPartitionedInterceptor(_receiver, _runtime); } if (Endpoint.ListenerCount > 1)