[Feature] Add support to Apache RocketMQ#3650
Conversation
# Conflicts: # Brighter.sln # Directory.Packages.props
# Conflicts: # Brighter.sln # Directory.Packages.props
There was a problem hiding this comment.
Gates Failed
Enforce advisory code health rules
(2 files with Complex Method, Constructor Over-Injection)
Gates Passed
3 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| RocketMqMessageProducer.cs | 1 advisory rule | 10.00 → 9.45 | Suppress |
| RocketMqSubscription.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
| public RocketMqSubscription(SubscriptionName? subscriptionName = null, ChannelName? channelName = null, | ||
| RoutingKey? routingKey = null, string? consumerGroup = null, int bufferSize = 1, int noOfPerformers = 1, | ||
| TimeSpan? timeOut = null, int requeueCount = -1, TimeSpan? requeueDelay = null, | ||
| int unacceptableMessageLimit = 0, MessagePumpType messagePumpType = MessagePumpType.Unknown, | ||
| IAmAChannelFactory? channelFactory = null, OnMissingChannel makeChannels = OnMissingChannel.Create, | ||
| FilterExpression? filter = null, TimeSpan? emptyChannelDelay = null, | ||
| TimeSpan? channelFailureDelay = null, TimeSpan? receiveMessageTimeout = null, | ||
| TimeSpan? invisibilityTimeout = null) : base(typeof(T), subscriptionName, channelName, routingKey, consumerGroup, bufferSize, | ||
| noOfPerformers, timeOut, requeueCount, requeueDelay, unacceptableMessageLimit, messagePumpType, channelFactory, | ||
| makeChannels, filter, emptyChannelDelay, channelFailureDelay, receiveMessageTimeout, | ||
| invisibilityTimeout) | ||
| { | ||
| } |
There was a problem hiding this comment.
❌ New issue: Constructor Over-Injection
RocketMqSubscription has 18 arguments, threshold = 5
There was a problem hiding this comment.
Gates Failed
Enforce advisory code health rules
(2 files with Complex Method, Constructor Over-Injection)
Gates Passed
3 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| RocketMqMessageProducer.cs | 1 advisory rule | 10.00 → 9.45 | Suppress |
| RocketMqSubscription.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
| public RocketSubscription(Type dataType, | ||
| SubscriptionName? subscriptionName = null, | ||
| ChannelName? channelName = null, | ||
| RoutingKey? routingKey = null, | ||
| string? consumerGroup = null, | ||
| int bufferSize = 1, | ||
| int noOfPerformers = 1, | ||
| TimeSpan? timeOut = null, | ||
| int requeueCount = -1, | ||
| TimeSpan? requeueDelay = null, | ||
| int unacceptableMessageLimit = 0, | ||
| MessagePumpType messagePumpType = MessagePumpType.Unknown, | ||
| IAmAChannelFactory? channelFactory = null, | ||
| OnMissingChannel makeChannels = OnMissingChannel.Create, | ||
| FilterExpression? filter = null, | ||
| TimeSpan? emptyChannelDelay = null, | ||
| TimeSpan? channelFailureDelay = null, | ||
| TimeSpan? receiveMessageTimeout = null, | ||
| TimeSpan? invisibilityTimeout = null) : base(dataType, subscriptionName, channelName, routingKey, bufferSize, | ||
| noOfPerformers, timeOut, requeueCount, requeueDelay, unacceptableMessageLimit, messagePumpType, channelFactory, | ||
| makeChannels, emptyChannelDelay, channelFailureDelay) | ||
| { | ||
| ConsumerGroup = consumerGroup ?? string.Empty; | ||
| ReceiveMessageTimeout = receiveMessageTimeout ?? TimeSpan.FromMinutes(1); | ||
| InvisibilityTimeout = invisibilityTimeout ?? TimeSpan.FromSeconds(30); | ||
| Filter = filter ?? new FilterExpression("*"); | ||
| } |
There was a problem hiding this comment.
❌ New issue: Constructor Over-Injection
RocketSubscription has 19 arguments, threshold = 5
There was a problem hiding this comment.
Gates Failed
Enforce advisory code health rules
(2 files with Complex Method, Constructor Over-Injection)
Gates Passed
3 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| RocketMqMessageProducer.cs | 1 advisory rule | 10.00 → 9.47 | Suppress |
| RocketMqSubscription.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Pull Request Overview
Adds support for Apache RocketMQ by introducing new gateway implementations, extensive test coverage, and environment setup.
- Introduces RocketMQ-specific connection, subscription, producer, and consumer classes under
src/Paramore.Brighter.MessagingGateway.RocketMQ. - Provides synchronous and asynchronous end-to-end tests for RocketMQ integration in
tests/Paramore.Brighter.RocketMQ.Tests. - Updates solution, package references, and adds a Docker Compose file for local RocketMQ services.
Reviewed Changes
Copilot reviewed 39 out of 39 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/.../Utils/GatewayFactory.cs | Test utility for creating RocketMQ connections, producers, consumers |
| tests/.../TestDoubles/*.cs | Test doubles for message mappers, handlers, and commands |
| src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqSubscription.cs | Subscription configuration classes for RocketMQ |
| src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs | RocketMQ message producer implementation |
| src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMessageConsumer.cs | RocketMQ message consumer implementation |
| src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqChannelFactory.cs | Factory for creating sync/async channels |
| src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMessagingGatewayConnection.cs | RocketMQ connection configuration |
| docker-compose-rocketmq.yaml | Docker Compose setup for RocketMQ Nameserver, Broker, Proxy, Dashboard |
| Directory.Packages.props | Added RocketMQ.Client package version |
| Brighter.sln | Included the RocketMQ projects in the solution |
Comments suppressed due to low confidence (1)
src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqSubscription.cs:10
- The class name RocketSubscription does not match the file name RocketMqSubscription.cs; consider renaming the class to RocketMqSubscription for consistency.
public class RocketSubscription : Subscription
|
|
||
| if (!string.IsNullOrEmpty(message.Header.Subject)) | ||
| { | ||
| builder.AddProperty(HeaderNames.Subject, message.Header.Subject); |
There was a problem hiding this comment.
The Subject header is added twice; remove the duplicate AddProperty call for Subject to avoid redundant entries.
# Conflicts: # Brighter.sln
There was a problem hiding this comment.
Gates Failed
Enforce advisory code health rules
(2 files with Complex Method, Constructor Over-Injection)
Gates Passed
3 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| RocketMqMessageProducer.cs | 1 advisory rule | 10.00 → 9.50 | Suppress |
| RocketMqSubscription.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
| public async Task SendWithDelayAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken = default) | ||
| { | ||
| var builder = new Org.Apache.Rocketmq.Message.Builder() | ||
| .SetBody(message.Body.Bytes) | ||
| .SetTopic(mqPublication.Topic!.Value); | ||
|
|
||
| builder.AddProperty(HeaderNames.MessageId, message.Id) | ||
| .AddProperty(HeaderNames.Topic, message.Header.Topic.Value) | ||
| .AddProperty(HeaderNames.HandledCount, message.Header.HandledCount.ToString()) | ||
| .AddProperty(HeaderNames.MessageType, message.Header.MessageType.ToString()) | ||
| .AddProperty(HeaderNames.TimeStamp, message.Header.TimeStamp.ToRfc3339()) | ||
| .AddProperty(HeaderNames.Source, message.Header.Source.ToString()) | ||
| .AddProperty(HeaderNames.SpecVersion, message.Header.SpecVersion) | ||
| .AddProperty(HeaderNames.Type, message.Header.Type); | ||
|
|
||
| if (!string.IsNullOrEmpty(message.Header.Subject)) | ||
| { | ||
| builder.AddProperty(HeaderNames.Subject, message.Header.Subject); | ||
| } | ||
|
|
||
| if (message.Header.DataSchema != null) | ||
| { | ||
| builder.AddProperty(HeaderNames.DataSchema, message.Header.DataSchema.ToString()); | ||
| } | ||
|
|
||
| if (!string.IsNullOrEmpty(message.Header.Type)) | ||
| { | ||
| builder.AddProperty(HeaderNames.Type, message.Header.Type); | ||
| } | ||
|
|
||
| builder.AddProperty(HeaderNames.ContentType, message.Header.ContentType.ToString()); | ||
| builder.AddProperty(HeaderNames.DataContentType, message.Header.ContentType.ToString()); | ||
|
|
||
| if (!string.IsNullOrEmpty(message.Header.CorrelationId)) | ||
| { | ||
| builder.AddProperty(HeaderNames.CorrelationId, message.Header.CorrelationId); | ||
| } | ||
|
|
||
| if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo)) | ||
| { | ||
| builder.AddProperty(HeaderNames.ReplyTo, message.Header.ReplyTo); | ||
| } | ||
|
|
||
| if (!string.IsNullOrEmpty(message.Header.DataRef)) | ||
| { | ||
| builder.AddProperty(HeaderNames.DataRef, message.Header.DataRef); | ||
| } | ||
|
|
||
| if (delay.HasValue && delay.Value != TimeSpan.Zero) | ||
| { | ||
| builder | ||
| .SetDeliveryTimestamp(connection.TimerProvider.GetUtcNow().Add(delay.Value).UtcDateTime); | ||
| } | ||
|
|
||
| if (!PartitionKey.IsNullOrEmpty(message.Header.PartitionKey)) | ||
| { | ||
| builder.SetMessageGroup(message.Header.PartitionKey); | ||
| } | ||
|
|
||
| foreach (var (key, val) in message.Header.Bag | ||
| .Where(x => x.Key != HeaderNames.Keys && x.Key != HeaderNames.Tag)) | ||
| { | ||
| builder.AddProperty(key, val.ToString()); | ||
| } | ||
|
|
||
| if (message.Header.Bag.TryGetValue(HeaderNames.Keys, out var keys)) | ||
| { | ||
| if (keys is string[] keysArray) | ||
| { | ||
| builder.SetKeys(keysArray); | ||
| } | ||
| else if (keys is string keyString) | ||
| { | ||
| builder.SetKeys(keyString); | ||
| } | ||
| } | ||
| else | ||
| { | ||
| builder.SetKeys(message.Id); | ||
| } | ||
|
|
||
| if (message.Header.Bag.TryGetValue(HeaderNames.Tag, out var tag) && tag is string tagString) | ||
| { | ||
| builder.SetTag(tagString); | ||
| } | ||
| else if (!string.IsNullOrEmpty(mqPublication.Tag)) | ||
| { | ||
| builder.SetTag(mqPublication.Tag); | ||
| } | ||
|
|
||
| await producer.Send(builder.Build()); | ||
| BrighterTracer.WriteProducerEvent(Span, MessagingSystem.RocketMQ, message, instrumentation); | ||
| } |
There was a problem hiding this comment.
❌ New issue: Complex Method
SendWithDelayAsync has a cyclomatic complexity of 18, threshold = 9
* Add RocketMQ * Fixes issue to build * Init test * Improve test * Add unit test rocketmq * add unit for rocketmq * fix change ChangeInvisibleDuration * fix: Add fragile tag * fix: improve consumer group * fix: add missing file * feat: update csproj and remove content type null check * feat: Code Review remove duplicated property * set ChannelFactoryType for RocketMQ subscription * Add trace
Add support to Apache RocketMQ