diff --git a/docs/guide/messaging/partitioning.md b/docs/guide/messaging/partitioning.md index 9b128095f..ecec94b92 100644 --- a/docs/guide/messaging/partitioning.md +++ b/docs/guide/messaging/partitioning.md @@ -432,6 +432,39 @@ opts.MessagePartitioning.PublishToShardedAmazonSqsQueues("letters", 4, topology snippet source | anchor +## Propagating GroupId to PartitionKey + +When using Kafka (or any transport that uses `PartitionKey`), you may want cascaded messages from a handler to +automatically inherit the originating message's `GroupId` as their `PartitionKey`. This ensures that cascaded messages +land on the same Kafka partition as the originating message without manually specifying `DeliveryOptions` on every +outgoing message. + +This is especially useful when you have a chain of message handlers where the first message arrives at a Kafka topic +with a consumer group id, and you want all downstream cascaded messages to be published to the same partition. + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + // Automatically propagate the originating message's GroupId + // to the PartitionKey of all cascaded outgoing messages. + // This is particularly useful with Kafka where you want + // cascaded messages to land on the same partition as the + // originating message without manually specifying + // DeliveryOptions on every outgoing message. + opts.Policies.PropagateGroupIdToPartitionKey(); +}); +``` +snippet source | anchor + + +::: tip +The rule will not override an explicitly set `PartitionKey` on an outgoing envelope. If you set `PartitionKey` via +`DeliveryOptions`, that value takes precedence. +::: + ## Partitioning Messages Received from External Systems ::: warning diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md index ff8ec31ef..7be928e60 100644 --- a/docs/guide/messaging/transports/kafka.md +++ b/docs/guide/messaging/transports/kafka.md @@ -227,6 +227,20 @@ public static ValueTask publish_by_partition_key(IMessageBus bus) snippet source | anchor +## Propagating GroupId to PartitionKey + +When consuming from a Kafka topic, the incoming envelope's `GroupId` is automatically set from the Kafka consumer's +configured `GroupId`. If your handler produces cascaded messages that should land on the same partition, you can +enable automatic propagation of the originating `GroupId` to the outgoing `PartitionKey`: + +```csharp +opts.Policies.PropagateGroupIdToPartitionKey(); +``` + +This eliminates the need to manually set `DeliveryOptions.PartitionKey` on every outgoing message from your handlers. +The rule will never override an explicitly set `PartitionKey`. See the [Partitioned Sequential Messaging](/guide/messaging/partitioning#propagating-groupid-to-partitionkey) +documentation for more details and a code sample. + ## Interoperability ::: tip diff --git a/src/Samples/DocumentationSamples/PartitioningSamples.cs b/src/Samples/DocumentationSamples/PartitioningSamples.cs index 05fcb54ad..c266fbd08 100644 --- a/src/Samples/DocumentationSamples/PartitioningSamples.cs +++ b/src/Samples/DocumentationSamples/PartitioningSamples.cs @@ -134,11 +134,30 @@ public static async Task configuring_by_property_name() public static async Task SendMessageToGroup(IMessageBus bus) { await bus.PublishAsync( - new ApproveInvoice("AAA"), + new ApproveInvoice("AAA"), new() { GroupId = "agroup" }); } #endregion + + public static async Task propagate_group_id_to_partition_key() + { + #region sample_propagate_group_id_to_partition_key + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + // Automatically propagate the originating message's GroupId + // to the PartitionKey of all cascaded outgoing messages. + // This is particularly useful with Kafka where you want + // cascaded messages to land on the same partition as the + // originating message without manually specifying + // DeliveryOptions on every outgoing message. + opts.Policies.PropagateGroupIdToPartitionKey(); + }); + + #endregion + } } public record PayInvoice(string Id); diff --git a/src/Testing/CoreTests/GroupIdToPartitionKeyRuleTests.cs b/src/Testing/CoreTests/GroupIdToPartitionKeyRuleTests.cs new file mode 100644 index 000000000..4c2213b28 --- /dev/null +++ b/src/Testing/CoreTests/GroupIdToPartitionKeyRuleTests.cs @@ -0,0 +1,75 @@ +using NSubstitute; +using Wolverine; +using Wolverine.ComplianceTests; +using Xunit; + +namespace CoreTests; + +public class GroupIdToPartitionKeyRuleTests +{ + [Fact] + public void propagates_group_id_to_partition_key() + { + var rule = new GroupIdToPartitionKeyRule(); + var originator = Substitute.For(); + var originatingEnvelope = ObjectMother.Envelope(); + originatingEnvelope.GroupId = "stream-123"; + originator.Envelope.Returns(originatingEnvelope); + + var outgoing = ObjectMother.Envelope(); + outgoing.PartitionKey = null; + + rule.ApplyCorrelation(originator, outgoing); + + outgoing.PartitionKey.ShouldBe("stream-123"); + } + + [Fact] + public void does_not_override_existing_partition_key() + { + var rule = new GroupIdToPartitionKeyRule(); + var originator = Substitute.For(); + var originatingEnvelope = ObjectMother.Envelope(); + originatingEnvelope.GroupId = "stream-123"; + originator.Envelope.Returns(originatingEnvelope); + + var outgoing = ObjectMother.Envelope(); + outgoing.PartitionKey = "explicit-key"; + + rule.ApplyCorrelation(originator, outgoing); + + outgoing.PartitionKey.ShouldBe("explicit-key"); + } + + [Fact] + public void does_nothing_when_originator_has_no_group_id() + { + var rule = new GroupIdToPartitionKeyRule(); + var originator = Substitute.For(); + var originatingEnvelope = ObjectMother.Envelope(); + originatingEnvelope.GroupId = null; + originator.Envelope.Returns(originatingEnvelope); + + var outgoing = ObjectMother.Envelope(); + outgoing.PartitionKey = null; + + rule.ApplyCorrelation(originator, outgoing); + + outgoing.PartitionKey.ShouldBeNull(); + } + + [Fact] + public void does_nothing_when_originator_has_no_envelope() + { + var rule = new GroupIdToPartitionKeyRule(); + var originator = Substitute.For(); + originator.Envelope.Returns((Envelope?)null); + + var outgoing = ObjectMother.Envelope(); + outgoing.PartitionKey = null; + + rule.ApplyCorrelation(originator, outgoing); + + outgoing.PartitionKey.ShouldBeNull(); + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs new file mode 100644 index 000000000..e7b9ee407 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs @@ -0,0 +1,86 @@ +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Tracking; + +namespace Wolverine.Kafka.Tests; + +public class propagate_group_id_to_partition_key : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "PropagateGroupIdTests"; + + opts.UseKafka("localhost:9092").AutoProvision(); + + // Enable the feature under test + opts.Policies.PropagateGroupIdToPartitionKey(); + + opts.Policies.DisableConventionalLocalRouting(); + + // Listen to source topic with an explicit GroupId + opts.ListenToKafkaTopic("groupid-source") + .ProcessInline() + .ConfigureConsumer(config => + { + config.GroupId = "source-group-123"; + }); + + // Listen to target topic where cascaded messages arrive + opts.ListenToKafkaTopic("groupid-target") + .ProcessInline(); + + // Route cascaded TargetFromGroupId messages to the target topic + opts.PublishMessage() + .ToKafkaTopic("groupid-target") + .SendInline(); + + opts.Discovery.IncludeAssembly(GetType().Assembly); + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + } + + [Fact] + public async Task cascaded_message_receives_partition_key_from_originating_group_id() + { + var session = await _host.TrackActivity() + .IncludeExternalTransports() + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(_host) + .PublishMessageAndWaitAsync(new TriggerFromGroupId("hello")); + + var envelope = session.Received.SingleEnvelope(); + envelope.PartitionKey.ShouldBe("source-group-123"); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } +} + +public record TriggerFromGroupId(string Name); + +public record TargetFromGroupId(string Name); + +public static class TriggerFromGroupIdHandler +{ + public static TargetFromGroupId Handle(TriggerFromGroupId message) + { + return new TargetFromGroupId(message.Name); + } +} + +public static class TargetFromGroupIdHandler +{ + public static void Handle(TargetFromGroupId message) + { + // no-op, just receive + } +} diff --git a/src/Wolverine/IEnvelopeRule.cs b/src/Wolverine/IEnvelopeRule.cs index eeab596f3..6324f75ca 100644 --- a/src/Wolverine/IEnvelopeRule.cs +++ b/src/Wolverine/IEnvelopeRule.cs @@ -1,3 +1,4 @@ +using JasperFx.Core; using Wolverine.Util; namespace Wolverine; @@ -108,6 +109,30 @@ public override int GetHashCode() } } +/// +/// Propagates the originating message's GroupId to the outgoing envelope's PartitionKey. +/// This is useful for automatically carrying forward a stream/group id as a Kafka partition key +/// on cascaded or published messages without manually setting DeliveryOptions on every message. +/// +internal class GroupIdToPartitionKeyRule : IEnvelopeRule +{ + public void Modify(Envelope envelope) + { + // No-op when used without an originator context + } + + public void ApplyCorrelation(IMessageContext originator, Envelope outgoing) + { + if (outgoing.PartitionKey.IsNotEmpty()) return; + + var groupId = originator.Envelope?.GroupId; + if (groupId.IsNotEmpty()) + { + outgoing.PartitionKey = groupId; + } + } +} + internal class LambdaEnvelopeRule : IEnvelopeRule { private readonly Action _configure; diff --git a/src/Wolverine/IPolicies.cs b/src/Wolverine/IPolicies.cs index cc5d646cb..b0e135b3b 100644 --- a/src/Wolverine/IPolicies.cs +++ b/src/Wolverine/IPolicies.cs @@ -170,4 +170,12 @@ public interface IPolicies : IEnumerable, IWithFailurePolicies /// history, and endpoint events to the message bus? The default is false; /// bool PublishAgentEvents { get; set; } + + /// + /// Automatically propagate the originating message's GroupId to the PartitionKey of outgoing + /// envelopes. This is useful when using Kafka and you want cascaded messages to inherit + /// the partition key from the originating message's group/stream id without manually + /// specifying DeliveryOptions on every outgoing message. + /// + void PropagateGroupIdToPartitionKey(); } \ No newline at end of file diff --git a/src/Wolverine/Runtime/MessageBus.cs b/src/Wolverine/Runtime/MessageBus.cs index 7de169798..c0f4cbef9 100644 --- a/src/Wolverine/Runtime/MessageBus.cs +++ b/src/Wolverine/Runtime/MessageBus.cs @@ -253,7 +253,12 @@ public ValueTask BroadcastToTopicAsync(string topicName, object message, Deliver internal async ValueTask PersistOrSendAsync(Envelope envelope) { if (envelope is null) return; // Not sure how this would happen - + + foreach (var rule in Runtime.Options.MetadataRules) + { + rule.ApplyCorrelation(this, envelope); + } + if (envelope.Sender is null) { throw new InvalidOperationException("Envelope has not been routed"); @@ -306,6 +311,18 @@ internal virtual void TrackEnvelopeCorrelation(Envelope outbound, Activity? acti internal async ValueTask PersistOrSendAsync(params Envelope[] outgoing) { + var metadataRules = Runtime.Options.MetadataRules; + if (metadataRules.Count > 0) + { + foreach (var envelope in outgoing) + { + foreach (var rule in metadataRules) + { + rule.ApplyCorrelation(this, envelope); + } + } + } + if (Transaction != null) { // This filtering is done to only persist envelopes where diff --git a/src/Wolverine/WolverineOptions.Policies.cs b/src/Wolverine/WolverineOptions.Policies.cs index 8de2e70c5..d7bebc738 100644 --- a/src/Wolverine/WolverineOptions.Policies.cs +++ b/src/Wolverine/WolverineOptions.Policies.cs @@ -277,6 +277,11 @@ void IPolicies.LogMessageStarting(LogLevel logLevel) RegisteredPolicies.Insert(0, new LogStartingActivityPolicy(logLevel)); } + void IPolicies.PropagateGroupIdToPartitionKey() + { + MetadataRules.Add(new GroupIdToPartitionKeyRule()); + } + internal MiddlewarePolicy FindOrCreateMiddlewarePolicy() { var policy = RegisteredPolicies.OfType().FirstOrDefault(); diff --git a/src/Wolverine/WolverineOptions.cs b/src/Wolverine/WolverineOptions.cs index 7cae2d432..cf9885013 100644 --- a/src/Wolverine/WolverineOptions.cs +++ b/src/Wolverine/WolverineOptions.cs @@ -199,7 +199,13 @@ public MultipleHandlerBehavior MultipleHandlerBehavior /// This will be automatically applied to all outgoing messages, but will never override /// any explicitly defined Envelope.GroupId /// - public MessagePartitioningRules MessagePartitioning { get; } + public MessagePartitioningRules MessagePartitioning { get; } + + /// + /// Internal list of IEnvelopeRule instances that are applied via ApplyCorrelation + /// to outgoing envelopes in PersistOrSendAsync + /// + internal List MetadataRules { get; } = new(); /// For advanced usages, this gives you the ability to register pre-canned message handling