Dynamic Request Type#3669
Conversation
There was a problem hiding this comment.
Gates Passed
4 Quality Gates Passed
See analysis details in CodeScene
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Gates Passed
4 Quality Gates Passed
See analysis details in CodeScene
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
…imic existing behaviour.
There was a problem hiding this comment.
Gates Passed
4 Quality Gates Passed
See analysis details in CodeScene
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
… request type until after the mapping
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(1 file with Bumpy Road Ahead)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
| { | ||
| Log.CreatingConsumer(s_logger, consumerNumber, subscription.Name); | ||
| var consumerFactoryType = typeof(ConsumerFactory<>).MakeGenericType(subscription.DataType); | ||
There was a problem hiding this comment.
❌ New issue: Bumpy Road Ahead
CreateConsumer has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(1 file with Bumpy Road Ahead)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
…ption and rethrow
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(1 file with Bumpy Road Ahead)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(1 file with Bumpy Road Ahead)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(1 file with Bumpy Road Ahead)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(1 file with Bumpy Road Ahead)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(5 files with Bumpy Road Ahead)
Enforce advisory code health rules
(5 files with Complex Method, Code Duplication)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| ProducerRegistry.cs | 1 advisory rule | 10.00 → 9.39 | Suppress |
| JsonMessageMapper.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| FindPublicationByPublicationTopicOrRequestType.cs | 1 advisory rule | 9.54 → 9.51 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(5 files with Bumpy Road Ahead)
Enforce advisory code health rules
(5 files with Complex Method, Code Duplication)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| ProducerRegistry.cs | 1 advisory rule | 10.00 → 9.39 | Suppress |
| JsonMessageMapper.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| FindPublicationByPublicationTopicOrRequestType.cs | 1 advisory rule | 9.54 → 9.51 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
…vents type trying to be mapped which will fail
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(5 files with Bumpy Road Ahead)
Enforce advisory code health rules
(5 files with Complex Method, Code Duplication)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| ProducerRegistry.cs | 1 advisory rule | 10.00 → 9.39 | Suppress |
| JsonMessageMapper.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| FindPublicationByPublicationTopicOrRequestType.cs | 1 advisory rule | 9.54 → 9.51 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(5 files with Bumpy Road Ahead)
Enforce advisory code health rules
(5 files with Complex Method, Code Duplication)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| ProducerRegistry.cs | 1 advisory rule | 10.00 → 9.39 | Suppress |
| JsonMessageMapper.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| FindPublicationByPublicationTopicOrRequestType.cs | 1 advisory rule | 9.54 → 9.51 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(5 files with Bumpy Road Ahead)
Enforce advisory code health rules
(5 files with Complex Method, Code Duplication)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| ProducerRegistry.cs | 1 advisory rule | 10.00 → 9.39 | Suppress |
| JsonMessageMapper.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| FindPublicationByPublicationTopicOrRequestType.cs | 1 advisory rule | 9.54 → 9.51 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(5 files with Bumpy Road Ahead)
Enforce advisory code health rules
(5 files with Complex Method, Code Duplication)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| ProducerRegistry.cs | 1 advisory rule | 10.00 → 9.39 | Suppress |
| JsonMessageMapper.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| FindPublicationByPublicationTopicOrRequestType.cs | 1 advisory rule | 9.54 → 9.51 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Pull Request Overview
This PR introduces dynamic request type determination for message pumps, replacing the generic type parameter approach with a runtime function-based strategy. This change enables channels to handle multiple message types and determine the request type dynamically from message metadata like Cloud Events type.
Key changes:
- Replaced generic type parameters on message pumps with dynamic type resolution functions
- Enhanced subscription configuration to support both datatype channels and dynamic type mapping
- Updated producer registry to handle multiple message types per topic using CloudEventsType
- Modified constructors and method signatures throughout the codebase to support the new approach
Reviewed Changes
Copilot reviewed 297 out of 412 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| Subscription.cs | Added MapRequestType function and RequestType property to support dynamic type resolution |
| ProducerRegistry.cs | Enhanced to support ProducerKey with CloudEventsType for multiple message types per topic |
| Publication.cs | Changed Type property from string to CloudEventsType |
| RequestContext.cs | Updated Topic property to Destination with ProducerKey type |
| Various test files | Updated Proactor/Reactor constructors to use dynamic type functions instead of generic parameters |
| messageMapperRegistry.RegisterAsync<MyFailingMapperEvent, FailingEventMessageMapperAsync>(); | ||
|
|
||
| _messagePump = new Proactor<MyFailingMapperEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel) | ||
| _messagePump = new ServiceActivator.Proactor(commandProcessor, (message) => typeof(FailingEventMessageMapperAsync), |
There was a problem hiding this comment.
The lambda function should return the request type, not the mapper type. It should return typeof(MyFailingMapperEvent) instead of typeof(FailingEventMessageMapperAsync).
| _messagePump = new ServiceActivator.Proactor(commandProcessor, (message) => typeof(FailingEventMessageMapperAsync), | |
| _messagePump = new ServiceActivator.Proactor(commandProcessor, (message) => typeof(MyFailingMapperEvent), |
| #endregion | ||
|
|
||
| using System; | ||
| using Microsoft.VisualStudio.TestPlatform.ObjectModel; |
There was a problem hiding this comment.
This using statement appears to be unused and unrelated to the functionality of this class. It should be removed.
| using Microsoft.VisualStudio.TestPlatform.ObjectModel; |
| //message should not be posted | ||
| Assert.False(_bus.Stream(new RoutingKey(_commandTopic)).Any()); | ||
| Assert.False(_bus.Stream(new RoutingKey(_eventTopic)).Any()); | ||
| Assert.False(_bus.Stream(_commandTopic).Any()); |
There was a problem hiding this comment.
The parameter type has changed from RoutingKey to string. This should be _bus.Stream(new RoutingKey(_commandTopic)).Any() to maintain consistency with the expected type.
| Assert.False(_bus.Stream(_commandTopic).Any()); | |
| Assert.False(_bus.Stream(new RoutingKey(_commandTopic)).Any()); |
| Assert.False(_bus.Stream(new RoutingKey(_commandTopic)).Any()); | ||
| Assert.False(_bus.Stream(new RoutingKey(_eventTopic)).Any()); | ||
| Assert.False(_bus.Stream(_commandTopic).Any()); | ||
| Assert.False(_bus.Stream(_eventTopic).Any()); |
There was a problem hiding this comment.
The parameter type has changed from RoutingKey to string. This should be _bus.Stream(new RoutingKey(_eventTopic)).Any() to maintain consistency with the expected type.
| Assert.False(_bus.Stream(_eventTopic).Any()); | |
| Assert.False(_bus.Stream(new RoutingKey(_eventTopic)).Any()); |
| { | ||
| Publication = new Publication{RequestType = typeof(MyCommand), Topic = routingKey} | ||
| } }, |
There was a problem hiding this comment.
This line appears to be duplicating the Publication assignment. The producer was already constructed with the Publication, so this assignment should be removed.
| { | |
| Publication = new Publication{RequestType = typeof(MyCommand), Topic = routingKey} | |
| } }, | |
| }, |
There was a problem hiding this comment.
Gates Failed
Enforce critical code health rules
(5 files with Bumpy Road Ahead)
Enforce advisory code health rules
(5 files with Complex Method, Code Duplication)
Gates Passed
2 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
| Enforce critical code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SnsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| SqsMessageProducerFactory.cs | 1 critical rule | 10.00 → 9.69 | Suppress |
| Dispatcher.cs | 1 critical rule | 8.68 → 8.55 | Suppress |
| Enforce advisory code health rules | Violations | Code Health Impact | |
|---|---|---|---|
| ProducerRegistry.cs | 1 advisory rule | 10.00 → 9.39 | Suppress |
| JsonMessageMapper.cs | 1 advisory rule | 10.00 → 9.69 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| MessageItem.cs | 1 advisory rule | 9.34 → 9.31 | Suppress |
| FindPublicationByPublicationTopicOrRequestType.cs | 1 advisory rule | 9.54 → 9.51 | Suppress |
Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
* fix: numbering of ADRs is out. * Adding a dynamic approach to determining the request type * feat: roll dynamic message types out to Azure and SQS * feat: make the dynamic request type change to RedisSubscription * feat: update Kafka for the new dynamic type * feat: make MsSqlSubscriptions dynamic * feat: add support for dynamid routing to SQS * feat: roll out dynamic routing to request, this version should just mimic existing behaviour. * fix: dispatch needs to use late binding because we no longer know the request type until after the mapping * fix: signature of invoke and dispatch when copied from Proactor incorrect * fix: where we invoke via reflection we need to catch the wrapped exception and rethrow * feat: make type optional, as may not be needed if using dynamic mapping * feat: cache pipeline requests * feat: add dispatch method caching * feat: tests a dynamic message flowing out to a command processor * feat: add tests for async dynamic message mapping * feat: <breaks build> adding sample for kakfa topic with multiple event types * feat: add kafka sample showing multiple events on same topic * chore: fix build issues * feat: add dynamic samples with cloud events registration and routing * chore: remove spurious project reference * chore: fix sln issues * chore: provide some comments to help with debugging here * feat: remove primitive obsession for CloudEventsType * feat: move publication to inmemoryproducer constructor * fix: add publicaion with routing key to producer * fix: breaking tests following changes to publication in inmemoryproducer * Move finder to use the ProducerKey * fix: issues with how we do lookup not including cloud events type * fix: ensure that we can disambiguate producers by cloud events key * fix: move some producer registration to the new producer key as they use cloud event type * fix: add default cloud events properties to json mapper; ensure cloud events key is passed through; tests for failing paths * fix: need to scan assemblies for handlers * fix: make the sample a little easier to understand * chore: merge issues * chore: fix merge issues * fix: don't need schema registry for default json mapper * fix: need to use value to force write of string not CloudEventsType * fix: missing mapping of value of cloud events type leading to cloud events type trying to be mapped which will fail * chore: resolve typos * chore: fix typos
Previously we have used a generic parameter on the MessagePump types (Reactor and Proactor) to determine the type that a message maps to. This limits our ability to determine that from the message itself, i.e. the Cloud Events type, and prevents us from supporting channels with multiple message types. This change supports those scenarios. It supports both consuming - we need to dynamically determine the how to map the consumed message to a derived type from
IRequestand producing - we need to allow multiple publications on the same topic - and determine which publication to use based onIRequest, but disambiguate by cloud events type if there is more than one match.