Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c812fd7
Add proposal for optimising Dynamo DB outbox usage
dhickie Sep 3, 2025
e1577c8
Improve ADR title
dhickie Sep 3, 2025
6c40914
Update ADR with decision on Dynamo DB optimisation
dhickie Sep 11, 2025
2149e49
Merge remote-tracking branch 'upstream/master' into optimise_dynamo_o…
dhickie Sep 11, 2025
3b149e8
Scan outbox over all topics
dhickie Sep 11, 2025
9bdab43
Add support for parallel scans of all topic indices
dhickie Sep 12, 2025
e8cad1b
Add sharding to delivered index
dhickie Sep 16, 2025
5255850
Add BatchGet method to outboxes
dhickie Sep 17, 2025
4077466
Fix AWS SDK project reference
dhickie Sep 17, 2025
7b73ec1
Use BatchGet when clearing messages from outbox
dhickie Sep 17, 2025
4eb168a
Use update expression to mark messages as dispatched
dhickie Sep 19, 2025
7e8d087
Use BatchWrites for deleting from outbox
dhickie Sep 19, 2025
80ed023
Add outstanding count method to outboxes
dhickie Sep 23, 2025
38b300b
Make shard assignment deterministic
dhickie Sep 23, 2025
ba44724
Port Dynamo DB shard assignment to v4 sdk
dhickie Sep 23, 2025
d28a701
Remove misleading comment
dhickie Sep 23, 2025
3c2f572
Fixes for existing tests
dhickie Sep 25, 2025
96d3960
Ensure accurate segment page sizes
dhickie Sep 25, 2025
f474ebf
Add new tests for new outbox methods
dhickie Sep 25, 2025
bea0163
Add outbox migration guide to ADR
dhickie Sep 26, 2025
9255d71
Merge remote-tracking branch 'upstream/master' into optimise_dynamo_o…
dhickie Sep 26, 2025
7fffa7f
Fix observability tests
dhickie Sep 26, 2025
67e7f5d
Add clarifying comments
dhickie Oct 3, 2025
f1f720d
Merge remote-tracking branch 'upstream/master' into optimise_dynamo_o…
dhickie Oct 3, 2025
d4727c9
Align GCP outbox with interface
dhickie Oct 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 24 additions & 26 deletions Brighter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,18 @@ Global
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|x86.ActiveCfg = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|x86.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|x86.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|x86.Build.0 = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Any CPU.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|x86.ActiveCfg = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|x86.Build.0 = Release|Any CPU
{022AD920-4E8D-4370-9C6D-CA4D8DA3DB6F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{022AD920-4E8D-4370-9C6D-CA4D8DA3DB6F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{022AD920-4E8D-4370-9C6D-CA4D8DA3DB6F}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -2318,6 +2330,18 @@ Global
{D530B147-067A-408D-BB1B-A4290324012F}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{D530B147-067A-408D-BB1B-A4290324012F}.Release|x86.ActiveCfg = Release|Any CPU
{D530B147-067A-408D-BB1B-A4290324012F}.Release|x86.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Any CPU.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|x86.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|x86.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Any CPU.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Any CPU.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|x86.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|x86.Build.0 = Release|Any CPU
{7AA5B0BF-3520-45C4-9B8A-7F131EFDA227}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7AA5B0BF-3520-45C4-9B8A-7F131EFDA227}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7AA5B0BF-3520-45C4-9B8A-7F131EFDA227}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -2546,18 +2570,6 @@ Global
{3B6D084F-C034-49C6-A8C4-3C23DCC83CF2}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{3B6D084F-C034-49C6-A8C4-3C23DCC83CF2}.Release|x86.ActiveCfg = Release|Any CPU
{3B6D084F-C034-49C6-A8C4-3C23DCC83CF2}.Release|x86.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Any CPU.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|x86.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|x86.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Any CPU.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Any CPU.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|x86.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|x86.Build.0 = Release|Any CPU
{4EA5F196-DDA8-4941-956B-D413B03051C9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4EA5F196-DDA8-4941-956B-D413B03051C9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4EA5F196-DDA8-4941-956B-D413B03051C9}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -2630,8 +2642,6 @@ Global
{0E6A0B80-58B7-4AA2-9E40-EE0AA5D4719E}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{0E6A0B80-58B7-4AA2-9E40-EE0AA5D4719E}.Release|x86.ActiveCfg = Release|Any CPU
{0E6A0B80-58B7-4AA2-9E40-EE0AA5D4719E}.Release|x86.Build.0 = Release|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|Any CPU.Build.0 = Debug|Any CPU
{79CA356E-B08C-4D88-88C9-653EC8D8BF4D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{79CA356E-B08C-4D88-88C9-653EC8D8BF4D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{79CA356E-B08C-4D88-88C9-653EC8D8BF4D}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -2668,18 +2678,6 @@ Global
{24360989-A956-45E9-BF07-7FD9E7553C7D}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{24360989-A956-45E9-BF07-7FD9E7553C7D}.Release|x86.ActiveCfg = Release|Any CPU
{24360989-A956-45E9-BF07-7FD9E7553C7D}.Release|x86.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|x86.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|x86.Build.0 = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Any CPU.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|x86.ActiveCfg = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|x86.Build.0 = Release|Any CPU
{9063F17B-5636-4AD5-999B-C894517DB5FD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9063F17B-5636-4AD5-999B-C894517DB5FD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9063F17B-5636-4AD5-999B-C894517DB5FD}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand Down
115 changes: 115 additions & 0 deletions docs/adr/0033-optimise-reads-and-writes-from-dynamo-outbox.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# 33. Optimise reads/writes from/to Dynamo outbox

Date: 2025-09-03

## Status

Proposed

## Context

Load testing of APIs using Brighter with a Dynamo DB outbox has shown a performance bottleneck in high throughput scenarios, resulting in excessive CPU usage, high response times, and some HTTP 503 responses to clients. When running load testing locally with the API hooked up to LocalStack, we see an excessive number of `Query` operations being performed on the Dynamo outbox table. This suggests that the issue is some form of resource exhaustion cause by excessive requests being made over the network.

There are several places where we make inefficient use of the Dynamo DB client.

### `OutstandingMessages` operation

The `Outstanding` GSI in the Dynamo outbox uses the topic name as a primary key, which is then sharded according to the number of shards provided in config by the user in order to avoid hot partitions. When the outbox is queried to fetch all outstanding messages, it performs a large number of queries iterating over each topic it knows about, and each shard for each of those partitions. If the outbox was being used as part of publishing to five topics, each with 20 shards in the outbox table, that would mean performing 100 query operations even if the outstanding index was completely empty.

### `DispatchedMessages` operation & the `Delivered` GSI

Similar to the operation above. The `Delivered` index is partitioned by topic name, but unlike the `Outstanding` index it is _not_ sharded. This means that when the `DispatchedMessage` operation is performed, it only has to iterate over the topics it knows about (so following the scenario above, five queries even if the delivered index is empty).

As the `Delivered` index isn't sharded, it can fall victim to hot partitions. When writes to a GSI are throttled, [this also throttles writes to the base table that would affect the GSI](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/gsi-throttling.html). Since the `Delivered` index is a sparse index that isn't written to if the `DeliveryTime` of a message is null, this wouldn't affect the initial writing of messages to the outbox. It _would_, however, throttle the ability to mark messages as delivered, and lead to duplicate publishes as messages become "stuck" in the outbox.

### Batch get and write operations

There are a few operations where the outbox is provided with a collection of messages or message IDs, and instead of performing batch operations with those IDs it iterates through them and performs individual operations sequentially:

* When clearing a collection of message IDs from the outbox, it fetches each of these individually and tries to dispatch it before moving onto the next
* When a collection of messages are deleted from the outbox they're worked through sequentially with separate requests

### Fetching outstanding message count

Every time one or more messages are cleared from the outbox, the `OutboxProducerMediator` checks whether it needs to update it's internal metric for how many outstanding messages currently sit in the outbox, based on when that last check was last run. If it determines a refresh is neccesary, it fetches all outstanding messages from the outbox into memory (but does so asynchronously). This comes with all the queries described above, and if there are a large number of messages outstanding, can eat up a problematic amount of memory. The count of outstanding messages only appears to be used for monitoring purposes.

## Decision

Some of the inefficiencies above can be improved with non-breaking changes. There will, however, be breaking changes required to the delivered index.

### `OutstandingMessages` operation

Introduce a new additional `Outstanding` GSI called `OutstandingAllTopics`. This index will use `OutstandingCreatedTime` as the hash key, and `MessageId` as the range key, making it a sparse, high cardinality index containing outstanding messages for all topics.

When requests are made to the outbox to fetch outstanding messages for all topics, scan the new index using a parallel `Scan` and fetch results up to the provided page size. Order the results by created time in memory before returning them to the calling function.

Add a new configuration option to `DynamoDbConfiguration` called `ScanConcurrency` to allow configurability of how many parallel scan operations are performed concurrently.

When requests are made to the outbox to fetch outstanding message for a specific topic, continue to use a `Query` operation on the existing index and iterate through shards, fetching results up to the page size.

The one downside of this is that we cannot specify the ordering of results from a `Scan` operation. We try to get around this by ordering the results in memory, but if the number of outstanding messages in the outbox is greater than the page size, the ordering of messages returned by the operation cannot be guaranteed.

### `DispatchedMessages` operation

As above:

* Introduce a new `Delivered` index called `DeliveredAllTopics`, which uses `DeliveryTime` as the hash key and `MessageId` as the range key
* Scan the new index when fetching delivered messages for all topics, using the `ScanConcurrency` option for parallel scan concurrency
* Continue to use a `Query` operation when fetching delivered messages for a specific topic, iterating through shards

### The `Delivered` GSI

Introduce sharding to the `Delivered` index, using the same number of shards as configured for the `Outstanding` index.

### Getting messages for dispatch

Add overloads of both `Get` and `GetAsync` to the outbox interfaces that take a collection of message IDs instead of just one. For the Dynamo DB implementation, use a `BatchGet` operation to fetch all of them at once. For the other implementations, they can just iterate over their other `Get` methods for now.

If the `BatchGet` operation only returns a subset of the requested messages, throw an exception.

Update `OutboxProducerMediator` to use the new `Get` methods.

### Marking messages as dispatched

When marking a message as dispatched, use an `UpdateExpression` to only update the attributes we need to instead of reading the whole message out and then writing it all back in again.

### Deleting messages from the outbox

When deleting a collection of messages from the outbox, do so using a `BatchWrite` operation. If any of the deletes fail, throw an exception.

### Outstanding item count

It feels useful to have the number of outstanding messages available in a metric. Dynamo DB doesn't have a `Count` operation, but it does allow `Scan` operations that return only the count of items scanned, minimising the amount of data sent over the wire. As this is still a scan, we still need to specify a page size when this method is invoked:

* If the outbox has a maximum outstanding message count configured, then the page size should be 1 larger than the maximum to ensure the count retreived is at least as big as the configured maximum
* If the outbox does _not_ have a maximum outstanding message count configured, use the default value

Add a new method to the outbox interfaces called called `GetOutstandingMessageCount` and `GetOutstandingMessageCountAsync` that is called from the `OutboxProducerMediator`.

Other outbox implementations can continue to use their implementations of `OutstandingMessages` for now.

### Deterministic shard assignment

Make assignment of messages to shards for each topic deterministic. This makes it possible to preserve ordering of messages within a partition key by ensuring all messages with that key are assigned to the same shard. This can be done by hashing the partition key on the message:

```c#
var keyBytes = Encoding.UTF8.GetBytes(message.Header.PartitionKey);
var sha256 = SHA256.Create();
var keyHash = sha256.ComputeHash(keyBytes);
var shardNumber = BitConverter.ToUInt32(keyHash, 0) % _configuration.NumberOfShards;
```

If the partition key isn't specified for a message, then fall back to random shard assignment.

## Consequences

* When performing the `OutstandingMessages` or `DispatchedMessages` operations for all topics, we will only be able to guarantee the order of the returned messages if the number of outstanding messages is less than the page size for the operation.
* Shards will be assigned to messages deterministically based on their partition key
* The possibility of future improvements to other outbox implementations, to take advantage of the new bulk operation methods
* Users of the Dynamo DB outbox implementation in Brighter v9 will need to update their table as part of their migration to v10:
* Add a new GSI called `OutstandingAllTopics`, that uses `OutstandingCreatedTime` as its HASH key and `MessageId` as its RANGE key
* Add a new GSI called `DeliveredAllTopics`, that uses `DeliveryTime` as its HASH key and `MessageId` as its RANGE key
* Change the HASH key used by the `Delivered` index. This can be achieved by:
* Adding a _new_ GSI, which for the sake of example we'll call `DeliveredSharded`, which uses `TopicShard` as its HASH key and `DeliveryTime` as its RANGE key
* When performing the Brighter v10 upgrade, customise the `DynamoDbConfiguration` during configuration to set `DeliveredIndexName` to `DeliveredSharded`
* Once the v10 upgrade is complete, the old `Delivered` index can be removed if desired
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,39 @@ THE SOFTWARE. */

#endregion

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Paramore.Brighter.Outbox.DynamoDB.V4;

internal sealed class DispatchedMessagesQueryResult
internal sealed class AllTopicsScanContext(int numSegments)
{
public IEnumerable<MessageItem> Messages { get; private set; }
public string PaginationToken { get; private set; }
public bool QueryComplete { get; private set; }
public int NextPage { get; private set; } = 1;

public DispatchedMessagesQueryResult(IEnumerable<MessageItem> messages, string paginationToken, bool queryComplete)
private string?[] _lastEvaluatedKeys = new string?[numSegments];

private SemaphoreSlim _scanLock = new(1, 1);

public void SetPagingToken(int segmentNumber, string? lastEvaluatedKey)
{
Messages = messages;
PaginationToken = paginationToken;
QueryComplete = queryComplete;
_lastEvaluatedKeys[segmentNumber] = lastEvaluatedKey;
}

public string? GetPagingToken(int segmentNumber) => _lastEvaluatedKeys[segmentNumber];

public void SetNextPage()
{
if (_lastEvaluatedKeys.Any(x => x != null))
{
NextPage++;
}
else
{
NextPage = 1;
}
}

public async Task Lock(CancellationToken cancellationToken) => await _scanLock.WaitAsync(cancellationToken);
public void Release() => _scanLock.Release();
}

This file was deleted.

Loading
Loading