diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md index 7be928e60..0f1572c9b 100644 --- a/docs/guide/messaging/transports/kafka.md +++ b/docs/guide/messaging/transports/kafka.md @@ -146,17 +146,19 @@ processes messages. Understanding these settings is important for getting the de ### How Endpoint Mode Affects Consumer Configuration When an endpoint uses `EndpointMode.Durable` (i.e., you've called `.UseDurableInbox()` or applied durable inbox -globally), Wolverine overrides two key consumer settings before building the listener: +globally), Wolverine overrides the following consumer setting before building the listener: | Consumer Setting | Durable (`UseDurableInbox`) | Non-Durable (`BufferedInMemory` / `Inline`) | |---|---|---| | `EnableAutoCommit` | `false` | `true` (Kafka default) | -| `EnableAutoOffsetStore` | `false` | `true` (Kafka default) | - -In **durable mode**, Wolverine disables Kafka's automatic offset management so that offsets are only committed -after a message has been successfully processed and persisted to the transactional inbox. This prevents message loss -if the application shuts down unexpectedly -- unprocessed messages will be re-delivered when the consumer rejoins -the group. +| `EnableAutoOffsetStore` | `true` (Kafka default) | `true` (Kafka default) | + +In **durable mode**, Wolverine disables Kafka's automatic offset *commit* so that offsets are only committed +when Wolverine explicitly calls `Commit()` after a message has been successfully persisted to the transactional +inbox. The Kafka client still auto-stores the offset on each `Consume()` call (the default behavior), which +tracks the consumer's position. However, the stored offset is not pushed to the broker until `Commit()` is +called. This gives correct at-least-once semantics -- if the application shuts down unexpectedly before +committing, unprocessed messages will be re-delivered when the consumer rejoins the group. In **non-durable mode** (`BufferedInMemory` or `ProcessInline`), Kafka's default auto-commit behavior is left in place. The Kafka client library periodically commits offsets automatically, which provides higher throughput diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs index d78bd38e3..0b4a43097 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/configure_consumers_and_publishers.cs @@ -1,7 +1,9 @@ +using IntegrationTests; using JasperFx.Resources; using Microsoft.Extensions.Hosting; using Shouldly; using Wolverine.Kafka.Internals; +using Wolverine.Postgresql; using Wolverine.Tracking; using Wolverine.Transports; using Wolverine.Transports.Sending; @@ -81,6 +83,11 @@ public async Task InitializeAsync() opts.ListenToKafkaTopic("green") .BufferedInMemory(); + opts.ListenToKafkaTopic("blue") + .UseDurableInbox(); + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "kafka_config"); + // Include test assembly for handler discovery opts.Discovery.IncludeAssembly(GetType().Assembly); @@ -141,4 +148,15 @@ public void override_producer_configuration() colors.Config.BatchSize.ShouldBe(222); } + [Fact] + public void durable_mode_disables_auto_commit_but_not_auto_offset_store() + { + var blue = _host.GetRuntime().Endpoints.ActiveListeners() + .Single(x => x.Uri.ToString().Contains("blue")).ShouldBeOfType() + .Listener.ShouldBeOfType(); + + blue.Config.EnableAutoCommit.ShouldBe(false); + blue.Config.EnableAutoOffsetStore.ShouldBeNull(); + } + } \ No newline at end of file diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs index e7b9ee407..d653cd9d1 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs @@ -1,7 +1,9 @@ +using Confluent.Kafka; using JasperFx.Core; using JasperFx.Resources; using Microsoft.Extensions.Hosting; using Shouldly; +using Wolverine.Attributes; using Wolverine.Tracking; namespace Wolverine.Kafka.Tests; @@ -30,12 +32,18 @@ public async Task InitializeAsync() .ConfigureConsumer(config => { config.GroupId = "source-group-123"; + config.AutoOffsetReset = AutoOffsetReset.Earliest; }); // Listen to target topic where cascaded messages arrive opts.ListenToKafkaTopic("groupid-target") .ProcessInline(); + // Route TriggerFromGroupId to the source topic + opts.PublishMessage() + .ToKafkaTopic("groupid-source") + .SendInline(); + // Route cascaded TargetFromGroupId messages to the target topic opts.PublishMessage() .ToKafkaTopic("groupid-target") @@ -65,8 +73,10 @@ public async Task DisposeAsync() } } +[Topic("groupid-source")] public record TriggerFromGroupId(string Name); +[Topic("groupid-target")] public record TargetFromGroupId(string Name); public static class TriggerFromGroupIdHandler diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index 23469c58d..297a2a73e 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -98,7 +98,6 @@ public override ValueTask BuildListenerAsync(IWolverineRuntime runtim if (Mode == EndpointMode.Durable) { config.EnableAutoCommit = false; - config.EnableAutoOffsetStore = false; } var listener = new KafkaListener(this, config,