Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/Mocha/src/Mocha/Transport/MessagingTransport.Lifecyle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ internal void DiscoverEndpoints(IMessagingSetupContext context)
foreach (var route in router.OutboundRoutes)
{
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (route.Endpoint is null)
if (route.Endpoint is null
&& route.Destination is null)
{
ConnectRoute(context, route);
}
Expand Down Expand Up @@ -282,12 +283,13 @@ private void CreateMatchingOutboundRoute(IMessagingSetupContext context, Inbound
outboundRoute.Initialize(context, outboundRouteConfiguration);
}

// a route with an explicit destination is connected later through the
// destination loop, so we must not claim it here.
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (outboundRoute.Endpoint is null)
if (outboundRoute.Endpoint is null
&& outboundRoute.Destination is null)
Comment thread
PascalSenn marked this conversation as resolved.
{
var outboundEndpoint = ConnectRoute(context, outboundRoute);

outboundRoute.ConnectEndpoint(context, outboundEndpoint);
ConnectRoute(context, outboundRoute);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,42 @@ public void Handler_Should_BindToNonDefaultTransport_When_ClaimedByInMemory()
e => e.Name == "order-created");
}

[Fact]
public void Send_Should_RouteToExplicitDestination_When_HandlerForSameMessageBoundExplicitly()
{
// arrange & act
// the same bus handles ProcessPayment via an explicitly bound handler endpoint and
// also sends ProcessPayment to an explicit destination queue.
var runtime = new ServiceCollection()
.AddMessageBus()
.AddRequestHandler<ProcessPaymentHandler>()
.AddMessage<ProcessPayment>(d => d.Send(r => r.ToInMemoryQueue("my-queue")))
.AddInMemory(t =>
{
t.BindHandlersExplicitly();
t.DeclareQueue("payment-q");
t.DeclareQueue("my-queue");
t.Endpoint("payment-endpoint").Queue("payment-q").Handler<ProcessPaymentHandler>();
})
.BuildRuntime();

// assert - the send route resolves to the explicit destination instead of the convention endpoint
var route = runtime.Router.OutboundRoutes.Single(r =>
r.Kind == OutboundRouteKind.Send && r.MessageType.RuntimeType == typeof(ProcessPayment));

Assert.Contains("q/my-queue", route.Endpoint.Address.ToString());
}

public sealed class TestOrderConsumer : IConsumer<OrderCreated>
{
public ValueTask ConsumeAsync(IConsumeContext<OrderCreated> context) => default;
}

public sealed class ProcessPaymentHandler : IEventRequestHandler<ProcessPayment>
{
public ValueTask HandleAsync(ProcessPayment request, CancellationToken cancellationToken)
{
return default;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,41 @@ public void ToRabbitMQExchange_Should_SetDestinationUri_When_CustomSchema()
Assert.StartsWith("custom:", endpoint.Address.ToString());
}

[Fact]
public void ToRabbitMQQueue_Should_RouteToDestination_When_HandlerForSameMessageBoundExplicitly()
{
// arrange & act
// the same bus handles ProcessPayment via an explicitly bound handler endpoint and
// also sends ProcessPayment to an explicit destination queue.
var services = new ServiceCollection();
var runtime = services
.AddMessageBus()
.AddRequestHandler<ProcessPaymentHandler>()
.AddMessage<ProcessPayment>(d => d.Send(r => r.ToRabbitMQQueue("my-queue")))
.AddRabbitMQ(t =>
{
t.ConnectionProvider(_ => new StubConnectionProvider());
t.BindHandlersExplicitly();
t.AutoProvision(false);
t.Endpoint("payment-endpoint").Queue("payment-q").Handler<ProcessPaymentHandler>();
})
.BuildRuntime();

// assert - the send route resolves to q/my-queue instead of the convention exchange e/process-payment
var route = runtime.Router.OutboundRoutes.Single(r =>
r.Kind == OutboundRouteKind.Send && r.MessageType.RuntimeType == typeof(ProcessPayment));

Assert.Contains("q/my-queue", route.Endpoint.Address.ToString());
}

public sealed class ProcessPaymentHandler : IEventRequestHandler<ProcessPayment>
{
public ValueTask HandleAsync(ProcessPayment request, CancellationToken cancellationToken)
{
return default;
}
}

private static MessagingRuntime CreateRuntime(Action<IMessageBusHostBuilder> configure, string? schema = null)
{
var services = new ServiceCollection();
Expand Down
Loading