Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ internal static class ConsumeContextExtensions
/// Creates reply options from the incoming message metadata when a response channel is available.
/// </summary>
/// <remarks>
/// Correlation id and headers are copied so replies/faults remain linked to the original request
/// and downstream workflows (for example saga headers) keep working.
/// Headers are copied so replies and faults remain linked to the original request and downstream
/// workflows (for example saga headers) keep working. The correlation id is echoed when present,
/// so callers that correlate by a different mechanism (such as a saga header) are still supported.
/// </remarks>
public static bool TryCreateResponseOptions(this IConsumeContext context, out ReplyOptions options)
{
Expand All @@ -23,15 +24,10 @@ public static bool TryCreateResponseOptions(this IConsumeContext context, out Re
return false;
}

if (context.CorrelationId is not { } correlationId)
{
return false;
}

options = new ReplyOptions
{
Headers = [],
CorrelationId = correlationId,
CorrelationId = context.CorrelationId,
ConversationId = context.ConversationId,
ReplyAddress = replyTo
};
Expand Down
4 changes: 3 additions & 1 deletion src/Mocha/src/Mocha/Descriptions/InboundRouteDescription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ namespace Mocha;
/// <summary>
/// Describes an inbound route binding a message type to a consumer and endpoint.
/// </summary>
/// <param name="Kind">The kind of inbound route (subscribe, send, or request).</param>
/// <param name="Kind">The kind of inbound route (subscribe, send, request, or reply).</param>
/// <param name="MessageTypeIdentity">The identity string of the message type, or <c>null</c> if unknown.</param>
/// <param name="ConsumerName">The name of the consumer handling messages on this route, or <c>null</c> if unbound.</param>
/// <param name="Condition">The condition that decides whether this route selects its consumer for a received message.</param>
/// <param name="Endpoint">The endpoint reference, or <c>null</c> if not yet assigned.</param>
public sealed record InboundRouteDescription(
InboundRouteKind Kind,
string? MessageTypeIdentity,
string? ConsumerName,
RouteConditionDescription Condition,
EndpointReferenceDescription? Endpoint);
15 changes: 15 additions & 0 deletions src/Mocha/src/Mocha/Descriptions/RouteConditionDescription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Mocha;

/// <summary>
/// Describes the match condition that determines whether an inbound route selects its consumer for a
/// received message, for diagnostic and visualization purposes.
/// </summary>
/// <param name="Kind">The kind of condition, such as the message type rule, a header rule, or a composite.</param>
/// <param name="Detail">
/// A condition specific detail, such as a message type identity or a header key, or <c>null</c> if not applicable.
/// </param>
/// <param name="Children">The nested conditions of a composite condition, or an empty list for a leaf condition.</param>
public sealed record RouteConditionDescription(
string Kind,
string? Detail,
IReadOnlyList<RouteConditionDescription> Children);
43 changes: 43 additions & 0 deletions src/Mocha/src/Mocha/MessageTypes/Conditions/AndCondition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System.Collections.Immutable;
using Mocha.Middlewares;

namespace Mocha;

/// <summary>
/// Matches only when all of its child conditions match.
/// </summary>
/// <remarks>
/// Initializes a new instance of the <see cref="AndCondition"/> class.
/// </remarks>
/// <param name="conditions">The child conditions that must all match.</param>
internal sealed class AndCondition(ImmutableArray<RouteCondition> conditions) : RouteCondition
{
/// <inheritdoc />
public override void Initialize(IMessagingConfigurationContext context)
{
foreach (var condition in conditions)
{
condition.Initialize(context);
}
}

/// <inheritdoc />
public override bool Matches(IReceiveContext context)
{
foreach (var condition in conditions)
{
if (!condition.Matches(context))
{
return false;
}
}

return true;
}

/// <inheritdoc />
public override RouteConditionDescription Describe()
=> new("And", null, [.. conditions.Select(static c => c.Describe())]);

public static AndCondition Create(params ReadOnlySpan<RouteCondition> conditions) => new([.. conditions]);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Mocha.Middlewares;

namespace Mocha;

/// <summary>
/// Matches when the received message carries a header for a given key, regardless of its value.
/// </summary>
/// <param name="key">The typed key of the header that must be present.</param>
/// <typeparam name="T">The type of the header value.</typeparam>
internal sealed class HeaderPresentCondition<T>(ContextDataKey<T> key) : RouteCondition
{
/// <inheritdoc />
public override bool Matches(IReceiveContext context)
=> context.Headers.TryGet(key, out _);

/// <inheritdoc />
public override RouteConditionDescription Describe()
=> new("HeaderPresent", key.Key, []);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using Mocha.Middlewares;

namespace Mocha;

/// <summary>
/// Matches when the received message type is the route's message type or encloses it through its type
/// hierarchy, so handlers registered for base contracts can receive derived messages. When the route
/// is optional and the received message has no resolved message type, the condition still matches so
/// the route can select on other terms.
/// </summary>
internal sealed class MessageTypeCondition : RouteCondition
{
private readonly Type _eventType;
private readonly bool _optional;
private MessageType? _messageType;

/// <summary>
/// Initializes a new instance of the <see cref="MessageTypeCondition"/> class for the given CLR
/// message type. The message type is resolved against the registry in <see cref="Initialize"/>.
/// </summary>
/// <param name="eventType">The CLR type of the message the route handles.</param>
/// <param name="optional">
/// When <c>true</c>, the condition matches a received message that has no resolved message type.
/// </param>
public MessageTypeCondition(Type eventType, bool optional = false)
{
_eventType = eventType;
_optional = optional;
}

/// <summary>
/// Initializes a new instance of the <see cref="MessageTypeCondition"/> class from an already
/// resolved message type.
/// </summary>
/// <param name="messageType">The resolved message type the route handles.</param>
/// <param name="optional">
/// When <c>true</c>, the condition matches a received message that has no resolved message type.
/// </param>
public MessageTypeCondition(MessageType messageType, bool optional = false)
{
_eventType = messageType.RuntimeType;
_messageType = messageType;
_optional = optional;
}

public MessageType? MessageType => _messageType;

/// <inheritdoc />
public override void Initialize(IMessagingConfigurationContext context)
=> _messageType ??= context.Messages.GetOrAdd(context, _eventType);

/// <inheritdoc />
public override bool Matches(IReceiveContext context)
{
if (context.MessageType is not { } mt)
{
return _optional;
}

return _messageType is { } messageType
&& (mt == messageType || mt.EnclosedMessageTypes.Contains(messageType));
}

/// <inheritdoc />
public override RouteConditionDescription Describe()
=> new("MessageType", _messageType?.Identity, []);
}
26 changes: 26 additions & 0 deletions src/Mocha/src/Mocha/MessageTypes/Conditions/NoMatchCondition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Mocha.Middlewares;

namespace Mocha;

/// <summary>
/// Never matches. Assigned to the reply route that has no message type so the generic receive router
/// has a non-null condition to evaluate while keeping the route inert.
/// </summary>
internal sealed class NoMatchCondition : RouteCondition
{
private NoMatchCondition()
{
}

/// <inheritdoc />
public override bool Matches(IReceiveContext context) => false;

/// <inheritdoc />
public override RouteConditionDescription Describe()
=> new("NoMatch", null, []);

/// <summary>
/// Gets the shared instance of the <see cref="NoMatchCondition"/>.
/// </summary>
public static NoMatchCondition Instance { get; } = new();
}
33 changes: 33 additions & 0 deletions src/Mocha/src/Mocha/MessageTypes/Conditions/RouteCondition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Mocha.Middlewares;

namespace Mocha;

/// <summary>
/// Represents the rule that decides whether an inbound route selects its consumer for a received
/// message. The receive router evaluates the condition against the message envelope metadata, the
/// message type and the headers, both of which are available before the message body is deserialized.
/// </summary>
public abstract class RouteCondition
{
/// <summary>
/// Prepares the condition against the messaging configuration so that any message types it
/// references are resolved and registered before the route starts evaluating received messages.
/// </summary>
/// <param name="context">The messaging configuration context.</param>
public virtual void Initialize(IMessagingConfigurationContext context)
{
}

/// <summary>
/// Determines whether the route should select its consumer for the given received message.
/// </summary>
/// <param name="context">The receive context exposing the resolved message type and the headers.</param>
/// <returns><c>true</c> if the route matches the message; otherwise, <c>false</c>.</returns>
public abstract bool Matches(IReceiveContext context);

/// <summary>
/// Creates a structured description of this condition for visualization and diagnostic purposes.
/// </summary>
/// <returns>A <see cref="RouteConditionDescription"/> representing this condition.</returns>
public abstract RouteConditionDescription Describe();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ public class InboundRouteConfiguration : MessagingConfiguration
/// Gets or sets the kind of inbound route.
/// </summary>
public InboundRouteKind Kind { get; set; }

/// <summary>
/// Gets or sets the condition that decides whether this route selects its consumer for a received
/// message, or <c>null</c> to derive the default condition from the message type.
/// </summary>
public RouteCondition? Condition { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,12 @@ public interface IInboundRouteDescriptor : IMessagingDescriptor<InboundRouteConf
/// <param name="kind">The inbound route kind.</param>
/// <returns>This descriptor for method chaining.</returns>
IInboundRouteDescriptor Kind(InboundRouteKind kind);

/// <summary>
/// Sets the condition that decides whether this route selects its consumer for a received message,
/// overriding the default condition derived from the message type.
/// </summary>
/// <param name="condition">The route condition.</param>
/// <returns>This descriptor for method chaining.</returns>
IInboundRouteDescriptor Condition(RouteCondition condition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public IInboundRouteDescriptor Kind(InboundRouteKind kind)
return this;
}

/// <inheritdoc />
public IInboundRouteDescriptor Condition(RouteCondition condition)
{
Configuration.Condition = condition;
return this;
}

/// <summary>
/// Creates the final configuration from the descriptor state.
/// </summary>
Expand Down
13 changes: 13 additions & 0 deletions src/Mocha/src/Mocha/MessageTypes/InboundRoute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public sealed class InboundRoute
/// </summary>
public InboundRouteKind Kind { get; private set; }

/// <summary>
/// Gets the condition that decides whether this route selects its consumer for a received message.
/// </summary>
public RouteCondition Condition { get; private set; } = null!;

/// <summary>
/// Gets the receive endpoint that this route is connected to, or <c>null</c> if not yet connected.
/// </summary>
Expand Down Expand Up @@ -71,6 +76,13 @@ public void Initialize(IMessagingConfigurationContext context, InboundRouteConfi
context.Messages.GetOrAdd(context, configuration.ResponseRuntimeType);
}

Condition = configuration.Condition
?? (MessageType is not null
? new MessageTypeCondition(MessageType)
: NoMatchCondition.Instance);

Condition.Initialize(context);

MarkInitialized();
}

Expand Down Expand Up @@ -146,6 +158,7 @@ public InboundRouteDescription Describe()
Kind,
MessageType?.Identity,
Consumer?.Name,
Condition.Describe(),
Endpoint is not null
? new EndpointReferenceDescription(Endpoint.Name, Endpoint.Address?.ToString(), Endpoint.Transport.Name)
: null);
Expand Down
4 changes: 4 additions & 0 deletions src/Mocha/src/Mocha/Middlewares/DefaultMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,10 @@ public async ValueTask<bool> CancelScheduledMessageAsync(string token, Cancellat

private void PropagateCorrelationIds(DispatchContext context)
{
// ConversationId and CausationId are lineage ids that flow through the whole graph.
// CorrelationId is a per-hop routing key (request/reply pairing, saga correlation) and must
// be set explicitly, never inherited, or an outbound message could complete the wrong
// request promise or attach to the wrong saga.
if (consumeContextAccessor.Context is { } ambient)
{
context.ConversationId ??= ambient.ConversationId;
Expand Down
25 changes: 9 additions & 16 deletions src/Mocha/src/Mocha/Middlewares/Receive/RoutingMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
namespace Mocha.Middlewares;

/// <summary>
/// Selects matching consumers for the resolved message type and current endpoint.
/// Selects matching consumers for the current endpoint by evaluating each route's condition against
/// the received message.
/// </summary>
/// <remarks>
/// Matches include enclosed message types so handlers registered for base contracts can receive
/// derived messages.
/// The default condition matches by message type, including enclosed message types so handlers
/// registered for base contracts can receive derived messages. Other conditions, such as header based
/// reply routing, select on envelope metadata alone.
/// Without this middleware, no consumer list is built for execution and messages can traverse the
/// pipeline without ever reaching application handlers.
/// </remarks>
Expand All @@ -18,21 +20,12 @@ public async ValueTask InvokeAsync(IReceiveContext context, ReceiveDelegate next
{
var feature = context.Features.GetOrSet<ReceiveConsumerFeature>();

if (context.MessageType is { } messageType)
foreach (var route in router.GetInboundByEndpoint(context.Endpoint))
{
var routes = router.GetInboundByEndpoint(context.Endpoint);

foreach (var route in routes)
if (route.Consumer is not null && route.Condition.Matches(context))
{
if (route.MessageType is not null
&& route.Consumer is not null
&& (
route.MessageType == messageType
|| messageType.EnclosedMessageTypes.Contains(route.MessageType)))
{
// Consumers are collected on the feature for later execution middleware.
feature.Consumers.Add(route.Consumer);
}
// Consumers are collected on the feature for later execution middleware.
feature.Consumers.Add(route.Consumer);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/Mocha/src/Mocha/Sagas/Saga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ private async Task SendEventsAsync(
var requestType = context.Runtime.GetMessageType(message.GetType());
var endpoint = context.Runtime.GetSendEndpoint(requestType);

// Route the reply to the shared reply endpoint, where the saga's OnReply/OnAnyReply route
// is bound. The reply is delivered to the saga consumer there and correlated by the saga
// header, so no correlation id is required.
options = options with { ReplyEndpoint = endpoint.Transport.ReplyReceiveEndpoint?.Source.Address };

options.Headers.Set(SagaContextData.SagaId, state.Id.ToString("D"));
Expand Down
Loading
Loading