From f6946fcf69e2a15a33e6220fdc71476621285444 Mon Sep 17 00:00:00 2001 From: Pascal Senn Date: Wed, 10 Jun 2026 00:18:05 +0200 Subject: [PATCH 1/2] Fix late binding of route destinations --- .../Transport/MessagingTransport.Lifecyle.cs | 5 ++- .../InMemoryHandlerClaimTests.cs | 36 ++++++++++++++++++ .../RabbitMQMessageTypeExtensionTests.cs | 37 +++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) diff --git a/src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs b/src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs index f05163def42..1742cb41d6c 100644 --- a/src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs +++ b/src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs @@ -248,8 +248,11 @@ private void CreateMatchingOutboundRoute(IMessagingSetupContext context, Inbound outboundRoute.Initialize(context, outboundRouteConfiguration); } + // a route with an explicit destination is connected later through the + // destination loop, so we must not claim it here. // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract - if (outboundRoute.Endpoint is null) + if (outboundRoute.Endpoint is null + && outboundRoute.Destination is null) { var outboundEndpoint = ConnectRoute(context, outboundRoute); diff --git a/src/Mocha/test/Mocha.Transport.InMemory.Tests/InMemoryHandlerClaimTests.cs b/src/Mocha/test/Mocha.Transport.InMemory.Tests/InMemoryHandlerClaimTests.cs index c12721335a4..0f9b0bc655b 100644 --- a/src/Mocha/test/Mocha.Transport.InMemory.Tests/InMemoryHandlerClaimTests.cs +++ b/src/Mocha/test/Mocha.Transport.InMemory.Tests/InMemoryHandlerClaimTests.cs @@ -190,8 +190,44 @@ public void Handler_Should_BindToNonDefaultTransport_When_ClaimedByInMemory() e => e.Name == "order-created"); } + [Fact] + public void Send_Should_RouteToExplicitDestination_When_HandlerForSameMessageBoundExplicitly() + { + // arrange & act + // the same bus handles ProcessPayment via an explicitly bound handler endpoint and + // also sends ProcessPayment to an explicit destination queue. + var runtime = new ServiceCollection() + .AddSingleton(new MessageRecorder()) + .AddMessageBus() + .AddRequestHandler() + .AddMessage(d => d.Send(r => r.ToInMemoryQueue("my-queue"))) + .AddInMemory(t => + { + t.BindHandlersExplicitly(); + t.DeclareQueue("payment-q"); + t.DeclareQueue("my-queue"); + t.Endpoint("payment-endpoint").Queue("payment-q").Handler(); + }) + .BuildRuntime(); + + // assert - the send route resolves to the explicit destination instead of the convention endpoint + var route = runtime.Router.OutboundRoutes.Single(r => + r.Kind == OutboundRouteKind.Send && r.MessageType.RuntimeType == typeof(ProcessPayment)); + + Assert.Contains("q/my-queue", route.Endpoint.Address.ToString()); + } + public sealed class TestOrderConsumer : IConsumer { public ValueTask ConsumeAsync(IConsumeContext context) => default; } + + public sealed class ProcessPaymentHandler(MessageRecorder recorder) : IEventRequestHandler + { + public ValueTask HandleAsync(ProcessPayment request, CancellationToken cancellationToken) + { + recorder.Record(request); + return default; + } + } } diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQMessageTypeExtensionTests.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQMessageTypeExtensionTests.cs index 2b422d01f3e..5f46f264619 100644 --- a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQMessageTypeExtensionTests.cs +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQMessageTypeExtensionTests.cs @@ -69,6 +69,43 @@ public void ToRabbitMQExchange_Should_SetDestinationUri_When_CustomSchema() Assert.StartsWith("custom:", endpoint.Address.ToString()); } + [Fact] + public void ToRabbitMQQueue_Should_RouteToDestination_When_HandlerForSameMessageBoundExplicitly() + { + // arrange & act + // the same bus handles ProcessPayment via an explicitly bound handler endpoint and + // also sends ProcessPayment to an explicit destination queue. + var services = new ServiceCollection(); + services.AddSingleton(new MessageRecorder()); + var runtime = services + .AddMessageBus() + .AddRequestHandler() + .AddMessage(d => d.Send(r => r.ToRabbitMQQueue("my-queue"))) + .AddRabbitMQ(t => + { + t.ConnectionProvider(_ => new StubConnectionProvider()); + t.BindHandlersExplicitly(); + t.AutoProvision(false); + t.Endpoint("payment-endpoint").Queue("payment-q").Handler(); + }) + .BuildRuntime(); + + // assert - the send route resolves to q/my-queue instead of the convention exchange e/process-payment + var route = runtime.Router.OutboundRoutes.Single(r => + r.Kind == OutboundRouteKind.Send && r.MessageType.RuntimeType == typeof(ProcessPayment)); + + Assert.Contains("q/my-queue", route.Endpoint.Address.ToString()); + } + + public sealed class ProcessPaymentHandler(MessageRecorder recorder) : IEventRequestHandler + { + public ValueTask HandleAsync(ProcessPayment request, CancellationToken cancellationToken) + { + recorder.Record(request); + return default; + } + } + private static MessagingRuntime CreateRuntime(Action configure, string? schema = null) { var services = new ServiceCollection(); From f0178eff5f194e05861f28d61d43a663d8574b66 Mon Sep 17 00:00:00 2001 From: Pascal Senn Date: Wed, 10 Jun 2026 00:38:06 +0200 Subject: [PATCH 2/2] cleanup --- .../src/Mocha/Transport/MessagingTransport.Lifecyle.cs | 7 +++---- .../InMemoryHandlerClaimTests.cs | 4 +--- .../Topology/RabbitMQMessageTypeExtensionTests.cs | 4 +--- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs b/src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs index 1742cb41d6c..1d76dab9487 100644 --- a/src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs +++ b/src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs @@ -204,7 +204,8 @@ internal void DiscoverEndpoints(IMessagingSetupContext context) foreach (var route in router.OutboundRoutes) { // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract - if (route.Endpoint is null) + if (route.Endpoint is null + && route.Destination is null) { ConnectRoute(context, route); } @@ -254,9 +255,7 @@ private void CreateMatchingOutboundRoute(IMessagingSetupContext context, Inbound if (outboundRoute.Endpoint is null && outboundRoute.Destination is null) { - var outboundEndpoint = ConnectRoute(context, outboundRoute); - - outboundRoute.ConnectEndpoint(context, outboundEndpoint); + ConnectRoute(context, outboundRoute); } } } diff --git a/src/Mocha/test/Mocha.Transport.InMemory.Tests/InMemoryHandlerClaimTests.cs b/src/Mocha/test/Mocha.Transport.InMemory.Tests/InMemoryHandlerClaimTests.cs index 0f9b0bc655b..54d2bdfcd93 100644 --- a/src/Mocha/test/Mocha.Transport.InMemory.Tests/InMemoryHandlerClaimTests.cs +++ b/src/Mocha/test/Mocha.Transport.InMemory.Tests/InMemoryHandlerClaimTests.cs @@ -197,7 +197,6 @@ public void Send_Should_RouteToExplicitDestination_When_HandlerForSameMessageBou // the same bus handles ProcessPayment via an explicitly bound handler endpoint and // also sends ProcessPayment to an explicit destination queue. var runtime = new ServiceCollection() - .AddSingleton(new MessageRecorder()) .AddMessageBus() .AddRequestHandler() .AddMessage(d => d.Send(r => r.ToInMemoryQueue("my-queue"))) @@ -222,11 +221,10 @@ public sealed class TestOrderConsumer : IConsumer public ValueTask ConsumeAsync(IConsumeContext context) => default; } - public sealed class ProcessPaymentHandler(MessageRecorder recorder) : IEventRequestHandler + public sealed class ProcessPaymentHandler : IEventRequestHandler { public ValueTask HandleAsync(ProcessPayment request, CancellationToken cancellationToken) { - recorder.Record(request); return default; } } diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQMessageTypeExtensionTests.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQMessageTypeExtensionTests.cs index 5f46f264619..2cc7b31867d 100644 --- a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQMessageTypeExtensionTests.cs +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Topology/RabbitMQMessageTypeExtensionTests.cs @@ -76,7 +76,6 @@ public void ToRabbitMQQueue_Should_RouteToDestination_When_HandlerForSameMessage // the same bus handles ProcessPayment via an explicitly bound handler endpoint and // also sends ProcessPayment to an explicit destination queue. var services = new ServiceCollection(); - services.AddSingleton(new MessageRecorder()); var runtime = services .AddMessageBus() .AddRequestHandler() @@ -97,11 +96,10 @@ public void ToRabbitMQQueue_Should_RouteToDestination_When_HandlerForSameMessage Assert.Contains("q/my-queue", route.Endpoint.Address.ToString()); } - public sealed class ProcessPaymentHandler(MessageRecorder recorder) : IEventRequestHandler + public sealed class ProcessPaymentHandler : IEventRequestHandler { public ValueTask HandleAsync(ProcessPayment request, CancellationToken cancellationToken) { - recorder.Record(request); return default; } }