Skip to content

Universal DLQ#4015

Merged
iancooper merged 62 commits into
masterfrom
universal_dlq
Feb 22, 2026
Merged

Universal DLQ#4015
iancooper merged 62 commits into
masterfrom
universal_dlq

Conversation

@iancooper

Copy link
Copy Markdown
Member

Whilst we have worked on Kafka and InMemory, this PR extends the work out to all of our providers, to ensure that we have consistent behaviour around RejectMessageAction

iancooper and others added 23 commits February 11, 2026 16:14
Requirement specs for AWS SQS, Redis, MsSql, PostgreSQL, RocketMQ, and
MQTT dead letter queue support. Numbered from 0010 to leave 0003-0009
free for parallel workstreams on other branches.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…design

SQS-specific decisions for replacing ChangeMessageVisibility(0) with
direct SendMessage to DLQ. References ADR 0034 (strategy) and ADR 0036
(routing logic) for shared concerns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…38 producer creation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…MessageConsumer

Remove the inverted _hasDlq boolean from SqsMessageConsumer and replace with
RoutingKey? deadLetterRoutingKey and RoutingKey? invalidMessageRoutingKey
parameters, preparing for Brighter-managed DLQ support (ADR 0038).

- Add connection and makeChannels parameters to SqsMessageConsumer for later
  use by lazy DLQ producers
- Update SqsMessageConsumerFactory to pass new parameters
- Update reject tests: reject without DLQ now deletes (no longer uses
  ChangeMessageVisibility(0) which was the inverted _hasDlq behavior)
- Applied identically to both AWSSQS and AWSSQS.V4 packages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- SqsSubscription implements IUseBrighterDeadLetterSupport and
  IUseBrighterInvalidMessageSupport interfaces
- Add DeadLetterRoutingKey and InvalidMessageRoutingKey properties
- Add optional constructor parameters, null by default (backward compatible)
- Applied identically to both AWSSQS and AWSSQS.V4 packages

- Test: When_creating_sqs_subscription_with_dlq_routing_keys_should_expose_properties
- Test: When_creating_sqs_subscription_without_dlq_routing_keys_should_default_to_null

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…onsumer

SqsMessageConsumerFactory.CreateImpl() now extracts routing keys from the
subscription via IUseBrighterDeadLetterSupport/IUseBrighterInvalidMessageSupport
interface checks and passes them to the SqsMessageConsumer constructor, matching
the pattern established by KafkaMessageConsumerFactory.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace ChangeMessageVisibility(0) with direct SendMessage to DLQ per ADR 0038.
SqsMessageConsumer now creates lazy DLQ/invalid-message producers and routes
rejected messages with metadata (originalTopic, rejectionReason, rejectionTimestamp,
originalMessageType) before deleting the original from the source queue.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Integration test confirms DetermineRejectionRoute correctly sends messages
rejected with Unacceptable reason to the invalid message queue (not DLQ)
when both queues are configured.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… channel

Integration test confirms that when only deadLetterRoutingKey is configured
(no invalidMessageRoutingKey), rejecting with Unacceptable reason falls back
to the DLQ per ADR 0036 routing decision tree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Integration test confirms that when no deadLetterRoutingKey or
invalidMessageRoutingKey is configured, rejecting with a reason still
deletes the original message from the source queue.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Integration test confirms the async/Proactor channel path correctly sends
messages rejected with DeliveryError to the DLQ with metadata, exercising
RejectAsync directly without the sync-over-async BrighterAsyncContext wrapper.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…equeue

The SNS requeue tests were still testing the old ChangeMessageVisibility(0)
behavior. Updated to match the SQS reject test pattern: reject deletes the
message from the queue (MT_NONE) rather than requeuing it.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ADR, and tasks

Redis has no native DLQ support, so ADR 0039 adopts the same Brighter-managed
pattern proven in Kafka (spec 0001) and SQS (spec 0010). Also marks spec 0010
tasks as complete.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
RedisSubscription now implements IUseBrighterDeadLetterSupport and
IUseBrighterInvalidMessageSupport with optional constructor parameters,
following the same pattern as SQS. Backward compatible — defaults to null.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…onsumer

RedisMessageConsumerFactory.Create() and CreateAsync() now extract
deadLetterRoutingKey and invalidMessageRoutingKey from subscriptions via
interface checks and pass them to the RedisMessageConsumer constructor.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implements the core DLQ routing logic in RedisMessageConsumer.Reject()
and RejectAsync() following the SQS pattern: lazy producer creation,
metadata enrichment, and ADR 0036 rejection routing decision tree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When both DLQ and invalid message channels are configured, Unacceptable
rejections route to the invalid channel per ADR 0036. DLQ stays empty.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… channel configured

- Test: When_rejecting_message_with_unacceptable_and_no_invalid_channel_should_fallback_to_dlq
- Confirms ADR 0036 fallback: Unacceptable → DLQ when invalidMessageRoutingKey is null

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…om inflight

- Test: When_rejecting_message_with_no_channels_configured_should_remove_from_inflight
- Confirms consumer can receive next message after rejecting without DLQ/invalid channels

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Test: When_rejecting_message_with_delivery_error_should_send_to_dlq_async
- Confirms RejectAsync path sends to DLQ with rejection metadata

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
All 7 implementation tasks and verification complete.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Switch active spec to 0012-mssql-dead-letter-queue and add design
for Brighter-managed DLQ support in the MsSql transport, following
the established pattern from Kafka, SQS, and Redis.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@iancooper iancooper self-assigned this Feb 16, 2026
@iancooper iancooper added 2 - In Progress .NET Pull requests that update .net code V10.X labels Feb 16, 2026
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Feb 16, 2026

Copy link
Copy Markdown

Pull Request Review: Universal DLQ

Summary

This PR extends dead letter queue (DLQ) support across multiple messaging providers (AWS SQS, Redis, and foundation for MS SQL, Postgres, RocketMQ, and MQTT). The implementation follows the pattern established in the Kafka DLQ work (spec 0001) and ensures consistent behavior for RejectMessageAction across all transports.

Strengths

1. Excellent Architecture & Documentation

  • Three well-structured ADRs (0038, 0039, 0040) clearly document design decisions with context, alternatives considered, and consequences
  • Responsibility-Driven Design is properly applied - roles are clearly separated (Subscription knows routing keys, Factory coordinates, Consumer performs rejection)
  • Consistent pattern across transports reduces cognitive load for developers
  • Comprehensive specification docs in specs/ directory with requirements, tasks, and design documents

2. Implementation Quality

  • Lazy producer initialization prevents overhead when DLQ is not configured - good performance consideration
  • Backward compatibility maintained with all new parameters being optional
  • Proper routing logic - DeliveryError → DLQ, Unacceptable → Invalid Message channel (with fallback to DLQ)
  • Metadata enrichment adds essential debugging information (original topic, rejection reason, timestamp)
  • Graceful degradation when no DLQ is configured (logs warning and deletes message)

3. Test Coverage

  • Comprehensive test scenarios covering:
    • Delivery error routing to DLQ
    • Unacceptable messages routing to invalid channel
    • Fallback behavior when invalid channel not configured
    • No-channels-configured scenarios
  • Tests verify both message routing AND metadata enrichment
  • Both sync (Reactor) and async (Proactor) paths tested
  • Duplicate test coverage for both AWS SDK v3 and v4 packages

4. AWS SQS Improvements

  • Fixes inverted logic bug: The old _hasDlq flag was inverted (true when redrive policy was null). This is now properly removed.
  • Direct send to DLQ: Replaces the inefficient ChangeMessageVisibility(0) pattern that caused unnecessary re-processing cycles
  • Clear separation: Brighter-managed DLQ takes precedence when configured; native SQS redrive policy is independent

Issues & Recommendations

Critical Issues

None identified. The implementation is solid.

Code Quality Observations

1. Duplicate Code Between SDK Versions (Minor)

Location: Paramore.Brighter.MessagingGateway.AWSSQS and Paramore.Brighter.MessagingGateway.AWSSQS.V4

Both packages have identical SqsMessageConsumer implementations. While this is acknowledged in ADR 0038 as a "Dual Package Constraint," consider whether shared code could be extracted to reduce maintenance burden.

Recommendation: Low priority - acceptable as-is given the packages target different SDK versions. Consider for future refactoring if the duplication becomes a maintenance issue.

2. Test File Naming Consistency

Location: Test files like When_rejecting_message_with_delivery_error_should_send_to_dlq.cs

Test class names are very descriptive but quite long. This is consistent with existing patterns in the codebase, so no change needed.

3. Error Handling in Reject Methods

Location: SqsMessageConsumer.RejectAsync() around line 284-320

The error handling logs errors but continues to delete the original message even if DLQ production fails. This is documented behavior per ADR 0036, but could lead to message loss.

From ADR 0038:

Risk: SendMessage to DLQ fails, then DeleteMessage also fails — message stuck.
Mitigation: Per ADR 0036, log the error and continue.

Recommendation: Acceptable as designed. The alternative (blocking the consumer) would be worse. Consider documenting this in user-facing documentation so operators understand the trade-off and can set up appropriate monitoring.

Documentation Suggestions

1. Add Migration Guide (Enhancement)

For users currently relying on native SQS redrive policy with the ChangeMessageVisibility(0) pattern, provide guidance on:

  • How to migrate to Brighter-managed DLQ
  • Whether they should keep native redrive policy configured
  • Performance implications (immediate routing vs. retry cycles)

2. Observability Guidance (Enhancement)

The PR adds structured logging (message ID, rejection reason). Consider documenting:

  • What log events to monitor for DLQ health
  • Recommended metrics (DLQ depth, rejection rates by reason)
  • How to set up alerts for DLQ production failures

Performance Considerations

Lazy Producer Creation

The lazy producer pattern is well-implemented:

if (_deadLetterRoutingKey != null)
{
    _deadLetterProducer = new Lazy<SqsMessageProducer?>(CreateDeadLetterProducer);
}

Concern: If DLQ production is rare, the first rejection will pay the cost of producer initialization. This is acceptable, but worth noting for high-throughput scenarios.

Recommendation: No change needed - the trade-off is correct (zero overhead for the common path when DLQ is not used).

Security Considerations

Metadata in Message Headers

Rejection metadata (original topic, timestamps, reasons) is added to MessageHeader.Bag. This is good for debugging but could potentially expose internal routing information.

Recommendation: Acceptable for DLQ messages (they're error scenarios). If DLQ messages are ever forwarded to external systems, review what metadata should be included.

Test Coverage Assessment

Covered Scenarios ✅

  • Delivery error → DLQ routing
  • Unacceptable message → Invalid channel routing
  • Fallback behavior (Unacceptable with no invalid channel → DLQ)
  • No channels configured (logs and deletes)
  • Metadata enrichment verification
  • Original message deletion after routing
  • Both sync and async code paths

Potential Gaps

  1. Concurrent rejection scenarios: What happens if multiple messages are rejected simultaneously? (Likely fine given lazy producer initialization is thread-safe, but not explicitly tested)
  2. DLQ production failure scenarios: No tests explicitly simulate DLQ send failing. The code handles this, but integration tests could verify the error path.
  3. Queue creation on MakeChannels.Create: Tests use OnMissingChannel.Create but don't verify that DLQ queues are actually created if missing.

Recommendation: These are edge cases. Current test coverage is good. Consider adding failure injection tests in a future PR if flaky DLQ behavior is observed in production.

Specification & Planning Quality

The specs/ directory structure is exemplary:

  • Clear separation between requirements, design, and tasks
  • Approval gates tracked with .design-approved, .requirements-approved files
  • Cross-references between ADRs and specs
  • Future work clearly documented (specs 0013-0015 for Postgres, RocketMQ, MQTT)

This demonstrates excellent engineering discipline and makes the codebase maintainable.

Consistency with Codebase Standards

Following CLAUDE.md Guidelines ✅

  • ADRs follow the template in .agent_instructions/
  • Responsibility-Driven Design principles applied correctly
  • XML documentation present on public APIs
  • Test-driven approach evident from comprehensive test coverage

Code Style ✅

  • Consistent with existing C# conventions in the codebase
  • Proper use of nullable reference types (RoutingKey?)
  • Async/sync method pairs follow established patterns
  • Logging uses structured logging with message templates

Final Recommendation

Approve and Merge

This is high-quality work that:

  1. Solves real problems (inverted SQS DLQ logic, missing DLQ support in Redis)
  2. Follows established patterns (Kafka DLQ implementation)
  3. Maintains backward compatibility
  4. Has comprehensive test coverage
  5. Is thoroughly documented with ADRs and specs

Merge Checklist

  • ✅ All new parameters are optional (backward compatible)
  • ✅ Tests cover core scenarios
  • ✅ ADRs document design decisions
  • ✅ Both AWS SDK v3 and v4 packages updated identically
  • ✅ Follows Responsibility-Driven Design principles
  • ⏳ CI checks should pass (unable to verify in this environment)

Post-Merge Suggestions

  1. Monitor DLQ production failure rates in production to validate error handling strategy
  2. Consider user-facing documentation for migration from native SQS DLQ to Brighter-managed DLQ
  3. Track the Redis/MS SQL/Postgres DLQ implementations (specs 0011-0013) for completion

Excellent work on this PR! The consistent pattern across transports and thorough documentation will make this feature maintainable and understandable for future contributors.

@claude

claude Bot commented Feb 22, 2026

Copy link
Copy Markdown

PR Review: Universal DLQ

This PR extends Brighter-managed DLQ support to SQS (v3 + v4), Redis, MSSQL, PostgreSQL, RocketMQ, and MQTT. The architecture is consistent and the ADRs are thorough. The self-review in specs/dlq-review-findings.md shows good practice — most high/medium findings are marked as fixed. The comments below add a few things not captured there.


Potential Bug: DisposeAsync missing async disposal in SQS v3 and v4

In both Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs and the V4 variant, DisposeAsync calls the synchronous Dispose() on the producer rather than DisposeAsync():

public ValueTask DisposeAsync()
{
    if (_deadLetterProducer?.IsValueCreated == true)
        _deadLetterProducer.Value?.Dispose(); // sync Dispose in async path
    ...
}

The Redis implementation correctly checks for IAsyncDisposable first:

if (_deadLetterProducer?.IsValueCreated == true && _deadLetterProducer.Value is IAsyncDisposable deadLetterAsync)
    await deadLetterAsync.DisposeAsync();
else if (_deadLetterProducer?.IsValueCreated == true)
    (_deadLetterProducer.Value as IDisposable)?.Dispose();

The Redis pattern is correct; the SQS pattern should match it.


Breaking Change: SqsMessageConsumer constructor signature

The hasDlq: bool parameter is replaced with deadLetterRoutingKey, invalidMessageRoutingKey, and makeChannels. The factory uses named parameters so it's fine, but anyone constructing SqsMessageConsumer directly (which is a public class) will get a compile error. Worth a changelog note or an [Obsolete] bridge if this is meant to be a non-breaking minor version.


MQTT: Return value contract change in Reject / RejectAsync

The old implementation returned false from both methods:

public bool Reject(Message message, MessageRejectionReason? reason = null) => false;
public Task<bool> RejectAsync(...) => Task.FromResult(false);

The new implementation returns true in all cases, including when no channels are configured. If callers use the return value to determine whether the message was actually dispatched somewhere, this is a silent semantic change. Worth confirming whether the interface contract specifies what true/false mean here — if it means "rejection was handled" rather than "message was routed to a DLQ", then true is correct for all paths.


Code quality: Duplication in Reject / RejectAsync across transports

The routing logic block is copy-pasted verbatim between the sync and async Reject paths in Redis (and similarly in MSSQL/Postgres). The MQTT implementation partially avoids this with ResolveRejectionProducer. Given how many transports now share this logic, a common helper or base class method would reduce the maintenance surface. Not blocking, but worth considering for a follow-up.


Open finding L2: Hardcoded DLQ metadata key strings

Noted in specs/dlq-review-findings.md as not yet fixed:

message.Header.Bag["originalTopic"] = message.Header.Topic.Value;
message.Header.Bag["rejectionReason"] = reason.RejectionReason.ToString();

These string literals are duplicated across every transport implementation. A typo in one transport would silently produce inconsistent metadata. Adding constants to HeaderNames would be low-effort and prevent drift. This is worth addressing before shipping so DLQ consumers can rely on consistent key names without transport-specific knowledge.


Minor: _deadLetterProducer and _invalidMessageProducer not readonly in Redis

// RedisMessageConsumer.cs
private Lazy<RedisMessageProducer?>? _deadLetterProducer;
private Lazy<RedisMessageProducer?>? _invalidMessageProducer;

These should be readonly like the SQS and MQTT equivalents, since they are assigned only in the constructor and never mutated afterward.


MQTT producer TopicPrefix semantics

In CreateDeadLetterProducer, the DLQ routing key is used as the TopicPrefix of the producer configuration. Given that the consumer subscribes to {TopicPrefix}/# and the producer publishes to {TopicPrefix}/{messageId} (or similar), this needs care: a DLQ consumer would need to be configured with the same topic prefix. This is consistent with how MQTT works, but it's worth documenting in the subscription XML docs or a note in ADR 0043, since the MQTT DLQ pattern is less obvious than for queue-based transports.


Good patterns to call out

  • The Lazy<T> producer initialization with IsValueCreated guard in Dispose is correct and avoids unnecessary allocation.
  • The comment in CreateDeadLetterProducer explaining why sync ConfirmQueueExists must not be called (deadlock risk with BrighterAsyncContext.Run) is exactly the kind of note that prevents future regressions.
  • The ADRs (0038-0043) are well-structured and clearly motivated. The decision table in ADR 0038 for DLQ coexistence with native SQS redrive policy is particularly useful.
  • Test naming follows the established Brighter convention cleanly (When_rejecting_message_with_delivery_error_should_send_to_dlq).
  • The .slopwatch/config.json suppression with a justification comment is a good practice.

Overall this is a well-executed feature with good test coverage across transports. The DisposeAsync issue in SQS and the open L2 (hardcoded metadata keys) are the most worth addressing before merge.

iancooper and others added 2 commits February 22, 2026 19:35
Resolves conflicts in MQTT, MsSql, and Redis transports where both the
scheduler (delay requeue) and DLQ (rejection routing) features modified
the same consumer and factory files. Combined both sets of constructor
parameters, fields, and methods so both features coexist.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Feb 22, 2026

Copy link
Copy Markdown

PR Review: Universal DLQ

This is a large, well-structured PR that extends Brighter-managed DLQ support consistently across Redis, MsSql, Postgres, RocketMQ, MQTT, and SQS (v3 + v4). The ADRs are thorough, the specs/dlq-review-findings.md shows disciplined self-review, and the test coverage across all transports is solid. Below are observations ranging from bugs to suggestions.


Bug: DisposeAsync calls sync Dispose on producers instead of DisposeAsync

Files: SqsMessageConsumer.cs (both V3 and V4)

Both DisposeAsync implementations call _deadLetterProducer.Value?.Dispose() rather than awaiting DisposeAsync. If SqsMessageProducer implements IAsyncDisposable, this defeats the purpose of the async path and may suppress async-disposal cleanup (e.g. flushing in-flight sends).

// Current (both V3 and V4)
public ValueTask DisposeAsync()
{
    if (_deadLetterProducer?.IsValueCreated == true)
        _deadLetterProducer.Value?.Dispose();   // ← sync Dispose in an async method
    ...
}

// Should be
public async ValueTask DisposeAsync()
{
    if (_deadLetterProducer?.IsValueCreated == true && _deadLetterProducer.Value is IAsyncDisposable ad)
        await ad.DisposeAsync();
    else if (_deadLetterProducer?.IsValueCreated == true)
        _deadLetterProducer.Value?.Dispose();
    ...
}

Bug: Null-forgiving operator on reason.Description can silently insert null into the bag

All RefreshMetadata implementations across transports

message.Header.Bag["rejectionMessage"] = reason.Description\!;

MessageRejectionReason.Description appears to be nullable. The \! suppresses the compiler warning but does not prevent a runtime null value entering the bag. DLQ consumers expecting a string will get a null object. Consider a null-coalescing fallback:

message.Header.Bag["rejectionMessage"] = reason.Description ?? string.Empty;

Inconsistency: DetermineRejectionRoute handles RejectionReason.None differently across transports

File: MQTTMessageConsumer.cs vs all others

MQTT's switch expression includes:

_ when hasDeadLetterProducer => (_deadLetterRoutingKey, true, false),   // routes None to DLQ

All other transports (SqsMessageConsumer, RedisMessageConsumer, MsSqlMessageConsumer, PostgresMessageConsumer) fall through to _ => (null, false, false) for RejectionReason.None, meaning the message is deleted/dropped without routing. This behavioural difference is not documented and is likely unintentional. A None rejection reason silently routing to the DLQ in MQTT but being dropped on SQS/Redis/SQL will surprise users.

Decision needed: should None route to DLQ as a safe default, or always drop? Whichever is chosen, it should be uniform across all transports.


Missing integration tests: SQS FIFO DLQ routing

Files: tests/Paramore.Brighter.AWS.Tests/MessagingGateway/Sqs/Fifo/

The FIFO tests were renamed (requeuedelete) but no new DLQ routing tests were added for the FIFO path (neither V3 nor V4). FIFO queues have additional constraints — MessageGroupId and MessageDeduplicationId — which are not exercised by the standard-queue DLQ tests. Specifically worth testing: does the DLQ producer correctly propagate or generate a MessageGroupId when routing a FIFO message?


No tests for producer creation failure paths (L5 from findings doc — remains open)

CreateDeadLetterProducer() swallows exceptions and returns null, but no test covers what happens at rejection time when the lazy factory returns null. The code path falls through to Log.NoChannelsConfiguredForRejection and deletes the source. This is the correct fallback, but it's untested — a future refactor could silently break the degraded path.


Metadata key casing inconsistency between Kafka and other transports (L3 — acknowledged as intentional)

The specs/dlq-review-findings.md marks L3 as "investigated: intentional", but the inconsistency is a footgun for users who consume DLQ messages from multiple transports. Kafka uses PascalCase (OriginalTopic, RejectionReason) via IKafkaMessageHeaderBuilder; all new transports use camelCase (originalTopic, rejectionReason). A consumer that reads the DLQ must know which transport produced the message to parse the metadata.

Consider introducing HeaderNames constants (referenced in multiple ADRs but not yet added as L2) and either unifying the casing or documenting the transport-specific casing contract explicitly in a single place users can find.


Minor: RocketMQ Reject is sync-over-async wrapping an async CreateProducerAsync

public bool Reject(Message message, MessageRejectionReason? reason)
    => BrighterAsyncContext.Run(() => RejectAsync(message, reason));

RejectAsync calls GetProducerForRouteAsync which calls CreateProducerAsync. The comment in SqsMessageConsumer explicitly warns about nesting BrighterAsyncContext.Run to avoid deadlocks. Verify that the RocketMQ CreateProducerAsync builder does not internally use a synchronisation context that would deadlock when called via BrighterAsyncContext.Run. If it does, the lazy initialization approach (using a thread-safe but non-async factory) used by SQS should be applied here too.


Minor: rejectionTimestamp format lacks timezone test coverage

rejectionTimestamp is serialized as DateTimeOffset.UtcNow.ToString("o") (ISO 8601 round-trip). The test assertions only verify the key exists (Assert.True(ContainsKey("rejectionTimestamp"))) rather than validating the format. This is fine for initial coverage but leaves format regressions undetected.


Nit: .current-spec and specs/.current-spec committed to repo root

Two spec-tracking files were committed:

  • .current-spec (repo root, contains 0015-mqtt-dead-letter-queue)
  • specs/.current-spec

These appear to be workflow artifacts from the spec tooling. Are they intentionally part of the public repository, or should they be gitignored? The specs/dlq-review-findings.md is also a working document that may not belong in the repo history.


Summary

Category Count
Bugs 2 (DisposeAsync sync Dispose, null-forgiving on Description)
Behavioural inconsistency 1 (MQTT None reason routing)
Test gaps 2 (FIFO DLQ, producer creation failure)
Design/maintenance 2 (HeaderNames constants, async nesting risk)
Minor/nit 2

The core pattern is sound and consistently applied. The main items to address before merging are the DisposeAsync disposal bug, the MQTT/other inconsistency in None reason handling, and verifying FIFO queue DLQ behaviour.

iancooper and others added 2 commits February 22, 2026 19:52
SqsMessageConsumer.DisposeAsync() was calling sync Dispose() on
producers that implement IAsyncDisposable, bypassing async cleanup.
Now checks for IAsyncDisposable and awaits it, matching the pattern
used by Kafka and Redis consumers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… RefreshMetadata

reason.Description is nullable; the `!` operator suppressed the compiler
warning but could silently insert null into the message bag. Use
`?? string.Empty` to provide a safe runtime fallback across all 8
transport consumers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@iancooper iancooper marked this pull request as ready for review February 22, 2026 20:08
iancooper and others added 2 commits February 22, 2026 20:11
FIFO queues require MessageGroupId (from PartitionKey) and optionally
MessageDeduplicationId. These tests verify that rejected FIFO messages
arrive at a FIFO DLQ with FIFO attributes preserved, covering both
Reactor (sync) and Proactor (async) paths for V3 and V4 SDK variants.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The DLQ and invalid message producers were created without queue
attributes, so FIFO queues were created as standard queues (missing
.fifo suffix). This caused DLQ sends to fail silently for FIFO
messages because MessageGroupId is invalid on standard queues.

Pass SqsAttributes from the subscription through the consumer to the
DLQ/invalid message producer publications, ensuring FIFO DLQ queues
are created correctly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Feb 22, 2026

Copy link
Copy Markdown

PR Review: Universal DLQ

Great scope and execution on this PR. Extending DLQ support consistently across all transports is a significant reliability improvement, and the spec-driven approach with ADRs per transport is well-structured. The comprehensive integration test coverage (reactor + proactor paths, all four rejection scenarios per transport) is particularly valuable. A few things worth examining before merge:


Issues

1. MsSql "no channels" leaves the message in the queue

The Postgres no-channels path calls DeleteSourceMessage(receiptHandle) before returning true. The MsSql equivalent test is named When_rejecting_message_with_no_channels_configured_should_log_warning, implying the message is only logged and not removed. Since MsSql is a Brighter-managed queue (poll-based, no native nack), leaving the message would cause it to be re-delivered on the next Receive call — an infinite loop. Postgres deletes it; MsSql should too for consistency.

2. Lazy<T> caches thrown exceptions by default

new Lazy<SqsMessageProducer?>(CreateDeadLetterProducer) uses LazyThreadSafetyMode.ExecutionAndPublication (the default). If the factory delegate throws on first access (e.g., invalid queue name, transient AWS error), .NET caches that exception and re-throws it on every subsequent .Value access for the lifetime of the object. This means a single transient failure during producer creation permanently disables DLQ routing until the consumer is restarted. Consider LazyThreadSafetyMode.None (safe given single-threaded message pump semantics) to allow retry, or handle the exception inside the factory and return null with a warning log.

This affects SQS V3, SQS V4, Redis, MsSql, Postgres, and MQTT consumers.

3. Postgres finally deletes source message even when DLQ send fails

In the updated Reject/RejectAsync, the finally block calls DeleteSourceMessage unconditionally. If the DLQ send throws, the catch logs the error and returns true, but the finally has already deleted the source. The message is permanently lost with no record in the DLQ. The ADR presumably documents this trade-off, but a Log.ErrorSendingToRejectionChannel at Error severity before discarding would at minimum leave an observable trace. Same pattern applies to RejectAsync.


Moderate Concerns

4. MQTT: DLQ send failure is silent

In MQTTMessageConsumer.Reject, when producer.Send(message) fails, the catch block returns true without logging. The reasoning (MQTT is fire-and-forget, message only lives in memory) is documented in the ADR, but an Error-level log before discarding would make incidents observable without changing behaviour.

5. RocketMQ sync-over-async in Reject

Reject now calls BrighterAsyncContext.Run(() => RejectAsync(...)). This is a known pattern in Brighter for bridging sync interfaces but carries deadlock risk if the execution context changes. Worth a comment referencing the same justification used elsewhere in the codebase (e.g., Acknowledge), so the intent is clear to future maintainers.

6. RocketMQ: DLQ producer fields not disposed on teardown

Unlike other transports that use Lazy<T?> and dispose in Dispose/DisposeAsync, RocketMQ uses plain nullable RocketMqMessageProducer? fields initialised lazily via GetProducerForRouteAsync. Verify that Dispose / DisposeAsync on RocketMessageConsumer handles these. If Reject is never called (message processed successfully), the fields remain null and there is nothing to dispose — but if they are created, they should be cleaned up.


Minor

7. Mutable set vs init on routing key properties

DeadLetterRoutingKey and InvalidMessageRoutingKey on SqsSubscription (and the other subscription types) use { get; set; }. These are configuration-time values that should not change after construction. init setters would better express that intent and prevent accidental mutation.

8. Inconsistent "no channels" behaviour across transports

Transport No-channels behaviour
SQS Acknowledge + return true
RocketMQ Ack + log warning + return true
Redis Remove from inflight + return true
Postgres Delete + log warning + return true
MsSql Log warning + return true (no delete?)
MQTT Return true (fire-and-forget)

The differences are mostly justified by transport semantics, but the MsSql/Postgres divergence (both SQL-based, poll-based) is likely a defect rather than an intentional design difference (see issue #1 above).


Positives worth calling out

  • The _hasDlq: bool to _deadLetterRoutingKey: RoutingKey? refactor is a clear improvement — the routing key carries meaning that the boolean flag did not.
  • Adding structured logging to RejectMessageOnErrorHandler / RejectMessageOnErrorHandlerAsync via [LoggerMessage] is the right approach and fills an observability gap.
  • The fix to await DisposeAsync on SQS producers (commit 1ad490e) and to pass SqsAttributes to DLQ producers for FIFO support (commit 41db952) are both correct and would have been subtle production bugs.
  • Lazy producer initialisation avoids creating AWS/Redis/SQL connections at consumer startup for queues that may never see a rejection.

Review generated with Claude Code

… exceptions

The default LazyThreadSafetyMode.ExecutionAndPublication caches thrown
exceptions permanently. A single transient failure during DLQ producer
creation would disable DLQ routing for the lifetime of the consumer.
LazyThreadSafetyMode.None does not cache exceptions, allowing retry on
the next .Value access. This is safe because message pumps are
single-threaded per consumer.

Affects all 7 Brighter-managed DLQ transports: SQS (V3+V4), Kafka,
MQTT, Redis, PostgreSQL, MsSql.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

@codescene-delta-analysis codescene-delta-analysis Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Prevent hotspot decline (2 hotspots with Complex Method, Bumpy Road Ahead, Code Duplication, Overall Code Complexity, Constructor Over-Injection)
Enforce critical code health rules (6 files with Bumpy Road Ahead)
Enforce advisory code health rules (8 files with Complex Method, Code Duplication, Overall Code Complexity, Constructor Over-Injection)

Gates Passed
1 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Prevent hotspot decline Violations Code Health Impact
RedisMessageConsumer.cs 5 rules in this hotspot 8.57 → 6.72 Suppress
SqsMessageConsumer.cs 2 rules in this hotspot 9.69 → 9.14 Suppress
Enforce critical code health rules Violations Code Health Impact
RedisMessageConsumer.cs 1 critical rule 8.57 → 6.72 Suppress
PostgresMessageConsumer.cs 1 critical rule 8.28 → 6.86 Suppress
MsSqlMessageConsumer.cs 1 critical rule 9.39 → 8.04 Suppress
SqsMessageConsumer.cs 1 critical rule 9.69 → 9.14 Suppress
SqsMessageConsumer.cs 1 critical rule 9.69 → 9.14 Suppress
RocketMessageConsumer.cs 1 critical rule 10.00 → 9.54 Suppress
Enforce advisory code health rules Violations Code Health Impact
RedisMessageConsumer.cs 4 advisory rules 8.57 → 6.72 Suppress
PostgresMessageConsumer.cs 3 advisory rules 8.28 → 6.86 Suppress
MsSqlMessageConsumer.cs 3 advisory rules 9.39 → 8.04 Suppress
SqsMessageConsumer.cs 1 advisory rule 9.69 → 9.14 Suppress
SqsMessageConsumer.cs 1 advisory rule 9.69 → 9.14 Suppress
RocketMessageConsumer.cs 1 advisory rule 10.00 → 9.54 Suppress
MqttSubscription.cs 1 advisory rule 9.69 Suppress
MQTTMessageConsumer.cs 1 advisory rule 9.39 → 9.10 Suppress

Quality Gate Profile: Clean Code Collective
Install CodeScene MCP: safeguard and uplift AI-generated code. Catch issues early with our IDE extension and CLI tool.

_sqlMessageQueue.Send(message, topic);
return true;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Getting worse: Code Duplication
introduced similar code in: CreateDeadLetterProducer,CreateInvalidMessageProducer,Reject,RejectAsync

Suppress

@iancooper

Copy link
Copy Markdown
Member Author

Re: Issue #1 & #8 — MsSql "no channels" behaviour

The MsSql implementation is correct as-is, and the apparent divergence from PostgreSQL is intentional, driven by fundamentally different queue semantics.

MsSql uses atomic read-and-delete. The Receive() SQL is a DELETE...OUTPUT statement:

DELETE FROM cte OUTPUT deleted.Payload, deleted.MessageType, deleted.Topic, deleted.Id

The message is permanently removed from the queue table in the same operation that reads it. By the time Reject() is called, the message is already gone — there is nothing left to delete. This is also why Acknowledge() is a no-op (empty method body). There is no risk of infinite redelivery because the message no longer exists in the source table.

PostgreSQL uses a visibility timeout model. Receive() marks a row with a visibility timeout but leaves it in the table. Without an explicit DeleteSourceMessage(), the row would become visible again after the timeout expires — causing redelivery. That's why PostgreSQL must delete in every path.

So the outcome is identical in both transports: the message is removed from the source queue and will not be redelivered. They differ only in when the removal happens (receive-time for MsSql vs. reject/ack-time for PostgreSQL), which is dictated by their underlying queue mechanics.

The Acknowledge method signatures confirm this pattern:

  • MsSql: public void Acknowledge(Message message) {} — empty, nothing to do
  • PostgreSQL: public void Acknowledge(Message message) { DeleteSourceMessage(...); } — must explicitly delete

No change needed here.

@claude

claude Bot commented Feb 22, 2026

Copy link
Copy Markdown

Code Review: Universal DLQ (#4015)

This is a substantial and well-structured PR that brings consistent DLQ behaviour across all Brighter transport providers. The ADRs, test coverage per rejection scenario, and the use of Lazy<T> with LazyThreadSafetyMode.None are all good design choices. Below are the findings, ordered by severity.


Breaking Changes / Correctness

1. SQS: native DLQ users will silently lose messages

The old hasDlq: false path called ChangeMessageVisibilityAsync(0), making the message immediately redeliverable so SQS could route it to the native DLQ after maxReceiveCount. The new code removes that path entirely: when deadLetterRoutingKey == null && invalidMessageRoutingKey == null, RejectAsync logs a warning and then deletes the source message.

Any existing user who configured a native SQS redrive policy but does not set deadLetterRoutingKey on their SqsSubscription will have rejected messages silently discarded rather than forwarded to the redrive-configured DLQ. This is a data-loss breaking change that needs a migration guide or at least a prominent note in the PR description / changelog.

2. MsSql consumer does not delete the source row on rejection

The Postgres consumer correctly calls DeleteSourceMessage() in a finally block. The MsSql RejectAsync diff shown does not include an equivalent delete — it calls SendAsync to the DLQ but appears to leave the row in the source table. If that is correct, messages could accumulate and be redelivered indefinitely. Please confirm whether the underlying MsSqlMessageQueue removes the row on read, or whether an explicit delete is required here (as in Postgres).


Design / DRY

3. DetermineRejectionRoute and RefreshMetadata are duplicated in every consumer

Both methods are copied with near-identical bodies into Redis, MsSql, Postgres, MQTT, RocketMQ, and both SQS variants (at least seven copies). A shared internal static helper class or a base class would eliminate the duplication and make future routing logic changes a single-site edit. Even a MessageRejectionRouter static class in Paramore.Brighter holding DetermineRejectionRoute and RefreshMetadata would be a significant improvement, especially given V9/V10 already has the Kafka implementation to pattern-match against.

4. RocketMQ does not use Lazy<T>

All other providers use Lazy<T> with LazyThreadSafetyMode.None for DLQ producers. RocketMQ uses plain nullable fields with null-coalescing assignment inside GetProducerForRouteAsync. This is fine in practice because RocketMQ consumers are single-threaded, but the inconsistency will confuse future maintainers and the intent of the LazyThreadSafetyMode.None comment (exception retry on next access) is lost.

5. SQS factory uses if/if with explicit casts; all others use null-conditional ?.

// SqsMessageConsumerFactory (both V3 and V4)
if (sqsSubscription is IUseBrighterDeadLetterSupport dlqSupport)
    deadLetterRoutingKey = dlqSupport.DeadLetterRoutingKey;
if (sqsSubscription is IUseBrighterInvalidMessageSupport invalidSupport)
    invalidMessageRoutingKey = invalidSupport.InvalidMessageRoutingKey;

// Every other factory
var deadLetterRoutingKey = (subscription as IUseBrighterDeadLetterSupport)?.DeadLetterRoutingKey;
var invalidMessageRoutingKey = (subscription as IUseBrighterInvalidMessageSupport)?.InvalidMessageRoutingKey;

The SQS pattern is more verbose for no gain. Align with the other factories for consistency.


Minor Issues

6. message.Header.Topic is mutated in-place

RefreshMetadata records the original topic into the header bag, then the caller overwrites message.Header.Topic with the DLQ routing key. The caller's reference to the message is silently mutated. This is probably safe today (message is not used after Reject returns), but it is a side-effect that is easy to miss. Consider working on a copy, or at least adding a code comment to each site where the mutation is intentional.

7. Reject return value changed for MQTT (false → true with no channels)

Previously Reject returned false in MQTT. It now returns true even when no channels are configured and the message is effectively discarded. The ADR justification (prevent requeue loops) is sound, but returning true from Reject when nothing was done is a subtle semantic change — callers checking the return value to decide whether to requeue will now no longer requeue. Ensure the callers in the message pump handle this correctly and document the convention.

8. Fragile .Single() assertions in tests

Several new tests use .Single() directly on the Receive() result array (e.g., Redis DLQ test). .Single() throws InvalidOperationException on empty or multi-element sequences, which can mask the real assertion failure with a confusing exception. Prefer an explicit count assertion followed by indexed access, or use .First(m => m.Header.MessageType != MessageType.MT_NONE) as is done in the MQTT tests.

9. Kafka RefreshMetadata null-forgiving fix

The change from reason.Description! to reason.Description ?? string.Empty is correct — if the null-forgiving operator was needed before, the field was already nullable and the ! was suppressing a legitimate warning. The fix is good; worth noting the original was a latent bug.

10. Copyright year in MqttMessageConsumerFactory.cs

The copyright header was changed from 2017 to 2026. Confirm this is intentional and consistent with the project's policy on copyright year updates.


Summary

# Severity Area
1 High SQS breaking change / potential data loss for native DLQ users
2 High MsSql source message not deleted on rejection
3 Medium DRY violation — rejection helpers duplicated across 7+ providers
4 Low RocketMQ inconsistency with Lazy<T> pattern
5 Low SQS factory style inconsistency
6 Low In-place message mutation
7 Low MQTT Reject return semantics change
8 Low Fragile .Single() in tests
9 Info Kafka null-forgiving fix (correct)
10 Info Copyright year change

Items 1 and 2 should be addressed before merge. The DRY concern (item 3) is worth at least a follow-up issue given the maintenance cost of keeping seven copies of the same routing logic in sync.

Overall the approach is sound and the test coverage is thorough — good work bringing consistency across all providers.

@iancooper iancooper merged commit edc6b77 into master Feb 22, 2026
28 of 93 checks passed
@iancooper iancooper deleted the universal_dlq branch February 22, 2026 22:58
DevJonny pushed a commit to DevJonny/Brighter that referenced this pull request Feb 28, 2026
* docs: add DLQ requirement specs for remaining transports (0010-0015)

Requirement specs for AWS SQS, Redis, MsSql, PostgreSQL, RocketMQ, and
MQTT dead letter queue support. Numbered from 0010 to leave 0003-0009
free for parallel workstreams on other branches.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add ADR 0038 for SQS DLQ direct send, approve requirements and design

SQS-specific decisions for replacing ChangeMessageVisibility(0) with
direct SendMessage to DLQ. References ADR 0034 (strategy) and ADR 0036
(routing logic) for shared concerns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add implementation tasks for SQS DLQ (spec 0010), update ADR 0038 producer creation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: replace _hasDlq flag with DLQ routing key parameters in SqsMessageConsumer

Remove the inverted _hasDlq boolean from SqsMessageConsumer and replace with
RoutingKey? deadLetterRoutingKey and RoutingKey? invalidMessageRoutingKey
parameters, preparing for Brighter-managed DLQ support (ADR 0038).

- Add connection and makeChannels parameters to SqsMessageConsumer for later
  use by lazy DLQ producers
- Update SqsMessageConsumerFactory to pass new parameters
- Update reject tests: reject without DLQ now deletes (no longer uses
  ChangeMessageVisibility(0) which was the inverted _hasDlq behavior)
- Applied identically to both AWSSQS and AWSSQS.V4 packages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: SqsSubscription exposes DLQ and invalid message routing keys

- SqsSubscription implements IUseBrighterDeadLetterSupport and
  IUseBrighterInvalidMessageSupport interfaces
- Add DeadLetterRoutingKey and InvalidMessageRoutingKey properties
- Add optional constructor parameters, null by default (backward compatible)
- Applied identically to both AWSSQS and AWSSQS.V4 packages

- Test: When_creating_sqs_subscription_with_dlq_routing_keys_should_expose_properties
- Test: When_creating_sqs_subscription_without_dlq_routing_keys_should_default_to_null

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: consumer factory passes DLQ routing keys from subscription to consumer

SqsMessageConsumerFactory.CreateImpl() now extracts routing keys from the
subscription via IUseBrighterDeadLetterSupport/IUseBrighterInvalidMessageSupport
interface checks and passes them to the SqsMessageConsumer constructor, matching
the pattern established by KafkaMessageConsumerFactory.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: reject SQS messages with DeliveryError by sending to DLQ queue

Replace ChangeMessageVisibility(0) with direct SendMessage to DLQ per ADR 0038.
SqsMessageConsumer now creates lazy DLQ/invalid-message producers and routes
rejected messages with metadata (originalTopic, rejectionReason, rejectionTimestamp,
originalMessageType) before deleting the original from the source queue.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: verify Unacceptable rejection routes to invalid message queue

Integration test confirms DetermineRejectionRoute correctly sends messages
rejected with Unacceptable reason to the invalid message queue (not DLQ)
when both queues are configured.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: verify Unacceptable rejection falls back to DLQ when no invalid channel

Integration test confirms that when only deadLetterRoutingKey is configured
(no invalidMessageRoutingKey), rejecting with Unacceptable reason falls back
to the DLQ per ADR 0036 routing decision tree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: verify rejection with no channels configured acknowledges message

Integration test confirms that when no deadLetterRoutingKey or
invalidMessageRoutingKey is configured, rejecting with a reason still
deletes the original message from the source queue.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: verify async Proactor path sends rejected messages to DLQ

Integration test confirms the async/Proactor channel path correctly sends
messages rejected with DeliveryError to the DLQ with metadata, exercising
RejectAsync directly without the sync-over-async BrighterAsyncContext wrapper.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: update SNS reject tests to assert message deletion instead of requeue

The SNS requeue tests were still testing the old ChangeMessageVisibility(0)
behavior. Updated to match the SQS reject test pattern: reject deletes the
message from the queue (MT_NONE) rather than requeuing it.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* spec: add Redis DLQ specification (0011) with approved requirements, ADR, and tasks

Redis has no native DLQ support, so ADR 0039 adopts the same Brighter-managed
pattern proven in Kafka (spec 0001) and SQS (spec 0010). Also marks spec 0010
tasks as complete.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: RedisSubscription exposes DLQ and invalid message routing keys

RedisSubscription now implements IUseBrighterDeadLetterSupport and
IUseBrighterInvalidMessageSupport with optional constructor parameters,
following the same pattern as SQS. Backward compatible — defaults to null.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: consumer factory passes DLQ routing keys from subscription to consumer

RedisMessageConsumerFactory.Create() and CreateAsync() now extract
deadLetterRoutingKey and invalidMessageRoutingKey from subscriptions via
interface checks and pass them to the RedisMessageConsumer constructor.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: verify DeliveryError rejection routes message to DLQ

Implements the core DLQ routing logic in RedisMessageConsumer.Reject()
and RejectAsync() following the SQS pattern: lazy producer creation,
metadata enrichment, and ADR 0036 rejection routing decision tree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: verify Unacceptable rejection routes to invalid message channel

When both DLQ and invalid message channels are configured, Unacceptable
rejections route to the invalid channel per ADR 0036. DLQ stays empty.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: verify Unacceptable rejection falls back to DLQ when no invalid channel configured

- Test: When_rejecting_message_with_unacceptable_and_no_invalid_channel_should_fallback_to_dlq
- Confirms ADR 0036 fallback: Unacceptable → DLQ when invalidMessageRoutingKey is null

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: verify rejection with no channels configured removes message from inflight

- Test: When_rejecting_message_with_no_channels_configured_should_remove_from_inflight
- Confirms consumer can receive next message after rejecting without DLQ/invalid channels

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: verify async DeliveryError rejection routes message to DLQ

- Test: When_rejecting_message_with_delivery_error_should_send_to_dlq_async
- Confirms RejectAsync path sends to DLQ with rejection metadata

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: mark all Redis DLQ tasks complete in spec 0011

All 7 implementation tasks and verification complete.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add ADR 0040 for MsSql DLQ brighter-managed

Switch active spec to 0012-mssql-dead-letter-queue and add design
for Brighter-managed DLQ support in the MsSql transport, following
the established pattern from Kafka, SQS, and Redis.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* approve mssal specs

* fix: use stored receipt handle in SQS RejectAsync to delete source message

RefreshMetadata removes ReceiptHandle from the message bag before the
subsequent AcknowledgeAsync calls, which silently no-op because the
handle is gone. Extract DeleteSourceMessageAsync helper that takes the
receipt handle directly, and use it in RejectAsync with the locally
stored variable captured before RefreshMetadata runs. Applied to both
V3 and V4 SQS consumers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: MsSqlSubscription exposes DLQ and invalid message routing keys

MsSqlSubscription now implements IUseBrighterDeadLetterSupport and
IUseBrighterInvalidMessageSupport with optional deadLetterRoutingKey
and invalidMessageRoutingKey constructor parameters. Both the base
class and generic MsSqlSubscription<T> support the new parameters.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: MsSql consumer factory passes DLQ routing keys to consumer

MsSqlMessageConsumer now accepts optional deadLetterRoutingKey and
invalidMessageRoutingKey parameters. MsSqlMessageConsumerFactory
extracts routing keys from subscription via IUseBrighterDeadLetterSupport
and IUseBrighterInvalidMessageSupport interface casts, passing them
through to the consumer constructor.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: MsSql consumer rejects messages to DLQ on delivery error

Implement Reject()/RejectAsync() with full DLQ routing in
MsSqlMessageConsumer following the same pattern as Redis/SQS.
Adds lazy MsSqlMessageProducer creation, RefreshMetadata() for
rejection metadata enrichment, and DetermineRejectionRoute() for
ADR 0036 routing decision tree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: MsSql rejects unacceptable messages to invalid message channel

Verifies that when both DLQ and invalid message routing keys are
configured, rejecting with Unacceptable routes to the invalid
message channel (not the DLQ). Confirms DetermineRejectionRoute()
correctly prioritizes the invalid message producer per ADR 0036.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: MsSql unacceptable rejection falls back to DLQ when no invalid channel

Verifies that when only deadLetterRoutingKey is configured (no
invalidMessageRoutingKey), rejecting with Unacceptable falls back
to the DLQ per ADR 0036 routing decision tree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: MsSql reject with no channels configured returns true

Verifies that when neither deadLetterRoutingKey nor
invalidMessageRoutingKey is configured, Reject() returns true
and the consumer can continue receiving subsequent messages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: MsSql async reject with delivery error sends to DLQ

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add ADR 0041 and tasks for PostgreSQL DLQ (spec 0013)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: PostgresSubscription exposes DLQ and invalid message routing keys

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: Postgres consumer factory passes DLQ routing keys to consumer

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: Postgres Reject() forwards messages to DLQ with metadata and deletes source

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: Postgres rejects unacceptable messages to invalid message channel

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: Postgres unacceptable rejection falls back to DLQ when no invalid channel

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: Postgres rejection with no channels configured deletes source and continues

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: Postgres async DeliveryError rejection sends message to DLQ

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: ADR 0042 and tasks for RocketMQ DLQ (Spec 0014)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: RocketSubscription exposes DLQ and invalid message routing keys

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: RocketMQ consumer factory passes DLQ routing keys to consumer

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: RocketMQ Reject() forwards messages to DLQ with metadata and Acks source

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* ensure we can run rocketmq on a mac

* test: RocketMQ DLQ rejection routing tests and docker-compose fixes

Add integration tests for remaining rejection routing paths:
- Unacceptable reason routes to invalid message channel
- Unacceptable falls back to DLQ when no invalid channel configured
- No channels configured Acks source and breaks requeue loop
- Async DeliveryError routes to DLQ via Proactor path

Fix docker-compose for RocketMQ 5.4.0: add platform linux/amd64 for
Rosetta compatibility, update mqadmin path, reduce JVM heap sizes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add ADR 0043 for MQTT dead letter queue (Brighter-managed)

Approve requirements and design for spec 0015. MQTT DLQ follows the
Redis pattern — Lazy<T> producers, no source message cleanup (fire-and-
forget), new MqttSubscription and MqttMessageConsumerFactory classes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: MqttSubscription exposes DLQ and invalid message routing keys

- MqttSubscription implements IUseBrighterDeadLetterSupport and IUseBrighterInvalidMessageSupport
- Generic MqttSubscription<T> variant for typed subscriptions
- Test: When_creating_mqtt_subscription_with_dlq_routing_keys_should_expose_properties
- Test: When_creating_mqtt_subscription_without_dlq_routing_keys_should_default_to_null

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: MqttMessageConsumerFactory passes DLQ routing keys to consumer

- MqttMessageConsumerFactory implements IAmAMessageConsumerFactory
- Create()/CreateAsync() extract routing keys via IUseBrighterDeadLetterSupport/IUseBrighterInvalidMessageSupport
- MqttMessageConsumer constructor accepts optional deadLetterRoutingKey and invalidMessageRoutingKey
- MqttSubscription.ChannelFactoryType overridden to MqttMessageConsumerFactory

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: MQTT Reject() forwards messages to DLQ with metadata

- Reject() and RejectAsync() forward rejected messages to configurable DLQ topic
- RefreshMetadata() adds originalTopic, rejectionReason, rejectionTimestamp, originalMessageType
- DetermineRejectionRoute() implements ADR 0036 routing (DeliveryError/Unacceptable/fallback)
- Lazy<MqttMessageProducer> for DLQ and invalid message producers
- Separate sync/async Reject implementations to avoid nested sync-over-async deadlock
- Added .slopwatch/config.json to suppress SW004 in test projects

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: MQTT Reject with Unacceptable routes to invalid message channel

- Verifies Unacceptable reason routes to invalid channel (not DLQ)
- Confirms DLQ does not receive the message when invalid channel is configured

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: MQTT DLQ rejection routing tests and async variant

- Unacceptable fallback to DLQ when no invalid channel configured
- No channels configured returns true (breaking change from false)
- Async DeliveryError rejection routes to DLQ with metadata

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: mark all MQTT DLQ tasks complete (spec 0015)

All 7 tasks + regression verification complete:
- 11/11 tests pass (existing flaky 1000-msg async test is pre-existing)
- Backward compatible: existing MqttMessageConsumer constructor still works

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: switch current spec to 0015-mqtt-dead-letter-queue

- Update .current-spec to MQTT DLQ spec
- Update RocketMQ tasks.md with completed checkboxes
- Add .tasks-approved for MQTT spec

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: address DLQ cross-spec review findings (H1, H2, M1-M3, L6-L7)

- Redis: dispose DLQ producers in Dispose/DisposeAsync to prevent connection leaks
- MQTT: add ClientID with -dlq/-invalid suffix to DLQ producer config
- MQTT: fix misleading log in ResolveRejectionProducer, return tuple for null safety
- Backstop: add source-generated logging per ADR 0037 in both sync/async handlers
- RocketMQ: replace throw-in-finally with AckSourceMessageSafeAsync to avoid masking DLQ result
- RocketMQ: add thread-safety comment on producer caching fields
- Remove stale RELIABILITY-IMPROVEMENTS-SUMMARY.md
- Add cross-spec review findings document with checklist

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: update collection type

* fix: await DisposeAsync on DLQ producers in SQS consumers

SqsMessageConsumer.DisposeAsync() was calling sync Dispose() on
producers that implement IAsyncDisposable, bypassing async cleanup.
Now checks for IAsyncDisposable and awaits it, matching the pattern
used by Kafka and Redis consumers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: replace null-forgiving operator with null-coalescing fallback in RefreshMetadata

reason.Description is nullable; the `!` operator suppressed the compiler
warning but could silently insert null into the message bag. Use
`?? string.Empty` to provide a safe runtime fallback across all 8
transport consumers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: add FIFO DLQ routing integration tests for SQS (V3 and V4)

FIFO queues require MessageGroupId (from PartitionKey) and optionally
MessageDeduplicationId. These tests verify that rejected FIFO messages
arrive at a FIFO DLQ with FIFO attributes preserved, covering both
Reactor (sync) and Proactor (async) paths for V3 and V4 SDK variants.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: pass SqsAttributes to DLQ producers for FIFO queue support

The DLQ and invalid message producers were created without queue
attributes, so FIFO queues were created as standard queues (missing
.fifo suffix). This caused DLQ sends to fail silently for FIFO
messages because MessageGroupId is invalid on standard queues.

Pass SqsAttributes from the subscription through the consumer to the
DLQ/invalid message producer publications, ensuring FIFO DLQ queues
are created correctly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: use LazyThreadSafetyMode.None for DLQ producers to avoid caching exceptions

The default LazyThreadSafetyMode.ExecutionAndPublication caches thrown
exceptions permanently. A single transient failure during DLQ producer
creation would disable DLQ routing for the lifetime of the consumer.
LazyThreadSafetyMode.None does not cache exceptions, allowing retry on
the next .Value access. This is safe because message pumps are
single-threaded per consumer.

Affects all 7 Brighter-managed DLQ transports: SQS (V3+V4), Kafka,
MQTT, Redis, PostgreSQL, MsSql.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

3 - Done .NET Pull requests that update .net code V10.X

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant