Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,11 @@ void IObserver<IWolverineEvent>.OnNext(IWolverineEvent value)
public IHandlerPipeline Pipeline { get; } = Substitute.For<IHandlerPipeline>();
public WolverineTracker Tracker { get; } = new(NullLogger.Instance);

public Dictionary<Type, IMessageRouter> Routers { get; } = new();

public IMessageRouter RoutingFor(Type messageType)
{
return Substitute.For<IMessageRouter>();
return Routers.TryGetValue(messageType, out var router) ? router : Substitute.For<IMessageRouter>();
}

public T? TryFindExtension<T>() where T : class
Expand Down
224 changes: 181 additions & 43 deletions src/Testing/CoreTests/Runtime/Partitioning/global_partitioning_tests.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
using System.Diagnostics;
using CoreTests.Runtime;
using Microsoft.Extensions.Logging;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using Wolverine;
using Wolverine.Configuration;
using Wolverine.ComplianceTests;
using Wolverine.Runtime;
using Wolverine.Runtime.Partitioning;
using Wolverine.Runtime.Routing;
using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports;
using Wolverine.Transports.Sending;
using Xunit;

namespace CoreTests.Runtime.Partitioning;
Expand Down Expand Up @@ -361,31 +366,57 @@ public void pipeline_delegates_to_local_queue()
public class GlobalPartitionedInterceptorTests
{
private readonly IReceiver _inner;
private readonly IMessageBus _messageBus;
private readonly IListener _listener;
private readonly ILogger _logger;
private readonly WolverineOptions _options;
private readonly MockWolverineRuntime _runtime;
private readonly List<Envelope> _routedEnvelopes = new();

public GlobalPartitionedInterceptorTests()
{
_inner = Substitute.For<IReceiver>();
_messageBus = Substitute.For<IMessageBus>();
_listener = Substitute.For<IListener>();
_logger = Substitute.For<ILogger>();
_options = new WolverineOptions();
_runtime = new MockWolverineRuntime();
}

private GlobalPartitionedInterceptor CreateInterceptor(params Type[] matchingTypes)
{
var topologies = new List<GlobalPartitionedMessageTopology>();
foreach (var type in matchingTypes)
{
var topology = new GlobalPartitionedMessageTopology(_options);
var topology = new GlobalPartitionedMessageTopology(_runtime.Options);
topology.Message(type);
topologies.Add(topology);
_runtime.Options.MessagePartitioning.GlobalPartitionedTopologies.Add(topology);
}

return new GlobalPartitionedInterceptor(_inner, _messageBus, topologies, _logger);
return new GlobalPartitionedInterceptor(_inner, _runtime);
}

private void ArrangeRouter<T>(Func<DeliveryOptions?, Envelope[]>? routeFactory = null)
{
var router = Substitute.For<IMessageRouter>();
router.RouteForPublish(Arg.Any<object>(), Arg.Any<DeliveryOptions?>())
.Returns(callInfo =>
{
var message = callInfo.Arg<object>();
var options = callInfo.Arg<DeliveryOptions?>();
Envelope[] outgoing;
if (routeFactory is not null)
{
outgoing = routeFactory(options);
}
else
{
var envelope = new Envelope(message)
{
Sender = NoopSendingAgent.Instance,
Destination = NoopSendingAgent.Instance.Destination,
Status = EnvelopeStatus.Outgoing,
};
options?.Override(envelope);
outgoing = [envelope];
}
_routedEnvelopes.AddRange(outgoing);
return outgoing;
});
_runtime.Routers[typeof(T)] = router;
}

[Fact]
Expand All @@ -398,7 +429,7 @@ public async Task passes_non_matching_messages_through_to_inner_receiver()
await interceptor.ReceivedAsync(_listener, envelope);

await _inner.Received(1).ReceivedAsync(_listener, envelope);
await _messageBus.DidNotReceive().PublishAsync(Arg.Any<object>(), Arg.Any<DeliveryOptions>());
_routedEnvelopes.ShouldBeEmpty();
}

[Fact]
Expand All @@ -417,25 +448,27 @@ public async Task passes_non_matching_messages_through_via_batch()
public async Task re_publishes_matching_messages_via_message_bus()
{
var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
var message = new GlobalTestMessage("123");
ArrangeRouter<GlobalTestMessage>();

var envelope = ObjectMother.Envelope();
envelope.Message = message;
envelope.Message = new GlobalTestMessage("123");
envelope.GroupId = "group1";

await interceptor.ReceivedAsync(_listener, envelope);

await _messageBus.Received(1).PublishAsync(message, Arg.Is<DeliveryOptions>(o =>
o.GroupId == "group1"));
_routedEnvelopes.Count.ShouldBe(1);
_routedEnvelopes[0].GroupId.ShouldBe("group1");
await _inner.DidNotReceive().ReceivedAsync(_listener, envelope);
}

[Fact]
public async Task completes_intercepted_messages_on_the_listener()
{
var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
var message = new GlobalTestMessage("123");
ArrangeRouter<GlobalTestMessage>();

var envelope = ObjectMother.Envelope();
envelope.Message = message;
envelope.Message = new GlobalTestMessage("123");

await interceptor.ReceivedAsync(_listener, envelope);

Expand All @@ -446,29 +479,13 @@ public async Task completes_intercepted_messages_on_the_listener()
public async Task defers_messages_when_re_publish_fails()
{
var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
var message = new GlobalTestMessage("123");
var envelope = ObjectMother.Envelope();
envelope.Message = message;

_messageBus.PublishAsync(Arg.Any<GlobalTestMessage>(), Arg.Any<DeliveryOptions>())
.Returns(ValueTask.FromException(new Exception("Transport failure")));

await interceptor.ReceivedAsync(_listener, envelope);

await _listener.Received(1).DeferAsync(envelope);
await _listener.DidNotReceive().CompleteAsync(envelope);
}
var router = Substitute.For<IMessageRouter>();
router.RouteForPublish(Arg.Any<object>(), Arg.Any<DeliveryOptions?>())
.Throws(new Exception("Transport failure"));
_runtime.Routers[typeof(GlobalTestMessage)] = router;

[Fact]
public async Task defers_messages_when_re_publish_fails_single_envelope()
{
var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
var message = new GlobalTestMessage("123");
var envelope = ObjectMother.Envelope();
envelope.Message = message;

_messageBus.PublishAsync(Arg.Any<GlobalTestMessage>(), Arg.Any<DeliveryOptions>())
.Returns(ValueTask.FromException(new Exception("Transport failure")));
envelope.Message = new GlobalTestMessage("123");

await interceptor.ReceivedAsync(_listener, envelope);

Expand All @@ -480,6 +497,7 @@ public async Task defers_messages_when_re_publish_fails_single_envelope()
public async Task batch_splits_matching_and_non_matching()
{
var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
ArrangeRouter<GlobalTestMessage>();

var matching = ObjectMother.Envelope();
matching.Message = new GlobalTestMessage("1");
Expand All @@ -489,18 +507,16 @@ public async Task batch_splits_matching_and_non_matching()

await interceptor.ReceivedAsync(_listener, new[] { matching, nonMatching });

// Matching message should be re-published
await _messageBus.Received(1).PublishAsync(Arg.Any<GlobalTestMessage>(), Arg.Any<DeliveryOptions>());
_routedEnvelopes.Count.ShouldBe(1);
await _listener.Received(1).CompleteAsync(matching);

// Non-matching should pass through to inner
await _inner.Received(1).ReceivedAsync(_listener, Arg.Is<Envelope[]>(arr => arr.Length == 1));
}

[Fact]
public async Task does_not_call_inner_when_all_messages_are_intercepted()
{
var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
ArrangeRouter<GlobalTestMessage>();

var e1 = ObjectMother.Envelope();
e1.Message = new GlobalTestMessage("1");
Expand Down Expand Up @@ -547,8 +563,130 @@ public async Task does_not_intercept_when_message_is_null()
await interceptor.ReceivedAsync(_listener, envelope);

await _inner.Received(1).ReceivedAsync(_listener, envelope);
await _messageBus.DidNotReceive().PublishAsync(Arg.Any<object>(), Arg.Any<DeliveryOptions>());
_routedEnvelopes.ShouldBeEmpty();
}

[Fact]
public async Task does_not_copy_custom_inbound_headers_when_propagation_rule_not_configured()
{
var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
ArrangeRouter<GlobalTestMessage>();

var envelope = ObjectMother.Envelope();
envelope.Message = new GlobalTestMessage("123");
envelope.Headers["Opta-X-Metrics-Id"] = "metrics-abc";
envelope.Headers["Opta-X-Causation-Id"] = "causation-xyz";

await interceptor.ReceivedAsync(_listener, envelope);

var outgoing = _routedEnvelopes.ShouldHaveSingleItem();
// Cast disambiguates between Shouldly's IDictionary / IReadOnlyDictionary
// ShouldNotContainKey overloads (Envelope.Headers is a concrete Dictionary).
((IDictionary<string, string?>)outgoing.Headers).ShouldNotContainKey("Opta-X-Metrics-Id");
((IDictionary<string, string?>)outgoing.Headers).ShouldNotContainKey("Opta-X-Causation-Id");
}

[Fact]
public async Task copies_only_allowlisted_custom_headers_when_propagation_rule_configured()
{
_runtime.Options.Policies.PropagateIncomingHeadersToOutgoing("Opta-X-Metrics-Id");

var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
ArrangeRouter<GlobalTestMessage>(_ => [new Envelope(new GlobalTestMessage("123"))
{
Sender = NoopSendingAgent.Instance,
Destination = NoopSendingAgent.Instance.Destination,
Status = EnvelopeStatus.Outgoing,
}]);

var envelope = ObjectMother.Envelope();
envelope.Message = new GlobalTestMessage("123");
envelope.Headers["Opta-X-Metrics-Id"] = "metrics-abc";
envelope.Headers["Opta-X-Causation-Id"] = "causation-xyz";

await interceptor.ReceivedAsync(_listener, envelope);

var outgoing = _routedEnvelopes.ShouldHaveSingleItem();
outgoing.Headers["Opta-X-Metrics-Id"].ShouldBe("metrics-abc");
((IDictionary<string, string?>)outgoing.Headers).ShouldNotContainKey("Opta-X-Causation-Id");
}

[Fact]
public async Task carries_inbound_correlation_id_onto_re_routed_envelope()
{
var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
ArrangeRouter<GlobalTestMessage>();

var envelope = ObjectMother.Envelope();
envelope.Message = new GlobalTestMessage("123");
envelope.CorrelationId = "inbound-correlation";

await interceptor.ReceivedAsync(_listener, envelope);

var outgoing = _routedEnvelopes.ShouldHaveSingleItem();
outgoing.CorrelationId.ShouldBe("inbound-correlation");
}

[Fact]
public async Task carries_inbound_tenant_id_onto_re_routed_envelope()
{
var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
ArrangeRouter<GlobalTestMessage>();

var envelope = ObjectMother.Envelope();
envelope.Message = new GlobalTestMessage("123");
envelope.TenantId = "tenant-42";

await interceptor.ReceivedAsync(_listener, envelope);

var outgoing = _routedEnvelopes.ShouldHaveSingleItem();
outgoing.TenantId.ShouldBe("tenant-42");
}

[Fact]
public async Task continues_inbound_distributed_trace_on_re_routed_envelope()
{
using var listener = new ActivityListener
{
ShouldListenTo = source => source.Name == "Wolverine",
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
ActivityStarted = _ => { },
ActivityStopped = _ => { },
};
ActivitySource.AddActivityListener(listener);

var traceId = ActivityTraceId.CreateRandom();
var spanId = ActivitySpanId.CreateRandom();
var inboundParentId = $"00-{traceId}-{spanId}-01";

var interceptor = CreateInterceptor(typeof(GlobalTestMessage));
ArrangeRouter<GlobalTestMessage>();

var envelope = ObjectMother.Envelope();
envelope.Message = new GlobalTestMessage("123");
envelope.ParentId = inboundParentId;

await interceptor.ReceivedAsync(_listener, envelope);

var outgoing = _routedEnvelopes.ShouldHaveSingleItem();
outgoing.ParentId.ShouldNotBeNull();
outgoing.ParentId.ShouldContain(traceId.ToString());
}
}

internal sealed class NoopSendingAgent : ISendingAgent
{
public static readonly NoopSendingAgent Instance = new();

public Uri Destination { get; } = new("noop://test");
public Uri? ReplyUri { get; set; }
public bool Latched => false;
public bool IsDurable => false;
public bool SupportsNativeScheduledSend => false;
public Endpoint Endpoint { get; } = null!;
public DateTimeOffset LastMessageSentAt => DateTimeOffset.UtcNow;
public ValueTask EnqueueOutgoingAsync(Envelope envelope) => ValueTask.CompletedTask;
public ValueTask StoreAndForwardAsync(Envelope envelope) => ValueTask.CompletedTask;
}

#endregion
Expand Down
Loading
Loading