Optimise Dynamo DB outbox usage#3761
Conversation
iancooper
left a comment
There was a problem hiding this comment.
I made a few comments. I suspect that the might be your actual intent here, so probably it is just clarifying.
|
|
||
| ### `OutstandingMessages` operation | ||
|
|
||
| Given that the `Outstanding` index is a sparse index, and we wish to pull out the entirety of that index when we perform the operation, this can be a `Scan` operation on the index instead of a `Query`. This removes the need to iterate over topics and shards, and can instead be done as a single HTTP call if the number of outstanding messages allows it (with paging if it doesn't). |
There was a problem hiding this comment.
What would happen if the partition key (hash key) was the status instead of the topic (as today) i.e. Outstanding or Delivered? This would be sparse as the record would not exist the value was null. As ticks are numeric then hashing them ought to ensure that we had high cardinality to avoid hotspots on writes. Then the only items in the GSI ought to be the Outstanding or Delivered rows. You could then Scan and filter by Topic as, as you allude to here, you want all the results back for a topic anyway. Assuming that the Scan is no more painful than the Query that needs to get all rows anyway?
There was a problem hiding this comment.
Scans are only more painful than a Query if:
- You need to be able to sort results (since you need to fetch everything to sort in memory)
- You want to apply a filter that filters out a substantial number of records.
Filter expressions are applied after the data has been read from the partition, but before the data is returned to the client. So while they reduce the data being sent over the wire, they don't reduce the number of read units being consumed or the amount of data that has to be read out of the table/index.
Using OutstandingCreatedTime and DeliveredTime as the hash key for the Outstanding and Delivered indices would certainly solve the cardinality problem, but could make things problematic for users who have a use case requiring retrieval of records for a specific topic. If, for example, an outbox was being used for one high throughput topic, and one low throughput topic, then attempts to get the outstanding messages on the low throughput topic would require reading everything from the high throughput topic as well.
Another thing to note is that the limit applied to both Query and Scan operations applies to the number of items read from the table, not the number of items returned to the client. So if one was applying a filter expression that filters out the majority of a table, the operation may well return no items at all even if there are items in the table that would be returned in subsequent pages.
That's why I left the partition structure in place here - it isn't as clean when it comes to cardinality, but is much cleaner for the use case where a user may only want outstanding/delivered messages for a particular topic.
There was a problem hiding this comment.
We could have both GSIs that let you query by topic and those that don't. However, if we end up paging in the records, as shown below, we should not expose a different way to handle high-throughput and low-throughput topics. If we do (and I missed it) or intend to (which, if we don't, might be a good idea), then we could explicitly use the GSI that uses the topic for the hash key if you expressly ask us to clear a given topic. We could note that the topic based version is preferred for clearing the low throughput topic if it's being drowned by a 'noisy neighbor'
There was a problem hiding this comment.
Ah yes I hadn't considered effective parallel scanning. Having both makes sense. To avoid requiring people to add a new GSI if they're using v9, I'll update the ADR to reflect:
In v9:
- Scan the existing outstanding index and only do a single scan for now
In v10:
- Add a new GSI for outstanding messages indexed by
OutstandingCreatedTime(with message ID as the range key for completeness) - When scanning for all outstanding messages, use the new index with a config option for the number of parallel scans to perform at once
- When querying for all outstanding messages for a particular topic, use the old index
|
|
||
| Given that the `Outstanding` index is a sparse index, and we wish to pull out the entirety of that index when we perform the operation, this can be a `Scan` operation on the index instead of a `Query`. This removes the need to iterate over topics and shards, and can instead be done as a single HTTP call if the number of outstanding messages allows it (with paging if it doesn't). | ||
|
|
||
| The one downside of this is that we cannot specify the ordering of results from a `Scan` operation. If the results are paged, we will not be able to specify that the oldest messages should be retrieved first. Given the performance issues using `Query` operations, and the limitations of Dynamo DB as a storage platform, this feels like a reasonable comprompise to make. |
There was a problem hiding this comment.
If we are retrieving all the outstanding items (without a limit), we can order them ourselves in memory and then dispatch them. That would preserve ordering. This also set us up to use a parallel scan to accelerate execution. It's relatively safe to read them all into a buffer, because if we fail, we don't dispatch them.
There was a problem hiding this comment.
I think this is the way to go. Preserving ordering at the database level can only be done with a Query, and since we have requirements to both increase the cardinality of partition keys (via sharding) and reduce the number of requests sent over the network, Queries just don't work here.
The only potential issue is when there is a large number of messages in the Outstanding index, necessitating the use of paging to avoid issues with memory consumption or latency. Since the order of records returned by a Scan is essentially random, we just need to ensure it's clear somewhere in documentation or in comments that ordering of published messages is only guaranteed if Number of outstanding messages < Page size.
There was a problem hiding this comment.
This does in theory negate the need for #3606, but if we keep the existing index structure for the Outstanding and Delivered indices then I still think it's worth making shard assignment deterministic - it would at least mean users could preserve ordering even if there are a large number of messages in the Outstanding index by querying topics individually.
There was a problem hiding this comment.
For convenience, I note above, that we could possibly have both, if we choose to have an optimal structure for a parallel scan, and then let you give us a specific index if noisy neighbor is a problem.
|
|
||
| ### Marking messages as dispatched | ||
|
|
||
| When marking a collection of messages as dispatched, us a `BatchWrite` operation to update all of them at once. If any of the updates fail, throw an exception. |
There was a problem hiding this comment.
If we fail to mark dispatched in a batch, do we want to throw an exception, or log? In principle, an undispatched message would just be picked up again and resent. The main issue would be increased latency. Perhaps a boolean to indicate if you throw on an partial write, or just log, and the from a Sweeper we would just let it try again next time, but from an immediate write, you would choose by setting an arg
There was a problem hiding this comment.
Yeah I thought about this one for a while. I ended up with the suggestion to throw an exception because that would effectively reproduce the existing behaviour, but logging makes a lot of sense. I'm inclined to say:
- Default to throwing
- Have an optional bool in the
argsbag to allow logging instead - Use the logging option when invoking the outbox from the Sweeper
This commit adds two new indices to the Dynamo DB outbox structure, allowing for effective scanning of all outstanding or delivered messages regardless of topic.
This commit adds support for parallel scans when scanning for outstanding or delivered messages in the "all topics" indices. It also adds some semaphores to ensure that the paging state of either all topics or a specific topic is only operated on by one thread at a time.
This commit adds sharding to the delivered index, which is used when querying for delivered messages for a specific topic. It also takes advantage of this to refactor the code that iterates through shards in a sharded index to use common code for both the outstanding and delivered indices.
This commit adds both sync and async methods for getting a batch of messages from the outbox, and adds an implementation using BatchGet for the Dynamo DB outbox. It also updates some method signatures for other outbox implementations to align to the new interface, and adds implementations where missing. Finally, it copies all the optimisations made so far over to the AWS SDK v4 version of the Dynamo outbox.
This fixes a project reference so that it uses v4 of the AWS SDK for both references and not just one of them.
This updates the OutboxProducerMediator to take advantage of BatchGet operations when clearing a specific collection of IDs from the outbox, to minimise the number of concurrent db operations.
This changes `MarkDispatchedAsync` to do so using an Update expression. This enables the ability to do it using a single call to the DB, instead of reading everything out and writing it all back in again. To facilitate this, a reference to the Dynamo DB client has been added as a member variable, as update expressions can only be used when using the low level API. This has also meant deleting one of the (unused) constructors that takes the Dynamo DB context directly, as the client is not accessible from within the context object. Finally, the ADR has been updated to reflect this change as this wasn't the original proposed change.
This updates `DeleteAsync` to use batch write operations to delete items in batches of up to 25 items. It does this using the low level API as the method exposed by `DynamoDBContext` doesn't expose a response object with the unprocessed items.
This adds a migration guide to the ADR for these changes, for users who are currently using the Dynamo DB outbox with v9 and need to upgrade to v10. The upgrade can be done in place without needing a new outbox table.
This fixes the observability tests, as they were looking for events in the wrong place (they're now on the per-operation create span instead of the per-message clear span as we pre-fetch all required messages in a batch). It also adds the missing span name for the new Count operation on outboxes.
iancooper
left a comment
There was a problem hiding this comment.
Approved as of meeting on Wed 1 Oct
This adds a couple of clarifying comments: - When calling the outstanding messages method on the Dynamo DB outbox, it makes it clear that only one query can run concurrently when querying all topics, and one query per topic can run when querying specific topics. - Updated the example index name in the guide for upgrading to the v10 outbox from a v9 outbox, to be clearer about what change is being made
A new outbox implementation was added after this work was started - this commit aligns the new outbox with the changes made to the outbox interfaces.
There was a problem hiding this comment.
Gates Failed
Prevent hotspot decline
(1 hotspot with Complex Conditional, Primitive Obsession)
Enforce advisory code health rules
(6 files with Complex Conditional, Primitive Obsession, Missing Arguments Abstractions, Code Duplication, Complex Method)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Prevent hotspot decline | Violations | Code Health Impact | |
|---|---|---|---|
| DynamoDbOutbox.cs | 2 rules in this hotspot | 7.01 → 6.90 | Suppress |
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| InMemoryOutbox.cs | 1 advisory rule | 8.55 → 7.79 | Suppress |
| MongoDbOutbox.cs | 1 advisory rule | 7.79 → 7.55 | Suppress |
| FirestoreOutbox.cs | 1 advisory rule | 5.74 → 5.58 | Suppress |
| DynamoDbOutbox.cs | 2 advisory rules | 7.01 → 6.90 | Suppress |
| DynamoDbOutbox.cs | 2 advisory rules | 7.01 → 6.90 | Suppress |
| BrighterSpanExtensions.cs | 1 advisory rule | 9.45 → 9.42 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
| } | ||
| } | ||
|
|
||
| /// <inheritdoc /> |
There was a problem hiding this comment.
❌ New issue: Missing Arguments Abstractions
The average number of function arguments in this module is 4.04 across 26 functions. The average arguments threshold is 4.00
This PR makes a number of improvements to the Dynamo DB outbox implementation, focused around performance improvement in high throughout scenarios.
The details of the changes made are covered in detail in the included ADR, but can be summarised as:
Delieveredindex, to prevent backpressure being applied to the main table when a particular topic creates a hot partition in the indexCOUNToption when determining how many outstanding messages are in the outboxSome of these changes have meant changing the sync and async outbox interfaces. These have been reflected in other outbox implementations, in completely non-breaking updates.