Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/guide/messaging/transports/nats.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,41 @@ opts.UseNats("nats://localhost:4222")
});
```

### Consumer Deliver Policy

When Wolverine auto-provisions a JetStream consumer for a listener it leaves the consumer config's `DeliverPolicy` unset, which falls through to NATS's own default of `DeliverPolicy.All` — every message currently in the stream is replayed when the consumer first connects. For new listeners attached to a long-running stream that's usually not what you want.

Set a transport-wide default through `JetStreamDefaults.DeliverPolicy` so every auto-provisioned consumer under this transport starts at the same position:

```csharp
opts.UseNats("nats://localhost:4222")
.UseJetStream(js =>
{
js.DeliverPolicy = ConsumerConfigDeliverPolicy.New; // only messages
// from now on
});
```

Override per-listener with `DeliverFrom(...)` when a single endpoint needs a different position:

```csharp
opts.ListenToNatsSubject("orders.received")
.UseJetStream("ORDERS")
.DeliverFrom(ConsumerConfigDeliverPolicy.New);
```

The per-listener override always wins over the transport-wide default. When neither is set Wolverine writes nothing to the consumer config and the NATS server default (`All`) applies.

The override only applies to consumers Wolverine itself auto-provisions. If you reference a pre-created consumer by name with `UseJetStream(streamName, consumerName)`, Wolverine reuses that consumer's existing configuration regardless of `DeliverFrom(...)` — pre-creating the consumer with the desired policy via the NATS CLI or `JetStream` API is the right tool there.

| `ConsumerConfigDeliverPolicy` | Effect |
|---|---|
| `All` | Replay every message currently in the stream (NATS-server default). |
| `New` | Only deliver messages that arrive **after** the consumer is created. |
| `Last` | Deliver only the latest message in the stream. |
| `LastPerSubject` | Deliver the latest message per matching subject filter. |
| `ByStartSequence` / `ByStartTime` | Start from a specific sequence number or timestamp. Requires pre-creating the consumer outside Wolverine — `OptStartSeq` / `OptStartTime` have no listener-configuration surface. |

### Defining Streams

#### Work Queue Stream (Retention by Interest)
Expand Down
116 changes: 116 additions & 0 deletions src/Transports/NATS/Wolverine.Nats.Tests/NatsDeliverPolicyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using NATS.Client.JetStream.Models;
using Shouldly;
using Wolverine.Nats.Configuration;
using Wolverine.Nats.Internal;
using Xunit;

namespace Wolverine.Nats.Tests;

/// <summary>
/// Coverage for the JetStream consumer <c>DeliverPolicy</c> override surface
/// (transport-wide via <see cref="JetStreamDefaults.DeliverPolicy"/>,
/// per-listener via <see cref="NatsListenerConfiguration.DeliverFrom"/>) and
/// the precedence rule that resolves the two through
/// <see cref="NatsEndpoint.EffectiveDeliverPolicy"/>.
/// </summary>
public class NatsDeliverPolicyTests
{
private NatsEndpoint EndpointFor(string subject = "orders.created")
{
var transport = new NatsTransport();
return (NatsEndpoint)transport.GetOrCreateEndpoint(NatsEndpointUri.Subject(subject));
}

[Fact]
public void endpoint_default_deliver_policy_is_null()
{
// Null means "leave the ConsumerConfig alone and accept the NATS
// server default of DeliverPolicy.All". Source-compatible with the
// pre-PR behaviour for hosts that don't opt in.
EndpointFor().DeliverPolicy.ShouldBeNull();
}

[Fact]
public void transport_default_deliver_policy_is_null()
{
new JetStreamDefaults().DeliverPolicy.ShouldBeNull();
}

[Fact]
public void effective_deliver_policy_is_null_when_neither_endpoint_nor_transport_set_it()
{
EndpointFor().EffectiveDeliverPolicy.ShouldBeNull();
}

[Fact]
public void effective_deliver_policy_falls_back_to_transport_when_endpoint_is_null()
{
var transport = new NatsTransport();
transport.Configuration.JetStreamDefaults.DeliverPolicy = ConsumerConfigDeliverPolicy.New;

var endpoint = (NatsEndpoint)transport.GetOrCreateEndpoint(NatsEndpointUri.Subject("topic"));

endpoint.DeliverPolicy.ShouldBeNull();
endpoint.EffectiveDeliverPolicy.ShouldBe(ConsumerConfigDeliverPolicy.New);
}

[Fact]
public void endpoint_override_wins_over_transport_default()
{
var transport = new NatsTransport();
transport.Configuration.JetStreamDefaults.DeliverPolicy = ConsumerConfigDeliverPolicy.New;

var endpoint = (NatsEndpoint)transport.GetOrCreateEndpoint(NatsEndpointUri.Subject("topic"));
endpoint.DeliverPolicy = ConsumerConfigDeliverPolicy.Last;

endpoint.EffectiveDeliverPolicy.ShouldBe(ConsumerConfigDeliverPolicy.Last);
}

[Fact]
public void listener_configuration_deliver_from_sets_endpoint_override()
{
var endpoint = EndpointFor();
var configuration = new NatsListenerConfiguration(endpoint);

configuration.DeliverFrom(ConsumerConfigDeliverPolicy.New);

// Configuration callbacks are buffered in the IDelayedEndpointConfiguration
// base — applying them here mirrors what the runtime does at endpoint
// compile time before BuildListenerAsync runs.
((Wolverine.Configuration.IDelayedEndpointConfiguration)configuration).Apply();

endpoint.DeliverPolicy.ShouldBe(ConsumerConfigDeliverPolicy.New);
endpoint.EffectiveDeliverPolicy.ShouldBe(ConsumerConfigDeliverPolicy.New);
}

[Fact]
public void listener_configuration_deliver_from_overrides_transport_default()
{
var transport = new NatsTransport();
transport.Configuration.JetStreamDefaults.DeliverPolicy = ConsumerConfigDeliverPolicy.All;

var endpoint = (NatsEndpoint)transport.GetOrCreateEndpoint(NatsEndpointUri.Subject("topic"));
var configuration = new NatsListenerConfiguration(endpoint);

configuration.DeliverFrom(ConsumerConfigDeliverPolicy.New);
((Wolverine.Configuration.IDelayedEndpointConfiguration)configuration).Apply();

endpoint.EffectiveDeliverPolicy.ShouldBe(ConsumerConfigDeliverPolicy.New);
}

[Theory]
[InlineData(ConsumerConfigDeliverPolicy.All)]
[InlineData(ConsumerConfigDeliverPolicy.Last)]
[InlineData(ConsumerConfigDeliverPolicy.New)]
[InlineData(ConsumerConfigDeliverPolicy.LastPerSubject)]
public void deliver_from_round_trips_through_endpoint(ConsumerConfigDeliverPolicy policy)
{
var endpoint = EndpointFor();
var configuration = new NatsListenerConfiguration(endpoint);

configuration.DeliverFrom(policy);
((Wolverine.Configuration.IDelayedEndpointConfiguration)configuration).Apply();

endpoint.EffectiveDeliverPolicy.ShouldBe(policy);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using NATS.Client.JetStream.Models;
using Wolverine.Configuration;
using Wolverine.Nats.Internal;

Expand Down Expand Up @@ -83,4 +84,42 @@ public NatsListenerConfiguration DeadLetterTo(string deadLetterSubject)

return this;
}

/// <summary>
/// Override the JetStream consumer's <c>DeliverPolicy</c> for this listener
/// only — wins over any transport-wide default set via
/// <c>UseJetStream(d =&gt; d.DeliverPolicy = ...)</c>.
///
/// Use <see cref="ConsumerConfigDeliverPolicy.New"/> to start an
/// auto-provisioned consumer at "only messages that arrive after this
/// consumer is created" — the typical answer when standing up a new
/// listener against an existing stream you don't want to replay from the
/// beginning. Other useful values include
/// <see cref="ConsumerConfigDeliverPolicy.Last"/> ("only the latest
/// message"), <see cref="ConsumerConfigDeliverPolicy.LastPerSubject"/>
/// (compaction-style: latest per subject filter), and the explicit
/// <see cref="ConsumerConfigDeliverPolicy.All"/> (the NATS-server default
/// when nothing is configured — replay every message currently in the
/// stream). For <see cref="ConsumerConfigDeliverPolicy.ByStartSequence"/>
/// or <see cref="ConsumerConfigDeliverPolicy.ByStartTime"/> you must
/// pre-create the consumer outside Wolverine and reference it by name in
/// <c>UseJetStream(...)</c> — the supplemental
/// <c>OptStartSeq</c> / <c>OptStartTime</c> properties have no
/// listener-configuration surface here.
///
/// Only applies to consumers Wolverine itself auto-provisions; if you
/// reference a pre-created consumer by name via
/// <c>UseJetStream(streamName, consumerName)</c>, Wolverine will reuse
/// that consumer's existing config and ignore this override (matches the
/// existing reuse-by-name behaviour in <c>JetStreamSubscriber</c>).
/// </summary>
public NatsListenerConfiguration DeliverFrom(ConsumerConfigDeliverPolicy deliverPolicy)
{
add(endpoint =>
{
endpoint.DeliverPolicy = deliverPolicy;
});

return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;

namespace Wolverine.Nats.Configuration;

Expand Down Expand Up @@ -92,6 +93,22 @@ public class JetStreamDefaults
public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30);
public int MaxDeliver { get; set; } = 3;
public TimeSpan DuplicateWindow { get; set; } = TimeSpan.FromMinutes(2);

/// <summary>
/// Transport-wide default for the JetStream consumer's <c>DeliverPolicy</c>.
/// When <c>null</c> (the default) Wolverine leaves the consumer config's
/// <c>DeliverPolicy</c> unset, which falls through to the NATS server's
/// own default — <see cref="ConsumerConfigDeliverPolicy.All"/> — replaying
/// every message currently in the stream when an auto-provisioned consumer
/// first connects.
///
/// Per-listener overrides via <c>NatsListenerConfiguration.DeliverFrom(...)</c>
/// always win over this transport-wide default. The override only applies
/// to consumers Wolverine itself auto-provisions; pre-created consumers
/// referenced by name keep whatever <c>DeliverPolicy</c> they were
/// originally created with.
/// </summary>
public ConsumerConfigDeliverPolicy? DeliverPolicy { get; set; }
}

public interface ITenantIdResolver
Expand Down
10 changes: 10 additions & 0 deletions src/Transports/NATS/Wolverine.Nats/Internal/JetStreamSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ CancellationToken cancellation
AckWait = TimeSpan.FromSeconds(30)
};

// Apply the per-endpoint or transport-wide DeliverPolicy override when set.
// Leaving the property unset on the ConsumerConfig instance falls through to
// the NATS server default of DeliverPolicy.All — which replays every existing
// message in the stream when an auto-provisioned consumer first connects, so
// hosts that want "only new messages from now on" need to opt in here.
if (_endpoint.EffectiveDeliverPolicy is { } deliverPolicy)
{
config.DeliverPolicy = deliverPolicy;
}

if (string.IsNullOrEmpty(_endpoint.ConsumerName))
{
config.FilterSubject = _subscriptionPattern;
Expand Down
25 changes: 25 additions & 0 deletions src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,30 @@ public NatsEndpoint(string subject, NatsTransport transport, EndpointRole role)
public string? DeadLetterSubject { get; set; }
public int MaxDeliveryAttempts { get; set; } = 5;

/// <summary>
/// Per-endpoint override for the JetStream consumer's <c>DeliverPolicy</c>.
/// When non-null this wins over
/// <see cref="Configuration.JetStreamDefaults.DeliverPolicy"/>; when null the
/// transport-wide default applies, and when both are null Wolverine leaves
/// <c>DeliverPolicy</c> unset on the auto-provisioned <c>ConsumerConfig</c>
/// — falling through to the NATS server default of
/// <see cref="ConsumerConfigDeliverPolicy.All"/>. See
/// <see cref="EffectiveDeliverPolicy"/> for the resolved value.
/// </summary>
public ConsumerConfigDeliverPolicy? DeliverPolicy { get; set; }

/// <summary>
/// Resolved <c>DeliverPolicy</c> for this endpoint: per-endpoint
/// <see cref="DeliverPolicy"/> wins over the transport-wide
/// <see cref="Configuration.JetStreamDefaults.DeliverPolicy"/>, with
/// <c>null</c> meaning "leave the consumer config alone and let the NATS
/// server default apply". Computed at access time so override mutations
/// performed during host bootstrap are picked up regardless of ordering
/// between transport / listener configuration calls.
/// </summary>
public ConsumerConfigDeliverPolicy? EffectiveDeliverPolicy =>
DeliverPolicy ?? _transport.Configuration.JetStreamDefaults.DeliverPolicy;

protected override bool supportsMode(EndpointMode mode)
{
return mode switch
Expand Down Expand Up @@ -104,6 +128,7 @@ protected override ISender CreateSender(IWolverineRuntime runtime)
DeadLetterQueueEnabled = DeadLetterQueueEnabled,
DeadLetterSubject = DeadLetterSubject,
MaxDeliveryAttempts = MaxDeliveryAttempts,
DeliverPolicy = DeliverPolicy,
MessageType = MessageType,
CustomHeaders = CustomHeaders,
NatsSerializer = NatsSerializer
Expand Down
Loading