diff --git a/src/Paramore.Brighter.Transformers.JustSaying/JustSayingMessageMapper.cs b/src/Paramore.Brighter.Transformers.JustSaying/JustSayingMessageMapper.cs index 8714230322..b3d73a0e8c 100644 --- a/src/Paramore.Brighter.Transformers.JustSaying/JustSayingMessageMapper.cs +++ b/src/Paramore.Brighter.Transformers.JustSaying/JustSayingMessageMapper.cs @@ -1,11 +1,14 @@ using System; +using System.Collections.Generic; using System.Net.Mime; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Paramore.Brighter.Extensions; using Paramore.Brighter.JsonConverters; using Paramore.Brighter.Transformers.JustSaying.Extensions; using Paramore.Brighter.Transformers.JustSaying.JsonConverters; +using Paramore.Brighter.Transforms.Attributes; namespace Paramore.Brighter.Transformers.JustSaying; @@ -38,6 +41,7 @@ public JustSayingMessageMapper() public IRequestContext? Context { get; set; } /// + [CloudEvents(0)] public Task MapToMessageAsync(TMessage request, Publication publication, CancellationToken cancellationToken = default) { return Task.FromResult(MapToMessage(request, publication)); @@ -50,6 +54,7 @@ public Task MapToRequestAsync(Message message, CancellationToken cance } /// + [CloudEvents(0)] public Message MapToMessage(TMessage request, Publication publication) { var messageType = request switch @@ -64,6 +69,7 @@ public Message MapToMessage(TMessage request, Publication publication) private Message JustSayingToMessage(TMessage request, MessageType messageType, Publication publication) { + var defaultHeaders = publication.DefaultHeaders ?? new Dictionary(); var justSaying = (IJustSayingRequest)request; justSaying.Id = GetId(justSaying.Id); justSaying.Conversation = GetCorrelationId(justSaying.Conversation); @@ -80,16 +86,19 @@ private Message JustSayingToMessage(TMessage request, MessageType messageType, P messageType: messageType, subject: GetSubject(publication), timeStamp: justSaying.TimeStamp, - topic: publication.Topic!), + topic: publication.Topic!, + partitionKey: Context.GetPartitionKey()) + { + Bag = defaultHeaders.Merge(Context.GetHeaders()) + }, new MessageBody( JsonSerializer.SerializeToUtf8Bytes(request, JsonSerialisationOptions.Options), s_justSaying)); - - } private Message GenericToMessage(TMessage request, MessageType messageType, Publication publication) { + var defaultHeaders = publication.DefaultHeaders ?? new Dictionary(); var doc = JsonSerializer.SerializeToNode(request, JsonSerialisationOptions.Options)!; var messageId = GetId(doc.GetId(nameof(IJustSayingRequest.Id))); var correlationId = GetCorrelationId(doc.GetId(nameof(IJustSayingRequest.Conversation))); @@ -110,7 +119,11 @@ private Message GenericToMessage(TMessage request, MessageType messageType, Publ messageType: messageType, subject: GetSubject(publication), timeStamp: timestamp, - topic: publication.Topic!), + topic: publication.Topic!, + partitionKey: Context.GetPartitionKey()) + { + Bag = defaultHeaders.Merge(Context.GetHeaders()) + }, new MessageBody( doc.ToJsonString(JsonSerialisationOptions.Options), s_justSaying)); diff --git a/src/Paramore.Brighter.Transformers.MassTransit/MassTransitMessageMapper.cs b/src/Paramore.Brighter.Transformers.MassTransit/MassTransitMessageMapper.cs index fbd0f1b0ac..c20f14990d 100644 --- a/src/Paramore.Brighter.Transformers.MassTransit/MassTransitMessageMapper.cs +++ b/src/Paramore.Brighter.Transformers.MassTransit/MassTransitMessageMapper.cs @@ -7,7 +7,9 @@ using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Paramore.Brighter.Extensions; using Paramore.Brighter.JsonConverters; +using Paramore.Brighter.Transforms.Attributes; namespace Paramore.Brighter.Transformers.MassTransit; @@ -50,6 +52,7 @@ public class MassTransitMessageMapper : IAmAMessageMapper, I public IRequestContext? Context { get; set; } /// + [CloudEvents(0)] public Task MapToMessageAsync(TMessage request, Publication publication, CancellationToken cancellationToken = default) { @@ -63,21 +66,14 @@ public Task MapToRequestAsync(Message message, CancellationToken cance } /// + [CloudEvents(0)] public virtual Message MapToMessage(TMessage request, Publication publication) { var timestamp = DateTimeOffset.UtcNow; - var bag = new Dictionary(); - if (Context is { Bag.Count: > 0 }) - { - foreach (var pair in Context.Bag) - { - if (!pair.Key.StartsWith(MassTransitHeaderNames.HeaderPrefix)) - { - bag[pair.Key] = pair.Value; - } - } - } - + + var defaultHeaders = publication.DefaultHeaders ?? new Dictionary(); + var headers = defaultHeaders.Merge(Context.GetHeaders()); + var envelop = new MassTransitMessageEnvelop { ConversationId = GetConversationId(), @@ -85,7 +81,7 @@ public virtual Message MapToMessage(TMessage request, Publication publication) DestinationAddress = GetDestinationAddress(), ExpirationTime = GetExpirationTime(), FaultAddress = GetFaultAddress(), - Headers = null, + Headers = headers!, Host = s_hostInfo, InitiatorId = GetInitiatorId(), Message = request, @@ -109,9 +105,10 @@ public virtual Message MapToMessage(TMessage request, Publication publication) _ => MessageType.MT_DOCUMENT }, timeStamp: timestamp, - topic: publication.Topic!) + topic: publication.Topic!, + partitionKey: Context.GetPartitionKey()) { - Bag = bag + Bag = headers }, new MessageBody(JsonSerializer.SerializeToUtf8Bytes(envelop, JsonSerialisationOptions.Options), MassTransitContentType) diff --git a/src/Paramore.Brighter/Extensions/DictionaryExtensions.cs b/src/Paramore.Brighter/Extensions/DictionaryExtensions.cs new file mode 100644 index 0000000000..8ef7a1007b --- /dev/null +++ b/src/Paramore.Brighter/Extensions/DictionaryExtensions.cs @@ -0,0 +1,67 @@ +using System.Collections.Generic; + +namespace Paramore.Brighter.Extensions; + +/// +/// Provides extension methods for dictionaries. +/// +public static class DictionaryExtensions +{ + /// + /// Merges two dictionaries into a new dictionary, with values from the second dictionary overwriting those from the first. + /// + /// The primary dictionary to merge (will be copied first) + /// The secondary dictionary to merge (may be null) + /// + /// A new dictionary containing the combined key-value pairs from both dictionaries. + /// When keys exist in both dictionaries, the value from takes precedence. + /// + /// + /// + /// This method performs a shallow merge where: + /// + /// All entries from are copied to the new dictionary + /// Entries from are then added or overwrite existing keys + /// Null values in will overwrite existing keys with null + /// + /// + /// + /// Special cases: + /// + /// If is null, returns a copy of + /// Original dictionaries remain unmodified + /// Performs a shallow copy (values are not cloned) + /// + /// + /// + /// Basic merge with key override: + /// + /// var dict1 = new Dictionary<string, object> { ["a"] = 1, ["b"] = 2 }; + /// var dict2 = new Dictionary<string, object> { ["b"] = 99, ["c"] = 3 }; + /// + /// var merged = dict1.Merge(dict2); + /// // Result: { "a": 1, "b": 99, "c": 3 } + /// + /// + /// Handling null merge: + /// + /// var merged = dict1.Merge(null); + /// // Returns copy of dict1 + /// + /// + /// + public static Dictionary Merge(this IDictionary dict1, + IDictionary? dict2) + { + var result = new Dictionary(dict1); + if(dict2 != null) + { + foreach (var val in dict2) + { + result[val.Key] = val.Value; + } + } + + return result; + } +} diff --git a/src/Paramore.Brighter/Extensions/RequestContextExtensions.cs b/src/Paramore.Brighter/Extensions/RequestContextExtensions.cs new file mode 100644 index 0000000000..7d08bfea95 --- /dev/null +++ b/src/Paramore.Brighter/Extensions/RequestContextExtensions.cs @@ -0,0 +1,152 @@ +using System.Collections.Generic; + +namespace Paramore.Brighter.Extensions; + +/// +/// Provides extension methods for to simplify access to common context bag values. +/// +/// +/// These extensions offer type-safe access to Brighter's reserved context bag values, +/// handling type conversion and null checks internally. They are safe to call with null contexts. +/// +public static class RequestContextExtensions +{ + /// + /// Retrieves the partition key from the request context bag. + /// + /// The request context (may be null) + /// + /// The partition key if present and valid, otherwise . + /// + /// + /// + /// Handles two valid value types in the context bag: + /// + /// : Converted to a instance + /// : Returned directly + /// + /// + /// + /// Returns for: + /// + /// Null context + /// Missing partition key entry + /// Unsupported value types + /// + /// + /// + /// Usage: + /// + /// var partitionKey = requestContext.GetPartitionKey(); + /// if (!partitionKey.IsEmpty) + /// { + /// // Use partition key + /// } + /// + /// + /// + public static PartitionKey GetPartitionKey(this IRequestContext? context) + { + if (context == null || !context.Bag.TryGetValue(RequestContextBagNames.PartitionKey, out var tmp)) + { + return PartitionKey.Empty; + } + + return tmp switch + { + string partitionKeyAsString => new PartitionKey(partitionKeyAsString), + PartitionKey partitionKey => partitionKey, + _ => PartitionKey.Empty + }; + } + + /// + /// Retrieves the dynamic headers dictionary from the request context bag. + /// + /// The request context (may be null) + /// + /// The headers dictionary if present and valid, otherwise null. + /// + /// + /// + /// The value must be stored in the context bag as a + /// where TKey is and TValue is . + /// + /// + /// Returns null for: + /// + /// Null context + /// Missing headers entry + /// Type mismatch (not Dictionary<string, object>) + /// + /// + /// + /// Usage: + /// + /// var headers = requestContext.GetHeaders(); + /// if (headers != null) + /// { + /// foreach (var header in headers) + /// { + /// // Process headers + /// } + /// } + /// + /// + /// + public static Dictionary? GetHeaders(this IRequestContext? context) + { + if (context != null + && context.Bag.TryGetValue(RequestContextBagNames.Headers, out var tmp) + && tmp is Dictionary headers) + { + return headers; + } + + return null; + } + + /// + /// Retrieves CloudEvent additional properties from the request context bag. + /// + /// The request context (may be null) + /// + /// The CloudEvent extensions dictionary if present and valid, otherwise null. + /// + /// + /// + /// The value must be stored in the context bag as a + /// where TKey is and TValue is . + /// + /// + /// Returns null for: + /// + /// Null context + /// Missing CloudEventsAdditionalProperties entry + /// Type mismatch (not Dictionary<string, object>) + /// + /// + /// + /// Usage: + /// + /// var cloudEventProps = requestContext.GetCloudEventAdditionalProperties(); + /// if (cloudEventProps != null) + /// { + /// // Add to CloudEvent extensions + /// } + /// + /// + /// + /// + public static Dictionary? GetCloudEventAdditionalProperties(this IRequestContext? context) + { + if (context != null + && context.Bag.TryGetValue(RequestContextBagNames.CloudEventsAdditionalProperties, out var tmp) + && tmp is Dictionary cloudEventAdditionalProperties) + { + return cloudEventAdditionalProperties; + } + + return null; + } +} diff --git a/src/Paramore.Brighter/IRequestContext.cs b/src/Paramore.Brighter/IRequestContext.cs index efa0976f3b..e5bd33ea98 100644 --- a/src/Paramore.Brighter/IRequestContext.cs +++ b/src/Paramore.Brighter/IRequestContext.cs @@ -23,7 +23,6 @@ THE SOFTWARE. */ #endregion using System.Collections.Concurrent; -using System.Collections.Generic; using System.Diagnostics; using Paramore.Brighter.FeatureSwitch; using Polly.Registry; @@ -63,7 +62,7 @@ public interface IRequestContext /// Gets the Feature Switches /// IAmAFeatureSwitchRegistry? FeatureSwitches { get; } - + /// /// Create a new copy of the Request Context /// diff --git a/src/Paramore.Brighter/MessageMappers/CloudEventJsonMessageMapper.cs b/src/Paramore.Brighter/MessageMappers/CloudEventJsonMessageMapper.cs index 1da40ee8ac..2116f0c23d 100644 --- a/src/Paramore.Brighter/MessageMappers/CloudEventJsonMessageMapper.cs +++ b/src/Paramore.Brighter/MessageMappers/CloudEventJsonMessageMapper.cs @@ -5,8 +5,8 @@ using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; +using Paramore.Brighter.Extensions; using Paramore.Brighter.JsonConverters; -using Paramore.Brighter.Transforms.Attributes; namespace Paramore.Brighter.MessageMappers; @@ -22,7 +22,6 @@ public class CloudEventJsonMessageMapper : IAmAMessageMapper public IRequestContext? Context { get; set; } /// - [CloudEvents(0)] public Task MapToMessageAsync(TRequest request, Publication publication, CancellationToken cancellationToken = default) => Task.FromResult(MapToMessage(request, publication)); @@ -32,10 +31,9 @@ public Task MapToRequestAsync(Message message, CancellationToken cance => Task.FromResult(MapToRequest(message)); /// - [CloudEvents(0)] public Message MapToMessage(TRequest request, Publication publication) { - MessageType messageType = request switch + var messageType = request switch { ICommand => MessageType.MT_COMMAND, IEvent => MessageType.MT_EVENT, @@ -46,24 +44,40 @@ public Message MapToMessage(TRequest request, Publication publication) { throw new ArgumentException($"No Topic Defined for {publication}"); } - + + var defaultHeaders = publication.DefaultHeaders ?? new Dictionary(); var headerContentType = new ContentType("application/cloudevents+json"); - var header = new MessageHeader(messageId: request.Id, topic: publication.Topic, messageType: messageType, contentType: headerContentType); + var header = new MessageHeader( + messageId: request.Id, + topic: publication.Topic, + messageType: messageType, + contentType: headerContentType, + partitionKey: Context.GetPartitionKey(), + source: publication.Source, + type: publication.Type, + dataSchema: publication.DataSchema, + subject: publication.Subject + ) + { + Bag = defaultHeaders.Merge(Context.GetHeaders()), + }; + #if NETSTANDARD2_0 - var bodyContentType = new ContentType("application/json"); + var bodyContentType = new ContentType("application/json"); #else var bodyContentType = new ContentType(MediaTypeNames.Application.Json); -#endif +#endif + var defaultCloudEventsAdditionalProperties = publication.CloudEventsAdditionalProperties ?? new Dictionary(); var body = new MessageBody(JsonSerializer.Serialize(new CloudEventMessage { Id = request.Id, Source = publication.Source, Type = publication.Type, - DataContentType = bodyContentType!.ToString(), + DataContentType = bodyContentType.ToString(), Subject = publication.Subject, DataSchema = publication.DataSchema, - AdditionalProperties = publication.CloudEventsAdditionalProperties, + AdditionalProperties = defaultCloudEventsAdditionalProperties.Merge(Context.GetCloudEventAdditionalProperties()), Time = DateTimeOffset.UtcNow, Data = request }, JsonSerialisationOptions.Options)); diff --git a/src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs b/src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs index c0db62a7e0..015e55edbd 100644 --- a/src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs +++ b/src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs @@ -1,8 +1,10 @@ using System; +using System.Collections.Generic; using System.Net.Mime; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Paramore.Brighter.Extensions; using Paramore.Brighter.JsonConverters; using Paramore.Brighter.Transforms.Attributes; @@ -42,12 +44,22 @@ public Message MapToMessage(TRequest request, Publication publication) if(publication.Topic is null) throw new ArgumentException($"No Topic Defined for {publication}"); - - #if NETSTANDARD2_0 - var header = new MessageHeader(messageId: request.Id, topic: publication.Topic, messageType: messageType, contentType: new ContentType("application/json")); - #else - var header = new MessageHeader(messageId: request.Id, topic: publication.Topic, messageType: messageType, contentType: new ContentType(MediaTypeNames.Application.Json)); + + var defaultHeaders = publication.DefaultHeaders ?? new Dictionary(); + var header = new MessageHeader( + messageId: request.Id, + topic: publication.Topic, + messageType: messageType, + partitionKey: Context.GetPartitionKey(), +#if NETSTANDARD2_0 + contentType: new ContentType("application/json") +#else + contentType: new ContentType(MediaTypeNames.Application.Json) #endif + ) + { + Bag = defaultHeaders.Merge(Context.GetHeaders()) + }; var body = new MessageBody(JsonSerializer.Serialize(request, JsonSerialisationOptions.Options)); var message = new Message(header, body); diff --git a/src/Paramore.Brighter/Publication.cs b/src/Paramore.Brighter/Publication.cs index cea79b7a06..b9c34e5f17 100644 --- a/src/Paramore.Brighter/Publication.cs +++ b/src/Paramore.Brighter/Publication.cs @@ -95,6 +95,41 @@ public class Publication /// public string Type { get; set; } = "goparamore.io.Paramore.Brighter.Message"; + /// + /// Gets or sets the default headers to be included in published messages when using default message mappers. + /// + /// + /// These headers will be automatically added to all messages published through Brighter's message producers. + /// + /// Default message mappers will use these headers when constructing the outgoing message envelope. + /// + /// + /// Headers should be structured as key-value pairs where: + /// + /// Key: Header name (string) + /// Value: Header value (object) + /// + /// + /// + /// Setting default headers: + /// + /// publication.DefaultHeaders = new Dictionary<string, object> + /// { + /// ["x-correlation-id"] = Guid.NewGuid(), + /// ["x-message-type"] = typeof(MyEvent).FullName + /// }; + /// + /// + /// + /// If no default headers are required, this property can be left as null. + /// + /// + /// Note: These headers are only applied when using Brighter's default message mapping pipeline. + /// Custom mappers may ignore this property. + /// + /// + public IDictionary? DefaultHeaders { get; set; } + /// /// Gets or sets a dictionary of additional properties related to CloudEvents. /// This property enables the inclusion of custom or vendor-specific metadata beyond the standard CloudEvents attributes. @@ -109,10 +144,15 @@ public class Publication /// During serialization to a CloudEvent JSON structure, the key-value pairs within this dictionary are added as top-level properties in the resulting JSON. /// This mechanism facilitates forward compatibility and allows for seamless extensibility in accordance with the CloudEvents specification and its extensions. /// - /// This property is utilized by the for mapping and by the for transforming messages into CloudEvents. + /// This property is utilized by the + /// for mapping and by the for + /// transforming messages into CloudEvents. /// /// - /// **Important:** If any key in this dictionary conflicts with the name of a standard CloudEvents JSON property (e.g., "id", "source", "type"), the serializer (System.Text.Json) will prioritize the value present in this CloudEventsAdditionalProperties dictionary, effectively overriding the standard property's value during serialization. Exercise caution to avoid unintended overwrites of core CloudEvents attributes. + /// **Important:** If any key in this dictionary conflicts with the name of a standard CloudEvents JSON property (e.g., "id", "source", "type"), + /// the serializer (System.Text.Json) will prioritize the value present in this CloudEventsAdditionalProperties dictionary, + /// effectively overriding the standard property's value during serialization. Exercise caution to avoid unintended + /// overwrites of core CloudEvents attributes. /// /// public IDictionary? CloudEventsAdditionalProperties { get; set; } diff --git a/src/Paramore.Brighter/RequestContext.cs b/src/Paramore.Brighter/RequestContext.cs index 165b7c77f2..c881f5a7dd 100644 --- a/src/Paramore.Brighter/RequestContext.cs +++ b/src/Paramore.Brighter/RequestContext.cs @@ -27,6 +27,7 @@ THE SOFTWARE. */ using System.Collections.Generic; using System.Diagnostics; using Paramore.Brighter.FeatureSwitch; +using Paramore.Brighter.Observability; using Polly.Registry; namespace Paramore.Brighter @@ -45,7 +46,7 @@ public RequestContext() { } private RequestContext(ConcurrentDictionary bag) { - Bag = bag; + Bag = new ConcurrentDictionary(bag); } /// @@ -72,7 +73,7 @@ public IRequestContext CreateCopy() Span = Span, Policies = Policies, FeatureSwitches = FeatureSwitches, - OriginatingMessage = OriginatingMessage + OriginatingMessage = OriginatingMessage, }; /// diff --git a/src/Paramore.Brighter/RequestContextBagNames.cs b/src/Paramore.Brighter/RequestContextBagNames.cs new file mode 100644 index 0000000000..52f71408c5 --- /dev/null +++ b/src/Paramore.Brighter/RequestContextBagNames.cs @@ -0,0 +1,120 @@ +using System.Collections.Generic; + +namespace Paramore.Brighter; + +/// +/// Contains well-known keys used for accessing values in Brighter's internal request context bag. +/// +/// +/// These constants represent reserved names used within Brighter's message processing pipeline to store +/// and retrieve specific contextual information during message handling. +/// +public static class RequestContextBagNames +{ + + /// + /// Key used to store additional extension properties for CloudEvents in the request context bag. + /// + /// + /// This reserved name is used by Brighter's default CloudEvent mapper to handle custom CloudEvent extensions. + /// + /// Important: The value associated with this key in the context bag must be of type + /// where TKey is and TValue is . + /// + /// + /// Additional properties can be provided through: + /// + /// Programmatic configuration of the message mapper + /// Declarative [CloudEvent] attributes applied to message types + /// + /// + /// + /// These properties will be serialized as CloudEvent extensions in the generated message envelope. + /// + /// + /// Setting additional properties via context bag: + /// + /// var additionalProps = new Dictionary<string, object> + /// { + /// ["myextension"] = "value", + /// ["numericExtension"] = 42 + /// }; + /// + /// requestContext.Bag[RequestContextBagNames.CloudEventsAdditionalProperties] = additionalProps; + /// + /// + /// + public const string CloudEventsAdditionalProperties = "Brighter-CloudEvents-AdditionalProperties"; + + /// + /// Key used to specify a custom partition key override in the request context bag. + /// + /// + /// This reserved name allows setting a custom partition key for message routing. + /// + /// The value associated with this key can be either: + /// + /// A value representing the partition key + /// A value + /// + /// + /// + /// Important notes: + /// + /// This value is optional and may be ignored by the transport if partitioning isn't supported + /// Custom message mappers may choose to ignore this value entirely + /// When not set, the transport's default partitioning strategy will be used + /// + /// + /// + /// Setting a string partition key: + /// + /// requestContext.Bag[RequestContextBagNames.PartitionKey] = "customer-1234"; + /// + /// + /// Setting a Brighter partition key: + /// + /// requestContext.Bag[RequestContextBagNames.PartitionKey] = new PartitionKey("customer-1234"); + /// + /// + /// + /// Default Brighter mappers will use this value when present. + /// + /// + public const string PartitionKey = "Brighter-PartitionKey"; + + /// + /// Key used to dynamically set or override message headers via the request context bag. + /// + /// + /// This reserved name allows runtime configuration of message headers during publication. + /// + /// Value must be: An where TKey is + /// and TValue is representing header name-value pairs. + /// + /// + /// Important notes: + /// + /// Headers set here take precedence over static header configurations + /// Custom message mappers may ignore these headers entirely + /// Merged with by default Brighter mappers + /// + /// + /// + /// Setting dynamic headers: + /// + /// requestContext.Bag[RequestContextBagNames.Headers] = new Dictionary<string, object> + /// { + /// ["x-custom-header"] = "runtime-value", + /// ["x-timestamp"] = DateTime.UtcNow, + /// }; + /// + /// + /// + /// Default Brighter message mappers will apply these headers after any static headers, + /// allowing runtime values to override configuration. Custom mappers may implement + /// different merging strategies or ignore this context completely. + /// + /// + public const string Headers = "Brighter-Headers"; +} diff --git a/src/Paramore.Brighter/Transforms/Transformers/CloudEventsTransformer.cs b/src/Paramore.Brighter/Transforms/Transformers/CloudEventsTransformer.cs index ed57e59445..491650dc15 100644 --- a/src/Paramore.Brighter/Transforms/Transformers/CloudEventsTransformer.cs +++ b/src/Paramore.Brighter/Transforms/Transformers/CloudEventsTransformer.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; +using Paramore.Brighter.Extensions; using Paramore.Brighter.JsonConverters; using Paramore.Brighter.Logging; using Paramore.Brighter.Transforms.Attributes; @@ -119,7 +120,7 @@ public Task UnwrapAsync(Message message, CancellationToken cancellation public Message Wrap(Message message, Publication publication) { var msg = WritePublicationHeaders(message, publication); - return _format == CloudEventFormat.Binary ? msg : WriteJsonMessage(msg); + return _format == CloudEventFormat.Binary ? msg : WriteJsonMessage(msg, publication); } @@ -127,7 +128,9 @@ public Message Wrap(Message message, Publication publication) public Message Unwrap(Message message) { if (_format == CloudEventFormat.Binary) + { return message; + } return ReadCloudEventJsonMessage(message); } @@ -206,24 +209,16 @@ private Message WritePublicationHeaders(Message message, Publication publication message.Header.DataSchema = _dataSchema ?? publication.DataSchema; message.Header.Subject = _subject ?? publication.Subject; message.Header.SpecVersion = _specVersion ?? message.Header.SpecVersion; - - foreach (var additional in publication.CloudEventsAdditionalProperties ?? new Dictionary()) - { - if (!message.Header.Bag.ContainsKey(additional.Key)) - { - message.Header.Bag[additional.Key] = additional.Value; - } - } return message; } - private static Message WriteJsonMessage(Message message) + private Message WriteJsonMessage(Message message, Publication publication) { try { JsonElement? data = null; string? dataBase64 = null; - var contentType = message.Header.ContentType?.ToString()?? string.Empty; + var contentType = message.Header.ContentType.ToString()?? string.Empty; if (message.Body.Value.Length > 0) { if (contentType.Contains("application/json") || contentType.Contains("text/json")) @@ -242,6 +237,8 @@ private static Message WriteJsonMessage(Message message) data = JsonDocument.Parse($"\"{encoded.ToString()}\"").RootElement; } } + + var defaultCloudEventsAdditionalProperties = publication.CloudEventsAdditionalProperties ?? new Dictionary(); var cloudEvent = new JsonEvent { @@ -253,7 +250,7 @@ private static Message WriteJsonMessage(Message message) DataSchema = message.Header.DataSchema, Subject = message.Header.Subject, Time = message.Header.TimeStamp, - AdditionalProperties = message.Header.Bag, + AdditionalProperties = defaultCloudEventsAdditionalProperties.Merge(Context.GetCloudEventAdditionalProperties()), Data = data, DataBase64 = dataBase64 // Add this property to CloudEventMessage }; @@ -284,7 +281,7 @@ public class JsonEvent /// [JsonRequired] [JsonPropertyName("id")] - public string Id { get; set; } = string.Empty; + public Id Id { get; set; } = Id.Empty; /// /// Gets or sets the specification version of the CloudEvents specification which the event uses. diff --git a/tests/Paramore.Brighter.Core.Tests/CloudEvents/When_a_message_uses_cloud_events.cs b/tests/Paramore.Brighter.Core.Tests/CloudEvents/When_a_message_uses_cloud_events.cs index 9ba558755b..afddc9ec24 100644 --- a/tests/Paramore.Brighter.Core.Tests/CloudEvents/When_a_message_uses_cloud_events.cs +++ b/tests/Paramore.Brighter.Core.Tests/CloudEvents/When_a_message_uses_cloud_events.cs @@ -4,6 +4,7 @@ using System.Text.Json; using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; using Paramore.Brighter.Extensions; +using Paramore.Brighter.JsonConverters; using Paramore.Brighter.Transforms.Transformers; using Xunit; @@ -72,7 +73,7 @@ public void When_a_message_uses_cloud_events_via_attribute_and_format_as_json() Assert.Equal(_subject, cloudEvents.Header.Subject); Assert.NotEqual(body.Bytes, cloudEvents.Body.Bytes); - var json = JsonSerializer.Deserialize(cloudEvents.Body.Bytes); + var json = JsonSerializer.Deserialize(cloudEvents.Body.Bytes, JsonSerialisationOptions.Options); Assert.NotNull(json); Assert.Equal(_source, json.Source); Assert.Equal(_type, json.Type); @@ -108,7 +109,7 @@ public void When_a_empty_message_uses_cloud_events_via_attribute_and_format_as_j Assert.Equal(_subject, cloudEvents.Header.Subject); Assert.NotEqual(body.Bytes, cloudEvents.Body.Bytes); - var json = JsonSerializer.Deserialize(cloudEvents.Body.Bytes); + var json = JsonSerializer.Deserialize(cloudEvents.Body.Bytes, JsonSerialisationOptions.Options); Assert.NotNull(json); Assert.Equal(_source, json.Source); Assert.Equal(_type, json.Type); @@ -234,7 +235,7 @@ public void When_a_message_uses_cloud_events_and_a_publication_and_format_as_jso Assert.Equal(subject, cloudEvents.Header.Subject); Assert.NotEqual(body.Bytes, cloudEvents.Body.Bytes); - var json = JsonSerializer.Deserialize(cloudEvents.Body.Bytes); + var json = JsonSerializer.Deserialize(cloudEvents.Body.Bytes, JsonSerialisationOptions.Options); Assert.NotNull(json); Assert.Equal(source, json.Source.ToString()); Assert.Equal(type, json.Type);