Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Paramore.Brighter.DynamoDb/DynamoDbUnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void Close()
/// Commit a transaction, performing all associated write actions
/// Will block thread and use second thread for callback
/// </summary>
public void Commit() => BrighterAsyncContext.Run(async () => await DynamoDb.TransactWriteItemsAsync(_tx));
public void Commit() => BrighterAsyncContext.Run(async () => await CommitAsync());

/// <summary>
/// Commit a transaction, performing all associated write actions
Expand Down
2 changes: 1 addition & 1 deletion src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void Add(
IAmABoxTransactionProvider<TransactWriteItemsRequest> transactionProvider = null
)
{
AddAsync(message, requestContext, outBoxTimeout).ConfigureAwait(ContinueOnCapturedContext).GetAwaiter().GetResult();
AddAsync(message, requestContext, outBoxTimeout, transactionProvider).ConfigureAwait(ContinueOnCapturedContext).GetAwaiter().GetResult();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Getting worse: Code Duplication
introduced similar code in: Add

Suppress

}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.DataModel;
using Amazon.DynamoDBv2.Model;
using FluentAssertions;
using Microsoft.Extensions.Time.Testing;
using Paramore.Brighter.DynamoDb;
using Paramore.Brighter.DynamoDB.Tests.TestDoubles;
Expand All @@ -18,6 +20,8 @@ public class DynamoDbOutboxTransactionTests : DynamoDBOutboxBaseTest
private readonly ITestOutputHelper _testOutputHelper;
private readonly DynamoDbOutbox _dynamoDbOutbox;
private readonly string _entityTableName;
private readonly Dictionary<string, AttributeValue?> _entityAttributes;
private readonly Message _message;

public DynamoDbOutboxTransactionTests(ITestOutputHelper testOutputHelper)
{
Expand Down Expand Up @@ -46,14 +50,11 @@ public DynamoDbOutboxTransactionTests(ITestOutputHelper testOutputHelper)
}

_dynamoDbOutbox = new DynamoDbOutbox(Client, new DynamoDbConfiguration(OutboxTableName), fakeTimeProvider);
}

[Fact]
public async void When_There_Is_A_Transaction_Between_Outbox_And_Entity()
{
var context = new DynamoDBContext(Client);
var myItem = new MyEntity { Id = Guid.NewGuid().ToString(), Value = "Test Value for Transaction Checking" };
var attributes = context.ToDocument(myItem).ToAttributeMap();
_entityAttributes = context.ToDocument(myItem).ToAttributeMap();

var myMessageHeader = new MessageHeader(
messageId: Guid.NewGuid().ToString(),
topic: new RoutingKey("test_topic"),
Expand All @@ -64,18 +65,51 @@ public async void When_There_Is_A_Transaction_Between_Outbox_And_Entity()
correlationId: Guid.NewGuid().ToString(),
replyTo: new RoutingKey("ReplyAddress"),
contentType: "text/plain");

var body = new MessageBody(myItem.Value);
var myMessage = new MessageItem(new Message(myMessageHeader, body));
var messageAttributes = context.ToDocument(myMessage).ToAttributeMap();
_message = new Message(myMessageHeader, body);
}

[Fact]
public void When_There_Is_A_Transaction_Between_Outbox_And_Entity()
{
var uow = new DynamoDbUnitOfWork(Client);
TransactWriteItemsResponse response;
try
{
_dynamoDbOutbox.Add(_message, new RequestContext(), transactionProvider: uow);

var transaction = uow.GetTransaction();
transaction.TransactItems.Add(new TransactWriteItem { Put = new Put { TableName = _entityTableName, Item = _entityAttributes, } });

transaction.TransactItems.Count.Should().Be(2);

uow.Commit();
response = uow.LastResponse;
}
catch (Exception e)
{
_testOutputHelper.WriteLine(e.ToString());
throw;
}

response.Should().NotBeNull();
response.HttpStatusCode.Should().Be(HttpStatusCode.OK);
response.ContentLength.Should().Be(2); //number of tables in the transaction
}

[Fact]
public async Task When_There_Is_A_Transaction_Between_Outbox_And_Entity_Async()
{
var uow = new DynamoDbUnitOfWork(Client);
TransactWriteItemsResponse response = null;
TransactWriteItemsResponse response;
try
{
await _dynamoDbOutbox.AddAsync(_message, new RequestContext(), transactionProvider: uow);

var transaction = await uow.GetTransactionAsync();
transaction.TransactItems.Add(new TransactWriteItem { Put = new Put { TableName = _entityTableName, Item = attributes, } });
transaction.TransactItems.Add(new TransactWriteItem { Put = new Put { TableName = OutboxTableName, Item = messageAttributes}});
transaction.TransactItems.Add(new TransactWriteItem { Put = new Put { TableName = _entityTableName, Item = _entityAttributes, } });

transaction.TransactItems.Count.Should().Be(2);

await uow.CommitAsync();
response = uow.LastResponse;
Expand All @@ -86,8 +120,8 @@ public async void When_There_Is_A_Transaction_Between_Outbox_And_Entity()
throw;
}

Assert.NotNull(response);
Assert.Equal(HttpStatusCode.OK, response.HttpStatusCode);
Assert.Equal(2, response.ContentLength); //number of tables in the transaction
response.Should().NotBeNull();
response.HttpStatusCode.Should().Be(HttpStatusCode.OK);
response.ContentLength.Should().Be(2); //number of tables in the transaction
}
}