Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,58 @@ await host.StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L607-L627' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_disable_asb_dlq' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Recovering Native Dead Letters to Durable Storage <Badge type="tip" text="6.9" />

Azure Service Bus dead letters land in one of two places depending on the endpoint mode: buffered and
durable endpoints move failures to a Wolverine-managed dead letter **queue** (default
`wolverine-dead-letter-queue`), while inline endpoints — and Azure Service Bus itself, on TTL or
max-delivery — use the native
[`$DeadLetterQueue` sub-queue](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues)
of the source entity. Either way, those messages are only visible through Azure tooling. Tools that
manage Wolverine's *durable* dead letters (for example [CritterWatch](https://github.com/JasperFx/CritterWatch))
can't see or replay them.

`EnableDeadLetterQueueRecovery()` starts a background listener that drains **both** kinds of source —
the Wolverine-managed dead letter queue(s) and the native `$DeadLetterQueue` sub-queue of every
listening queue and subscription — copying each message into Wolverine's durable dead letter storage
(the `wolverine_dead_letters` table), where it becomes queryable and replayable through
`IDeadLetters`:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Durable message storage is required — the recovered dead letters
// are written to the wolverine_dead_letters table.
opts.PersistMessagesWithPostgresql(connectionString);

opts.UseAzureServiceBus(connectionString)
.AutoProvision()
// Drain the native $DeadLetterQueue sub-queue of every listening
// queue and subscription into Wolverine's durable storage.
.EnableDeadLetterQueueRecovery();

opts.ListenToAzureServiceBusQueue("orders");
}).StartAsync();
```

With no arguments, every managed dead letter queue and every listening queue/subscription's native
sub-queue is drained. Pass explicit names (a managed dead letter queue name, a listening queue name,
or a subscription endpoint name) to restrict recovery to a subset:

```csharp
opts.UseAzureServiceBus(connectionString)
.EnableDeadLetterQueueRecovery("orders", "shipments");
```

The original exception type and message are preserved: from the stamped failure metadata for messages
in the managed dead letter queue, or from the native `DeadLetterReason`/`DeadLetterErrorDescription`
for messages in a native sub-queue. A message is only completed off its source *after* it has been
safely written to durable storage, so a transient database outage never loses a dead letter.

::: tip
This is the Azure Service Bus equivalent of the
[RabbitMQ dead letter recovery](../rabbitmq/deadletterqueues.html) feature, and uses the same
`EnableDeadLetterQueueRecovery()` syntax across every native-dead-letter transport.
:::
47 changes: 47 additions & 0 deletions docs/guide/messaging/transports/rabbitmq/deadletterqueues.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,53 @@ using var host = await Host.CreateDefaultBuilder()
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L469-L489' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_disable_rabbit_mq_dead_letter_queue' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Recovering Native Dead Letters to Durable Storage <Badge type="tip" text="6.9" />

With native dead lettering, failed messages land in a RabbitMQ dead letter queue and are only visible
through RabbitMQ tooling. Tools that manage Wolverine's *durable* dead letters (for example
[CritterWatch](https://github.com/JasperFx/CritterWatch)) can't see or replay them.

`EnableDeadLetterQueueRecovery()` starts a background listener that consumes the native dead letter
queue(s) and copies each message into Wolverine's durable dead letter storage (the
`wolverine_dead_letters` table), where it becomes queryable and replayable through `IDeadLetters`:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Durable message storage is required — the recovered dead letters
// are written to the wolverine_dead_letters table.
opts.PersistMessagesWithPostgresql(connectionString);

opts.UseRabbitMq()
.AutoProvision()
// Consume the native dead letter queue and copy the messages into
// Wolverine's durable dead letter storage.
.EnableDeadLetterQueueRecovery();

opts.ListenToRabbitQueue("orders");
}).StartAsync();
```

With no arguments, the default `wolverine-dead-letter-queue` is consumed. Pass explicit queue names
to recover from custom-named dead letter queues:

```csharp
opts.UseRabbitMq()
.EnableDeadLetterQueueRecovery("orders-errors", "shipments-errors");
```

The original exception type and message are reconstructed from the RabbitMQ `x-death` metadata (and
from the [enhanced dead lettering](#enhanced-dead-lettering-with-exception-metadata) headers when
those are present).

::: tip
The same `EnableDeadLetterQueueRecovery()` syntax is available on the
[Amazon SQS](../sqs/deadletterqueues.html) and
[Azure Service Bus](../azureservicebus/deadletterqueues.html) transports, so "bridge my native dead
letters into durable storage" is a one-call decision on every native-dead-letter transport.
:::




52 changes: 52 additions & 0 deletions docs/guide/messaging/transports/sqs/deadletterqueues.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,57 @@ using var host = await Host.CreateDefaultBuilder()

This would force Wolverine to use any persistent envelope storage for dead letter queueing.

## Recovering Native Dead Letters to Durable Storage <Badge type="tip" text="6.9" />

By default an Amazon SQS dead letter queue is just another SQS queue — its messages are only visible
through the AWS console or SDK, and tools that manage Wolverine's *durable* dead letters (for example
[CritterWatch](https://github.com/JasperFx/CritterWatch)) can't see or replay them.

`EnableDeadLetterQueueRecovery()` starts a background listener that drains the native SQS dead letter
queue(s) and copies each message into Wolverine's durable dead letter storage (the
`wolverine_dead_letters` table), where it becomes queryable and replayable through `IDeadLetters`:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Durable message storage is required — the recovered dead letters
// are written to the wolverine_dead_letters table.
opts.PersistMessagesWithPostgresql(connectionString);

opts.UseAmazonSqsTransport()
.AutoProvision()
// Drain every dead letter queue used by a listener and copy the
// messages into Wolverine's durable dead letter storage.
.EnableDeadLetterQueueRecovery();

opts.ListenToSqsQueue("orders");
}).StartAsync();
```

With no arguments, every distinct dead letter queue used by a listening SQS queue is drained. Pass
explicit names when the dead letter queues you want to recover from aren't directly attached to a
Wolverine listener — for example, queues fed by an SQS [native redrive policy](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html)
that you manage yourself:

```csharp
opts.UseAmazonSqsTransport()
.EnableDeadLetterQueueRecovery("orders-dlq", "payments-dlq");
```

The names are sanitized with the same `SanitizeSqsName` normalization used everywhere else, so dots
become hyphens consistently.

Each recovered message is reconstructed back into a Wolverine `Envelope`. When Wolverine itself moved
the message to the dead letter queue, the original exception type and message are preserved (stamped
on the envelope when it failed). A message is only deleted from the SQS dead letter queue *after* it
has been safely written to durable storage, so a transient database outage never loses a dead letter.

::: tip
This is the SQS equivalent of the [RabbitMQ dead letter recovery](../rabbitmq/deadletterqueues.html)
feature, and uses the same `EnableDeadLetterQueueRecovery()` syntax across every native-dead-letter
transport.
:::



Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Resources;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.AmazonSqs.Internal;
using Wolverine.Marten;
using Wolverine.Persistence.Durability;
using Wolverine.Persistence.Durability.DeadLetterManagement;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Xunit;

namespace Wolverine.AmazonSqs.Tests;

/// <summary>
/// End-to-end coverage for the SQS <c>EnableDeadLetterQueueRecovery()</c> bridge added for #3103:
/// a failing message is moved to the native SQS dead letter queue, the background recovery listener
/// drains it, and the dead letter ends up queryable in Wolverine's durable storage.
/// </summary>
public class dead_letter_queue_recovery : IAsyncLifetime
{
private readonly string _queueName = $"dlq-recovery-{Guid.NewGuid():N}";
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ServiceName = "SqsDlqRecoveryTest";
opts.Durability.Mode = DurabilityMode.Solo;

opts.UseAmazonSqsTransportLocally()
.AutoProvision()
.AutoPurgeOnStartup()
.EnableDeadLetterQueueRecovery();

opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "sqs_dlq_recovery";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine();

opts.ListenToSqsQueue(_queueName);
opts.PublishMessage<SqsRecoveryTestMessage>().ToSqsQueue(_queueName);

opts.Policies.DisableConventionalLocalRouting();
opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

await _host.ResetResourceState();
}

public async Task DisposeAsync()
{
if (_host != null)
{
await _host.StopAsync();
_host.Dispose();
}
}

[Fact]
public async Task recovers_native_dlq_message_to_durable_storage()
{
await _host
.TrackActivity()
.DoNotAssertOnExceptionsDetected()
.Timeout(30.Seconds())
.PublishMessageAndWaitAsync(new SqsRecoveryTestMessage("test-recovery"));

var messageStore = _host.Services.GetRequiredService<IMessageStore>();
var query = new DeadLetterEnvelopeQuery { PageSize = 100 };

DeadLetterEnvelopeResults? results = null;
var deadline = DateTimeOffset.UtcNow.Add(30.Seconds());

while (DateTimeOffset.UtcNow < deadline)
{
results = await messageStore.DeadLetters.QueryAsync(query, CancellationToken.None);
if (results.Envelopes.Any()) break;
await Task.Delay(500);
}

results.ShouldNotBeNull();
results.Envelopes.ShouldNotBeEmpty(
"The SQS dead letter recovery listener should have recovered the failed message into durable storage");

var envelope = results.Envelopes.First();
envelope.MessageType.ShouldNotBeNullOrEmpty();
}
}

public record SqsRecoveryTestMessage(string Value);

public static class SqsRecoveryTestMessageHandler
{
public static void Handle(SqsRecoveryTestMessage message)
{
throw new DivideByZeroException($"SQS recovery test failure: {message.Value}");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.AmazonSqs.Internal;
using Xunit;

namespace Wolverine.AmazonSqs.Tests;

/// <summary>
/// Infrastructure-free coverage of the <c>EnableDeadLetterQueueRecovery()</c> registration wiring
/// (#3103): the settings holder and the background recovery listener should be registered exactly
/// once, and explicit queue names should be sanitized and accumulated.
/// </summary>
public class dead_letter_queue_recovery_registration
{
private static (AmazonSqsDeadLetterQueueRecoverySettings settings, int hostedServiceCount) Build(
Action<AmazonSqsTransportConfiguration> configure)
{
var options = new WolverineOptions();
var transport = options.Transports.GetOrCreate<AmazonSqsTransport>();
var configuration = new AmazonSqsTransportConfiguration(transport, options);

configure(configuration);

var settings = options.Services
.Where(x => x.ServiceType == typeof(AmazonSqsDeadLetterQueueRecoverySettings))
.Select(x => x.ImplementationInstance)
.OfType<AmazonSqsDeadLetterQueueRecoverySettings>()
.Single();

var hostedServiceCount = options.Services
.Count(x => x.ServiceType == typeof(IHostedService)
&& x.ImplementationType == typeof(SqsDeadLetterQueueListener));

return (settings, hostedServiceCount);
}

[Fact]
public void no_arg_overload_registers_listener_with_no_explicit_queue_names()
{
var (settings, hostedServiceCount) = Build(c => c.EnableDeadLetterQueueRecovery());

settings.QueueNames.ShouldBeEmpty();
hostedServiceCount.ShouldBe(1);
}

[Fact]
public void params_overload_sanitizes_and_records_queue_names()
{
var (settings, _) = Build(c => c.EnableDeadLetterQueueRecovery("acme.payments.dlq", "orders-dlq"));

// Periods are normalized to hyphens just like every other SQS name.
settings.QueueNames.ShouldBe(["acme-payments-dlq", "orders-dlq"]);
}

[Fact]
public void repeated_calls_register_the_listener_only_once()
{
var (settings, hostedServiceCount) = Build(c =>
{
c.EnableDeadLetterQueueRecovery("first-dlq");
c.EnableDeadLetterQueueRecovery("second-dlq");
c.EnableDeadLetterQueueRecovery();
});

hostedServiceCount.ShouldBe(1);
settings.QueueNames.ShouldBe(["first-dlq", "second-dlq"]);
}
}
Loading
Loading