Improve support to AWS SQS/SNS#3437
Conversation
|
Hi @lillo42 - thanks for this. To lift out of inline above, the usual strategy would be: Folders for each queue type:
Put the existing tests in Standard and duplicate the tests in Fifo, but with the changed semantics. I tend to prefer being explicit in tests, over worrying about duplication there. |
|
PS @lillo42 if you have time. One weakness in Brighter is that you have to produce to SNS and can't target a queue. It would be good to allow the queue to be the target. We now do something similar for ASB. Probably pop that in a separate PR though? |
iancooper
left a comment
There was a problem hiding this comment.
This looks great. You may also need to support async approaches, you'll note the increased footprint when you merge in master. It is a little crude, as we just tend to duplicate in tests, but I sort of prefer tests to be explicit
Sure no problem, I can add it to this PR. |
|
Thanks @lillo42. Apologies in advance for the merge issues caused by the Reactor(Blocking)/Proactor(Non-Blocking) enhancement |
… localstack and rename SQS to SNS
|
We still have a credentials issue on the AWS tests, that means they don't show up here. I need to fix that too. Let me see if I can fix that and merge to main, so that we can at least see what is passing |
| RoutingKey topic, | ||
| TopicFindBy topicFindBy, | ||
| SnsAttributes? attributes, | ||
| OnMissingChannel makeTopic = OnMissingChannel.Create, |
There was a problem hiding this comment.
Can we make makeTopic the last parameter? That keeps it in sync with other producers
| || contentType == CompressPayloadTransformerAsync.BROTLI) | ||
| return new MessageBody(sqsMessage.Body, contentType, CharacterEncoding.Base64); | ||
|
|
||
| public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) |
There was a problem hiding this comment.
❌ Getting worse: Complex Method
CreateMessage increases in cyclomatic complexity from 10 to 13, threshold = 9
| RoutingKey topic, | ||
| TopicFindBy topicFindBy, | ||
| SnsAttributes? attributes, | ||
| OnMissingChannel makeTopic = OnMissingChannel.Create, |
There was a problem hiding this comment.
❌ New issue: Code Duplication
The module contains 2 functions with similar structure: EnsureQueueAsync,EnsureTopicAsync
| private async Task<string> CreateQueueAsync( | ||
| string queueName, | ||
| SqsAttributes? sqsAttributes, | ||
| OnMissingChannel makeChannel, | ||
| CancellationToken cancellationToken) |
There was a problem hiding this comment.
❌ New issue: Complex Method
CreateQueueAsync has a cyclomatic complexity of 17, threshold = 9
| private async Task<string> CreateDeadLetterQueueAsync( | ||
| SqsAttributes sqsAttributes, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| using var sqsClient = _awsClientFactory.CreateSqsClient(); | ||
|
|
||
| var queueName = sqsAttributes.RedrivePolicy!.DeadlLetterQueueName; | ||
|
|
||
| var tags = new Dictionary<string, string> { { "Source", "Brighter" } }; | ||
| var attributes = new Dictionary<string, string?>(); | ||
| if (sqsAttributes.Type == SnsSqsType.Fifo) | ||
| { | ||
| queueName = queueName.ToValidSQSQueueName(true); | ||
|
|
||
| attributes.Add(QueueAttributeName.FifoQueue, "true"); | ||
| if (sqsAttributes.ContentBasedDeduplication) | ||
| { | ||
| attributes.Add(QueueAttributeName.ContentBasedDeduplication, "true"); | ||
| } | ||
|
|
||
| if (sqsAttributes is { DeduplicationScope: not null, FifoThroughputLimit: not null }) | ||
| { | ||
| attributes.Add(QueueAttributeName.FifoThroughputLimit, | ||
| sqsAttributes.FifoThroughputLimit.Value.ToString()); | ||
| attributes.Add(QueueAttributeName.DeduplicationScope, sqsAttributes.DeduplicationScope switch | ||
| { | ||
| DeduplicationScope.MessageGroup => "messageGroup", | ||
| _ => "queue" | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| string queueUrl; | ||
|
|
||
| try | ||
| { | ||
| var request = new CreateQueueRequest(queueName) { Attributes = attributes, Tags = tags }; | ||
| // create queue is idempotent, so safe to call even if queue already exists | ||
| var response = await sqsClient.CreateQueueAsync(request, cancellationToken); | ||
|
|
||
| queueUrl = response.QueueUrl ?? throw new InvalidOperationException( | ||
| $"Could not find create DLQ, status: {response.HttpStatusCode}"); | ||
| } | ||
| catch (QueueNameExistsException) | ||
| { | ||
| var response = await sqsClient.GetQueueUrlAsync(queueName, cancellationToken); | ||
| queueUrl = response.QueueUrl; | ||
| } | ||
|
|
||
| var attributesResponse = await sqsClient.GetQueueAttributesAsync( | ||
| new GetQueueAttributesRequest { QueueUrl = queueUrl, AttributeNames = [QueueAttributeName.QueueArn] }, | ||
| cancellationToken); | ||
|
|
||
| if (attributesResponse.HttpStatusCode != HttpStatusCode.OK) | ||
| { | ||
| throw new InvalidOperationException( | ||
| $"Could not find ARN of DLQ, status: {attributesResponse.HttpStatusCode}"); | ||
| } | ||
|
|
||
| return attributesResponse.QueueARN; | ||
| } |
There was a problem hiding this comment.
❌ New issue: Complex Method
CreateDeadLetterQueueAsync has a cyclomatic complexity of 10, threshold = 9
| private async Task<string> CreateQueueAsync( | ||
| string queueName, | ||
| SqsAttributes? sqsAttributes, | ||
| OnMissingChannel makeChannel, | ||
| CancellationToken cancellationToken) |
There was a problem hiding this comment.
❌ New issue: Bumpy Road Ahead
CreateQueueAsync has 3 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function
| private async Task SubscribeToTopicAsync( | ||
| string topicArn, | ||
| string queueUrl, | ||
| SqsAttributes? sqsAttributes, | ||
| AmazonSQSClient sqsClient, | ||
| AmazonSimpleNotificationServiceClient snsClient) | ||
| { | ||
| var arn = await snsClient.SubscribeQueueAsync(topicArn, sqsClient, queueUrl); | ||
| if (string.IsNullOrEmpty(arn)) | ||
| { | ||
| throw new InvalidOperationException( | ||
| $"Could not subscribe to topic: {topicArn} from queue: {queueUrl} in region {AwsConnection.Region}"); | ||
| } | ||
|
|
||
| var response = await snsClient.SetSubscriptionAttributesAsync( | ||
| new SetSubscriptionAttributesRequest(arn, | ||
| "RawMessageDelivery", | ||
| sqsAttributes?.RawMessageDelivery.ToString()) | ||
| ); | ||
|
|
||
| if (response.HttpStatusCode != HttpStatusCode.OK) | ||
| { | ||
| throw new InvalidOperationException("Unable to set subscription attribute for raw message delivery"); | ||
| } | ||
| } |
There was a problem hiding this comment.
ℹ New issue: Excess Number of Function Arguments
SubscribeToTopicAsync has 5 arguments, threshold = 4
iancooper
left a comment
There was a problem hiding this comment.
There is a lot here, and it is a great addition. A couple of minor tweaks, which are more o do with style over anything else.
|
|
||
| var arnElements = s!.Split(':'); | ||
| var topic = arnElements[(int)ARNAmazonSNS.TopicName]; | ||
| var topic = topicArn.GetValueInString() ?? string.Empty; |
There was a problem hiding this comment.
The earlier version reduces nesting by returning early, which is generally preferable
| } | ||
| } | ||
|
|
||
| var messageAttributes = new Dictionary<string, MessageAttributeValue> |
There was a problem hiding this comment.
Typically we factor this into a publisher, to keep the producer clean of all the header manipulation
| public SqsSubscription( | ||
| SubscriptionName? name = null, | ||
| ChannelName? channelName = null, | ||
| RoutingKey? routingKey = null, | ||
| int bufferSize = 1, | ||
| int noOfPerformers = 1, | ||
| TimeSpan? timeOut = null, | ||
| int requeueCount = -1, | ||
| TimeSpan? requeueDelay = null, | ||
| int unacceptableMessageLimit = 0, | ||
| MessagePumpType messagePumpType = MessagePumpType.Proactor, | ||
| IAmAChannelFactory? channelFactory = null, | ||
| int lockTimeout = 10, | ||
| int delaySeconds = 0, | ||
| int messageRetentionPeriod = 345600, | ||
| TopicFindBy findTopicBy = TopicFindBy.Name, | ||
| string? iAmPolicy = null, | ||
| RedrivePolicy? redrivePolicy = null, | ||
| SnsAttributes? snsAttributes = null, | ||
| Dictionary<string, string>? tags = null, | ||
| OnMissingChannel makeChannels = OnMissingChannel.Create, | ||
| bool rawMessageDelivery = true, | ||
| TimeSpan? emptyChannelDelay = null, | ||
| TimeSpan? channelFailureDelay = null, | ||
| SnsSqsType sqsType = SnsSqsType.Standard, | ||
| bool contentBasedDeduplication = true, | ||
| DeduplicationScope? deduplicationScope = null, | ||
| int? fifoThroughputLimit = null, | ||
| ChannelType channelType = ChannelType.PubSub, | ||
| QueueFindBy findQueueBy = QueueFindBy.Name | ||
| ) | ||
| : base(typeof(T), name, channelName, routingKey, bufferSize, noOfPerformers, timeOut, requeueCount, | ||
| requeueDelay, | ||
| unacceptableMessageLimit, messagePumpType, channelFactory, lockTimeout, delaySeconds, | ||
| messageRetentionPeriod, findTopicBy, | ||
| iAmPolicy, redrivePolicy, snsAttributes, tags, makeChannels, rawMessageDelivery, emptyChannelDelay, | ||
| channelFailureDelay, sqsType, contentBasedDeduplication, deduplicationScope, fifoThroughputLimit, | ||
| channelType, findQueueBy) |
There was a problem hiding this comment.
ℹ Getting worse: Constructor Over-Injection
SqsSubscription increases from 23 to 29 arguments, threshold = 5
| public SqsSubscription( | ||
| Type dataType, | ||
| SubscriptionName? name = null, | ||
| ChannelName? channelName = null, | ||
| RoutingKey? routingKey = null, | ||
| int bufferSize = 1, | ||
| int noOfPerformers = 1, | ||
| TimeSpan? timeOut = null, | ||
| int requeueCount = -1, | ||
| TimeSpan? requeueDelay = null, | ||
| int unacceptableMessageLimit = 0, | ||
| MessagePumpType messagePumpType = MessagePumpType.Unknown, | ||
| IAmAChannelFactory? channelFactory = null, | ||
| int lockTimeout = 10, | ||
| int delaySeconds = 0, | ||
| int messageRetentionPeriod = 345600, | ||
| TopicFindBy findTopicBy = TopicFindBy.Name, | ||
| string? iAmPolicy = null, | ||
| RedrivePolicy? redrivePolicy = null, | ||
| SnsAttributes? snsAttributes = null, | ||
| Dictionary<string, string>? tags = null, | ||
| OnMissingChannel makeChannels = OnMissingChannel.Create, | ||
| bool rawMessageDelivery = true, | ||
| TimeSpan? emptyChannelDelay = null, | ||
| TimeSpan? channelFailureDelay = null, | ||
| SnsSqsType sqsType = SnsSqsType.Standard, | ||
| bool contentBasedDeduplication = true, | ||
| DeduplicationScope? deduplicationScope = null, | ||
| int? fifoThroughputLimit = null, | ||
| ChannelType channelType = ChannelType.PubSub, | ||
| QueueFindBy findQueueBy = QueueFindBy.Name | ||
| ) | ||
| : base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeOut, requeueCount, | ||
| requeueDelay, unacceptableMessageLimit, messagePumpType, channelFactory, makeChannels, emptyChannelDelay, | ||
| channelFailureDelay) |
There was a problem hiding this comment.
ℹ Getting worse: Constructor Over-Injection
SqsSubscription increases from 24 to 30 arguments, threshold = 5
| public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken) | ||
| { | ||
| var request = new SendMessageRequest | ||
| { | ||
| QueueUrl = _queueUrl, | ||
| MessageBody = message.Body.Value | ||
| }; | ||
|
|
||
| delay ??= TimeSpan.Zero; | ||
| if (delay > TimeSpan.Zero) | ||
| { | ||
| // SQS has a hard limit of 15min for Delay in Seconds | ||
| if (delay.Value > s_maxDelay) | ||
| { | ||
| delay = s_maxDelay; | ||
| s_logger.LogWarning("Set delay from {CurrentDelay} to 15min (SQS support up to 15min)", delay); | ||
| } | ||
|
|
||
| request.DelaySeconds = (int)delay.Value.TotalSeconds; | ||
| } | ||
|
|
||
| if (_queueType == SnsSqsType.Fifo) | ||
| { | ||
| request.MessageGroupId = message.Header.PartitionKey; | ||
| if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId)) | ||
| { | ||
| request.MessageDeduplicationId = (string)deduplicationId; | ||
| } | ||
| } | ||
|
|
||
| var messageAttributes = new Dictionary<string, MessageAttributeValue> | ||
| { | ||
| [HeaderNames.Id] = | ||
| new() { StringValue = message.Header.MessageId, DataType = "String" }, | ||
| [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = "String" }, | ||
| [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = "String" }, | ||
| [HeaderNames.CorrelationId] = | ||
| new() { StringValue = message.Header.CorrelationId, DataType = "String" }, | ||
| [HeaderNames.HandledCount] = | ||
| new() { StringValue = Convert.ToString(message.Header.HandledCount), DataType = "String" }, | ||
| [HeaderNames.MessageType] = | ||
| new() { StringValue = message.Header.MessageType.ToString(), DataType = "String" }, | ||
| [HeaderNames.Timestamp] = new() | ||
| { | ||
| StringValue = Convert.ToString(message.Header.TimeStamp), DataType = "String" | ||
| } | ||
| }; | ||
|
|
||
| if (!string.IsNullOrEmpty(message.Header.ReplyTo)) | ||
| { | ||
| messageAttributes.Add(HeaderNames.ReplyTo, | ||
| new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = "String" }); | ||
| } | ||
|
|
||
| if (!string.IsNullOrEmpty(message.Header.Subject)) | ||
| { | ||
| messageAttributes.Add(HeaderNames.Subject, | ||
| new MessageAttributeValue { StringValue = message.Header.Subject, DataType = "String" }); | ||
| } | ||
|
|
||
| // we can set up to 10 attributes; we have set 6 above, so use a single JSON object as the bag | ||
| var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); | ||
| messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = "String" }; | ||
| request.MessageAttributes = messageAttributes; | ||
|
|
||
| var response = await _client.SendMessageAsync(request, cancellationToken); | ||
| if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created | ||
| or HttpStatusCode.Accepted) | ||
| { | ||
| return response.MessageId; | ||
| } | ||
|
|
||
| return null; | ||
| } |
There was a problem hiding this comment.
❌ New issue: Complex Method
SendAsync has a cyclomatic complexity of 9, threshold = 9
| public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken) | ||
| { | ||
| var request = new SendMessageRequest | ||
| { | ||
| QueueUrl = _queueUrl, | ||
| MessageBody = message.Body.Value | ||
| }; | ||
|
|
||
| delay ??= TimeSpan.Zero; | ||
| if (delay > TimeSpan.Zero) | ||
| { | ||
| // SQS has a hard limit of 15min for Delay in Seconds | ||
| if (delay.Value > s_maxDelay) | ||
| { | ||
| delay = s_maxDelay; | ||
| s_logger.LogWarning("Set delay from {CurrentDelay} to 15min (SQS support up to 15min)", delay); | ||
| } | ||
|
|
||
| request.DelaySeconds = (int)delay.Value.TotalSeconds; | ||
| } | ||
|
|
||
| if (_queueType == SnsSqsType.Fifo) | ||
| { | ||
| request.MessageGroupId = message.Header.PartitionKey; | ||
| if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId)) | ||
| { | ||
| request.MessageDeduplicationId = (string)deduplicationId; | ||
| } | ||
| } | ||
|
|
||
| var messageAttributes = new Dictionary<string, MessageAttributeValue> | ||
| { | ||
| [HeaderNames.Id] = | ||
| new() { StringValue = message.Header.MessageId, DataType = "String" }, | ||
| [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = "String" }, | ||
| [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = "String" }, | ||
| [HeaderNames.CorrelationId] = | ||
| new() { StringValue = message.Header.CorrelationId, DataType = "String" }, | ||
| [HeaderNames.HandledCount] = | ||
| new() { StringValue = Convert.ToString(message.Header.HandledCount), DataType = "String" }, | ||
| [HeaderNames.MessageType] = | ||
| new() { StringValue = message.Header.MessageType.ToString(), DataType = "String" }, | ||
| [HeaderNames.Timestamp] = new() | ||
| { | ||
| StringValue = Convert.ToString(message.Header.TimeStamp), DataType = "String" | ||
| } | ||
| }; | ||
|
|
||
| if (!string.IsNullOrEmpty(message.Header.ReplyTo)) | ||
| { | ||
| messageAttributes.Add(HeaderNames.ReplyTo, | ||
| new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = "String" }); | ||
| } | ||
|
|
||
| if (!string.IsNullOrEmpty(message.Header.Subject)) | ||
| { | ||
| messageAttributes.Add(HeaderNames.Subject, | ||
| new MessageAttributeValue { StringValue = message.Header.Subject, DataType = "String" }); | ||
| } | ||
|
|
||
| // we can set up to 10 attributes; we have set 6 above, so use a single JSON object as the bag | ||
| var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); | ||
| messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = "String" }; | ||
| request.MessageAttributes = messageAttributes; | ||
|
|
||
| var response = await _client.SendMessageAsync(request, cancellationToken); | ||
| if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created | ||
| or HttpStatusCode.Accepted) | ||
| { | ||
| return response.MessageId; | ||
| } | ||
|
|
||
| return null; | ||
| } |
There was a problem hiding this comment.
❌ New issue: Bumpy Road Ahead
SendAsync has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function
|
Thanks @lillo42 |
* BrighterCommandGH-1294 Add support to FIFO * BrighterCommandGH-1294 fixes build * BrighterCommandGH-1294 Add Fifo test * BrighterCommandGH-1294 fixes unit test * BrighterCommandGH-1294 Improve support to LocalStack * BrighterCommandGH-1294 Use AwsFactory * BrighterCommandGH-1294 Fixes the majority of test, improve support to localstack and rename SQS to SNS * BrighterCommandGH-1294 Fixes http test * Add Fifo test * BrighterCommandGH-1294 fixes DLQ, Get System Attribute & Unit tests * BrighterCommandGH-1294 Improve Valid SQS/SNS name logic * BrighterCommandGH-1294 Improve Valid SQS/SNS name logic * Add support to SQS Publication and add FIFO tests * Fixes SQS unit tests * BrighterCommandGH-1294 Add Fifo sample * Improvements * Rename ChannelType * Set max delay
Hohohoho 🎅
How should I handle the tests? My current approach is to duplicate them. Should I continue on this path?Duplicate the test