Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,19 @@ processes messages. Understanding these settings is important for getting the de
### How Endpoint Mode Affects Consumer Configuration

When an endpoint uses `EndpointMode.Durable` (i.e., you've called `.UseDurableInbox()` or applied durable inbox
globally), Wolverine overrides two key consumer settings before building the listener:
globally), Wolverine overrides the following consumer setting before building the listener:

| Consumer Setting | Durable (`UseDurableInbox`) | Non-Durable (`BufferedInMemory` / `Inline`) |
|---|---|---|
| `EnableAutoCommit` | `false` | `true` (Kafka default) |
| `EnableAutoOffsetStore` | `false` | `true` (Kafka default) |

In **durable mode**, Wolverine disables Kafka's automatic offset management so that offsets are only committed
after a message has been successfully processed and persisted to the transactional inbox. This prevents message loss
if the application shuts down unexpectedly -- unprocessed messages will be re-delivered when the consumer rejoins
the group.
| `EnableAutoOffsetStore` | `true` (Kafka default) | `true` (Kafka default) |

In **durable mode**, Wolverine disables Kafka's automatic offset *commit* so that offsets are only committed
when Wolverine explicitly calls `Commit()` after a message has been successfully persisted to the transactional
inbox. The Kafka client still auto-stores the offset on each `Consume()` call (the default behavior), which
tracks the consumer's position. However, the stored offset is not pushed to the broker until `Commit()` is
called. This gives correct at-least-once semantics -- if the application shuts down unexpectedly before
committing, unprocessed messages will be re-delivered when the consumer rejoins the group.

In **non-durable mode** (`BufferedInMemory` or `ProcessInline`), Kafka's default auto-commit behavior is left
in place. The Kafka client library periodically commits offsets automatically, which provides higher throughput
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using IntegrationTests;
using JasperFx.Resources;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Kafka.Internals;
using Wolverine.Postgresql;
using Wolverine.Tracking;
using Wolverine.Transports;
using Wolverine.Transports.Sending;
Expand Down Expand Up @@ -81,6 +83,11 @@ public async Task InitializeAsync()
opts.ListenToKafkaTopic("green")
.BufferedInMemory();

opts.ListenToKafkaTopic("blue")
.UseDurableInbox();

opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "kafka_config");

// Include test assembly for handler discovery
opts.Discovery.IncludeAssembly(GetType().Assembly);

Expand Down Expand Up @@ -141,4 +148,15 @@ public void override_producer_configuration()
colors.Config.BatchSize.ShouldBe(222);
}

[Fact]
public void durable_mode_disables_auto_commit_but_not_auto_offset_store()
{
var blue = _host.GetRuntime().Endpoints.ActiveListeners()
.Single(x => x.Uri.ToString().Contains("blue")).ShouldBeOfType<ListeningAgent>()
.Listener.ShouldBeOfType<KafkaListener>();

blue.Config.EnableAutoCommit.ShouldBe(false);
blue.Config.EnableAutoOffsetStore.ShouldBeNull();
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using Confluent.Kafka;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Attributes;
using Wolverine.Tracking;

namespace Wolverine.Kafka.Tests;
Expand Down Expand Up @@ -30,12 +32,18 @@ public async Task InitializeAsync()
.ConfigureConsumer(config =>
{
config.GroupId = "source-group-123";
config.AutoOffsetReset = AutoOffsetReset.Earliest;
});

// Listen to target topic where cascaded messages arrive
opts.ListenToKafkaTopic("groupid-target")
.ProcessInline();

// Route TriggerFromGroupId to the source topic
opts.PublishMessage<TriggerFromGroupId>()
.ToKafkaTopic("groupid-source")
.SendInline();

// Route cascaded TargetFromGroupId messages to the target topic
opts.PublishMessage<TargetFromGroupId>()
.ToKafkaTopic("groupid-target")
Expand Down Expand Up @@ -65,8 +73,10 @@ public async Task DisposeAsync()
}
}

[Topic("groupid-source")]
public record TriggerFromGroupId(string Name);

[Topic("groupid-target")]
public record TargetFromGroupId(string Name);

public static class TriggerFromGroupIdHandler
Expand Down
1 change: 0 additions & 1 deletion src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public override ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtim
if (Mode == EndpointMode.Durable)
{
config.EnableAutoCommit = false;
config.EnableAutoOffsetStore = false;
}

var listener = new KafkaListener(this, config,
Expand Down
Loading