diff --git a/src/Mocha/src/Mocha/Consumers/Extensions/ConsumeContextExtensions.cs b/src/Mocha/src/Mocha/Consumers/Extensions/ConsumeContextExtensions.cs
index dbd65a2fa3c..19919134be5 100644
--- a/src/Mocha/src/Mocha/Consumers/Extensions/ConsumeContextExtensions.cs
+++ b/src/Mocha/src/Mocha/Consumers/Extensions/ConsumeContextExtensions.cs
@@ -11,8 +11,9 @@ internal static class ConsumeContextExtensions
/// Creates reply options from the incoming message metadata when a response channel is available.
///
///
- /// Correlation id and headers are copied so replies/faults remain linked to the original request
- /// and downstream workflows (for example saga headers) keep working.
+ /// Headers are copied so replies and faults remain linked to the original request and downstream
+ /// workflows (for example saga headers) keep working. The correlation id is echoed when present,
+ /// so callers that correlate by a different mechanism (such as a saga header) are still supported.
///
public static bool TryCreateResponseOptions(this IConsumeContext context, out ReplyOptions options)
{
@@ -23,15 +24,10 @@ public static bool TryCreateResponseOptions(this IConsumeContext context, out Re
return false;
}
- if (context.CorrelationId is not { } correlationId)
- {
- return false;
- }
-
options = new ReplyOptions
{
Headers = [],
- CorrelationId = correlationId,
+ CorrelationId = context.CorrelationId,
ConversationId = context.ConversationId,
ReplyAddress = replyTo
};
diff --git a/src/Mocha/src/Mocha/Descriptions/InboundRouteDescription.cs b/src/Mocha/src/Mocha/Descriptions/InboundRouteDescription.cs
index c88dc61514e..06726918a4b 100644
--- a/src/Mocha/src/Mocha/Descriptions/InboundRouteDescription.cs
+++ b/src/Mocha/src/Mocha/Descriptions/InboundRouteDescription.cs
@@ -3,12 +3,14 @@ namespace Mocha;
///
/// Describes an inbound route binding a message type to a consumer and endpoint.
///
-/// The kind of inbound route (subscribe, send, or request).
+/// The kind of inbound route (subscribe, send, request, or reply).
/// The identity string of the message type, or null if unknown.
/// The name of the consumer handling messages on this route, or null if unbound.
+/// The condition that decides whether this route selects its consumer for a received message.
/// The endpoint reference, or null if not yet assigned.
public sealed record InboundRouteDescription(
InboundRouteKind Kind,
string? MessageTypeIdentity,
string? ConsumerName,
+ RouteConditionDescription Condition,
EndpointReferenceDescription? Endpoint);
diff --git a/src/Mocha/src/Mocha/Descriptions/RouteConditionDescription.cs b/src/Mocha/src/Mocha/Descriptions/RouteConditionDescription.cs
new file mode 100644
index 00000000000..848b7c1b253
--- /dev/null
+++ b/src/Mocha/src/Mocha/Descriptions/RouteConditionDescription.cs
@@ -0,0 +1,15 @@
+namespace Mocha;
+
+///
+/// Describes the match condition that determines whether an inbound route selects its consumer for a
+/// received message, for diagnostic and visualization purposes.
+///
+/// The kind of condition, such as the message type rule, a header rule, or a composite.
+///
+/// A condition specific detail, such as a message type identity or a header key, or null if not applicable.
+///
+/// The nested conditions of a composite condition, or an empty list for a leaf condition.
+public sealed record RouteConditionDescription(
+ string Kind,
+ string? Detail,
+ IReadOnlyList Children);
diff --git a/src/Mocha/src/Mocha/MessageTypes/Conditions/AndCondition.cs b/src/Mocha/src/Mocha/MessageTypes/Conditions/AndCondition.cs
new file mode 100644
index 00000000000..bf304422d75
--- /dev/null
+++ b/src/Mocha/src/Mocha/MessageTypes/Conditions/AndCondition.cs
@@ -0,0 +1,43 @@
+using System.Collections.Immutable;
+using Mocha.Middlewares;
+
+namespace Mocha;
+
+///
+/// Matches only when all of its child conditions match.
+///
+///
+/// Initializes a new instance of the class.
+///
+/// The child conditions that must all match.
+internal sealed class AndCondition(ImmutableArray conditions) : RouteCondition
+{
+ ///
+ public override void Initialize(IMessagingConfigurationContext context)
+ {
+ foreach (var condition in conditions)
+ {
+ condition.Initialize(context);
+ }
+ }
+
+ ///
+ public override bool Matches(IReceiveContext context)
+ {
+ foreach (var condition in conditions)
+ {
+ if (!condition.Matches(context))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ ///
+ public override RouteConditionDescription Describe()
+ => new("And", null, [.. conditions.Select(static c => c.Describe())]);
+
+ public static AndCondition Create(params ReadOnlySpan conditions) => new([.. conditions]);
+}
diff --git a/src/Mocha/src/Mocha/MessageTypes/Conditions/HeaderPresentCondition.cs b/src/Mocha/src/Mocha/MessageTypes/Conditions/HeaderPresentCondition.cs
new file mode 100644
index 00000000000..1eed87b861d
--- /dev/null
+++ b/src/Mocha/src/Mocha/MessageTypes/Conditions/HeaderPresentCondition.cs
@@ -0,0 +1,19 @@
+using Mocha.Middlewares;
+
+namespace Mocha;
+
+///
+/// Matches when the received message carries a header for a given key, regardless of its value.
+///
+/// The typed key of the header that must be present.
+/// The type of the header value.
+internal sealed class HeaderPresentCondition(ContextDataKey key) : RouteCondition
+{
+ ///
+ public override bool Matches(IReceiveContext context)
+ => context.Headers.TryGet(key, out _);
+
+ ///
+ public override RouteConditionDescription Describe()
+ => new("HeaderPresent", key.Key, []);
+}
diff --git a/src/Mocha/src/Mocha/MessageTypes/Conditions/MessageTypeCondition.cs b/src/Mocha/src/Mocha/MessageTypes/Conditions/MessageTypeCondition.cs
new file mode 100644
index 00000000000..6c1213acb61
--- /dev/null
+++ b/src/Mocha/src/Mocha/MessageTypes/Conditions/MessageTypeCondition.cs
@@ -0,0 +1,67 @@
+using Mocha.Middlewares;
+
+namespace Mocha;
+
+///
+/// Matches when the received message type is the route's message type or encloses it through its type
+/// hierarchy, so handlers registered for base contracts can receive derived messages. When the route
+/// is optional and the received message has no resolved message type, the condition still matches so
+/// the route can select on other terms.
+///
+internal sealed class MessageTypeCondition : RouteCondition
+{
+ private readonly Type _eventType;
+ private readonly bool _optional;
+ private MessageType? _messageType;
+
+ ///
+ /// Initializes a new instance of the class for the given CLR
+ /// message type. The message type is resolved against the registry in .
+ ///
+ /// The CLR type of the message the route handles.
+ ///
+ /// When true, the condition matches a received message that has no resolved message type.
+ ///
+ public MessageTypeCondition(Type eventType, bool optional = false)
+ {
+ _eventType = eventType;
+ _optional = optional;
+ }
+
+ ///
+ /// Initializes a new instance of the class from an already
+ /// resolved message type.
+ ///
+ /// The resolved message type the route handles.
+ ///
+ /// When true, the condition matches a received message that has no resolved message type.
+ ///
+ public MessageTypeCondition(MessageType messageType, bool optional = false)
+ {
+ _eventType = messageType.RuntimeType;
+ _messageType = messageType;
+ _optional = optional;
+ }
+
+ public MessageType? MessageType => _messageType;
+
+ ///
+ public override void Initialize(IMessagingConfigurationContext context)
+ => _messageType ??= context.Messages.GetOrAdd(context, _eventType);
+
+ ///
+ public override bool Matches(IReceiveContext context)
+ {
+ if (context.MessageType is not { } mt)
+ {
+ return _optional;
+ }
+
+ return _messageType is { } messageType
+ && (mt == messageType || mt.EnclosedMessageTypes.Contains(messageType));
+ }
+
+ ///
+ public override RouteConditionDescription Describe()
+ => new("MessageType", _messageType?.Identity, []);
+}
diff --git a/src/Mocha/src/Mocha/MessageTypes/Conditions/NoMatchCondition.cs b/src/Mocha/src/Mocha/MessageTypes/Conditions/NoMatchCondition.cs
new file mode 100644
index 00000000000..ff91332aeb6
--- /dev/null
+++ b/src/Mocha/src/Mocha/MessageTypes/Conditions/NoMatchCondition.cs
@@ -0,0 +1,26 @@
+using Mocha.Middlewares;
+
+namespace Mocha;
+
+///
+/// Never matches. Assigned to the reply route that has no message type so the generic receive router
+/// has a non-null condition to evaluate while keeping the route inert.
+///
+internal sealed class NoMatchCondition : RouteCondition
+{
+ private NoMatchCondition()
+ {
+ }
+
+ ///
+ public override bool Matches(IReceiveContext context) => false;
+
+ ///
+ public override RouteConditionDescription Describe()
+ => new("NoMatch", null, []);
+
+ ///
+ /// Gets the shared instance of the .
+ ///
+ public static NoMatchCondition Instance { get; } = new();
+}
diff --git a/src/Mocha/src/Mocha/MessageTypes/Conditions/RouteCondition.cs b/src/Mocha/src/Mocha/MessageTypes/Conditions/RouteCondition.cs
new file mode 100644
index 00000000000..248b377a490
--- /dev/null
+++ b/src/Mocha/src/Mocha/MessageTypes/Conditions/RouteCondition.cs
@@ -0,0 +1,33 @@
+using Mocha.Middlewares;
+
+namespace Mocha;
+
+///
+/// Represents the rule that decides whether an inbound route selects its consumer for a received
+/// message. The receive router evaluates the condition against the message envelope metadata, the
+/// message type and the headers, both of which are available before the message body is deserialized.
+///
+public abstract class RouteCondition
+{
+ ///
+ /// Prepares the condition against the messaging configuration so that any message types it
+ /// references are resolved and registered before the route starts evaluating received messages.
+ ///
+ /// The messaging configuration context.
+ public virtual void Initialize(IMessagingConfigurationContext context)
+ {
+ }
+
+ ///
+ /// Determines whether the route should select its consumer for the given received message.
+ ///
+ /// The receive context exposing the resolved message type and the headers.
+ /// true if the route matches the message; otherwise, false.
+ public abstract bool Matches(IReceiveContext context);
+
+ ///
+ /// Creates a structured description of this condition for visualization and diagnostic purposes.
+ ///
+ /// A representing this condition.
+ public abstract RouteConditionDescription Describe();
+}
diff --git a/src/Mocha/src/Mocha/MessageTypes/Configurations/InboundRouteConfiguration.cs b/src/Mocha/src/Mocha/MessageTypes/Configurations/InboundRouteConfiguration.cs
index 9329d1e0256..a10f41548ec 100644
--- a/src/Mocha/src/Mocha/MessageTypes/Configurations/InboundRouteConfiguration.cs
+++ b/src/Mocha/src/Mocha/MessageTypes/Configurations/InboundRouteConfiguration.cs
@@ -29,4 +29,10 @@ public class InboundRouteConfiguration : MessagingConfiguration
/// Gets or sets the kind of inbound route.
///
public InboundRouteKind Kind { get; set; }
+
+ ///
+ /// Gets or sets the condition that decides whether this route selects its consumer for a received
+ /// message, or null to derive the default condition from the message type.
+ ///
+ public RouteCondition? Condition { get; set; }
}
diff --git a/src/Mocha/src/Mocha/MessageTypes/Descriptors/IInboundRouteDescriptor.cs b/src/Mocha/src/Mocha/MessageTypes/Descriptors/IInboundRouteDescriptor.cs
index adaf2e0b56a..05c7ed2fddd 100644
--- a/src/Mocha/src/Mocha/MessageTypes/Descriptors/IInboundRouteDescriptor.cs
+++ b/src/Mocha/src/Mocha/MessageTypes/Descriptors/IInboundRouteDescriptor.cs
@@ -25,4 +25,12 @@ public interface IInboundRouteDescriptor : IMessagingDescriptorThe inbound route kind.
/// This descriptor for method chaining.
IInboundRouteDescriptor Kind(InboundRouteKind kind);
+
+ ///
+ /// Sets the condition that decides whether this route selects its consumer for a received message,
+ /// overriding the default condition derived from the message type.
+ ///
+ /// The route condition.
+ /// This descriptor for method chaining.
+ IInboundRouteDescriptor Condition(RouteCondition condition);
}
diff --git a/src/Mocha/src/Mocha/MessageTypes/Descriptors/InboundRouteDescriptor.cs b/src/Mocha/src/Mocha/MessageTypes/Descriptors/InboundRouteDescriptor.cs
index 110e1c0a367..de5aabcb6b9 100644
--- a/src/Mocha/src/Mocha/MessageTypes/Descriptors/InboundRouteDescriptor.cs
+++ b/src/Mocha/src/Mocha/MessageTypes/Descriptors/InboundRouteDescriptor.cs
@@ -38,6 +38,13 @@ public IInboundRouteDescriptor Kind(InboundRouteKind kind)
return this;
}
+ ///
+ public IInboundRouteDescriptor Condition(RouteCondition condition)
+ {
+ Configuration.Condition = condition;
+ return this;
+ }
+
///
/// Creates the final configuration from the descriptor state.
///
diff --git a/src/Mocha/src/Mocha/MessageTypes/InboundRoute.cs b/src/Mocha/src/Mocha/MessageTypes/InboundRoute.cs
index 1c6de1b4dba..6d5111ed727 100644
--- a/src/Mocha/src/Mocha/MessageTypes/InboundRoute.cs
+++ b/src/Mocha/src/Mocha/MessageTypes/InboundRoute.cs
@@ -34,6 +34,11 @@ public sealed class InboundRoute
///
public InboundRouteKind Kind { get; private set; }
+ ///
+ /// Gets the condition that decides whether this route selects its consumer for a received message.
+ ///
+ public RouteCondition Condition { get; private set; } = null!;
+
///
/// Gets the receive endpoint that this route is connected to, or null if not yet connected.
///
@@ -71,6 +76,13 @@ public void Initialize(IMessagingConfigurationContext context, InboundRouteConfi
context.Messages.GetOrAdd(context, configuration.ResponseRuntimeType);
}
+ Condition = configuration.Condition
+ ?? (MessageType is not null
+ ? new MessageTypeCondition(MessageType)
+ : NoMatchCondition.Instance);
+
+ Condition.Initialize(context);
+
MarkInitialized();
}
@@ -146,6 +158,7 @@ public InboundRouteDescription Describe()
Kind,
MessageType?.Identity,
Consumer?.Name,
+ Condition.Describe(),
Endpoint is not null
? new EndpointReferenceDescription(Endpoint.Name, Endpoint.Address?.ToString(), Endpoint.Transport.Name)
: null);
diff --git a/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs b/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
index c3996080adb..2bb6363b01b 100644
--- a/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
+++ b/src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
@@ -429,6 +429,10 @@ public async ValueTask CancelScheduledMessageAsync(string token, Cancellat
private void PropagateCorrelationIds(DispatchContext context)
{
+ // ConversationId and CausationId are lineage ids that flow through the whole graph.
+ // CorrelationId is a per-hop routing key (request/reply pairing, saga correlation) and must
+ // be set explicitly, never inherited, or an outbound message could complete the wrong
+ // request promise or attach to the wrong saga.
if (consumeContextAccessor.Context is { } ambient)
{
context.ConversationId ??= ambient.ConversationId;
diff --git a/src/Mocha/src/Mocha/Middlewares/Receive/RoutingMiddleware.cs b/src/Mocha/src/Mocha/Middlewares/Receive/RoutingMiddleware.cs
index fd01acc2600..2786a42a145 100644
--- a/src/Mocha/src/Mocha/Middlewares/Receive/RoutingMiddleware.cs
+++ b/src/Mocha/src/Mocha/Middlewares/Receive/RoutingMiddleware.cs
@@ -4,11 +4,13 @@
namespace Mocha.Middlewares;
///
-/// Selects matching consumers for the resolved message type and current endpoint.
+/// Selects matching consumers for the current endpoint by evaluating each route's condition against
+/// the received message.
///
///
-/// Matches include enclosed message types so handlers registered for base contracts can receive
-/// derived messages.
+/// The default condition matches by message type, including enclosed message types so handlers
+/// registered for base contracts can receive derived messages. Other conditions, such as header based
+/// reply routing, select on envelope metadata alone.
/// Without this middleware, no consumer list is built for execution and messages can traverse the
/// pipeline without ever reaching application handlers.
///
@@ -18,21 +20,12 @@ public async ValueTask InvokeAsync(IReceiveContext context, ReceiveDelegate next
{
var feature = context.Features.GetOrSet();
- if (context.MessageType is { } messageType)
+ foreach (var route in router.GetInboundByEndpoint(context.Endpoint))
{
- var routes = router.GetInboundByEndpoint(context.Endpoint);
-
- foreach (var route in routes)
+ if (route.Consumer is not null && route.Condition.Matches(context))
{
- if (route.MessageType is not null
- && route.Consumer is not null
- && (
- route.MessageType == messageType
- || messageType.EnclosedMessageTypes.Contains(route.MessageType)))
- {
- // Consumers are collected on the feature for later execution middleware.
- feature.Consumers.Add(route.Consumer);
- }
+ // Consumers are collected on the feature for later execution middleware.
+ feature.Consumers.Add(route.Consumer);
}
}
diff --git a/src/Mocha/src/Mocha/Sagas/Saga.cs b/src/Mocha/src/Mocha/Sagas/Saga.cs
index 69ad93600d6..bd6bc9c6f7a 100644
--- a/src/Mocha/src/Mocha/Sagas/Saga.cs
+++ b/src/Mocha/src/Mocha/Sagas/Saga.cs
@@ -566,6 +566,9 @@ private async Task SendEventsAsync(
var requestType = context.Runtime.GetMessageType(message.GetType());
var endpoint = context.Runtime.GetSendEndpoint(requestType);
+ // Route the reply to the shared reply endpoint, where the saga's OnReply/OnAnyReply route
+ // is bound. The reply is delivered to the saga consumer there and correlated by the saga
+ // header, so no correlation id is required.
options = options with { ReplyEndpoint = endpoint.Transport.ReplyReceiveEndpoint?.Source.Address };
options.Headers.Set(SagaContextData.SagaId, state.Id.ToString("D"));
diff --git a/src/Mocha/src/Mocha/Sagas/SagaEventListener.cs b/src/Mocha/src/Mocha/Sagas/SagaEventListener.cs
index 9aaacdf6d95..7c38237ae7a 100644
--- a/src/Mocha/src/Mocha/Sagas/SagaEventListener.cs
+++ b/src/Mocha/src/Mocha/Sagas/SagaEventListener.cs
@@ -18,23 +18,53 @@ protected override void Configure(IConsumerDescriptor descriptor)
{
foreach (var transition in state.Value.Transitions)
{
+ var eventType = transition.Key;
+ var transitionKind = transition.Value.TransitionKind;
+
descriptor.AddRoute(r =>
- r.MessageType(transition.Key)
+ {
+ r.MessageType(eventType)
.Kind(
- transition.Value.TransitionKind switch
+ transitionKind switch
{
SagaTransitionKind.Event => InboundRouteKind.Subscribe,
SagaTransitionKind.Send => InboundRouteKind.Send,
SagaTransitionKind.Request => InboundRouteKind.Request,
SagaTransitionKind.Reply => InboundRouteKind.Reply,
- _ => throw new InvalidOperationException(
- $"Invalid transition kind: {transition.Value.TransitionKind}")
- })
- );
+ _ => throw new InvalidOperationException($"Invalid transition kind: {transitionKind}")
+ });
+
+ if (transitionKind == SagaTransitionKind.Reply)
+ {
+ // A reply lands on the shared reply endpoint alongside non saga (RPC) replies.
+ // The saga-id header marks replies to the saga's own requests, so the route
+ // selects only those. A typed reply additionally narrows by its message type.
+ r.Condition(CreateReplyCondition(eventType));
+ }
+ });
}
}
}
+ private static RouteCondition CreateReplyCondition(Type eventType)
+ {
+ // The saga-id header is the discriminator: only replies to the saga's own requests carry it,
+ // so the route never selects a non saga (RPC) reply on the shared reply endpoint.
+ var sagaId = new HeaderPresentCondition(SagaContextData.SagaId);
+
+ // OnAnyReply (OnReply