From c3136dc1e36909f79cd8178a46ed2e6bb3cefbc8 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Sun, 14 Jun 2026 20:03:00 -0500 Subject: [PATCH] Generalize EnableDeadLetterQueueRecovery to SQS and Azure Service Bus (#3103) RabbitMQ has long had a first-class native-DLQ -> durable-storage bridge via EnableDeadLetterQueueRecovery(). This brings the same one-call feature, with the same syntax, to Amazon SQS and Azure Service Bus so natively dead-lettered messages become queryable/replayable through IDeadLetters (and tools like CritterWatch) instead of being invisible in a broker DLQ. - SqsDeadLetterQueueListener: background service that drains the SQS dead letter queue(s) used by listeners (or explicit names) and writes each message into durable storage via MoveToDeadLetterStorageAsync. - AzureServiceBusDeadLetterQueueListener: drains BOTH the Wolverine-managed dead letter queue (buffered/durable endpoints) AND the native $DeadLetterQueue sub-queues (inline endpoints + ASB-native TTL/max-delivery), reading exception metadata from stamped headers or DeadLetterReason/ DeadLetterErrorDescription. - Shared Wolverine.Transports.DeadLetterRecoveredException implements IDeadLetterExceptionInfo so the original exception type is preserved in durable storage rather than the recovery wrapper. - Messages are only deleted/completed off the broker AFTER being safely persisted, so a transient storage outage never loses a dead letter. Docs: new "Recovering Native Dead Letters to Durable Storage" sections on the SQS, Azure Service Bus, and (previously undocumented) RabbitMQ dead letter queue pages, cross-linked. Tests: per-transport registration tests (no infra) plus end-to-end recovery integration tests verified locally against LocalStack + Postgres (SQS) and the Azure Service Bus emulator + Postgres (ASB). Closes #3103. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../azureservicebus/deadletterqueues.md | 55 ++++ .../transports/rabbitmq/deadletterqueues.md | 47 +++ .../transports/sqs/deadletterqueues.md | 52 +++ .../dead_letter_queue_recovery.cs | 106 +++++++ ...dead_letter_queue_recovery_registration.cs | 69 ++++ .../AmazonSqsTransportConfiguration.cs | 61 ++++ .../Internal/SqsDeadLetterQueueListener.cs | 229 ++++++++++++++ .../dead_letter_queue_recovery.cs | 111 +++++++ ...dead_letter_queue_recovery_registration.cs | 68 ++++ .../AzureServiceBusConfiguration.cs | 63 ++++ .../AzureServiceBusDeadLetterQueueListener.cs | 295 ++++++++++++++++++ .../DeadLetterRecoveredException.cs | 30 ++ 12 files changed, 1186 insertions(+) create mode 100644 src/Transports/AWS/Wolverine.AmazonSqs.Tests/dead_letter_queue_recovery.cs create mode 100644 src/Transports/AWS/Wolverine.AmazonSqs.Tests/dead_letter_queue_recovery_registration.cs create mode 100644 src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsDeadLetterQueueListener.cs create mode 100644 src/Transports/Azure/Wolverine.AzureServiceBus.Tests/dead_letter_queue_recovery.cs create mode 100644 src/Transports/Azure/Wolverine.AzureServiceBus.Tests/dead_letter_queue_recovery_registration.cs create mode 100644 src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusDeadLetterQueueListener.cs create mode 100644 src/Wolverine/Transports/DeadLetterRecoveredException.cs diff --git a/docs/guide/messaging/transports/azureservicebus/deadletterqueues.md b/docs/guide/messaging/transports/azureservicebus/deadletterqueues.md index cfc648069..c634fec4f 100644 --- a/docs/guide/messaging/transports/azureservicebus/deadletterqueues.md +++ b/docs/guide/messaging/transports/azureservicebus/deadletterqueues.md @@ -123,3 +123,58 @@ await host.StartAsync(); ``` snippet source | anchor + +## Recovering Native Dead Letters to Durable Storage + +Azure Service Bus dead letters land in one of two places depending on the endpoint mode: buffered and +durable endpoints move failures to a Wolverine-managed dead letter **queue** (default +`wolverine-dead-letter-queue`), while inline endpoints — and Azure Service Bus itself, on TTL or +max-delivery — use the native +[`$DeadLetterQueue` sub-queue](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues) +of the source entity. Either way, those messages are only visible through Azure tooling. Tools that +manage Wolverine's *durable* dead letters (for example [CritterWatch](https://github.com/JasperFx/CritterWatch)) +can't see or replay them. + +`EnableDeadLetterQueueRecovery()` starts a background listener that drains **both** kinds of source — +the Wolverine-managed dead letter queue(s) and the native `$DeadLetterQueue` sub-queue of every +listening queue and subscription — copying each message into Wolverine's durable dead letter storage +(the `wolverine_dead_letters` table), where it becomes queryable and replayable through +`IDeadLetters`: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Durable message storage is required — the recovered dead letters + // are written to the wolverine_dead_letters table. + opts.PersistMessagesWithPostgresql(connectionString); + + opts.UseAzureServiceBus(connectionString) + .AutoProvision() + // Drain the native $DeadLetterQueue sub-queue of every listening + // queue and subscription into Wolverine's durable storage. + .EnableDeadLetterQueueRecovery(); + + opts.ListenToAzureServiceBusQueue("orders"); + }).StartAsync(); +``` + +With no arguments, every managed dead letter queue and every listening queue/subscription's native +sub-queue is drained. Pass explicit names (a managed dead letter queue name, a listening queue name, +or a subscription endpoint name) to restrict recovery to a subset: + +```csharp +opts.UseAzureServiceBus(connectionString) + .EnableDeadLetterQueueRecovery("orders", "shipments"); +``` + +The original exception type and message are preserved: from the stamped failure metadata for messages +in the managed dead letter queue, or from the native `DeadLetterReason`/`DeadLetterErrorDescription` +for messages in a native sub-queue. A message is only completed off its source *after* it has been +safely written to durable storage, so a transient database outage never loses a dead letter. + +::: tip +This is the Azure Service Bus equivalent of the +[RabbitMQ dead letter recovery](../rabbitmq/deadletterqueues.html) feature, and uses the same +`EnableDeadLetterQueueRecovery()` syntax across every native-dead-letter transport. +::: diff --git a/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md b/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md index df6584479..e48f7f971 100644 --- a/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md +++ b/docs/guide/messaging/transports/rabbitmq/deadletterqueues.md @@ -126,6 +126,53 @@ using var host = await Host.CreateDefaultBuilder() snippet source | anchor +## Recovering Native Dead Letters to Durable Storage + +With native dead lettering, failed messages land in a RabbitMQ dead letter queue and are only visible +through RabbitMQ tooling. Tools that manage Wolverine's *durable* dead letters (for example +[CritterWatch](https://github.com/JasperFx/CritterWatch)) can't see or replay them. + +`EnableDeadLetterQueueRecovery()` starts a background listener that consumes the native dead letter +queue(s) and copies each message into Wolverine's durable dead letter storage (the +`wolverine_dead_letters` table), where it becomes queryable and replayable through `IDeadLetters`: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Durable message storage is required — the recovered dead letters + // are written to the wolverine_dead_letters table. + opts.PersistMessagesWithPostgresql(connectionString); + + opts.UseRabbitMq() + .AutoProvision() + // Consume the native dead letter queue and copy the messages into + // Wolverine's durable dead letter storage. + .EnableDeadLetterQueueRecovery(); + + opts.ListenToRabbitQueue("orders"); + }).StartAsync(); +``` + +With no arguments, the default `wolverine-dead-letter-queue` is consumed. Pass explicit queue names +to recover from custom-named dead letter queues: + +```csharp +opts.UseRabbitMq() + .EnableDeadLetterQueueRecovery("orders-errors", "shipments-errors"); +``` + +The original exception type and message are reconstructed from the RabbitMQ `x-death` metadata (and +from the [enhanced dead lettering](#enhanced-dead-lettering-with-exception-metadata) headers when +those are present). + +::: tip +The same `EnableDeadLetterQueueRecovery()` syntax is available on the +[Amazon SQS](../sqs/deadletterqueues.html) and +[Azure Service Bus](../azureservicebus/deadletterqueues.html) transports, so "bridge my native dead +letters into durable storage" is a one-call decision on every native-dead-letter transport. +::: + diff --git a/docs/guide/messaging/transports/sqs/deadletterqueues.md b/docs/guide/messaging/transports/sqs/deadletterqueues.md index 9dc67f6b0..3219bff3c 100644 --- a/docs/guide/messaging/transports/sqs/deadletterqueues.md +++ b/docs/guide/messaging/transports/sqs/deadletterqueues.md @@ -85,5 +85,57 @@ using var host = await Host.CreateDefaultBuilder() This would force Wolverine to use any persistent envelope storage for dead letter queueing. +## Recovering Native Dead Letters to Durable Storage + +By default an Amazon SQS dead letter queue is just another SQS queue — its messages are only visible +through the AWS console or SDK, and tools that manage Wolverine's *durable* dead letters (for example +[CritterWatch](https://github.com/JasperFx/CritterWatch)) can't see or replay them. + +`EnableDeadLetterQueueRecovery()` starts a background listener that drains the native SQS dead letter +queue(s) and copies each message into Wolverine's durable dead letter storage (the +`wolverine_dead_letters` table), where it becomes queryable and replayable through `IDeadLetters`: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Durable message storage is required — the recovered dead letters + // are written to the wolverine_dead_letters table. + opts.PersistMessagesWithPostgresql(connectionString); + + opts.UseAmazonSqsTransport() + .AutoProvision() + // Drain every dead letter queue used by a listener and copy the + // messages into Wolverine's durable dead letter storage. + .EnableDeadLetterQueueRecovery(); + + opts.ListenToSqsQueue("orders"); + }).StartAsync(); +``` + +With no arguments, every distinct dead letter queue used by a listening SQS queue is drained. Pass +explicit names when the dead letter queues you want to recover from aren't directly attached to a +Wolverine listener — for example, queues fed by an SQS [native redrive policy](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) +that you manage yourself: + +```csharp +opts.UseAmazonSqsTransport() + .EnableDeadLetterQueueRecovery("orders-dlq", "payments-dlq"); +``` + +The names are sanitized with the same `SanitizeSqsName` normalization used everywhere else, so dots +become hyphens consistently. + +Each recovered message is reconstructed back into a Wolverine `Envelope`. When Wolverine itself moved +the message to the dead letter queue, the original exception type and message are preserved (stamped +on the envelope when it failed). A message is only deleted from the SQS dead letter queue *after* it +has been safely written to durable storage, so a transient database outage never loses a dead letter. + +::: tip +This is the SQS equivalent of the [RabbitMQ dead letter recovery](../rabbitmq/deadletterqueues.html) +feature, and uses the same `EnableDeadLetterQueueRecovery()` syntax across every native-dead-letter +transport. +::: + diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/dead_letter_queue_recovery.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/dead_letter_queue_recovery.cs new file mode 100644 index 000000000..41ae99026 --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/dead_letter_queue_recovery.cs @@ -0,0 +1,106 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.AmazonSqs.Internal; +using Wolverine.Marten; +using Wolverine.Persistence.Durability; +using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.AmazonSqs.Tests; + +/// +/// End-to-end coverage for the SQS EnableDeadLetterQueueRecovery() bridge added for #3103: +/// a failing message is moved to the native SQS dead letter queue, the background recovery listener +/// drains it, and the dead letter ends up queryable in Wolverine's durable storage. +/// +public class dead_letter_queue_recovery : IAsyncLifetime +{ + private readonly string _queueName = $"dlq-recovery-{Guid.NewGuid():N}"; + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "SqsDlqRecoveryTest"; + opts.Durability.Mode = DurabilityMode.Solo; + + opts.UseAmazonSqsTransportLocally() + .AutoProvision() + .AutoPurgeOnStartup() + .EnableDeadLetterQueueRecovery(); + + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "sqs_dlq_recovery"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(); + + opts.ListenToSqsQueue(_queueName); + opts.PublishMessage().ToSqsQueue(_queueName); + + opts.Policies.DisableConventionalLocalRouting(); + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + await _host.ResetResourceState(); + } + + public async Task DisposeAsync() + { + if (_host != null) + { + await _host.StopAsync(); + _host.Dispose(); + } + } + + [Fact] + public async Task recovers_native_dlq_message_to_durable_storage() + { + await _host + .TrackActivity() + .DoNotAssertOnExceptionsDetected() + .Timeout(30.Seconds()) + .PublishMessageAndWaitAsync(new SqsRecoveryTestMessage("test-recovery")); + + var messageStore = _host.Services.GetRequiredService(); + var query = new DeadLetterEnvelopeQuery { PageSize = 100 }; + + DeadLetterEnvelopeResults? results = null; + var deadline = DateTimeOffset.UtcNow.Add(30.Seconds()); + + while (DateTimeOffset.UtcNow < deadline) + { + results = await messageStore.DeadLetters.QueryAsync(query, CancellationToken.None); + if (results.Envelopes.Any()) break; + await Task.Delay(500); + } + + results.ShouldNotBeNull(); + results.Envelopes.ShouldNotBeEmpty( + "The SQS dead letter recovery listener should have recovered the failed message into durable storage"); + + var envelope = results.Envelopes.First(); + envelope.MessageType.ShouldNotBeNullOrEmpty(); + } +} + +public record SqsRecoveryTestMessage(string Value); + +public static class SqsRecoveryTestMessageHandler +{ + public static void Handle(SqsRecoveryTestMessage message) + { + throw new DivideByZeroException($"SQS recovery test failure: {message.Value}"); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/dead_letter_queue_recovery_registration.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/dead_letter_queue_recovery_registration.cs new file mode 100644 index 000000000..40b253d66 --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/dead_letter_queue_recovery_registration.cs @@ -0,0 +1,69 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.AmazonSqs.Internal; +using Xunit; + +namespace Wolverine.AmazonSqs.Tests; + +/// +/// Infrastructure-free coverage of the EnableDeadLetterQueueRecovery() registration wiring +/// (#3103): the settings holder and the background recovery listener should be registered exactly +/// once, and explicit queue names should be sanitized and accumulated. +/// +public class dead_letter_queue_recovery_registration +{ + private static (AmazonSqsDeadLetterQueueRecoverySettings settings, int hostedServiceCount) Build( + Action configure) + { + var options = new WolverineOptions(); + var transport = options.Transports.GetOrCreate(); + var configuration = new AmazonSqsTransportConfiguration(transport, options); + + configure(configuration); + + var settings = options.Services + .Where(x => x.ServiceType == typeof(AmazonSqsDeadLetterQueueRecoverySettings)) + .Select(x => x.ImplementationInstance) + .OfType() + .Single(); + + var hostedServiceCount = options.Services + .Count(x => x.ServiceType == typeof(IHostedService) + && x.ImplementationType == typeof(SqsDeadLetterQueueListener)); + + return (settings, hostedServiceCount); + } + + [Fact] + public void no_arg_overload_registers_listener_with_no_explicit_queue_names() + { + var (settings, hostedServiceCount) = Build(c => c.EnableDeadLetterQueueRecovery()); + + settings.QueueNames.ShouldBeEmpty(); + hostedServiceCount.ShouldBe(1); + } + + [Fact] + public void params_overload_sanitizes_and_records_queue_names() + { + var (settings, _) = Build(c => c.EnableDeadLetterQueueRecovery("acme.payments.dlq", "orders-dlq")); + + // Periods are normalized to hyphens just like every other SQS name. + settings.QueueNames.ShouldBe(["acme-payments-dlq", "orders-dlq"]); + } + + [Fact] + public void repeated_calls_register_the_listener_only_once() + { + var (settings, hostedServiceCount) = Build(c => + { + c.EnableDeadLetterQueueRecovery("first-dlq"); + c.EnableDeadLetterQueueRecovery("second-dlq"); + c.EnableDeadLetterQueueRecovery(); + }); + + hostedServiceCount.ShouldBe(1); + settings.QueueNames.ShouldBe(["first-dlq", "second-dlq"]); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransportConfiguration.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransportConfiguration.cs index 6eb4193a1..08b6dd8b0 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransportConfiguration.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransportConfiguration.cs @@ -1,4 +1,5 @@ using Amazon.Runtime; +using Microsoft.Extensions.DependencyInjection; using Wolverine.Configuration; using Wolverine.Runtime; using Wolverine.Transports; @@ -147,6 +148,66 @@ public AmazonSqsTransportConfiguration DefaultDeadLetterQueueName(string deadLet return this; } + /// + /// Enable a background listener that drains the native Amazon SQS dead letter queue(s) and + /// recovers the messages into Wolverine's durable dead letter storage (the + /// wolverine_dead_letters table), making natively dead-lettered messages queryable and + /// replayable through IDeadLetters and tools like CritterWatch. This is the SQS analogue + /// of RabbitMQ's EnableDeadLetterQueueRecovery(). + /// + /// With no arguments, every distinct dead letter queue used by a listening SQS queue is drained. + /// Requires Wolverine's durable message storage (a database) to be configured. + /// + /// + public AmazonSqsTransportConfiguration EnableDeadLetterQueueRecovery() + { + ensureRecoveryServicesRegistered(); + return this; + } + + /// + /// Enable a background listener that drains the named Amazon SQS dead letter queue(s) and + /// recovers the messages into Wolverine's durable dead letter storage. Use this overload when + /// the dead letter queues you want to recover from are not directly attached to a Wolverine + /// listener (for example, queues fed by an SQS native redrive policy you manage yourself). + /// + /// The names of the SQS dead letter queues to drain. + /// + public AmazonSqsTransportConfiguration EnableDeadLetterQueueRecovery(params string[] deadLetterQueueNames) + { + var settings = ensureRecoveryServicesRegistered(); + foreach (var name in deadLetterQueueNames) + { + var sanitized = AmazonSqsTransport.SanitizeSqsName(name); + if (!settings.QueueNames.Contains(sanitized)) + { + settings.QueueNames.Add(sanitized); + } + } + + return this; + } + + private AmazonSqsDeadLetterQueueRecoverySettings ensureRecoveryServicesRegistered() + { + var existing = Options.Services + .Where(s => s.ServiceType == typeof(AmazonSqsDeadLetterQueueRecoverySettings)) + .Select(s => s.ImplementationInstance) + .OfType() + .FirstOrDefault(); + + if (existing != null) + { + return existing; + } + + var settings = new AmazonSqsDeadLetterQueueRecoverySettings(); + Options.Services.AddSingleton(settings); + Options.Services.AddSingleton(Transport); + Options.Services.AddHostedService(); + return settings; + } + /// /// Enable Wolverine system queues for request/reply support. /// Creates a per-node response queue that is automatically cleaned up. diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsDeadLetterQueueListener.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsDeadLetterQueueListener.cs new file mode 100644 index 000000000..64f34f94a --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsDeadLetterQueueListener.cs @@ -0,0 +1,229 @@ +using Amazon.SQS.Model; +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Wolverine.Runtime; +using Wolverine.Transports; + +namespace Wolverine.AmazonSqs.Internal; + +/// +/// Configuration holder for Amazon SQS dead letter queue recovery. Registered as a singleton +/// so the listener can discover which dead letter queues to drain. When no queue names are +/// supplied, the listener recovers from every distinct dead-letter queue used by a listening +/// SQS queue. +/// +public class AmazonSqsDeadLetterQueueRecoverySettings +{ + public List QueueNames { get; } = new(); +} + +/// +/// Background service that drains one or more Amazon SQS dead letter queues and recovers the +/// messages into Wolverine's durable dead letter storage (the wolverine_dead_letters table). +/// This bridges SQS's native dead-lettering with Wolverine's database-backed dead letter management, +/// so natively dead-lettered messages become queryable and replayable through +/// and tools like CritterWatch. This mirrors the RabbitMQ EnableDeadLetterQueueRecovery() feature. +/// +public class SqsDeadLetterQueueListener : BackgroundService +{ + private readonly AmazonSqsTransport _transport; + private readonly IWolverineRuntime _runtime; + private readonly AmazonSqsDeadLetterQueueRecoverySettings _settings; + private readonly ILogger _logger; + + public SqsDeadLetterQueueListener(AmazonSqsTransport transport, IWolverineRuntime runtime, + AmazonSqsDeadLetterQueueRecoverySettings settings, ILogger logger) + { + _transport = transport; + _runtime = runtime; + _settings = settings; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + try + { + // The transport's SQS client is created when Wolverine connects the transport during + // startup. Wait for it rather than building a second client so we share configuration, + // credentials, and (for LocalStack) the service URL. + await waitForClientAsync(stoppingToken); + + var queueNames = resolveQueueNames(); + if (queueNames.Count == 0) + { + _logger.LogInformation( + "Amazon SQS dead letter queue recovery was enabled, but no dead letter queues could be resolved. No recovery listeners were started."); + return; + } + + var tasks = queueNames.Select(name => drainQueueLoopAsync(name, stoppingToken)).ToArray(); + await Task.WhenAll(tasks); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // Normal shutdown + } + catch (Exception e) + { + _logger.LogError(e, "Amazon SQS dead letter queue recovery listener failed"); + } + } + + private async Task waitForClientAsync(CancellationToken token) + { + while (_transport.Client == null) + { + token.ThrowIfCancellationRequested(); + await Task.Delay(250.Milliseconds(), token); + } + } + + private List resolveQueueNames() + { + if (_settings.QueueNames.Count > 0) + { + return _settings.QueueNames.Distinct().ToList(); + } + + return _transport.Queues + .Where(x => x.IsListener) + .Select(x => x.DeadLetterQueueName) + .Where(x => x.IsNotEmpty()) + .Distinct() + .Select(x => x!) + .ToList(); + } + + private async Task drainQueueLoopAsync(string queueName, CancellationToken stoppingToken) + { + var queue = _transport.Queues[queueName]; + var failedCount = 0; + + _logger.LogInformation( + "Started Amazon SQS dead letter queue recovery listener on queue '{QueueName}'. Messages will be recovered to durable storage.", + queueName); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + if (queue.QueueUrl.IsEmpty()) + { + await queue.InitializeAsync(_logger); + } + + var request = new ReceiveMessageRequest(queue.QueueUrl) + { + MaxNumberOfMessages = 10, + WaitTimeSeconds = 5, + VisibilityTimeout = 60, + MessageAttributeNames = ["All"] + }; + + var results = await _transport.Client!.ReceiveMessageAsync(request, stoppingToken); + + failedCount = 0; + + if (results.Messages == null || results.Messages.Count == 0) + { + await Task.Delay(250.Milliseconds(), stoppingToken); + continue; + } + + foreach (var message in results.Messages) + { + await recoverMessageAsync(queue, message, stoppingToken); + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception e) + { + failedCount++; + var pause = failedCount > 5 ? 5.Seconds() : (failedCount * 200).Milliseconds(); + _logger.LogError(e, + "Error while recovering dead letters from Amazon SQS queue '{QueueName}'", queueName); + await Task.Delay(pause, stoppingToken); + } + } + } + + private async Task recoverMessageAsync(AmazonSqsQueue queue, Message message, CancellationToken token) + { + try + { + var envelope = new Envelope(); + string exceptionType; + string exceptionMessage; + + try + { + var mapper = queue.BuildMapper(_runtime); + mapper.ReadEnvelopeData(envelope, message.Body, message.MessageAttributes ?? new Dictionary()); + + // Wolverine stamps these onto the envelope headers when it moves a failed message to + // the dead letter queue (see DeadLetterQueueConstants.StampFailureMetadata). + exceptionType = headerOrDefault(envelope, DeadLetterQueueConstants.ExceptionTypeHeader, "Unknown"); + exceptionMessage = headerOrDefault(envelope, DeadLetterQueueConstants.ExceptionMessageHeader, + "Recovered from Amazon SQS dead letter queue"); + } + catch (Exception readError) + { + // The dead letter body wasn't a Wolverine envelope (for example, a message moved by a + // native SQS redrive policy from a non-Wolverine producer). Preserve it with a minimal + // envelope so it is still visible and not silently lost. + _logger.LogWarning(readError, + "Could not reconstruct a Wolverine envelope from SQS dead letter message {MessageId} on '{QueueName}'. Persisting a minimal envelope.", + message.MessageId, queue.QueueName); + + envelope = new Envelope + { + Data = System.Text.Encoding.UTF8.GetBytes(message.Body ?? string.Empty), + ContentType = EnvelopeConstants.JsonContentType, + MessageType = "unknown" + }; + exceptionType = "Unknown"; + exceptionMessage = "Recovered from Amazon SQS dead letter queue (non-Wolverine message)"; + } + + if (envelope.Id == Guid.Empty) + { + envelope.Id = Guid.NewGuid(); + } + + envelope.Destination ??= queue.Uri; + envelope.Source ??= queue.Uri.ToString(); + if (envelope.SentAt == default) + { + envelope.SentAt = DateTimeOffset.UtcNow; + } + + var exception = new DeadLetterRecoveredException(exceptionType, exceptionMessage); + await _runtime.Storage.Inbox.MoveToDeadLetterStorageAsync(envelope, exception); + + // Only delete once the dead letter has been safely persisted. + await _transport.Client!.DeleteMessageAsync(queue.QueueUrl, message.ReceiptHandle, token); + + _logger.LogInformation( + "Recovered dead letter {MessageId} (type={MessageType}) from Amazon SQS queue '{QueueName}' to durable storage.", + envelope.Id, envelope.MessageType ?? "unknown", queue.QueueName); + } + catch (Exception e) + { + // Leave the message on the queue (its visibility timeout returns it) so a transient + // storage failure doesn't lose the dead letter. + _logger.LogError(e, + "Failed to recover SQS dead letter message {MessageId} from '{QueueName}'", message.MessageId, + queue.QueueName); + } + } + + private static string headerOrDefault(Envelope envelope, string key, string fallback) + { + return envelope.Headers.TryGetValue(key, out var value) && value.IsNotEmpty() ? value! : fallback; + } +} diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/dead_letter_queue_recovery.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/dead_letter_queue_recovery.cs new file mode 100644 index 000000000..b4ab8683d --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/dead_letter_queue_recovery.cs @@ -0,0 +1,111 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Marten; +using Wolverine.Persistence.Durability; +using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.AzureServiceBus.Tests; + +/// +/// End-to-end coverage for the Azure Service Bus EnableDeadLetterQueueRecovery() bridge added +/// for #3103. A failing handler makes Wolverine natively dead-letter the message to the queue's +/// $DeadLetterQueue sub-queue (with DeadLetterReason/DeadLetterErrorDescription +/// set). The background recovery listener drains the sub-queue and the dead letter ends up queryable +/// in Wolverine's durable storage. +/// +[Trait("Category", "Flaky")] +public class dead_letter_queue_recovery : IAsyncLifetime +{ + private readonly string _queueName = $"dlqrecovery{Guid.NewGuid():N}"; + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "AsbDlqRecoveryTest"; + opts.Durability.Mode = DurabilityMode.Solo; + + opts.UseAzureServiceBusTesting() + .AutoProvision() + .AutoPurgeOnStartup() + .EnableDeadLetterQueueRecovery(); + + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "asb_dlq_recovery"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(); + + opts.ListenToAzureServiceBusQueue(_queueName); + opts.PublishMessage().ToAzureServiceBusQueue(_queueName); + + opts.Policies.DisableConventionalLocalRouting(); + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + await _host.ResetResourceState(); + } + + public async Task DisposeAsync() + { + if (_host != null) + { + await _host.StopAsync(); + _host.Dispose(); + } + } + + [Fact] + public async Task recovers_native_dlq_message_to_durable_storage() + { + await _host + .TrackActivity() + .DoNotAssertOnExceptionsDetected() + .Timeout(30.Seconds()) + .PublishMessageAndWaitAsync(new AsbRecoveryTestMessage("test-recovery")); + + var messageStore = _host.Services.GetRequiredService(); + var query = new DeadLetterEnvelopeQuery { PageSize = 100 }; + + DeadLetterEnvelopeResults? results = null; + var deadline = DateTimeOffset.UtcNow.Add(30.Seconds()); + + while (DateTimeOffset.UtcNow < deadline) + { + results = await messageStore.DeadLetters.QueryAsync(query, CancellationToken.None); + if (results.Envelopes.Any()) break; + await Task.Delay(500); + } + + results.ShouldNotBeNull(); + results.Envelopes.ShouldNotBeEmpty( + "The Azure Service Bus dead letter recovery listener should have recovered the natively dead-lettered message into durable storage"); + + var envelope = results.Envelopes.First(); + envelope.MessageType.ShouldNotBeNullOrEmpty(); + + // The native DeadLetterReason (the exception type) should be preserved as the dead letter's + // exception type rather than the DeadLetterRecoveredException wrapper. + envelope.ExceptionType.ShouldNotBeNullOrEmpty(); + } +} + +public record AsbRecoveryTestMessage(string Value); + +public static class AsbRecoveryTestMessageHandler +{ + public static void Handle(AsbRecoveryTestMessage message) + { + throw new DivideByZeroException($"ASB recovery test failure: {message.Value}"); + } +} diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/dead_letter_queue_recovery_registration.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/dead_letter_queue_recovery_registration.cs new file mode 100644 index 000000000..59a75ef99 --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/dead_letter_queue_recovery_registration.cs @@ -0,0 +1,68 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.AzureServiceBus.Internal; +using Xunit; + +namespace Wolverine.AzureServiceBus.Tests; + +/// +/// Infrastructure-free coverage of the Azure Service Bus EnableDeadLetterQueueRecovery() +/// registration wiring (#3103): the settings holder and the background recovery listener should be +/// registered exactly once, and explicit queue/subscription names should be accumulated. +/// +public class dead_letter_queue_recovery_registration +{ + private static (AzureServiceBusDeadLetterQueueRecoverySettings settings, int hostedServiceCount) Build( + Action configure) + { + var options = new WolverineOptions(); + var transport = options.Transports.GetOrCreate(); + var configuration = new AzureServiceBusConfiguration(transport, options); + + configure(configuration); + + var settings = options.Services + .Where(x => x.ServiceType == typeof(AzureServiceBusDeadLetterQueueRecoverySettings)) + .Select(x => x.ImplementationInstance) + .OfType() + .Single(); + + var hostedServiceCount = options.Services + .Count(x => x.ServiceType == typeof(IHostedService) + && x.ImplementationType == typeof(AzureServiceBusDeadLetterQueueListener)); + + return (settings, hostedServiceCount); + } + + [Fact] + public void no_arg_overload_registers_listener_with_no_explicit_names() + { + var (settings, hostedServiceCount) = Build(c => c.EnableDeadLetterQueueRecovery()); + + settings.EndpointNames.ShouldBeEmpty(); + hostedServiceCount.ShouldBe(1); + } + + [Fact] + public void params_overload_records_endpoint_names() + { + var (settings, _) = Build(c => c.EnableDeadLetterQueueRecovery("orders", "shipments")); + + settings.EndpointNames.ShouldBe(["orders", "shipments"]); + } + + [Fact] + public void repeated_calls_register_the_listener_only_once() + { + var (settings, hostedServiceCount) = Build(c => + { + c.EnableDeadLetterQueueRecovery("orders"); + c.EnableDeadLetterQueueRecovery("shipments"); + c.EnableDeadLetterQueueRecovery(); + }); + + hostedServiceCount.ShouldBe(1); + settings.EndpointNames.ShouldBe(["orders", "shipments"]); + } +} diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs index 598d46c62..df455e757 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs @@ -1,5 +1,6 @@ using Azure.Messaging.ServiceBus.Administration; using JasperFx.Core; +using Microsoft.Extensions.DependencyInjection; using Wolverine.AzureServiceBus.Internal; using Wolverine.Configuration; using Wolverine.Transports; @@ -165,6 +166,68 @@ public AzureServiceBusConfiguration UseConventionalRouting(NamingSource namingSo return this; } + /// + /// Enable a background listener that drains the native Azure Service Bus dead letter sub-queues + /// ($DeadLetterQueue) of every listening queue and subscription, recovering the messages + /// into Wolverine's durable dead letter storage (the wolverine_dead_letters table). This + /// makes natively dead-lettered messages queryable and replayable through IDeadLetters + /// and tools like CritterWatch. It is the Azure Service Bus analogue of RabbitMQ's + /// EnableDeadLetterQueueRecovery(), and reads the native + /// DeadLetterReason/DeadLetterErrorDescription as the recorded failure metadata. + /// + /// Requires Wolverine's durable message storage (a database) to be configured. + /// + /// + public AzureServiceBusConfiguration EnableDeadLetterQueueRecovery() + { + ensureRecoveryServicesRegistered(); + return this; + } + + /// + /// Enable a background listener that drains the native Azure Service Bus dead letter sub-queues + /// of only the named queues (or subscription endpoint names), recovering the messages into + /// Wolverine's durable dead letter storage. + /// + /// + /// The queue names — or subscription endpoint names — whose native dead letter sub-queues should + /// be drained. + /// + /// + public AzureServiceBusConfiguration EnableDeadLetterQueueRecovery(params string[] queueOrSubscriptionNames) + { + var settings = ensureRecoveryServicesRegistered(); + foreach (var name in queueOrSubscriptionNames) + { + if (!settings.EndpointNames.Contains(name)) + { + settings.EndpointNames.Add(name); + } + } + + return this; + } + + private AzureServiceBusDeadLetterQueueRecoverySettings ensureRecoveryServicesRegistered() + { + var existing = Options.Services + .Where(s => s.ServiceType == typeof(AzureServiceBusDeadLetterQueueRecoverySettings)) + .Select(s => s.ImplementationInstance) + .OfType() + .FirstOrDefault(); + + if (existing != null) + { + return existing; + } + + var settings = new AzureServiceBusDeadLetterQueueRecoverySettings(); + Options.Services.AddSingleton(settings); + Options.Services.AddSingleton(Transport); + Options.Services.AddHostedService(); + return settings; + } + /// /// Is Wolverine enabled to create system queues automatically for responses and retries? This /// should probably be set to false if the application does not have permissions to create queues diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusDeadLetterQueueListener.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusDeadLetterQueueListener.cs new file mode 100644 index 000000000..e6459f81e --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusDeadLetterQueueListener.cs @@ -0,0 +1,295 @@ +using Azure.Messaging.ServiceBus; +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Wolverine.Runtime; +using Wolverine.Transports; + +namespace Wolverine.AzureServiceBus.Internal; + +/// +/// Configuration holder for Azure Service Bus dead letter queue recovery. Registered as a singleton +/// so the listener can discover which dead letter sources to drain. When no names are supplied, the +/// listener recovers from both the Wolverine-managed dead letter queue(s) and the native +/// $DeadLetterQueue sub-queue of every listening queue and subscription. +/// +public class AzureServiceBusDeadLetterQueueRecoverySettings +{ + /// + /// Optional set of names to restrict recovery to. A name may be a Wolverine-managed dead letter + /// queue name, a listening queue name, or a subscription endpoint name. When empty, every + /// managed dead letter queue and every listening queue/subscription's native dead letter + /// sub-queue is drained. + /// + public List EndpointNames { get; } = new(); +} + +/// +/// Background service that recovers Azure Service Bus dead letters into Wolverine's durable dead +/// letter storage (the wolverine_dead_letters table), so natively dead-lettered messages +/// become queryable and replayable through +/// and tools like CritterWatch. It drains two kinds of source: +/// +/// The Wolverine-managed dead letter queue(s) (default wolverine-dead-letter-queue), +/// where buffered and durable endpoints move failed messages with the exception metadata stamped +/// onto the message. +/// The native $DeadLetterQueue sub-queue of each listening queue and subscription, +/// where inline endpoints and Azure Service Bus itself (TTL / max-delivery) dead-letter messages, +/// reading the native DeadLetterReason/DeadLetterErrorDescription. +/// +/// This mirrors the RabbitMQ EnableDeadLetterQueueRecovery() feature. +/// +public class AzureServiceBusDeadLetterQueueListener : BackgroundService +{ + private readonly AzureServiceBusTransport _transport; + private readonly IWolverineRuntime _runtime; + private readonly AzureServiceBusDeadLetterQueueRecoverySettings _settings; + private readonly ILogger _logger; + + public AzureServiceBusDeadLetterQueueListener(AzureServiceBusTransport transport, IWolverineRuntime runtime, + AzureServiceBusDeadLetterQueueRecoverySettings settings, + ILogger logger) + { + _transport = transport; + _runtime = runtime; + _settings = settings; + _logger = logger; + } + + /// + /// Describes one dead letter source to drain. A managed source is a normal Wolverine dead letter + /// queue (full envelope, exception metadata in application properties); a sub-queue source is a + /// native $DeadLetterQueue (exception metadata in DeadLetterReason/DeadLetterErrorDescription). + /// + private sealed record DrainSource(string Name, bool IsSubQueue, AzureServiceBusEndpoint? Endpoint, + Func CreateReceiver); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + try + { + var sources = resolveSources(); + if (sources.Count == 0) + { + _logger.LogInformation( + "Azure Service Bus dead letter queue recovery was enabled, but no dead letter sources were found. No recovery listeners were started."); + return; + } + + var tasks = sources.Select(source => drainLoopAsync(source, stoppingToken)).ToArray(); + await Task.WhenAll(tasks); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // Normal shutdown + } + catch (Exception e) + { + _logger.LogError(e, "Azure Service Bus dead letter queue recovery listener failed"); + } + } + + private List resolveSources() + { + var listeningQueues = _transport.Queues.Where(x => x.IsListener).ToArray(); + var listeningSubscriptions = _transport.Subscriptions.Where(x => x.IsListener).ToArray(); + + var sources = new List(); + + // Wolverine-managed dead letter queues — where buffered/durable endpoints move failures. + var managedDlqNames = listeningQueues + .Select(x => x.DeadLetterQueueName) + .Where(x => x.IsNotEmpty()) + .Distinct() + .Select(x => x!); + + foreach (var name in managedDlqNames) + { + var dlqName = name; + // Resolve the managed dead letter queue as a real endpoint so the recovered envelope gets + // a non-null Destination/Uri and that queue's own envelope mapper. + var dlqEndpoint = _transport.Queues[dlqName]; + sources.Add(new DrainSource(dlqName, false, dlqEndpoint, + () => _transport.BusClient.CreateReceiver(dlqName))); + } + + // Native $DeadLetterQueue sub-queues — inline endpoints and ASB-native dead letters. + foreach (var queue in listeningQueues) + { + var queueName = queue.QueueName; + sources.Add(new DrainSource(queueName, true, queue, + () => _transport.BusClient.CreateReceiver(queueName, + new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter }))); + } + + foreach (var subscription in listeningSubscriptions) + { + var sub = subscription; + sources.Add(new DrainSource(sub.SubscriptionName, true, sub, + () => _transport.BusClient.CreateReceiver(sub.Topic.TopicName, sub.SubscriptionName, + new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter }))); + } + + if (_settings.EndpointNames.Count > 0) + { + var allowed = _settings.EndpointNames.Select(x => _transport.SanitizeIdentifier(x)).ToHashSet(); + sources = sources.Where(x => allowed.Contains(_transport.SanitizeIdentifier(x.Name))).ToList(); + } + + return sources; + } + + private async Task drainLoopAsync(DrainSource source, CancellationToken stoppingToken) + { + var failedCount = 0; + ServiceBusReceiver? receiver = null; + + // The managed dead letter queue holds full Wolverine envelopes; use a real endpoint's mapper + // when we have one, otherwise fall back to the default mapper. + var mapper = (source.Endpoint ?? _transport.Queues.FirstOrDefault(x => x.IsListener))?.BuildMapper(_runtime); + + _logger.LogInformation( + "Started Azure Service Bus dead letter recovery listener on '{Source}' ({Kind}). Messages will be recovered to durable storage.", + source.Name, source.IsSubQueue ? "native sub-queue" : "managed queue"); + + try + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + receiver ??= source.CreateReceiver(); + + var messages = await receiver.ReceiveMessagesAsync(20, 5.Seconds(), stoppingToken); + + failedCount = 0; + + if (messages == null || messages.Count == 0) + { + continue; + } + + foreach (var message in messages) + { + await recoverMessageAsync(source, mapper, receiver, message, stoppingToken); + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception e) + { + failedCount++; + var pause = failedCount > 5 ? 5.Seconds() : (failedCount * 200).Milliseconds(); + _logger.LogError(e, + "Error while recovering dead letters from Azure Service Bus source '{Source}'", source.Name); + + // Rebuild the receiver on the next pass in case the connection went bad + if (receiver != null) + { + await receiver.DisposeAsync(); + receiver = null; + } + + await Task.Delay(pause, stoppingToken); + } + } + } + finally + { + if (receiver != null) + { + await receiver.DisposeAsync(); + } + } + } + + private async Task recoverMessageAsync(DrainSource source, IAzureServiceBusEnvelopeMapper? mapper, + ServiceBusReceiver receiver, ServiceBusReceivedMessage message, CancellationToken token) + { + try + { + var envelope = new Envelope(); + + try + { + if (mapper != null) + { + mapper.MapIncomingToEnvelope(envelope, message); + } + else + { + envelope.Data = message.Body.ToArray(); + envelope.ContentType = message.ContentType ?? EnvelopeConstants.JsonContentType; + envelope.MessageType = message.Subject; + } + } + catch (Exception readError) + { + _logger.LogWarning(readError, + "Could not reconstruct a Wolverine envelope from Azure Service Bus dead letter message {MessageId} on '{Source}'. Persisting a minimal envelope.", + message.MessageId, source.Name); + + envelope = new Envelope + { + Data = message.Body.ToArray(), + ContentType = message.ContentType ?? EnvelopeConstants.JsonContentType, + MessageType = message.Subject ?? "unknown" + }; + } + + if (envelope.Id == Guid.Empty) + { + envelope.Id = Guid.NewGuid(); + } + + envelope.Destination ??= source.Endpoint?.Uri; + envelope.Source ??= source.Endpoint?.Uri.ToString() ?? source.Name; + if (envelope.SentAt == default) + { + envelope.SentAt = DateTimeOffset.UtcNow; + } + + var (exceptionType, exceptionMessage) = resolveExceptionInfo(envelope, message); + var exception = new DeadLetterRecoveredException(exceptionType, exceptionMessage); + await _runtime.Storage.Inbox.MoveToDeadLetterStorageAsync(envelope, exception); + + // Only settle the dead letter once it is safely persisted. + await receiver.CompleteMessageAsync(message, token); + + _logger.LogInformation( + "Recovered dead letter {MessageId} (type={MessageType}) from Azure Service Bus '{Source}' to durable storage. Reason: {Reason}", + envelope.Id, envelope.MessageType ?? "unknown", source.Name, exceptionType); + } + catch (Exception e) + { + // Leave the message in place (its lock expires and it returns) so a transient storage + // failure doesn't lose the dead letter. + _logger.LogError(e, + "Failed to recover Azure Service Bus dead letter message {MessageId} from '{Source}'", + message.MessageId, source.Name); + } + } + + private static (string exceptionType, string exceptionMessage) resolveExceptionInfo(Envelope envelope, + ServiceBusReceivedMessage message) + { + // Wolverine stamps these onto the message (and thus the envelope headers) when it moves a + // failed message to its managed dead letter queue. + var type = headerOrNull(envelope, DeadLetterQueueConstants.ExceptionTypeHeader) + ?? (message.DeadLetterReason.IsNotEmpty() ? message.DeadLetterReason : null) + ?? "Unknown"; + + var description = headerOrNull(envelope, DeadLetterQueueConstants.ExceptionMessageHeader) + ?? (message.DeadLetterErrorDescription.IsNotEmpty() ? message.DeadLetterErrorDescription : null) + ?? "Recovered from Azure Service Bus dead letter queue"; + + return (type!, description!); + } + + private static string? headerOrNull(Envelope envelope, string key) + { + return envelope.Headers.TryGetValue(key, out var value) && value.IsNotEmpty() ? value : null; + } +} diff --git a/src/Wolverine/Transports/DeadLetterRecoveredException.cs b/src/Wolverine/Transports/DeadLetterRecoveredException.cs new file mode 100644 index 000000000..7cfdb9c3a --- /dev/null +++ b/src/Wolverine/Transports/DeadLetterRecoveredException.cs @@ -0,0 +1,30 @@ +using Wolverine.Persistence.Durability; + +namespace Wolverine.Transports; + +/// +/// Synthetic exception used when a natively dead-lettered broker message is recovered into +/// Wolverine's durable dead-letter storage. It carries the original exception type name and +/// message reconstructed from the transport's native dead-letter metadata (RabbitMQ x-death, +/// SQS failure headers, Azure Service Bus DeadLetterReason/DeadLetterErrorDescription), +/// and implements so the durable store records the original +/// exception type rather than this wrapper. That keeps dead letters triageable by type in tools +/// like CritterWatch. +/// +public class DeadLetterRecoveredException : Exception, IDeadLetterExceptionInfo +{ + public DeadLetterRecoveredException(string? originalExceptionType, string message) : base(message) + { + ExceptionType = string.IsNullOrWhiteSpace(originalExceptionType) ? "Unknown" : originalExceptionType!; + } + + /// + /// The original exception type name, as reconstructed from the transport's native dead-letter + /// metadata. Persisted to durable storage in place of this wrapper's runtime type. + /// + public string ExceptionType { get; } + + public string ExceptionMessage => Message; + + public override string ToString() => $"{ExceptionType}: {Message}"; +}