Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ public static async Task Main(string[] args)
new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)),
bufferSize: 10,
timeOut: TimeSpan.FromMilliseconds(20),
lockTimeout: 30)
lockTimeout: 30,
sqsType: SnsSqsType.Fifo,
snsAttributes: new SnsAttributes
{
Type = SnsSqsType.Fifo
})
};

//create the gateway
Expand Down
12 changes: 11 additions & 1 deletion samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ THE SOFTWARE. */
#endregion

using System;
using System.Threading.Tasks;
using System.Transactions;
using Amazon;
using Amazon.Runtime.CredentialManagement;
Expand All @@ -40,7 +41,7 @@ namespace GreetingsSender
{
class Program
{
static void Main(string[] args)
static async Task Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
Expand Down Expand Up @@ -71,6 +72,15 @@ static void Main(string[] args)
{
Topic = new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()),
RequestType = typeof(GreetingEvent)
},
new()
{
Topic = new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)),
RequestType = typeof(FarewellEvent),
SnsAttributes = new SnsAttributes
{
Type = SnsSqsType.Fifo
}
}
}
).Create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ public SnsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien
new() { StringValue = Convert.ToString(message.Header.MessageId), DataType = "String" },
[HeaderNames.Topic] = new() { StringValue = _topicArn, DataType = "String" },
[HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = "String" },
[HeaderNames.CorrelationId] =
new() { StringValue = Convert.ToString(message.Header.CorrelationId), DataType = "String" },
[HeaderNames.HandledCount] =
new() { StringValue = Convert.ToString(message.Header.HandledCount), DataType = "String" },
[HeaderNames.MessageType] =
Expand All @@ -67,6 +65,16 @@ public SnsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien
StringValue = Convert.ToString(message.Header.TimeStamp), DataType = "String"
}
};

if (!string.IsNullOrEmpty(message.Header.CorrelationId))
{
messageAttributes[HeaderNames.CorrelationId] = new MessageAttributeValue
{
StringValue = Convert.ToString(message.Header.CorrelationId),
DataType = "String"
};
}


if (_snsSqsType == SnsSqsType.Fifo)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public SqsMessageSender(string queueUrl, SnsSqsType queueType, AmazonSQSClient c
new() { StringValue = message.Header.MessageId, DataType = "String" },
[HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = "String" },
[HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = "String" },
[HeaderNames.CorrelationId] =
new() { StringValue = message.Header.CorrelationId, DataType = "String" },
[HeaderNames.HandledCount] =
new() { StringValue = Convert.ToString(message.Header.HandledCount), DataType = "String" },
[HeaderNames.MessageType] =
Expand All @@ -102,6 +100,12 @@ public SqsMessageSender(string queueUrl, SnsSqsType queueType, AmazonSQSClient c
messageAttributes.Add(HeaderNames.Subject,
new MessageAttributeValue { StringValue = message.Header.Subject, DataType = "String" });
}

if (!string.IsNullOrEmpty(message.Header.CorrelationId))
{
messageAttributes.Add(HeaderNames.CorrelationId,
new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = "String" });
}
Comment on lines +103 to +108

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ Getting worse: Complex Method
SendAsync increases in cyclomatic complexity from 9 to 10, threshold = 9


// we can set up to 10 attributes; we have set 6 above, so use a single JSON object as the bag
var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
Expand Down
43 changes: 27 additions & 16 deletions src/Paramore.Brighter/MessageMapperRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ THE SOFTWARE. */
#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Paramore.Brighter.MessageMappers;

Expand All @@ -39,8 +40,8 @@ public class MessageMapperRegistry : IAmAMessageMapperRegistry, IAmAMessageMappe
{
private readonly IAmAMessageMapperFactory? _messageMapperFactory;
private readonly IAmAMessageMapperFactoryAsync? _messageMapperFactoryAsync;
private readonly Dictionary<Type, Type> _messageMappers = new Dictionary<Type, Type>();
private readonly Dictionary<Type, Type> _asyncMessageMappers = new Dictionary<Type, Type>();
private readonly ConcurrentDictionary<Type, Type> _messageMappers = new();
private readonly ConcurrentDictionary<Type, Type> _asyncMessageMappers = new();
private readonly Type? _defaultMessageMapper;
private readonly Type? _defaultMessageMapperAsync;

Expand Down Expand Up @@ -74,11 +75,16 @@ public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory,
if (_messageMapperFactory is null)
return null;

var messageMapperType = _messageMappers.ContainsKey(typeof(TRequest))
? _messageMappers[typeof(TRequest)]
: _defaultMessageMapper;
if (!_messageMappers.TryGetValue(typeof(TRequest), out var messageMapperType) && _defaultMessageMapper != null)
{
messageMapperType = _defaultMessageMapper.MakeGenericType(typeof(TRequest));
_messageMappers.TryAdd(typeof(TRequest), messageMapperType);
}

if (messageMapperType is null) return null;
if (messageMapperType is null)
{
return null;
}

return (IAmAMessageMapper<TRequest>)_messageMapperFactory.Create(messageMapperType);
}
Expand All @@ -92,12 +98,17 @@ public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory,
{
if (_messageMapperFactoryAsync is null)
return null;

var messageMapperType = _asyncMessageMappers.ContainsKey(typeof(TRequest))
? _asyncMessageMappers[typeof(TRequest)]
: _defaultMessageMapperAsync;

if (messageMapperType is null) return null;

if (!_asyncMessageMappers.TryGetValue(typeof(TRequest), out var messageMapperType) && _defaultMessageMapperAsync != null)
{
messageMapperType = _defaultMessageMapperAsync.MakeGenericType(typeof(TRequest));
_asyncMessageMappers.TryAdd(typeof(TRequest), messageMapperType);
}

if (messageMapperType is null)
{
return null;
}

return (IAmAMessageMapperAsync<TRequest>)_messageMapperFactoryAsync.Create(messageMapperType);
}
Expand All @@ -113,7 +124,7 @@ public void Register<TRequest, TMessageMapper>() where TRequest : class, IReques
if (_messageMappers.ContainsKey(typeof(TRequest)))
throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", typeof(TRequest).Name));

_messageMappers.Add(typeof(TRequest), typeof(TMessageMapper));
_messageMappers.TryAdd(typeof(TRequest), typeof(TMessageMapper));
}

/// <summary>
Expand All @@ -127,7 +138,7 @@ public void Register(Type request, Type mapper)
if (_messageMappers.ContainsKey(request))
throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", request.Name));

_messageMappers.Add(request, mapper);
_messageMappers.TryAdd(request, mapper);
}

/// <summary>
Expand All @@ -141,7 +152,7 @@ public void RegisterAsync<TRequest, TMessageMapper>() where TRequest : class, IR
if (_asyncMessageMappers.ContainsKey(typeof(TRequest)))
throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", typeof(TRequest).Name));

_asyncMessageMappers.Add(typeof(TRequest), typeof(TMessageMapper));
_asyncMessageMappers.TryAdd(typeof(TRequest), typeof(TMessageMapper));

}

Expand All @@ -156,7 +167,7 @@ public void RegisterAsync(Type request, Type mapper)
if (_asyncMessageMappers.ContainsKey(request))
throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", request.Name));

_asyncMessageMappers.Add(request, mapper);
_asyncMessageMappers.TryAdd(request, mapper);
}
}
}
4 changes: 2 additions & 2 deletions src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace Paramore.Brighter.MessageMappers;

public class JsonMessageMapper<TRequest>(RequestContext? context) : IAmAMessageMapper<TRequest>, IAmAMessageMapperAsync<TRequest> where TRequest : class, IRequest
public class JsonMessageMapper<TRequest> : IAmAMessageMapper<TRequest>, IAmAMessageMapperAsync<TRequest> where TRequest : class, IRequest
{
public IRequestContext? Context { get; set; } = context;
public IRequestContext? Context { get; set; }

public Task<Message> MapToMessageAsync(TRequest request, Publication publication,
CancellationToken cancellationToken = default)
Expand Down