diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 05d659f471..5e31da4781 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -433,3 +433,37 @@ jobs:
run: dotnet restore
- name: Azure Tests
run: dotnet test ./tests/Paramore.Brighter.AzureServiceBus.Tests/Paramore.Brighter.AzureServiceBus.Tests.csproj --filter "Fragile!=CI" --configuration Release --logger "console;verbosity=normal" --blame -v n
+
+# MongoDB tool too long time to run
+# mongodb-ci:
+# runs-on: ubuntu-latest
+# timeout-minutes: 5
+# needs: [build]
+#
+# services:
+# mongo:
+# image: mongo
+# ports:
+# - 27017:27017
+# env:
+# MONGO_INITDB_ROOT_USERNAME: root
+# MONGO_INITDB_ROOT_PASSWORD: example
+# MONGO_INITDB_DATABASE: brighter
+# options: >-
+# --health-cmd mongo
+# --health-interval 20s
+# --health-timeout 10s
+# --health-retries 10
+# steps:
+# - uses: actions/checkout@v4
+# - name: Setup dotnet
+# uses: actions/setup-dotnet@v4
+# with:
+# dotnet-version: |
+# 8.0.x
+# 9.0.x
+# - name: Install dependencies
+# run: dotnet restore
+# - name: MongoDB Tests
+# run: dotnet test ./tests/Paramore.Brighter.MongoDb.Tests/Paramore.Brighter.MongoDb.Tests.csproj --filter "Fragile!=CI" --configuration Release --logger "console;verbosity=normal" --blame -v n
+
diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml
index 94694e069e..eeb640034a 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -41,8 +41,8 @@ jobs:
uses: actions/setup-dotnet@v4
with:
dotnet-version: |
- 6.0.x
8.0.x
+ 9.0.x
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
diff --git a/Brighter.sln b/Brighter.sln
index f912ad9c7f..436741f9e4 100644
--- a/Brighter.sln
+++ b/Brighter.sln
@@ -351,6 +351,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.MessageSc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Hangfire.Tests", "tests\Paramore.Brighter.Hangfire.Tests\Paramore.Brighter.Hangfire.Tests.csproj", "{517D302F-46FC-4CDF-A190-D1E8A4F2E082}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Outbox.MongoDb", "src\Paramore.Brighter.Outbox.MongoDb\Paramore.Brighter.Outbox.MongoDb.csproj", "{4295A571-4653-43FD-971D-6C8F6E1B301B}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Inbox.MongoDb", "src\Paramore.Brighter.Inbox.MongoDb\Paramore.Brighter.Inbox.MongoDb.csproj", "{34487FF5-FD63-4C64-9A33-9249B0C814AA}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.MongoDb", "src\Paramore.Brighter.MongoDb\Paramore.Brighter.MongoDb.csproj", "{9389F329-ED2B-45EB-B87F-E25304C82277}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Locking.MongoDb", "src\Paramore.Brighter.Locking.MongoDb\Paramore.Brighter.Locking.MongoDb.csproj", "{EE92EF65-BBA8-4601-A4F6-84A695548BD3}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.MongoDb.Tests", "tests\Paramore.Brighter.MongoDb.Tests\Paramore.Brighter.MongoDb.Tests.csproj", "{E988767C-A8F0-4EF1-B3CA-1822500F18DF}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -1981,6 +1991,66 @@ Global
{517D302F-46FC-4CDF-A190-D1E8A4F2E082}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{517D302F-46FC-4CDF-A190-D1E8A4F2E082}.Release|x86.ActiveCfg = Release|Any CPU
{517D302F-46FC-4CDF-A190-D1E8A4F2E082}.Release|x86.Build.0 = Release|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Debug|x86.Build.0 = Debug|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Release|x86.ActiveCfg = Release|Any CPU
+ {4295A571-4653-43FD-971D-6C8F6E1B301B}.Release|x86.Build.0 = Release|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Debug|x86.Build.0 = Debug|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Release|Any CPU.Build.0 = Release|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Release|x86.ActiveCfg = Release|Any CPU
+ {34487FF5-FD63-4C64-9A33-9249B0C814AA}.Release|x86.Build.0 = Release|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Debug|x86.Build.0 = Debug|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Release|x86.ActiveCfg = Release|Any CPU
+ {9389F329-ED2B-45EB-B87F-E25304C82277}.Release|x86.Build.0 = Release|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Debug|x86.Build.0 = Debug|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Release|Any CPU.Build.0 = Release|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Release|x86.ActiveCfg = Release|Any CPU
+ {EE92EF65-BBA8-4601-A4F6-84A695548BD3}.Release|x86.Build.0 = Release|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Debug|x86.Build.0 = Debug|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Release|x86.ActiveCfg = Release|Any CPU
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -2078,6 +2148,7 @@ Global
{915BD9FD-2E29-4E2B-8DA3-A74055F32A20} = {C6B17EFD-4F05-4D45-AF3E-C4F3F790B994}
{7D8CE752-CCBB-4868-ADF0-30FF94CA611C} = {11935469-A062-4CFF-9F72-F4F41E14C2B4}
{FBAF452E-C0AB-4C4B-9A81-F1ED9616DE2A} = {202BA107-89D5-4868-AC5A-3527114C0109}
+ {E988767C-A8F0-4EF1-B3CA-1822500F18DF} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD}
{4469AEE3-B460-4948-A0A5-B9480EE70EA4} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD}
{8E414D7F-8DF0-4608-B47C-19213DB7E2B0} = {235DE1F1-E71B-4817-8E27-3B34FF006E4C}
{D5F5A100-F524-4020-B157-298EDC0910F4} = {8E414D7F-8DF0-4608-B47C-19213DB7E2B0}
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 2177a21e25..be3e0aaf9d 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -54,6 +54,7 @@
+
@@ -128,11 +129,13 @@
+
-
+
+
all
runtime; build; native; contentfiles; analyzers
diff --git a/docker-compose-mongodb.yaml b/docker-compose-mongodb.yaml
new file mode 100644
index 0000000000..bd84167c61
--- /dev/null
+++ b/docker-compose-mongodb.yaml
@@ -0,0 +1,9 @@
+services:
+ mongo:
+ image: mongo
+ environment:
+ MONGO_INITDB_ROOT_USERNAME: root
+ MONGO_INITDB_ROOT_PASSWORD: example
+ MONGO_INITDB_DATABASE: brighter
+ ports:
+ - "27017:27017"
\ No newline at end of file
diff --git a/src/Paramore.Brighter.Inbox.MongoDb/InboxMessage.cs b/src/Paramore.Brighter.Inbox.MongoDb/InboxMessage.cs
new file mode 100644
index 0000000000..ec263157a6
--- /dev/null
+++ b/src/Paramore.Brighter.Inbox.MongoDb/InboxMessage.cs
@@ -0,0 +1,85 @@
+using System.Text.Json;
+using MongoDB.Bson.Serialization.Attributes;
+using Paramore.Brighter.MongoDb;
+
+namespace Paramore.Brighter.Inbox.MongoDb;
+
+///
+/// The MongoDb inbox message
+///
+public class InboxMessage : IMongoDbCollectionTTL
+{
+ ///
+ /// Initialize new instance of
+ ///
+ public InboxMessage()
+ {
+ }
+
+ ///
+ /// Initialize new instance of
+ ///
+ /// The command.
+ /// The command id.
+ /// The context key.
+ /// The time stamp of when the message was created.
+ /// The expires after X seconds.
+ public InboxMessage(object command, string id, string contextKey, DateTimeOffset timeStamp, long? expireAfterSeconds)
+ {
+ Id = new InboxMessageId { Id = id, ContextKey = contextKey };
+ TimeStamp = timeStamp;
+ CommandType = command.GetType().FullName!;
+ CommandBody = JsonSerializer.Serialize(command, JsonSerialisationOptions.Options);
+ ExpireAfterSeconds = expireAfterSeconds;
+ }
+
+ ///
+ /// The Message ID
+ ///
+ [BsonId]
+ public InboxMessageId Id { get; set; } = new();
+
+ ///
+ /// The when the message was crated
+ ///
+ public DateTimeOffset TimeStamp { get; set; }
+
+ ///
+ /// The command type(the full name)
+ ///
+ public string CommandType { get; set; } = string.Empty;
+
+ ///
+ /// The command body
+ ///
+ public string CommandBody { get; set; } = string.Empty;
+
+ ///
+ /// The TTL for this message
+ ///
+ public long? ExpireAfterSeconds { get; set; }
+
+ ///
+ /// The inbox message id
+ ///
+ public class InboxMessageId
+ {
+ ///
+ /// The id.
+ ///
+ public string Id { get; set; } = string.Empty;
+
+ ///
+ /// The context key.
+ ///
+ public string? ContextKey { get; set; }
+ }
+
+ ///
+ /// Convert the to
+ ///
+ /// The .
+ /// New instance of .
+ public T ToCommand()
+ => JsonSerializer.Deserialize(CommandBody, JsonSerialisationOptions.Options)!;
+}
diff --git a/src/Paramore.Brighter.Inbox.MongoDb/MongoDbInbox.cs b/src/Paramore.Brighter.Inbox.MongoDb/MongoDbInbox.cs
new file mode 100644
index 0000000000..b68e652d03
--- /dev/null
+++ b/src/Paramore.Brighter.Inbox.MongoDb/MongoDbInbox.cs
@@ -0,0 +1,122 @@
+using MongoDB.Driver;
+using Paramore.Brighter.Inbox.Exceptions;
+using Paramore.Brighter.MongoDb;
+
+namespace Paramore.Brighter.Inbox.MongoDb;
+
+///
+/// The inbox implementation to MongoDB
+///
+public class MongoDbInbox : BaseMongoDb, IAmAnInboxAsync, IAmAnInboxSync
+{
+ ///
+ /// Initialize a new instance of .
+ ///
+ /// The configuration.
+ public MongoDbInbox(MongoDbConfiguration configuration)
+ : base(configuration)
+ {
+ }
+
+ ///
+ public bool ContinueOnCapturedContext { get; set; }
+
+ ///
+ public async Task AddAsync(T command, string contextKey, int timeoutInMilliseconds = -1,
+ CancellationToken cancellationToken = default) where T : class, IRequest
+ {
+ var message = new InboxMessage(command, command.Id, contextKey, Configuration.TimeProvider.GetUtcNow(),
+ ExpireAfterSeconds);
+
+ try
+ {
+ await Collection.InsertOneAsync(message, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ }
+ catch (MongoWriteException e)
+ {
+ if (e.WriteError.Category == ServerErrorCategory.DuplicateKey)
+ {
+ return;
+ }
+
+ throw;
+ }
+ }
+
+
+ ///
+ public async Task GetAsync(string id, string contextKey, int timeoutInMilliseconds = -1,
+ CancellationToken cancellationToken = default) where T : class, IRequest
+ {
+ var commandId = new InboxMessage.InboxMessageId { Id = id, ContextKey = contextKey };
+ var filter = Builders.Filter.Eq(x => x.Id, commandId);
+
+ var command = await Collection.Find(filter)
+ .FirstOrDefaultAsync(cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+
+ if (command == null)
+ {
+ throw new RequestNotFoundException(id);
+ }
+
+ return command.ToCommand();
+ }
+
+ ///
+ public async Task ExistsAsync(string id, string contextKey, int timeoutInMilliseconds = -1,
+ CancellationToken cancellationToken = default) where T : class, IRequest
+ {
+ var commandId = new InboxMessage.InboxMessageId { Id = id, ContextKey = contextKey };
+ var filter = Builders.Filter.Eq("Id", commandId);
+ return await Collection.Find(filter)
+ .AnyAsync(cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ }
+
+ ///
+ public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest
+ {
+ var message = new InboxMessage(command, command.Id, contextKey, Configuration.TimeProvider.GetUtcNow(),
+ ExpireAfterSeconds);
+
+ try
+ {
+ Collection.InsertOne(message);
+ }
+ catch (MongoWriteException e)
+ {
+ if (e.WriteError.Category == ServerErrorCategory.DuplicateKey)
+ {
+ return;
+ }
+
+ throw;
+ }
+ }
+
+ ///
+ public T Get(string id, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest
+ {
+ var commandId = new InboxMessage.InboxMessageId { Id = id, ContextKey = contextKey };
+ var filter = Builders.Filter.Eq(x => x.Id, commandId);
+
+ var command = Collection.Find(filter).FirstOrDefault();
+ if (command == null)
+ {
+ throw new RequestNotFoundException(id);
+ }
+
+ return command.ToCommand();
+ }
+
+ ///
+ public bool Exists(string id, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest
+ {
+ var commandId = new InboxMessage.InboxMessageId { Id = id, ContextKey = contextKey };
+ var filter = Builders.Filter.Eq("Id", commandId);
+ return Collection.Find(filter)
+ .Any();
+ }
+}
diff --git a/src/Paramore.Brighter.Inbox.MongoDb/MongoDbUnitOfWork.cs b/src/Paramore.Brighter.Inbox.MongoDb/MongoDbUnitOfWork.cs
new file mode 100644
index 0000000000..03f4e032ee
--- /dev/null
+++ b/src/Paramore.Brighter.Inbox.MongoDb/MongoDbUnitOfWork.cs
@@ -0,0 +1,108 @@
+using MongoDB.Driver;
+
+namespace Paramore.Brighter.Inbox.MongoDb;
+
+///
+/// The MongoDB Unit of work
+///
+///
+public class MongoDbUnitOfWork(IMongoClient client) : IAmABoxTransactionProvider
+{
+ private IClientSessionHandle? _session;
+
+ ///
+ public void Close()
+ {
+ if (_session != null)
+ {
+ _session.Dispose();
+ _session = null;
+ }
+ }
+
+ ///
+ public void Commit()
+ {
+ _session?.CommitTransaction();
+ _session?.Dispose();
+ _session = null;
+ }
+
+ ///
+ public async Task CommitAsync(CancellationToken cancellationToken = default)
+ {
+ if (_session != null)
+ {
+ await _session.CommitTransactionAsync(cancellationToken);
+ _session.Dispose();
+ }
+
+ _session = null;
+ }
+
+ ///
+ public IClientSessionHandle GetTransaction()
+ {
+ if (_session != null)
+ {
+ _session = client.StartSession();
+ }
+
+ return _session!;
+ }
+
+ ///
+ public async Task GetTransactionAsync(CancellationToken cancellationToken = default)
+ {
+ if (_session != null)
+ {
+ _session = await client.StartSessionAsync(cancellationToken: cancellationToken);
+ }
+
+ return _session!;
+ }
+
+ ///
+ public bool HasOpenTransaction => _session != null;
+
+ ///
+ public bool IsSharedConnection => false;
+
+ ///
+ public void Rollback()
+ {
+ if (_session != null)
+ {
+ try
+ {
+ _session.AbortTransaction();
+ }
+ catch
+ {
+ // Ignore
+ }
+
+ _session.Dispose();
+ _session = null;
+ }
+ }
+
+ ///
+ public async Task RollbackAsync(CancellationToken cancellationToken = default)
+ {
+ if (_session != null)
+ {
+ try
+ {
+ await _session.AbortTransactionAsync(cancellationToken);
+ }
+ catch
+ {
+ // Ignore
+ }
+
+ _session.Dispose();
+ _session = null;
+ }
+ }
+}
diff --git a/src/Paramore.Brighter.Inbox.MongoDb/Paramore.Brighter.Inbox.MongoDb.csproj b/src/Paramore.Brighter.Inbox.MongoDb/Paramore.Brighter.Inbox.MongoDb.csproj
new file mode 100644
index 0000000000..cc8ecac1c7
--- /dev/null
+++ b/src/Paramore.Brighter.Inbox.MongoDb/Paramore.Brighter.Inbox.MongoDb.csproj
@@ -0,0 +1,17 @@
+
+
+
+ net472;$(BrighterCoreTargetFrameworks)
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Paramore.Brighter.Locking.MongoDb/LockMessage.cs b/src/Paramore.Brighter.Locking.MongoDb/LockMessage.cs
new file mode 100644
index 0000000000..583c4fab83
--- /dev/null
+++ b/src/Paramore.Brighter.Locking.MongoDb/LockMessage.cs
@@ -0,0 +1,21 @@
+using MongoDB.Bson.Serialization.Attributes;
+using Paramore.Brighter.MongoDb;
+
+namespace Paramore.Brighter.Locking.MongoDb;
+
+///
+/// The lock message
+///
+public class LockMessage : IMongoDbCollectionTTL
+{
+ ///
+ /// The Lock id
+ ///
+ [BsonId] public string Id { get; set; } = string.Empty;
+
+ ///
+ public DateTimeOffset TimeStamp { get; set; }
+
+ ///
+ public long? ExpireAfterSeconds { get; set; }
+}
diff --git a/src/Paramore.Brighter.Locking.MongoDb/MongoDbLockingProvider.cs b/src/Paramore.Brighter.Locking.MongoDb/MongoDbLockingProvider.cs
new file mode 100644
index 0000000000..3c2bd564ac
--- /dev/null
+++ b/src/Paramore.Brighter.Locking.MongoDb/MongoDbLockingProvider.cs
@@ -0,0 +1,49 @@
+using System.Collections.ObjectModel;
+using MongoDB.Driver;
+using Paramore.Brighter.MongoDb;
+
+namespace Paramore.Brighter.Locking.MongoDb;
+
+///
+/// The MongoDb implementation to
+///
+public class MongoDbLockingProvider : BaseMongoDb, IDistributedLock
+{
+ ///
+ /// Initialize new instance of
+ ///
+ /// The
+ public MongoDbLockingProvider(MongoDbConfiguration configuration)
+ : base(configuration)
+ {
+ }
+
+ ///
+ public async Task ObtainLockAsync(string resource, CancellationToken cancellationToken)
+ {
+ try
+ {
+ await Collection.InsertOneAsync(new LockMessage
+ {
+ Id = resource,
+ TimeStamp = Configuration.TimeProvider.GetUtcNow(),
+ ExpireAfterSeconds = ExpireAfterSeconds
+ },
+ cancellationToken: cancellationToken);
+ return resource;
+ }
+ catch (MongoWriteException e)
+ {
+ if (e.WriteError.Category == ServerErrorCategory.DuplicateKey)
+ {
+ return null;
+ }
+
+ throw;
+ }
+ }
+
+ ///
+ public async Task ReleaseLockAsync(string resource, string lockId, CancellationToken cancellationToken)
+ => await Collection.DeleteOneAsync(Builders.Filter.Eq(x => x.Id, lockId), cancellationToken);
+}
diff --git a/src/Paramore.Brighter.Locking.MongoDb/Paramore.Brighter.Locking.MongoDb.csproj b/src/Paramore.Brighter.Locking.MongoDb/Paramore.Brighter.Locking.MongoDb.csproj
new file mode 100644
index 0000000000..ad27e8d8ed
--- /dev/null
+++ b/src/Paramore.Brighter.Locking.MongoDb/Paramore.Brighter.Locking.MongoDb.csproj
@@ -0,0 +1,16 @@
+
+
+ net472;$(BrighterCoreTargetFrameworks)
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Paramore.Brighter.MongoDb/BaseMongoDb.cs b/src/Paramore.Brighter.MongoDb/BaseMongoDb.cs
new file mode 100644
index 0000000000..36420a3519
--- /dev/null
+++ b/src/Paramore.Brighter.MongoDb/BaseMongoDb.cs
@@ -0,0 +1,121 @@
+using MongoDB.Bson;
+using MongoDB.Driver;
+
+namespace Paramore.Brighter.MongoDb;
+
+///
+/// The base class for any class that need to access mongodb.
+///
+/// The Collection type
+public abstract class BaseMongoDb
+ where TCollection : IMongoDbCollectionTTL
+{
+ private readonly SemaphoreSlim _semaphore = new(1, 1);
+ private readonly MongoClient _client;
+ private readonly IMongoDatabase _database;
+ private IMongoCollection? _collection;
+
+ ///
+ /// Initializer the
+ ///
+ /// The configuration.
+ protected BaseMongoDb(MongoDbConfiguration configuration)
+ {
+ _client = configuration.Client;
+ _database = _client.GetDatabase(configuration.DatabaseName, configuration.DatabaseSettings);
+ Configuration = configuration;
+ }
+
+ ///
+ /// The .
+ ///
+ protected MongoDbConfiguration Configuration { get; }
+
+ ///
+ /// The provided TTL in seconds
+ ///
+ protected long? ExpireAfterSeconds
+ {
+ get
+ {
+ if (Configuration.TimeToLive.HasValue)
+ {
+ return (long)Configuration.TimeToLive.Value.TotalSeconds;
+ }
+
+ return null;
+ }
+ }
+
+ ///
+ /// The
+ ///
+ protected IMongoCollection Collection => _collection ??= CreateCollection();
+
+ ///
+ /// Get or create a collection.
+ ///
+ /// The
+ ///
+ private IMongoCollection CreateCollection()
+ {
+ _semaphore.Wait();
+ try
+ {
+ if (_collection != null)
+ {
+ return _collection;
+ }
+
+ if (Configuration.MakeCollection == OnResolvingACollection.Assume)
+ {
+ _collection =
+ _database.GetCollection(Configuration.CollectionName,
+ Configuration.CollectionSettings);
+ return _collection;
+ }
+
+ var filter = new BsonDocument("name", Configuration.CollectionName);
+ var options = new ListCollectionNamesOptions { Filter = filter };
+
+ var collections = _database.ListCollectionNames(options);
+ if (collections.Any())
+ {
+ _collection =
+ _database.GetCollection(Configuration.CollectionName,
+ Configuration.CollectionSettings);
+ return _collection;
+ }
+
+ if (Configuration.MakeCollection == OnResolvingACollection.Validate)
+ {
+ throw new InvalidOperationException("collection not exits");
+ }
+
+ using var session = _client.StartSession();
+
+ _database
+ .CreateCollection(session, Configuration.CollectionName, Configuration.CreateCollectionOptions);
+
+ _collection =
+ _database.GetCollection(Configuration.CollectionName, Configuration.CollectionSettings);
+
+ if (Configuration.TimeToLive != null)
+ {
+ _collection.Indexes.CreateOne(
+ new CreateIndexModel(Builders.IndexKeys.Ascending(x => x.TimeStamp),
+ new CreateIndexOptions
+ {
+ Name = $"brighter_ttl_{Configuration.CollectionName}",
+ ExpireAfter = Configuration.TimeToLive
+ }));
+ }
+
+ return _collection;
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+ }
+}
diff --git a/src/Paramore.Brighter.MongoDb/IMongoDbCollectionTTL.cs b/src/Paramore.Brighter.MongoDb/IMongoDbCollectionTTL.cs
new file mode 100644
index 0000000000..0a9f2336b1
--- /dev/null
+++ b/src/Paramore.Brighter.MongoDb/IMongoDbCollectionTTL.cs
@@ -0,0 +1,17 @@
+namespace Paramore.Brighter.MongoDb;
+
+///
+/// The MongoDB collection TTL
+///
+public interface IMongoDbCollectionTTL
+{
+ ///
+ /// The timestamp of when the message was created
+ ///
+ DateTimeOffset TimeStamp { get; set; }
+
+ ///
+ /// For how long a doc should live
+ ///
+ long? ExpireAfterSeconds { get; set; }
+}
diff --git a/src/Paramore.Brighter.MongoDb/MongoDbConfiguration.cs b/src/Paramore.Brighter.MongoDb/MongoDbConfiguration.cs
new file mode 100644
index 0000000000..a6510230da
--- /dev/null
+++ b/src/Paramore.Brighter.MongoDb/MongoDbConfiguration.cs
@@ -0,0 +1,85 @@
+using MongoDB.Driver;
+using Paramore.Brighter.Observability;
+
+namespace Paramore.Brighter.MongoDb;
+
+///
+/// The MongoDB configuration
+///
+public class MongoDbConfiguration
+{
+ ///
+ /// Initialize new instance of
+ ///
+ /// The Mongo client.
+ /// The database name.
+ /// The collection name.
+ public MongoDbConfiguration(MongoClient client, string databaseName, string collectionName)
+ {
+ Client = client;
+ DatabaseName = databaseName;
+ CollectionName = collectionName;
+ }
+
+ ///
+ /// Initialize new instance of
+ ///
+ /// The Mongo db connection string.
+ /// The database name.
+ /// The collection name.
+ public MongoDbConfiguration(string connectionString, string databaseName, string collectionName)
+ : this(new MongoClient(connectionString), databaseName, collectionName)
+ {
+ }
+
+ ///
+ /// The
+ ///
+ public MongoClient Client { get; set; }
+
+ ///
+ /// The mongodb database name
+ ///
+ public string DatabaseName { get; }
+
+ ///
+ /// The mongodb collection
+ ///
+ public string CollectionName { get; }
+
+ ///
+ /// The
+ ///
+ public TimeProvider TimeProvider { get; set; } = TimeProvider.System;
+
+ ///
+ /// Action to be performed when it's resolving a collection
+ ///
+ public OnResolvingACollection MakeCollection { get; set; } = OnResolvingACollection.Assume;
+
+ ///
+ /// The used when access the database.
+ ///
+ public MongoDatabaseSettings? DatabaseSettings { get; set; }
+
+ ///
+ /// The used to get collection
+ ///
+ public MongoCollectionSettings? CollectionSettings { get; set; }
+
+ ///
+ /// The .
+ ///
+ public CreateCollectionOptions? CreateCollectionOptions { get; set; }
+
+ ///
+ /// The .
+ ///
+ public InstrumentationOptions InstrumentationOptions { get; set; } = InstrumentationOptions.All;
+
+ ///
+ /// Optional time to live for the messages in the outbox
+ /// By default, messages will not expire
+ ///
+ public TimeSpan? TimeToLive { get; set; }
+}
diff --git a/src/Paramore.Brighter.MongoDb/OnResolvingACollection.cs b/src/Paramore.Brighter.MongoDb/OnResolvingACollection.cs
new file mode 100644
index 0000000000..15edfa53e8
--- /dev/null
+++ b/src/Paramore.Brighter.MongoDb/OnResolvingACollection.cs
@@ -0,0 +1,22 @@
+namespace Paramore.Brighter.MongoDb;
+
+///
+/// Action to be performed when it's resolving a collection
+///
+public enum OnResolvingACollection
+{
+ ///
+ /// Assume the collection exists
+ ///
+ Assume,
+
+ ///
+ /// Check if the collection, if not throw an exception.
+ ///
+ Validate,
+
+ ///
+ /// Check if the collection, if not created
+ ///
+ CreateIfNotExists
+}
diff --git a/src/Paramore.Brighter.MongoDb/Paramore.Brighter.MongoDb.csproj b/src/Paramore.Brighter.MongoDb/Paramore.Brighter.MongoDb.csproj
new file mode 100644
index 0000000000..5edcd4806f
--- /dev/null
+++ b/src/Paramore.Brighter.MongoDb/Paramore.Brighter.MongoDb.csproj
@@ -0,0 +1,15 @@
+
+
+ net472;$(BrighterCoreTargetFrameworks)
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Paramore.Brighter.Outbox.MongoDb/MongoDbOutbox.cs b/src/Paramore.Brighter.Outbox.MongoDb/MongoDbOutbox.cs
new file mode 100644
index 0000000000..630e35240b
--- /dev/null
+++ b/src/Paramore.Brighter.Outbox.MongoDb/MongoDbOutbox.cs
@@ -0,0 +1,780 @@
+using MongoDB.Bson;
+using MongoDB.Driver;
+using Paramore.Brighter.MongoDb;
+using Paramore.Brighter.Observability;
+
+namespace Paramore.Brighter.Outbox.MongoDb;
+
+///
+/// The implementation for MongoDB for outbox
+///
+public class MongoDbOutbox : BaseMongoDb, IAmAnOutboxAsync,
+ IAmAnOutboxSync
+{
+ ///
+ /// Initialize MongoDbOutbox
+ ///
+ /// The .
+ public MongoDbOutbox(MongoDbConfiguration configuration)
+ : base(configuration)
+ {
+ }
+
+ ///
+ public IAmABrighterTracer? Tracer { get; set; }
+
+ ///
+ public bool ContinueOnCapturedContext { get; set; }
+
+ ///
+ /// Returns all messages in the store
+ ///
+ /// Number of messages to return in search results (default = 100)
+ /// Page number of results to return (default = 1)
+ /// Additional parameters required for search, if any
+ /// The cancellation token
+ /// A list of messages
+ public async Task> GetAsync(int pageSize = 100,
+ int pageNumber = 1,
+ Dictionary? args = null,
+ CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Get,
+ Configuration.CollectionName),
+ null,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var filter = Builders.Filter.Empty;
+ if (args != null && args.TryGetValue("Topic", out var topic))
+ {
+ filter &= Builders.Filter.Eq(x => x.Topic, topic);
+ }
+
+ var cursor = await Collection.FindAsync(filter,
+ new FindOptions { Skip = pageSize * Math.Max(pageNumber - 1, 0), Limit = pageSize },
+ cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+
+ var messages = new List(pageSize);
+ while (await cursor.MoveNextAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext))
+ {
+ messages.AddRange(cursor.Current.Select(x => x.ConvertToMessage()));
+ }
+
+ return messages;
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ /// Returns messages specified by the Ids
+ ///
+ /// The Ids of the messages
+ /// What is the context for this request; used to access the Span
+ /// The Timeout of the outbox.
+ /// Cancellation Token.
+ ///
+ public async Task> GetAsync(
+ IEnumerable messageIds,
+ RequestContext requestContext,
+ int outBoxTimeout = -1,
+ CancellationToken cancellationToken = default
+ )
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Get,
+ Configuration.CollectionName),
+ null,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var filter = Builders.Filter.In(x => x.MessageId, messageIds);
+
+ var cursor = await Collection.FindAsync(filter,
+ cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+
+ var messages = new List();
+ while (await cursor.MoveNextAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext))
+ {
+ messages.AddRange(cursor.Current.Select(x => x.ConvertToMessage()));
+ }
+
+ return messages;
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ /// Get the number of messages in the Outbox that are not dispatched
+ ///
+ /// Cancel the async operation
+ ///
+ public async Task GetNumberOfOutstandingMessagesAsync(CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Get,
+ Configuration.CollectionName),
+ null,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ return await Collection.CountDocumentsAsync(
+ Builders.Filter.Eq(x => x.Dispatched, null),
+ cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public async Task AddAsync(Message message,
+ RequestContext requestContext,
+ int outBoxTimeout = -1,
+ IAmABoxTransactionProvider? transactionProvider = null,
+ CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Add,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var messageToStore = new OutboxMessage(message, ExpireAfterSeconds);
+
+ if (transactionProvider != null)
+ {
+ var session = await transactionProvider.GetTransactionAsync(cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+
+ await Collection
+ .InsertOneAsync(session, messageToStore, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ }
+ else
+ {
+ await Collection
+ .InsertOneAsync(messageToStore, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ }
+ }
+ catch (MongoWriteException e)
+ {
+ if (e.WriteError.Category != ServerErrorCategory.DuplicateKey)
+ {
+ throw;
+ }
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public async Task AddAsync(IEnumerable messages,
+ RequestContext? requestContext,
+ int outBoxTimeout = -1,
+ IAmABoxTransactionProvider? transactionProvider = null,
+ CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Add,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+ try
+ {
+ var messageItems = messages.Select(message => new OutboxMessage(message, ExpireAfterSeconds));
+ if (transactionProvider != null)
+ {
+ var session = await transactionProvider.GetTransactionAsync(cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ await Collection
+ .InsertManyAsync(session, messageItems, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ }
+ else
+ {
+ await Collection
+ .InsertManyAsync(messageItems, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ }
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public async Task DeleteAsync(string[] messageIds,
+ RequestContext requestContext,
+ Dictionary? args = null,
+ CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Delete,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var filter = Builders.Filter.In(x => x.MessageId, messageIds);
+ await Collection.DeleteManyAsync(filter, cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public async Task> DispatchedMessagesAsync(TimeSpan dispatchedSince,
+ RequestContext requestContext,
+ int pageSize = 100,
+ int pageNumber = 1, int outboxTimeout = -1, Dictionary? args = null,
+ CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.DispatchedMessages,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+ try
+ {
+ var olderThan = Configuration.TimeProvider.GetLocalNow() - dispatchedSince;
+ var filter = Builders.Filter.Lt(x => x.Dispatched, olderThan);
+ if (args != null && args.TryGetValue("Topic", out var topic))
+ {
+ filter &= Builders.Filter.Eq(x => x.Topic, topic);
+ }
+
+ var cursor = await Collection.FindAsync(filter,
+ new FindOptions
+ {
+ Limit = pageSize,
+ Skip = pageSize * Math.Max(pageNumber - 1, 0),
+ Sort = Builders.Sort.Ascending(x => x.TimeStamp)
+ }, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+
+ var messages = new List(pageSize);
+ while (await cursor.MoveNextAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext))
+ {
+ messages.AddRange(cursor.Current.Select(x => x.ConvertToMessage()));
+ }
+
+ return messages;
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public async Task GetAsync(string messageId, RequestContext requestContext, int outBoxTimeout = -1,
+ Dictionary? args = null,
+ CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Get,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var find = await Collection
+ .FindAsync(x => x.MessageId == messageId, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+
+ var first = await find
+ .FirstOrDefaultAsync(cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+
+ return first == null ? new Message() : first.ConvertToMessage();
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public async Task MarkDispatchedAsync(string id, RequestContext requestContext, DateTimeOffset? dispatchedAt = null,
+ Dictionary? args = null, CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.MarkDispatched,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var filter = Builders.Filter.Eq(x => x.MessageId, id);
+
+ dispatchedAt ??= Configuration.TimeProvider.GetUtcNow();
+ var update = Builders.Update
+ .Set(x => x.Dispatched, dispatchedAt.Value);
+
+ await Collection.UpdateOneAsync(filter, update, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public async Task MarkDispatchedAsync(IEnumerable ids,
+ RequestContext requestContext,
+ DateTimeOffset? dispatchedAt = null,
+ Dictionary? args = null, CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.MarkDispatched,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+ try
+ {
+ var filter = Builders.Filter.In(x => x.MessageId, ids);
+
+ dispatchedAt ??= Configuration.TimeProvider.GetUtcNow();
+ var update = Builders.Update
+ .Set(x => x.Dispatched, dispatchedAt.Value);
+
+ await Collection.UpdateManyAsync(filter, update, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public async Task> OutstandingMessagesAsync(TimeSpan dispatchedSince,
+ RequestContext requestContext,
+ int pageSize = 100,
+ int pageNumber = 1, Dictionary? args = null,
+ CancellationToken cancellationToken = default)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.OutStandingMessages,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+ try
+ {
+ var olderThan = Configuration.TimeProvider.GetLocalNow() - dispatchedSince;
+ var filter = Builders.Filter.Eq(x => x.Dispatched, null);
+ filter &= Builders.Filter.Lt(x => x.TimeStamp, olderThan);
+ if (args != null && args.TryGetValue("Topic", out var topic))
+ {
+ filter &= Builders.Filter.Eq(x => x.Topic, topic);
+ }
+
+ var cursor = await Collection.FindAsync(filter,
+ new FindOptions
+ {
+ Limit = pageSize,
+ Skip = pageSize * Math.Max(pageNumber - 1, 0),
+ Sort = Builders.Sort.Ascending(x => x.TimeStamp)
+ }, cancellationToken: cancellationToken)
+ .ConfigureAwait(ContinueOnCapturedContext);
+
+ var messages = new List(pageSize);
+ while (await cursor.MoveNextAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext))
+ {
+ messages.AddRange(cursor.Current.Select(x => x.ConvertToMessage()));
+ }
+
+ return messages;
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ /// Returns all messages in the store
+ ///
+ /// Number of messages to return in search results (default = 100)
+ /// Page number of results to return (default = 1)
+ /// Additional parameters required for search, if any
+ /// A list of messages
+ public IList Get(int pageSize = 100, int pageNumber = 1, Dictionary? args = null)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Get,
+ Configuration.CollectionName),
+ null,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var filter = Builders.Filter.Empty;
+ if (args != null && args.TryGetValue("Topic", out var topic))
+ {
+ filter &= Builders.Filter.Eq(x => x.Topic, topic);
+ }
+
+ var cursor = Collection.FindSync(filter,
+ new FindOptions { Skip = pageSize * Math.Max(pageNumber - 1, 0), Limit = pageSize });
+
+ var messages = new List(pageSize);
+ while (cursor.MoveNext())
+ {
+ messages.AddRange(cursor.Current.Select(x => x.ConvertToMessage()));
+ }
+
+ return messages;
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ /// Returns messages specified by the Ids
+ ///
+ /// The Ids of the messages
+ /// What is the context for this request; used to access the Span
+ /// The Timeout of the outbox.
+ ///
+ public IEnumerable Get(
+ IEnumerable messageIds,
+ RequestContext? requestContext = null,
+ int outBoxTimeout = -1
+ )
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Get,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var filter = Builders.Filter.In(x => x.MessageId, messageIds);
+
+ var cursor = Collection.FindSync(filter);
+
+ var messages = new List();
+ while (cursor.MoveNext())
+ {
+ messages.AddRange(cursor.Current.Select(x => x.ConvertToMessage()));
+ }
+
+ return messages;
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ /// Get the number of messages in the Outbox that are not dispatched
+ ///
+ ///
+ public long GetNumberOfOutstandingMessages()
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Get,
+ Configuration.CollectionName),
+ null,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ return Collection.CountDocuments(Builders.Filter.Eq(x => x.Dispatched, null));
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public void Add(Message message, RequestContext requestContext, int outBoxTimeout = -1,
+ IAmABoxTransactionProvider? transactionProvider = null)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Add,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var messageToStore = new OutboxMessage(message, ExpireAfterSeconds);
+
+ if (transactionProvider != null)
+ {
+ var session = transactionProvider.GetTransaction();
+ Collection.InsertOne(session, messageToStore);
+ }
+ else
+ {
+ Collection.InsertOne(messageToStore);
+ }
+ }
+ catch (MongoWriteException e)
+ {
+ if (e.WriteError.Category != ServerErrorCategory.DuplicateKey)
+ {
+ throw;
+ }
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public void Add(IEnumerable messages, RequestContext? requestContext, int outBoxTimeout = -1,
+ IAmABoxTransactionProvider? transactionProvider = null)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Add,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+ try
+ {
+ var messageItems = messages.Select(message => new OutboxMessage(message, ExpireAfterSeconds));
+ if (transactionProvider != null)
+ {
+ var session = transactionProvider.GetTransaction();
+ Collection.InsertMany(session, messageItems);
+ }
+ else
+ {
+ Collection.InsertMany(messageItems);
+ }
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public void Delete(string[] messageIds, RequestContext? requestContext, Dictionary? args = null)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Delete,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+ try
+ {
+ var filter = Builders.Filter.In(x => x.MessageId, messageIds);
+ Collection.DeleteMany(filter);
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public IEnumerable DispatchedMessages(TimeSpan dispatchedSince, RequestContext requestContext,
+ int pageSize = 100,
+ int pageNumber = 1, int outBoxTimeout = -1, Dictionary? args = null)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.DispatchedMessages,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var olderThan = Configuration.TimeProvider.GetLocalNow() - dispatchedSince;
+ var filter = Builders.Filter.Lt(x => x.Dispatched, olderThan);
+ if (args != null && args.TryGetValue("Topic", out var topic))
+ {
+ filter &= Builders.Filter.Eq(x => x.Topic, topic);
+ }
+
+ var cursor = Collection.FindSync(filter,
+ new FindOptions
+ {
+ Limit = pageSize,
+ Skip = pageSize * Math.Max(pageNumber - 1, 0),
+ Sort = Builders.Sort.Ascending(x => x.TimeStamp)
+ });
+
+ var messages = new List(pageSize);
+ while (cursor.MoveNext())
+ {
+ messages.AddRange(cursor.Current.Select(x => x.ConvertToMessage()));
+ }
+
+ return messages;
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public Message Get(string messageId, RequestContext requestContext, int outBoxTimeout = -1,
+ Dictionary? args = null)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.Get,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var find = Collection.FindSync(x => x.MessageId == messageId);
+ var first = find.FirstOrDefault();
+ return first?.ConvertToMessage() ?? new Message();
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public void MarkDispatched(string id, RequestContext requestContext, DateTimeOffset? dispatchedAt = null,
+ Dictionary? args = null)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.MarkDispatched,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+
+ try
+ {
+ var filter = Builders.Filter.Eq(x => x.MessageId, id);
+
+ dispatchedAt ??= Configuration.TimeProvider.GetUtcNow();
+ var update = Builders.Update
+ .Set(x => x.Dispatched, dispatchedAt.Value);
+
+ Collection.UpdateOne(filter, update);
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+
+ ///
+ public IEnumerable OutstandingMessages(TimeSpan dispatchedSince, RequestContext? requestContext,
+ int pageSize = 100,
+ int pageNumber = 1, Dictionary? args = null)
+ {
+ var span = Tracer?.CreateDbSpan(
+ new OutboxSpanInfo(DbSystem.Mongodb,
+ Configuration.DatabaseName,
+ OutboxDbOperation.OutStandingMessages,
+ Configuration.CollectionName),
+ requestContext?.Span,
+ options: Configuration.InstrumentationOptions);
+ try
+ {
+ var olderThan = Configuration.TimeProvider.GetLocalNow() - dispatchedSince;
+ var filter = Builders.Filter.Eq(x => x.Dispatched, null);
+ filter &= Builders.Filter.Lt(x => x.TimeStamp, olderThan);
+ if (args != null && args.TryGetValue("Topic", out var topic))
+ {
+ filter &= Builders.Filter.Eq(x => x.Topic, topic);
+ }
+
+ var cursor = Collection.FindSync(filter,
+ new FindOptions
+ {
+ Limit = pageSize,
+ Skip = pageSize * Math.Max(pageNumber - 1, 0),
+ Sort = Builders.Sort.Ascending(x => x.TimeStamp)
+ });
+
+ var messages = new List(pageSize);
+ while (cursor.MoveNext())
+ {
+ messages.AddRange(cursor.Current.Select(x => x.ConvertToMessage()));
+ }
+
+ return messages;
+ }
+ finally
+ {
+ Tracer?.EndSpan(span);
+ }
+ }
+}
diff --git a/src/Paramore.Brighter.Outbox.MongoDb/OutboxMessage.cs b/src/Paramore.Brighter.Outbox.MongoDb/OutboxMessage.cs
new file mode 100644
index 0000000000..b074397221
--- /dev/null
+++ b/src/Paramore.Brighter.Outbox.MongoDb/OutboxMessage.cs
@@ -0,0 +1,157 @@
+using System.Text.Json;
+using MongoDB.Bson.Serialization.Attributes;
+using Paramore.Brighter.MongoDb;
+
+namespace Paramore.Brighter.Outbox.MongoDb;
+
+///
+/// The MongoDb outbox message
+///
+public class OutboxMessage : IMongoDbCollectionTTL
+{
+ ///
+ /// Initialize new instance of
+ ///
+ public OutboxMessage()
+ {
+ }
+
+ ///
+ /// Initialize new instance of
+ ///
+ /// The message to be store.
+ /// When it should be expired.
+ public OutboxMessage(Message message, long? expireAfterSeconds = null)
+ {
+ TimeStamp = message.Header.TimeStamp == DateTimeOffset.MinValue
+ ? DateTimeOffset.UtcNow
+ : message.Header.TimeStamp;
+ Body = message.Body.Bytes;
+ BodyContentType = message.Body.ContentType;
+ ContentType = message.Header.ContentType;
+ CorrelationId = message.Header.CorrelationId;
+ HeaderBag = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
+ MessageId = message.Id;
+ MessageType = message.Header.MessageType.ToString();
+ PartitionKey = message.Header.PartitionKey;
+ ReplyTo = message.Header.ReplyTo;
+ Topic = message.Header.Topic;
+ ExpireAfterSeconds = expireAfterSeconds;
+ }
+
+ ///
+ /// The Id of the Message. Used as a Global Secondary Index
+ ///
+ [BsonId]
+ public string MessageId { get; set; } = string.Empty;
+
+ ///
+ /// The Topic the message was published to
+ ///
+ public string Topic { get; set; } = string.Empty;
+
+ ///
+ /// The type of message i.e. MT_COMMAND, MT_EVENT etc. An enumeration rendered as a string
+ ///
+ public string MessageType { get; set; } = string.Empty;
+
+ ///
+ /// The of the message was created
+ ///
+ public DateTimeOffset TimeStamp { get; set; } = DateTimeOffset.UtcNow;
+
+ ///
+ /// The correlation id.
+ ///
+ public string? CorrelationId { get; set; }
+
+ ///
+ /// The reply to
+ ///
+ public string? ReplyTo { get; set; }
+
+ ///
+ /// The message content type
+ ///
+ public string? ContentType { get; set; }
+
+ ///
+ /// The message content type
+ ///
+ public string BodyContentType { get; set; } = MessageBody.APPLICATION_JSON;
+
+ ///
+ /// The message partition key
+ ///
+ public string? PartitionKey { get; set; }
+
+ ///
+ /// The of when the message was dispatched
+ ///
+ public DateTimeOffset? Dispatched { get; set; }
+
+ ///
+ /// The message header
+ ///
+ public string? HeaderBag { get; set; }
+
+ ///
+ /// The message body
+ ///
+ public byte[]? Body { get; set; }
+
+ ///
+ /// The body encoding
+ ///
+ public string? CharacterEncoding { get; set; }
+
+ ///
+ /// The document TTL
+ ///
+ public long? ExpireAfterSeconds { get; set; }
+
+ ///
+ /// Convert the outbox message to
+ ///
+ /// New instance of .
+ public Message ConvertToMessage()
+ {
+ //following type may be missing on older data
+ var characterEncoding = CharacterEncoding != null
+ ? (CharacterEncoding)Enum.Parse(typeof(CharacterEncoding), CharacterEncoding)
+ : Brighter.CharacterEncoding.UTF8;
+ var messageType = (MessageType)Enum.Parse(typeof(MessageType), MessageType);
+
+ var header = new MessageHeader(
+ messageId: MessageId,
+ topic: new RoutingKey(Topic),
+ messageType: messageType,
+ timeStamp: TimeStamp,
+ correlationId: CorrelationId,
+ replyTo: ReplyTo == null ? RoutingKey.Empty : new RoutingKey(ReplyTo));
+
+ if (!string.IsNullOrEmpty(PartitionKey))
+ {
+ header.PartitionKey = PartitionKey!;
+ }
+
+ if (!string.IsNullOrEmpty(ContentType))
+ {
+ header.ContentType = ContentType!;
+ }
+
+ if (!string.IsNullOrEmpty(HeaderBag))
+ {
+ var bag = JsonSerializer.Deserialize>(HeaderBag!,
+ JsonSerialisationOptions.Options)!;
+ foreach (var key in bag.Keys)
+ {
+ header.Bag.Add(key, bag[key]);
+ }
+ }
+
+ var body = new MessageBody(Body, BodyContentType, characterEncoding);
+
+ return new Message(header, body);
+ }
+}
diff --git a/src/Paramore.Brighter.Outbox.MongoDb/Paramore.Brighter.Outbox.MongoDb.csproj b/src/Paramore.Brighter.Outbox.MongoDb/Paramore.Brighter.Outbox.MongoDb.csproj
new file mode 100644
index 0000000000..c93aa84d16
--- /dev/null
+++ b/src/Paramore.Brighter.Outbox.MongoDb/Paramore.Brighter.Outbox.MongoDb.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net472;$(BrighterCoreTargetFrameworks)
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Catch.cs b/tests/Paramore.Brighter.MongoDb.Tests/Catch.cs
new file mode 100644
index 0000000000..20bfb26718
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Catch.cs
@@ -0,0 +1,66 @@
+#region License NUnit.Specifications
+/* From https://raw.githubusercontent.com/derekgreer/NUnit.Specifications/master/license.txt
+Copyright(c) 2015 Derek B.Greer
+
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+*/
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Threading.Tasks;
+
+namespace Paramore.Brighter.MongoDb.Tests
+{
+ [DebuggerStepThrough]
+ public static class Catch
+ {
+ public static Exception Exception(Action action)
+ {
+ Exception exception = null;
+
+ try
+ {
+ action();
+ }
+ catch (Exception e)
+ {
+ exception = e;
+ }
+
+ return exception;
+ }
+ public static async Task ExceptionAsync(Func action)
+ {
+ Exception exception = null;
+
+ try
+ {
+ await action();
+ }
+ catch (Exception e)
+ {
+ exception = e;
+ }
+
+ return exception;
+ }
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Configuration.cs b/tests/Paramore.Brighter.MongoDb.Tests/Configuration.cs
new file mode 100644
index 0000000000..5a6e5cbe51
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Configuration.cs
@@ -0,0 +1,17 @@
+using Paramore.Brighter.MongoDb;
+
+namespace Paramore.Brighter.MongoDb.Tests;
+
+public class Configuration
+{
+ public static MongoDbConfiguration Create(string collection)
+ {
+ return new MongoDbConfiguration("mongodb://root:example@localhost:27017", "brighter", collection);
+ }
+
+ public static void Cleanup(string collection)
+ {
+ var config = Create(collection);
+ config.Client.GetDatabase(config.DatabaseName).DropCollection(collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_the_message_Is_already_in_the_Inbox_async.cs b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_the_message_Is_already_in_the_Inbox_async.cs
new file mode 100644
index 0000000000..817e223039
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_the_message_Is_already_in_the_Inbox_async.cs
@@ -0,0 +1,80 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2020 Ian Cooper
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+
+using System;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Paramore.Brighter.Inbox.MongoDb;
+using Paramore.Brighter.MongoDb.Tests.TestDoubles;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Inbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbInboxDuplicateMessageAsyncTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly MongoDbInbox _inbox;
+ private readonly MyCommand _raisedCommand;
+ private readonly string _contextKey;
+
+ public MongoDbInboxDuplicateMessageAsyncTests()
+ {
+ _collection = $"inbox-{Guid.NewGuid():N}";
+ _inbox = new MongoDbInbox(Configuration.Create(_collection));
+ _raisedCommand = new MyCommand { Value = "Test" };
+ _contextKey = "test-context";
+ }
+
+ [Fact]
+ public async Task When_The_Message_Is_Already_In_The_Inbox_Async()
+ {
+ await _inbox.AddAsync(_raisedCommand, _contextKey);
+
+ var exception = await Catch.ExceptionAsync(() => _inbox.AddAsync(_raisedCommand, _contextKey));
+
+ //_should_succeed_even_if_the_message_is_a_duplicate
+ exception.Should().BeNull();
+ var exists = await _inbox.ExistsAsync(_raisedCommand.Id, _contextKey);
+ exists.Should().BeTrue();
+ }
+
+ [Fact]
+ public async Task When_The_Message_Is_Already_In_The_Inbox_Different_Context()
+ {
+ await _inbox.AddAsync(_raisedCommand, "some other key");
+
+ var storedCommand = _inbox.Get(_raisedCommand.Id, "some other key");
+
+ //_should_read_the_command_from_the__dynamo_db_inbox
+ storedCommand.Should().NotBeNull();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_the_message_is_already_in_the_inbox.cs b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_the_message_is_already_in_the_inbox.cs
new file mode 100644
index 0000000000..0b5480464f
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_the_message_is_already_in_the_inbox.cs
@@ -0,0 +1,81 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2020 Ian Cooper
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+
+using System;
+using FluentAssertions;
+using Paramore.Brighter.Inbox.MongoDb;
+using Paramore.Brighter.MongoDb.Tests.TestDoubles;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Inbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbInboxDuplicateMessageTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly MongoDbInbox _inbox;
+ private readonly MyCommand _raisedCommand;
+ private readonly string _contextKey;
+
+ public MongoDbInboxDuplicateMessageTests()
+ {
+ _collection = $"inbox-{Guid.NewGuid():N}";
+ _inbox = new MongoDbInbox(Configuration.Create(_collection));
+ _raisedCommand = new MyCommand { Value = "Test" };
+ _contextKey = Guid.NewGuid().ToString();
+ }
+
+ [Fact]
+ public void When_The_Message_Is_Already_In_The_Inbox()
+ {
+ _inbox.Add(_raisedCommand, _contextKey);
+
+ var exception = Catch.Exception(() => _inbox.Add(_raisedCommand, _contextKey));
+
+ //_should_succeed_even_if_the_message_is_a_duplicate
+ exception.Should().BeNull();
+ _inbox.Exists(_raisedCommand.Id, _contextKey).Should().BeTrue();
+ }
+
+ [Fact]
+ public void When_The_Message_Is_Already_In_The_Inbox_Different_Context()
+ {
+ _inbox.Add(_raisedCommand, _contextKey);
+
+ var newcontext = Guid.NewGuid().ToString();
+ _inbox.Add(_raisedCommand, newcontext);
+
+ var storedCommand = _inbox.Get(_raisedCommand.Id, newcontext);
+
+ //_should_read_the_command_from_the__dynamo_db_inbox
+ storedCommand.Should().NotBeNull();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_there_Is_no_message_in_the_sql_inbox_async.cs b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_there_Is_no_message_in_the_sql_inbox_async.cs
new file mode 100644
index 0000000000..de38cc4fa7
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_there_Is_no_message_in_the_sql_inbox_async.cs
@@ -0,0 +1,69 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2020 Ian Cooper
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+
+using System;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Paramore.Brighter.Inbox.Exceptions;
+using Paramore.Brighter.Inbox.MongoDb;
+using Paramore.Brighter.MongoDb.Tests.TestDoubles;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Inbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbInboxEmptyWhenSearchedAsyncTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly MongoDbInbox _inbox;
+
+ public MongoDbInboxEmptyWhenSearchedAsyncTests()
+ {
+ _collection = $"inbox-{Guid.NewGuid():N}";
+ _inbox = new MongoDbInbox(Configuration.Create(_collection));
+ }
+
+ [Fact]
+ public async Task When_There_Is_No_Message_In_The_Sql_Inbox_And_I_Get_Async()
+ {
+ string commandId = Guid.NewGuid().ToString();
+ var exception = await Catch.ExceptionAsync(() => _inbox.GetAsync(commandId, "some-key"));
+ exception.Should().BeOfType>();
+ }
+
+ [Fact]
+ public async Task When_There_Is_No_Message_In_The_Sql_Inbox_And_I_Check_Exists_Async()
+ {
+ string commandId = Guid.NewGuid().ToString();
+ bool exists = await _inbox.ExistsAsync(commandId, "some-key");
+ exists.Should().BeFalse();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_there_is_no_message_in_the_sql_inbox.cs b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_there_is_no_message_in_the_sql_inbox.cs
new file mode 100644
index 0000000000..0cb49a3e32
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_there_is_no_message_in_the_sql_inbox.cs
@@ -0,0 +1,70 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2020 Ian Cooper
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+
+using System;
+using FluentAssertions;
+using Paramore.Brighter.Inbox.Exceptions;
+using Paramore.Brighter.Inbox.MongoDb;
+using Paramore.Brighter.MongoDb.Tests.TestDoubles;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Inbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbInboxEmptyWhenSearchedTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly MongoDbInbox _inbox;
+ private readonly string _contextKey;
+
+ public MongoDbInboxEmptyWhenSearchedTests()
+ {
+ _collection = $"inbox-{Guid.NewGuid():N}";
+ _inbox = new MongoDbInbox(Configuration.Create(_collection));
+ _contextKey = "context-key";
+ }
+
+ [Fact]
+ public void When_There_Is_No_Message_In_The_Sql_Inbox_And_Call_Get()
+ {
+ string commandId = Guid.NewGuid().ToString();
+ var exception = Catch.Exception(() => _ = _inbox.Get(commandId, _contextKey));
+
+ exception.Should().BeOfType>();
+ }
+
+ [Fact]
+ public void When_There_Is_No_Message_In_The_Sql_Inbox_And_Call_Exists()
+ {
+ string commandId = Guid.NewGuid().ToString();
+ _inbox.Exists(commandId, _contextKey).Should().BeFalse();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_writing_a_message_to_the_inbox.cs b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_writing_a_message_to_the_inbox.cs
new file mode 100644
index 0000000000..ae0a769653
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_writing_a_message_to_the_inbox.cs
@@ -0,0 +1,79 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2020 Ian Cooper
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+
+using System;
+using FluentAssertions;
+using Paramore.Brighter.Inbox.Exceptions;
+using Paramore.Brighter.Inbox.MongoDb;
+using Paramore.Brighter.MongoDb.Tests.TestDoubles;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Inbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbInboxAddMessageTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly MongoDbInbox _inbox;
+ private readonly MyCommand _raisedCommand;
+ private readonly string _contextKey;
+
+ public MongoDbInboxAddMessageTests()
+ {
+ _collection = $"inbox-{Guid.NewGuid():N}";
+ _inbox = new MongoDbInbox(Configuration.Create(_collection));
+
+ _raisedCommand = new MyCommand { Value = "Test" };
+ _contextKey = "context-key";
+ _inbox.Add(_raisedCommand, _contextKey);
+ }
+
+ [Fact]
+ public void When_Writing_A_Message_To_The_Inbox()
+ {
+ var storedCommand = _inbox.Get(_raisedCommand.Id, _contextKey);
+
+ //_should_read_the_command_from_the__sql_inbox
+ storedCommand.Should().NotBeNull();
+ //_should_read_the_command_value
+ storedCommand.Value.Should().Be(_raisedCommand.Value);
+ //_should_read_the_command_id
+ storedCommand.Id.Should().Be(_raisedCommand.Id);
+ }
+
+ [Fact]
+ public void When_Reading_A_Message_From_The_Inbox_And_ContextKey_IsNull()
+ {
+ var exception = Catch.Exception(() => _ = _inbox.Get(_raisedCommand.Id, null));
+ //should_not_read_message
+ exception.Should().BeOfType>();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_writing_a_message_to_the_inbox_async.cs b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_writing_a_message_to_the_inbox_async.cs
new file mode 100644
index 0000000000..9edae80ba0
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Inbox/When_writing_a_message_to_the_inbox_async.cs
@@ -0,0 +1,72 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2020 Ian Cooper
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+
+using System;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Paramore.Brighter.Inbox.MongoDb;
+using Paramore.Brighter.MongoDb.Tests.TestDoubles;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Inbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbInboxAddMessageAsyncTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly MongoDbInbox _inbox;
+ private readonly MyCommand _raisedCommand;
+ private readonly string _contextKey;
+
+ public MongoDbInboxAddMessageAsyncTests()
+ {
+ _collection = $"inbox-{Guid.NewGuid():N}";
+ _inbox = new MongoDbInbox(Configuration.Create(_collection));
+
+ _raisedCommand = new MyCommand { Value = "Test" };
+ _contextKey = "context-key";
+ }
+
+ [Fact]
+ public async Task When_Writing_A_Message_To_The_Inbox_Async()
+ {
+ await _inbox.AddAsync(_raisedCommand, _contextKey);
+
+ var storedCommand = await _inbox.GetAsync(_raisedCommand.Id, _contextKey);
+
+ //_should_read_the_command_from_the__sql_inbox
+ storedCommand.Should().NotBeNull();
+ //_should_read_the_command_value
+ storedCommand.Value.Should().Be(_raisedCommand.Value);
+ //_should_read_the_command_id
+ storedCommand.Id.Should().Be(_raisedCommand.Id);
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/MongoDbLockingProviderTest.cs b/tests/Paramore.Brighter.MongoDb.Tests/MongoDbLockingProviderTest.cs
new file mode 100644
index 0000000000..4fc0341068
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/MongoDbLockingProviderTest.cs
@@ -0,0 +1,46 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Paramore.Brighter.Locking.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbLockingProviderTest
+{
+ private readonly MongoDbLockingProvider _locking;
+
+ public MongoDbLockingProviderTest()
+ {
+ _locking = new MongoDbLockingProvider(Configuration.Create("locking"));
+ }
+
+ [Fact]
+ public async Task GivenAnPostgresLockingProvider_WhenLockIsCalled_ItCanOnlyBeObtainedOnce()
+ {
+ var resourceName = $"TestLock-{Guid.NewGuid()}";
+
+ var first = await _locking.ObtainLockAsync(resourceName, CancellationToken.None);
+ var second = await _locking.ObtainLockAsync(resourceName, CancellationToken.None);
+
+ Assert.Equal(first, resourceName);
+ Assert.Null(second);
+ }
+
+ [Fact]
+ public async Task GivenAnPostgresLockingProviderWithALockedBlob_WhenReleaseLockIsCalled_ItCanOnlyBeLockedAgain()
+ {
+ var resourceName = $"TestLock-{Guid.NewGuid()}";
+
+ var first = await _locking.ObtainLockAsync(resourceName, CancellationToken.None);
+ await _locking.ReleaseLockAsync(resourceName, first, CancellationToken.None);
+
+ var second = await _locking.ObtainLockAsync(resourceName, CancellationToken.None);
+ var third = await _locking.ObtainLockAsync(resourceName, CancellationToken.None);
+
+ Assert.Equal(first, resourceName);
+ Assert.Equal(second, resourceName);
+ Assert.Null(third);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_Removing_Messages_From_The_Outbox.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_Removing_Messages_From_The_Outbox.cs
new file mode 100644
index 0000000000..69706da3ff
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_Removing_Messages_From_The_Outbox.cs
@@ -0,0 +1,94 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2014 Francesco Pighi
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+using System;
+using System.Linq;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbOutboxDeletingMessagesTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly Message _firstMessage;
+ private readonly Message _secondMessage;
+ private readonly Message _thirdMessage;
+ private readonly MongoDbOutbox _outbox;
+
+ public MongoDbOutboxDeletingMessagesTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new MongoDbOutbox(Configuration.Create(_collection));
+
+ _firstMessage = new Message(new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("Test"),
+ MessageType.MT_COMMAND,
+ timeStamp: DateTime.UtcNow.AddHours(-3)), new MessageBody("Body")
+ );
+ _secondMessage = new Message(new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("Test2"),
+ MessageType.MT_COMMAND,
+ timeStamp: DateTime.UtcNow.AddHours(-2)), new MessageBody("Body2")
+ );
+ _thirdMessage = new Message(new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("Test3"),
+ MessageType.MT_COMMAND,
+ timeStamp: DateTime.UtcNow.AddHours(-1)), new MessageBody("Body3")
+ );
+ }
+
+ [Fact]
+ public void When_Removing_Messages_From_The_Outbox()
+ {
+ //arrange
+ var context = new RequestContext();
+
+ //act
+ _outbox.Add(_firstMessage, context);
+ _outbox.Add(_secondMessage, context);
+ _outbox.Add(_thirdMessage, context);
+
+ _outbox.Delete([_firstMessage.Id], context);
+
+ //assert
+ var remainingMessages = _outbox.OutstandingMessages(TimeSpan.Zero, context);
+
+ var msgs = remainingMessages as Message[] ?? remainingMessages.ToArray();
+ msgs.Should().HaveCount(2);
+ msgs.Should().Contain(_secondMessage);
+ msgs.Should().Contain(_thirdMessage);
+
+ _outbox.Delete([_secondMessage.Id, _thirdMessage.Id], context);
+
+ var messages = _outbox.OutstandingMessages(TimeSpan.Zero, context);
+
+ messages.Should().HaveCount(0);
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_The_Message_Is_Already_In_The_Outbox.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_The_Message_Is_Already_In_The_Outbox.cs
new file mode 100644
index 0000000000..e914d4aa2d
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_The_Message_Is_Already_In_The_Outbox.cs
@@ -0,0 +1,65 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2014 Francesco Pighi
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+using System;
+using FluentAssertions;
+using Paramore.Brighter.Inbox.MongoDb;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbOutboxMessageAlreadyExistsTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly Message _messageEarliest;
+ private readonly MongoDbOutbox _outbox;
+
+ public MongoDbOutboxMessageAlreadyExistsTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new (Configuration.Create(_collection));
+ _messageEarliest = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("test_topic"), MessageType.MT_DOCUMENT),
+ new MessageBody("message body")
+ );
+ _outbox.Add(_messageEarliest, new RequestContext());
+ }
+
+ [Fact]
+ public void When_The_Message_Is_Already_In_The_Outbox()
+ {
+ var exception = Catch.Exception(() => _outbox.Add(_messageEarliest, new RequestContext()));
+
+ //should ignore the duplicate key and still succeed
+ exception.Should().BeNull();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_There_Is_No_Message_In_The_Sql_Outbox.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_There_Is_No_Message_In_The_Sql_Outbox.cs
new file mode 100644
index 0000000000..0afedfe722
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_There_Is_No_Message_In_The_Sql_Outbox.cs
@@ -0,0 +1,63 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2014 Francesco Pighi
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+using System;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbOutboxEmptyStoreTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly Message _messageEarliest;
+ private readonly MongoDbOutbox _outbox;
+
+ public MongoDbOutboxEmptyStoreTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new (Configuration.Create(_collection));
+ _messageEarliest = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("test_topic"), MessageType.MT_DOCUMENT),
+ new MessageBody("message body")
+ );
+ }
+
+ [Fact]
+ public void When_There_Is_No_Message_In_The_Sql_Outbox()
+ {
+ var storedMessage = _outbox.Get(_messageEarliest.Id, new RequestContext());
+
+ //should return a empty message
+ storedMessage.Header.MessageType.Should().Be(MessageType.MT_NONE);
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_Writing_A_Message_To_A_Binary_Body_Outbox.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_Writing_A_Message_To_A_Binary_Body_Outbox.cs
new file mode 100644
index 0000000000..f4b734f602
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_Writing_A_Message_To_A_Binary_Body_Outbox.cs
@@ -0,0 +1,90 @@
+using System;
+using System.Net.Mime;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbBinaryOutboxWritingMessageTests : IDisposable
+{
+ private readonly string _collection;
+ private const string Key1 = "name1";
+ private const string Key2 = "name2";
+ private const string Key3 = "name3";
+ private const string Key4 = "name4";
+ private const string Key5 = "name5";
+ private readonly Message _messageEarliest;
+ private readonly MongoDbOutbox _outbox;
+ private const string Value1 = "value1";
+ private const string Value2 = "value2";
+ private const int Value3 = 123;
+ private readonly Guid _value4 = Guid.NewGuid();
+ private readonly DateTime _value5 = DateTime.UtcNow;
+ private readonly RequestContext _context;
+
+ public MongoDbBinaryOutboxWritingMessageTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new(Configuration.Create(_collection));
+ var messageHeader = new MessageHeader(
+ messageId: Guid.NewGuid().ToString(),
+ topic: new RoutingKey("test_topic"),
+ messageType: MessageType.MT_DOCUMENT,
+ timeStamp: DateTime.UtcNow.AddDays(-1),
+ handledCount: 5,
+ delayed: TimeSpan.FromMilliseconds(5),
+ correlationId: Guid.NewGuid().ToString(),
+ replyTo: new RoutingKey("ReplyTo"),
+ contentType: "text/plain",
+ partitionKey: Guid.NewGuid().ToString());
+ messageHeader.Bag.Add(Key1, Value1);
+ messageHeader.Bag.Add(Key2, Value2);
+ messageHeader.Bag.Add(Key3, Value3);
+ messageHeader.Bag.Add(Key4, _value4);
+ messageHeader.Bag.Add(Key5, _value5);
+
+ _context = new RequestContext();
+
+ _messageEarliest = new Message(messageHeader, new MessageBody("message body"));
+ _outbox.Add(_messageEarliest, _context);
+ }
+
+ [Fact]
+ public void When_Writing_A_Message_To_A_Binary_Body_Outbox()
+ {
+ var storedMessage = _outbox.Get(_messageEarliest.Id, _context);
+
+ //should read the message from the sql outbox
+ storedMessage.Body.Value.Should().Be(_messageEarliest.Body.Value);
+ //should read the header from the sql outbox
+ storedMessage.Header.Topic.Should().Be(_messageEarliest.Header.Topic);
+ storedMessage.Header.MessageType.Should().Be(_messageEarliest.Header.MessageType);
+ storedMessage.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ss.fZ")
+ .Should().Be(_messageEarliest.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ss.fZ"));
+ storedMessage.Header.HandledCount.Should().Be(0); // -- should be zero when read from outbox
+ storedMessage.Header.Delayed.Should().Be(TimeSpan.Zero); // -- should be zero when read from outbox
+ storedMessage.Header.CorrelationId.Should().Be(_messageEarliest.Header.CorrelationId);
+ storedMessage.Header.ReplyTo.Should().Be(_messageEarliest.Header.ReplyTo);
+ storedMessage.Header.ContentType.Should().Be(_messageEarliest.Header.ContentType);
+ storedMessage.Header.PartitionKey.Should().Be(_messageEarliest.Header.PartitionKey);
+
+ //Bag serialization
+ storedMessage.Header.Bag.ContainsKey(Key1).Should().BeTrue();
+ storedMessage.Header.Bag[Key1].Should().Be(Value1);
+ storedMessage.Header.Bag.ContainsKey(Key2).Should().BeTrue();
+ storedMessage.Header.Bag[Key2].Should().Be(Value2);
+ storedMessage.Header.Bag.ContainsKey(Key3).Should().BeTrue();
+ storedMessage.Header.Bag[Key3].Should().Be(Value3);
+ storedMessage.Header.Bag.ContainsKey(Key4).Should().BeTrue();
+ storedMessage.Header.Bag[Key4].Should().Be(_value4);
+ storedMessage.Header.Bag.ContainsKey(Key5).Should().BeTrue();
+ storedMessage.Header.Bag[Key5].Should().Be(_value5);
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_Writing_A_Message_To_The_Outbox.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_Writing_A_Message_To_The_Outbox.cs
new file mode 100644
index 0000000000..84c97da6c5
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_Writing_A_Message_To_The_Outbox.cs
@@ -0,0 +1,115 @@
+#region Licence
+
+/* The MIT License (MIT)
+Copyright © 2014 Francesco Pighi
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+using System;
+using System.Net.Mime;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbOutboxWritingMessageTests : IDisposable
+{
+ private readonly string _collection;
+ private const string Key1 = "name1";
+ private const string Key2 = "name2";
+ private const string Key3 = "name3";
+ private const string Key4 = "name4";
+ private const string Key5 = "name5";
+ private readonly Message _messageEarliest;
+ private readonly MongoDbOutbox _outbox;
+ private const string Value1 = "value1";
+ private const string Value2 = "value2";
+ private const int Value3 = 123;
+ private readonly Guid _value4 = Guid.NewGuid();
+ private readonly DateTime _value5 = DateTime.UtcNow;
+ private readonly RequestContext _context;
+
+ public MongoDbOutboxWritingMessageTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new(Configuration.Create(_collection));
+ var messageHeader = new MessageHeader(
+ messageId: Guid.NewGuid().ToString(),
+ topic: new RoutingKey("test_topic"),
+ messageType: MessageType.MT_DOCUMENT,
+ timeStamp: DateTime.UtcNow.AddDays(-1),
+ handledCount: 5,
+ delayed: TimeSpan.FromMilliseconds(5),
+ correlationId: Guid.NewGuid().ToString(),
+ replyTo: new RoutingKey("ReplyTo"),
+ contentType: "text/plain",
+ partitionKey: Guid.NewGuid().ToString());
+ messageHeader.Bag.Add(Key1, Value1);
+ messageHeader.Bag.Add(Key2, Value2);
+ messageHeader.Bag.Add(Key3, Value3);
+ messageHeader.Bag.Add(Key4, _value4);
+ messageHeader.Bag.Add(Key5, _value5);
+
+ _context = new RequestContext();
+
+ _messageEarliest = new Message(messageHeader, new MessageBody("message body"));
+ _outbox.Add(_messageEarliest, _context);
+ }
+
+ [Fact]
+ public void When_Writing_A_Message_To_The_PostgreSql_Outbox()
+ {
+ var storedMessage = _outbox.Get(_messageEarliest.Id, _context);
+
+ //should read the message from the sql outbox
+ storedMessage.Body.Value.Should().Be(_messageEarliest.Body.Value);
+ //should read the header from the sql outbox
+ storedMessage.Header.Topic.Should().Be(_messageEarliest.Header.Topic);
+ storedMessage.Header.MessageType.Should().Be(_messageEarliest.Header.MessageType);
+ storedMessage.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ss.fZ")
+ .Should().Be(_messageEarliest.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ss.fZ"));
+ storedMessage.Header.HandledCount.Should().Be(0); // -- should be zero when read from outbox
+ storedMessage.Header.Delayed.Should().Be(TimeSpan.Zero); // -- should be zero when read from outbox
+ storedMessage.Header.CorrelationId.Should().Be(_messageEarliest.Header.CorrelationId);
+ storedMessage.Header.ReplyTo.Should().Be(_messageEarliest.Header.ReplyTo);
+ storedMessage.Header.ContentType.Should().Be(_messageEarliest.Header.ContentType);
+ storedMessage.Header.PartitionKey.Should().Be(_messageEarliest.Header.PartitionKey);
+
+ //Bag serialization
+ storedMessage.Header.Bag.ContainsKey(Key1).Should().BeTrue();
+ storedMessage.Header.Bag[Key1].Should().Be(Value1);
+ storedMessage.Header.Bag.ContainsKey(Key2).Should().BeTrue();
+ storedMessage.Header.Bag[Key2].Should().Be(Value2);
+ storedMessage.Header.Bag.ContainsKey(Key3).Should().BeTrue();
+ storedMessage.Header.Bag[Key3].Should().Be(Value3);
+ storedMessage.Header.Bag.ContainsKey(Key4).Should().BeTrue();
+ storedMessage.Header.Bag[Key4].Should().Be(_value4);
+ storedMessage.Header.Bag.ContainsKey(Key5).Should().BeTrue();
+ storedMessage.Header.Bag[Key5].Should().Be(_value5);
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages.cs
new file mode 100644
index 0000000000..ebd16d0fce
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages.cs
@@ -0,0 +1,87 @@
+using System;
+using System.Linq;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbFetchMessageTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly Message _messageEarliest;
+ private readonly Message _messageDispatched;
+ private readonly Message _messageUnDispatched;
+ private readonly MongoDbOutbox _outbox;
+
+ public MongoDbFetchMessageTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new MongoDbOutbox(Configuration.Create(_collection));
+ var routingKey = new RoutingKey("test_topic");
+
+ _messageEarliest = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageUnDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ }
+
+ [Fact]
+ public void When_Retrieving_Messages()
+ {
+ var context = new RequestContext();
+ _outbox.Add([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ _outbox.MarkDispatched(_messageEarliest.Id, context, DateTime.UtcNow.AddHours(-3));
+ _outbox.MarkDispatched(_messageDispatched.Id, context);
+
+ var messages = _outbox.Get();
+
+ //Assert
+ messages.Should().HaveCount(3);
+ }
+
+ [Fact]
+ public void When_Retrieving_Messages_By_Id()
+ {
+ var context = new RequestContext();
+ _outbox.Add([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ _outbox.MarkDispatched(_messageEarliest.Id, context, DateTime.UtcNow.AddHours(-3));
+ _outbox.MarkDispatched(_messageDispatched.Id, context);
+
+ var messages = _outbox.Get(
+ [_messageEarliest.Id, _messageUnDispatched.Id],
+ context);
+
+ //Assert
+ messages = messages.ToList();
+ messages.Should().HaveCount(2);
+ messages.Should().Contain(x => x.Id == _messageEarliest.Id);
+ messages.Should().Contain(x => x.Id == _messageUnDispatched.Id);
+ messages.Should().NotContain(x => x.Id == _messageDispatched.Id);
+ }
+
+ [Fact]
+ public void When_Retrieving_Message_By_Id()
+ {
+ var context = new RequestContext();
+ _outbox.Add([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ _outbox.MarkDispatched(_messageEarliest.Id, context, DateTime.UtcNow.AddHours(-3));
+ _outbox.MarkDispatched(_messageDispatched.Id, context);
+
+ var messages = _outbox.Get(_messageDispatched.Id, context);
+
+ //Assert
+ messages.Id.Should().Be(_messageDispatched.Id);
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages_async.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages_async.cs
new file mode 100644
index 0000000000..df5b8dc942
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages_async.cs
@@ -0,0 +1,88 @@
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbFetchMessageAsyncTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly Message _messageEarliest;
+ private readonly Message _messageDispatched;
+ private readonly Message _messageUnDispatched;
+ private readonly MongoDbOutbox _outbox;
+
+ public MongoDbFetchMessageAsyncTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new MongoDbOutbox(Configuration.Create(_collection));
+ var routingKey = new RoutingKey("test_topic");
+
+ _messageEarliest = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageUnDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ }
+
+ [Fact]
+ public async Task When_Retrieving_Messages_Async()
+ {
+ var context = new RequestContext();
+ await _outbox.AddAsync([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ await _outbox.MarkDispatchedAsync(_messageEarliest.Id, context, DateTime.UtcNow.AddHours(-3));
+ await _outbox.MarkDispatchedAsync(_messageDispatched.Id, context);
+
+ var messages = await _outbox.GetAsync();
+
+ //Assert
+ messages.Should().HaveCount(3);
+ }
+
+ [Fact]
+ public async Task When_Retrieving_Messages_By_Id_Async()
+ {
+ var context = new RequestContext();
+ await _outbox.AddAsync([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ await _outbox.MarkDispatchedAsync(_messageEarliest.Id, context, DateTime.UtcNow.AddHours(-3));
+ await _outbox.MarkDispatchedAsync(_messageDispatched.Id, context);
+
+ var messages = await _outbox.GetAsync(
+ [_messageEarliest.Id, _messageUnDispatched.Id],
+ context);
+
+ //Assert
+ messages = messages.ToList();
+ messages.Should().HaveCount(2);
+ messages.Should().Contain(x => x.Id == _messageEarliest.Id);
+ messages.Should().Contain(x => x.Id == _messageUnDispatched.Id);
+ messages.Should().NotContain(x => x.Id == _messageDispatched.Id);
+ }
+
+ [Fact]
+ public async Task When_Retrieving_Message_By_Id_Async()
+ {
+ var context = new RequestContext();
+ await _outbox.AddAsync([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ await _outbox.MarkDispatchedAsync(_messageEarliest.Id, context, DateTime.UtcNow.AddHours(-3));
+ await _outbox.MarkDispatchedAsync(_messageDispatched.Id, context);
+
+ var messages = await _outbox.GetAsync(_messageDispatched.Id, context);
+
+ //Assert
+ messages.Id.Should().Be(_messageDispatched.Id);
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages_to_archive.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages_to_archive.cs
new file mode 100644
index 0000000000..f56d1eaf61
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages_to_archive.cs
@@ -0,0 +1,56 @@
+using System;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbArchiveFetchTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly Message _messageEarliest;
+ private readonly Message _messageDispatched;
+ private readonly Message _messageUnDispatched;
+ private readonly MongoDbOutbox _outbox;
+
+ public MongoDbArchiveFetchTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new MongoDbOutbox(Configuration.Create(_collection));
+ var routingKey = new RoutingKey("test_topic");
+
+ _messageEarliest = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageUnDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ }
+
+ [Fact]
+ public void When_Retrieving_Messages_To_Archive_UsingTimeSpan()
+ {
+ var context = new RequestContext();
+ _outbox.Add([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ _outbox.MarkDispatched(_messageEarliest.Id, context, DateTime.UtcNow.AddHours(-3));
+ _outbox.MarkDispatched(_messageDispatched.Id, context);
+
+ var allDispatched = _outbox.DispatchedMessages(TimeSpan.Zero, context);
+ var messagesOverAnHour = _outbox.DispatchedMessages(TimeSpan.FromHours(2), context);
+ var messagesOver4Hours = _outbox.DispatchedMessages(TimeSpan.FromHours(4), context);
+
+ //Assert
+ allDispatched.Should().HaveCount(2);
+ messagesOverAnHour.Should().ContainSingle();
+ messagesOver4Hours.Should().BeEmpty();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages_to_archive_async.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages_to_archive_async.cs
new file mode 100644
index 0000000000..4ae2c0e423
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_messages_to_archive_async.cs
@@ -0,0 +1,64 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbArchiveFetchAsyncTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly Message _messageEarliest;
+ private readonly Message _messageDispatched;
+ private readonly Message _messageUnDispatched;
+ private readonly MongoDbOutbox _outbox;
+
+ public MongoDbArchiveFetchAsyncTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new MongoDbOutbox(Configuration.Create(_collection));
+ var routingKey = new RoutingKey("test_topic");
+
+ _messageEarliest = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageUnDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ }
+
+ [Fact]
+ public async Task When_Retrieving_Messages_To_Archive_UsingTimeSpan_Async()
+ {
+ var context = new RequestContext();
+ await _outbox.AddAsync([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ await _outbox.MarkDispatchedAsync(_messageEarliest.Id, context, DateTime.UtcNow.AddHours(-3));
+ await _outbox.MarkDispatchedAsync(_messageDispatched.Id, context);
+
+ var allDispatched =
+ await _outbox.DispatchedMessagesAsync(TimeSpan.Zero, context,
+ cancellationToken: CancellationToken.None);
+ var messagesOverAnHour =
+ await _outbox.DispatchedMessagesAsync(TimeSpan.FromHours(2), context,
+ cancellationToken: CancellationToken.None);
+ var messagesOver4Hours =
+ await _outbox.DispatchedMessagesAsync(TimeSpan.FromHours(4), context,
+ cancellationToken: CancellationToken.None);
+
+ //Assert
+ allDispatched.Should().HaveCount(2);
+ messagesOverAnHour.Should().ContainSingle();
+ messagesOver4Hours.Should().BeEmpty();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_outstanding_messages.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_outstanding_messages.cs
new file mode 100644
index 0000000000..0f8920cf4a
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_outstanding_messages.cs
@@ -0,0 +1,61 @@
+using System;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbFetchOutStandingMessageTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly Message _messageEarliest;
+ private readonly Message _messageDispatched;
+ private readonly Message _messageUnDispatched;
+ private readonly MongoDbOutbox _outbox;
+
+ public MongoDbFetchOutStandingMessageTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new MongoDbOutbox(Configuration.Create(_collection));
+ var routingKey = new RoutingKey("test_topic");
+
+ _messageEarliest = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT)
+ {
+ TimeStamp = DateTimeOffset.UtcNow.AddHours(-3)
+ },
+ new MessageBody("message body"));
+ _messageDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageUnDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ }
+
+ [Fact]
+ public void When_Retrieving_Not_Dispatched_Messages()
+ {
+ var context = new RequestContext();
+ _outbox.Add([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ _outbox.MarkDispatched(_messageDispatched.Id, context);
+
+ var total = _outbox.GetNumberOfOutstandingMessages();
+
+ var allUnDispatched = _outbox.OutstandingMessages(TimeSpan.Zero, context);
+ var messagesOverAnHour = _outbox.OutstandingMessages(TimeSpan.FromHours(1), context);
+ var messagesOver4Hours = _outbox.OutstandingMessages(TimeSpan.FromHours(4), context);
+
+ //Assert
+ total.Should().Be(2);
+ allUnDispatched.Should().HaveCount(2);
+ messagesOverAnHour.Should().ContainSingle();
+ messagesOver4Hours.Should().BeEmpty();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_outstanding_messages_async.cs b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_outstanding_messages_async.cs
new file mode 100644
index 0000000000..c59bd7dc9b
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Outbox/When_retrieving_outstanding_messages_async.cs
@@ -0,0 +1,62 @@
+using System;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Paramore.Brighter.Outbox.MongoDb;
+using Xunit;
+
+namespace Paramore.Brighter.MongoDb.Tests.Outbox;
+
+[Trait("Category", "MongoDb")]
+public class MongoDbFetchOutStandingMessageAsyncTests : IDisposable
+{
+ private readonly string _collection;
+ private readonly Message _messageEarliest;
+ private readonly Message _messageDispatched;
+ private readonly Message _messageUnDispatched;
+ private readonly MongoDbOutbox _outbox;
+
+ public MongoDbFetchOutStandingMessageAsyncTests()
+ {
+ _collection = $"outbox-{Guid.NewGuid():N}";
+ _outbox = new MongoDbOutbox(Configuration.Create(_collection));
+ var routingKey = new RoutingKey("test_topic");
+
+ _messageEarliest = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT)
+ {
+ TimeStamp = DateTimeOffset.UtcNow.AddHours(-3)
+ },
+ new MessageBody("message body"));
+ _messageDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ _messageUnDispatched = new Message(
+ new MessageHeader(Guid.NewGuid().ToString(), routingKey, MessageType.MT_DOCUMENT),
+ new MessageBody("message body"));
+ }
+
+ [Fact]
+ public async Task When_Retrieving_Not_Dispatched_Messages_Async()
+ {
+ var context = new RequestContext();
+ await _outbox.AddAsync([_messageEarliest, _messageDispatched, _messageUnDispatched], context);
+ await _outbox.MarkDispatchedAsync(_messageDispatched.Id, context);
+
+ var total = await _outbox.GetNumberOfOutstandingMessagesAsync();
+
+ var allUnDispatched = await _outbox.OutstandingMessagesAsync(TimeSpan.Zero, context);
+ var messagesOverAnHour = await _outbox.OutstandingMessagesAsync(TimeSpan.FromHours(1), context);
+ var messagesOver4Hours = await _outbox.OutstandingMessagesAsync(TimeSpan.FromHours(4), context);
+
+ //Assert
+ total.Should().Be(2);
+ allUnDispatched.Should().HaveCount(2);
+ messagesOverAnHour.Should().ContainSingle();
+ messagesOver4Hours.Should().BeEmpty();
+ }
+
+ public void Dispose()
+ {
+ Configuration.Cleanup(_collection);
+ }
+}
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/Paramore.Brighter.MongoDb.Tests.csproj b/tests/Paramore.Brighter.MongoDb.Tests/Paramore.Brighter.MongoDb.Tests.csproj
new file mode 100644
index 0000000000..f39b4a270d
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/Paramore.Brighter.MongoDb.Tests.csproj
@@ -0,0 +1,30 @@
+
+
+
+ net9.0
+ false
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/Paramore.Brighter.MongoDb.Tests/TestDoubles/MyCommand.cs b/tests/Paramore.Brighter.MongoDb.Tests/TestDoubles/MyCommand.cs
new file mode 100644
index 0000000000..db7f31672a
--- /dev/null
+++ b/tests/Paramore.Brighter.MongoDb.Tests/TestDoubles/MyCommand.cs
@@ -0,0 +1,39 @@
+#region Licence
+/* The MIT License (MIT)
+Copyright © 2014 Ian Cooper
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the “Software”), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE. */
+
+#endregion
+
+using System;
+
+namespace Paramore.Brighter.MongoDb.Tests.TestDoubles;
+
+internal class MyCommand : Command
+{
+ public MyCommand()
+ :base(Guid.NewGuid())
+
+ {}
+
+ public string Value { get; set; }
+ public bool WasCancelled { get; set; }
+ public bool TaskCompleted { get; set; }
+}