diff --git a/src/Paramore.Brighter.DynamoDb/GUIDConverter.cs b/src/Paramore.Brighter.DynamoDb/GUIDConverter.cs index 857269f2eb..779f24b72a 100644 --- a/src/Paramore.Brighter.DynamoDb/GUIDConverter.cs +++ b/src/Paramore.Brighter.DynamoDb/GUIDConverter.cs @@ -8,10 +8,8 @@ public class GUIDConverter : IPropertyConverter { public DynamoDBEntry ToEntry(object value) { - var uuid = value as Guid? ?? - throw new InvalidOperationException( - $"Supplied type was of type {value.GetType().Name} not Guid"); + var uuid = (Guid)value; var json = uuid.ToString(); DynamoDBEntry entry = new Primitive diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/BrighterDefinedHeaders.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/BrighterDefinedHeaders.cs new file mode 100644 index 0000000000..3bb3c08052 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/BrighterDefinedHeaders.cs @@ -0,0 +1,47 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +namespace Paramore.Brighter.MessagingGateway.Kafka; + +public static class BrighterDefinedHeaders +{ + public static readonly string[] HeadersToReset; + + static BrighterDefinedHeaders() + { + HeadersToReset = new[] { + HeaderNames.MESSAGE_ID, + HeaderNames.MESSAGE_TYPE, + HeaderNames.TOPIC, + HeaderNames.CORRELATION_ID, + HeaderNames.TIMESTAMP, + HeaderNames.PARTITIONKEY, + HeaderNames.CONTENT_TYPE, + HeaderNames.REPLY_TO, + HeaderNames.DELAYED_MILLISECONDS, + HeaderNames.HANDLED_COUNT + }; + } + +} diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/HeaderNames.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/HeaderNames.cs index 753fbbfe7a..e78cc459a2 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/HeaderNames.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/HeaderNames.cs @@ -27,6 +27,11 @@ namespace Paramore.Brighter.MessagingGateway.Kafka { public class HeaderNames { + /// + /// A dictionary of user defined values + /// + public static string BAG { get; } = "Bag"; + /// /// What is the content type of the message§ /// @@ -37,6 +42,17 @@ public class HeaderNames /// public const string CORRELATION_ID = "CorrelationId"; + /// + /// If the message was deferred, how long for? + /// + public static string DELAYED_MILLISECONDS { get; } = "x-delay"; + + /// + /// How many times has the message been retried with a delay + /// + public static string HANDLED_COUNT { get; } = "HandledCount" ; + + /// /// The message type /// diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/IKafkaMessageHeaderBuilder.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/IKafkaMessageHeaderBuilder.cs index dc4a8794b6..147d5d170f 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/IKafkaMessageHeaderBuilder.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/IKafkaMessageHeaderBuilder.cs @@ -1,4 +1,28 @@ -using Confluent.Kafka; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using Confluent.Kafka; namespace Paramore.Brighter.MessagingGateway.Kafka { diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs index ee63098801..055cacbe73 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs @@ -1,24 +1,41 @@ -using System; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; +using System.Globalization; using System.Linq; using Confluent.Kafka; using Paramore.Brighter.Extensions; namespace Paramore.Brighter.MessagingGateway.Kafka { - internal class KafkaDefaultMessageHeaderBuilder : IKafkaMessageHeaderBuilder + /// + /// This class serializes Brighter headers to Kafka. Kafka uses a byte[] for its header values. We convert all + /// header values into a string, and then get a UTF8 encoded set of bytes for that string. + /// + public class KafkaDefaultMessageHeaderBuilder : IKafkaMessageHeaderBuilder { - private static readonly string[] s_headersToReset; - - static KafkaDefaultMessageHeaderBuilder() - { - s_headersToReset = new[] { - HeaderNames.MESSAGE_TYPE, - HeaderNames.TOPIC, - HeaderNames.CORRELATION_ID, - HeaderNames.TIMESTAMP - }; - } - public static KafkaDefaultMessageHeaderBuilder Instance => new KafkaDefaultMessageHeaderBuilder(); public Headers Build(Message message) @@ -31,9 +48,9 @@ public Headers Build(Message message) }; if (message.Header.TimeStamp != default) - headers.Add(HeaderNames.TIMESTAMP, BitConverter.GetBytes(new DateTimeOffset(message.Header.TimeStamp).ToUnixTimeMilliseconds())); + headers.Add(HeaderNames.TIMESTAMP, new DateTimeOffset(message.Header.TimeStamp).ToString().ToByteArray()); else - headers.Add(HeaderNames.TIMESTAMP, BitConverter.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())); + headers.Add(HeaderNames.TIMESTAMP, DateTimeOffset.UtcNow.ToString().ToByteArray()); if (message.Header.CorrelationId != Guid.Empty) headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToString().ToByteArray()); @@ -47,20 +64,39 @@ public Headers Build(Message message) if (!string.IsNullOrEmpty(message.Header.ReplyTo)) headers.Add(HeaderNames.REPLY_TO, message.Header.ReplyTo.ToByteArray()); + headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.DelayedMilliseconds.ToString().ToByteArray()); + + headers.Add(HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString().ToByteArray()); + message.Header.Bag.Each((header) => { - if (!s_headersToReset.Any(htr => htr.Equals(header.Key))) + if (!BrighterDefinedHeaders.HeadersToReset.Any(htr => htr.Equals(header.Key))) { switch (header.Value) { case string stringValue: headers.Add(header.Key, stringValue.ToByteArray()); break; + case DateTime dateTimeValue: + headers.Add(header.Key, dateTimeValue.ToString(CultureInfo.InvariantCulture).ToByteArray()); + break; + case Guid guidValue: + headers.Add(header.Key, guidValue.ToString().ToByteArray()); + break; + case bool boolValue: + headers.Add(header.Key, boolValue.ToString().ToByteArray()); + break; case int intValue: - headers.Add(header.Key, BitConverter.GetBytes(intValue)); + headers.Add(header.Key, intValue.ToString().ToByteArray()); + break; + case double doubleValue: + headers.Add(header.Key, doubleValue.ToString(CultureInfo.InvariantCulture).ToByteArray()); + break; + case float floatValue: + headers.Add(header.Key, floatValue.ToString(CultureInfo.InvariantCulture).ToByteArray()); break; - case Guid guidValue: - headers.Add(header.Key, guidValue.ToByteArray()); + case long longValue: + headers.Add(header.Key, longValue.ToString().ToByteArray()); break; case byte[] byteArray: headers.Add(header.Key, byteArray); @@ -71,7 +107,7 @@ public Headers Build(Message message) } } }); - + return headers; } } diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs index 526ccd370a..b409f74307 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs @@ -1,7 +1,29 @@ -using System; -using System.Collections.Generic; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; using System.Linq; -using System.Text; using Confluent.Kafka; namespace Paramore.Brighter.MessagingGateway.Kafka diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs index ca6352dd64..eda8923066 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs @@ -20,6 +20,7 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #endregion + using System; using System.Collections.Concurrent; using System.Collections.Generic; diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs index 6828c7c813..42462655ae 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs @@ -1,4 +1,28 @@ -using System; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; using System.Globalization; using System.Linq; using System.Text; @@ -9,13 +33,19 @@ namespace Paramore.Brighter.MessagingGateway.Kafka { - internal class KafkaMessageCreator + /// + /// Turns a Kafka message into a Brighter message + /// Kafka header values are a key and a byte[]. For known header values we can coerce back into the expected + /// type. For unknown values, we just add them into the MessageHeader's Bag with a type of string. You will need + /// to coerce them to the appropriate type yourself. You can set the serializer, so your alternative is to + /// override the bag creation code to use more specific types. + /// + public class KafkaMessageCreator { private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); public Message CreateMessage(ConsumeResult consumeResult) { - var headers = consumeResult.Message.Headers; var topic = HeaderResult.Empty(); var messageId = HeaderResult.Empty(); var timeStamp = HeaderResult.Empty(); @@ -24,6 +54,8 @@ public Message CreateMessage(ConsumeResult consumeResult) var partitionKey = HeaderResult.Empty(); var replyTo = HeaderResult.Empty(); var contentType = HeaderResult.Empty(); + var delayMilliseconds = HeaderResult.Empty(); + var handledCount = HeaderResult.Empty(); Message message; try @@ -35,7 +67,9 @@ public Message CreateMessage(ConsumeResult consumeResult) correlationId = ReadCorrelationId(consumeResult.Message.Headers); partitionKey = ReadPartitionKey(consumeResult.Message.Headers); replyTo = ReadReplyTo(consumeResult.Message.Headers); - contentType = ReadContentType(consumeResult.Message.Headers); + contentType = ReadContentType(consumeResult.Message.Headers); + delayMilliseconds = ReadDelayMilliseconds(consumeResult.Message.Headers); + handledCount = ReadHandledCount(consumeResult.Message.Headers); if (false == (topic.Success && messageId.Success && messageType.Success && timeStamp.Success)) { @@ -55,19 +89,29 @@ public Message CreateMessage(ConsumeResult consumeResult) messageHeader.PartitionKey = partitionKey.Result; else messageHeader.PartitionKey = consumeResult.Message.Key; - + if (contentType.Success) - messageHeader.ContentType =contentType.Result; + messageHeader.ContentType = contentType.Result; if (replyTo.Success) messageHeader.ReplyTo = replyTo.Result; - message = new Message(messageHeader, new MessageBody(consumeResult.Message.Value, messageHeader.ContentType)); + if (delayMilliseconds.Success) + messageHeader.DelayedMilliseconds = delayMilliseconds.Result; + + if (handledCount.Success) + messageHeader.HandledCount = handledCount.Result; + + message = new Message(messageHeader, + new MessageBody(consumeResult.Message.Value, messageHeader.ContentType)); - headers.Each(header => message.Header.Bag.Add(header.Key, ParseHeaderValue(header))); - if (!message.Header.Bag.ContainsKey(HeaderNames.PARTITION_OFFSET)) message.Header.Bag.Add(HeaderNames.PARTITION_OFFSET, consumeResult.TopicPartitionOffset); + + consumeResult.Message.Headers.Each(header => + { + ReadBagEntry(header, message); + }); } } catch (Exception e) @@ -88,11 +132,10 @@ private Message FailureMessage(HeaderResult topic, HeaderResult me var message = new Message(header, new MessageBody(string.Empty)); return message; } - - + private HeaderResult ReadContentType(Headers headers) { - return ReadHeader(headers, HeaderNames.CONTENT_TYPE,false); + return ReadHeader(headers, HeaderNames.CONTENT_TYPE, false); } private HeaderResult ReadCorrelationId(Headers headers) @@ -116,19 +159,61 @@ private HeaderResult ReadCorrelationId(Headers headers) }); } + private HeaderResult ReadDelayMilliseconds(Headers headers) + { + return ReadHeader(headers, HeaderNames.DELAYED_MILLISECONDS) + .Map(s => + { + if (string.IsNullOrEmpty(s)) + { + s_logger.LogDebug("No delay milliseconds found in message"); + return new HeaderResult(0, true); + } + + if (int.TryParse(s, out int delayMilliseconds)) + { + return new HeaderResult(delayMilliseconds, true); + } + + s_logger.LogDebug("Could not parse message delayMilliseconds: {DelayMillisecondsValue}", s); + return new HeaderResult(0, false); + }); + } + + private HeaderResult ReadHandledCount(Headers headers) + { + return ReadHeader(headers, HeaderNames.HANDLED_COUNT) + .Map(s => + { + if (string.IsNullOrEmpty(s)) + { + s_logger.LogDebug("No handled count found in message"); + return new HeaderResult(0, true); + } + + if (int.TryParse(s, out int handledCount)) + { + return new HeaderResult(handledCount, true); + } + + s_logger.LogDebug("Could not parse message handled count: {HandledCountValue}", s); + return new HeaderResult(0, false); + }); + } + private HeaderResult ReadReplyTo(Headers headers) { return ReadHeader(headers, HeaderNames.REPLY_TO) - .Map(s => - { - if (string.IsNullOrEmpty(s)) - { - s_logger.LogDebug("No reply to found in message"); - return new HeaderResult(string.Empty, true); - } - - return new HeaderResult(s, true); - }); + .Map(s => + { + if (string.IsNullOrEmpty(s)) + { + s_logger.LogDebug("No reply to found in message"); + return new HeaderResult(string.Empty, true); + } + + return new HeaderResult(s, true); + }); } private HeaderResult ReadTimeStamp(Headers headers) @@ -136,14 +221,16 @@ private HeaderResult ReadTimeStamp(Headers headers) if (headers.TryGetLastBytesIgnoreCase(HeaderNames.TIMESTAMP, out byte[] lastHeader)) { //Additional testing for a non unixtimestamp string - if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.CurrentInfo, DateTimeStyles.AdjustToUniversal, out DateTime timestamp)) + if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.CurrentInfo, + DateTimeStyles.AdjustToUniversal, out DateTime timestamp)) { return new HeaderResult(timestamp, true); } try { - return new HeaderResult(DateTimeOffset.FromUnixTimeMilliseconds(BitConverter.ToInt64(lastHeader, 0)).DateTime, true); + return new HeaderResult( + DateTimeOffset.FromUnixTimeMilliseconds(BitConverter.ToInt64(lastHeader, 0)).DateTime, true); } catch (Exception) { @@ -183,7 +270,8 @@ private HeaderResult ReadMessageId(Headers headers) { if (string.IsNullOrEmpty(s)) { - s_logger.LogDebug("No message id found in message MessageId, new message id is {NewMessageId}", newMessageId); + s_logger.LogDebug("No message id found in message MessageId, new message id is {NewMessageId}", + newMessageId); return new HeaderResult(newMessageId, true); } @@ -212,12 +300,6 @@ private HeaderResult ReadPartitionKey(Headers headers) }); } - - private static object ParseHeaderValue(object value) - { - return value is byte[] bytes ? Encoding.UTF8.GetString(bytes) : value; - } - private HeaderResult ReadHeader(Headers headers, string key, bool dieOnMissing = false) { if (headers.TryGetLastBytesIgnoreCase(key, out byte[] lastHeader)) @@ -230,12 +312,26 @@ private HeaderResult ReadHeader(Headers headers, string key, bool dieOnM catch (Exception e) { var firstTwentyBytes = BitConverter.ToString(lastHeader.Take(20).ToArray()); - s_logger.LogWarning(e, "Failed to read the value of header {Topic} as UTF-8, first 20 byes follow: \n\t{1}", key, firstTwentyBytes); + s_logger.LogWarning(e, + "Failed to read the value of header {Topic} as UTF-8, first 20 byes follow: \n\t{1}", key, + firstTwentyBytes); return new HeaderResult(null, false); } } - + return new HeaderResult(string.Empty, !dieOnMissing); } + + /// + /// Override this in a derived class if you want to coerce specific user defined header values to the correct + /// type in the bag + /// + /// The Kafka message header + /// The Brighter message + protected virtual void ReadBagEntry(IHeader header, Message message) + { + if (!BrighterDefinedHeaders.HeadersToReset.Any(htr => htr.Equals(header.Key))) + message.Header.Bag.Add(header.Key, Encoding.UTF8.GetString(header.GetValueBytes())); + } } } diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagePublisher.cs index bbf1f1f610..6579a27ded 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagePublisher.cs @@ -1,4 +1,28 @@ -using System; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; using System.Threading.Tasks; using Confluent.Kafka; diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs index 53176c8b0a..963a30b871 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs @@ -1,4 +1,28 @@ -using System; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Deserializing_A_Message_Header_Bag.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Deserializing_A_Message_Header_Bag.cs index 05ab7bf0f0..9aff2cbc30 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Deserializing_A_Message_Header_Bag.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Deserializing_A_Message_Header_Bag.cs @@ -50,7 +50,10 @@ public void When_deserializing_a_message_header_bag() //Assert foreach (var key in expectedBag.Keys) { - if (key != "myArrayKey") deserializedHeader.Bag[key].Should().Be(expectedBag[key]); + if (key != "myArrayKey") + { + deserializedHeader.Bag[key].Should().Be(expectedBag[key]); + } if (key == "myArrayKey") { var expectedVals = (int[])expectedBag[key]; diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs new file mode 100644 index 0000000000..a5ba3bfd8e --- /dev/null +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs @@ -0,0 +1,72 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Text; +using System.Text.Json; +using Confluent.Kafka; +using FluentAssertions; +using Paramore.Brighter.MessagingGateway.Kafka; +using Xunit; + +namespace Paramore.Brighter.Kafka.Tests.MessagingGateway; + +public class KafkaDefaultMessageHeaderBuilderTests +{ + [Fact] + public void When_converting_brighterheader_to_kafkaheader() + { + //arrange + var message = new Message( + new MessageHeader( + messageId: Guid.NewGuid(), + topic: "test", + messageType: MessageType.MT_COMMAND, + timeStamp: DateTime.UtcNow, + correlationId: Guid.NewGuid(), + replyTo: "test", + contentType: "application/octet", + partitionKey: "mykey" + ), + new MessageBody("test content") + ); + + message.Header.DelayedMilliseconds = 500; + message.Header.HandledCount = 2; + + Dictionary bag = message.Header.Bag; + bag.Add("myguid", Guid.NewGuid()); + bag.Add("mystring", "string value"); + bag.Add("myint", 7); + bag.Add("mydouble", 3.56); + bag.Add("mydatetime", DateTime.UtcNow); + + //act + var builder = new KafkaDefaultMessageHeaderBuilder(); + Headers headers = builder.Build(message); + + //assert + + //known properties + headers.GetLastBytes(HeaderNames.MESSAGE_TYPE).Should().Equal(message.Header.MessageType.ToString().ToByteArray()); + headers.GetLastBytes(HeaderNames.MESSAGE_ID).Should().Equal(message.Header.Id.ToString().ToByteArray()); + headers.GetLastBytes(HeaderNames.TOPIC).Should().Equal(message.Header.Topic.ToByteArray()); + headers.GetLastBytes(HeaderNames.TIMESTAMP).Should() + .Equal(new DateTimeOffset(message.Header.TimeStamp).ToUnixTimeMilliseconds().ToString().ToByteArray()); + headers.GetLastBytes(HeaderNames.CORRELATION_ID).Should() + .Equal(message.Header.CorrelationId.ToString().ToByteArray()); + headers.GetLastBytes(HeaderNames.PARTITIONKEY).Should().Equal(message.Header.PartitionKey.ToByteArray()); + headers.GetLastBytes(HeaderNames.CONTENT_TYPE).Should().Equal(message.Header.ContentType.ToByteArray()); + headers.GetLastBytes(HeaderNames.REPLY_TO).Should().Equal(message.Header.ReplyTo.ToByteArray()); + headers.GetLastBytes(HeaderNames.DELAYED_MILLISECONDS).Should() + .Equal(message.Header.DelayedMilliseconds.ToString().ToByteArray()); + headers.GetLastBytes(HeaderNames.HANDLED_COUNT).Should() + .Equal(message.Header.HandledCount.ToString().ToByteArray()); + + //bag properties + headers.GetLastBytes("myguid").Should().Equal(bag["myguid"].ToString().ToByteArray()); + headers.GetLastBytes("mystring").Should().Equal(bag["mystring"].ToString().ToByteArray()); + headers.GetLastBytes("myint").Should().Equal(bag["myint"].ToString().ToByteArray()); + headers.GetLastBytes("mydouble").Should().Equal(bag["mydouble"].ToString().ToByteArray()); + headers.GetLastBytes("mydatetime").Should().Equal(((DateTime)bag["mydatetime"]).ToString(CultureInfo.InvariantCulture).ToByteArray()); + } +} diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs new file mode 100644 index 0000000000..1a58f1e989 --- /dev/null +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs @@ -0,0 +1,76 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using Confluent.Kafka; +using FluentAssertions; +using Paramore.Brighter.MessagingGateway.Kafka; +using Xunit; + +namespace Paramore.Brighter.Kafka.Tests.MessagingGateway; + +public class KafkaHeaderToBrighterTests +{ + [Fact] + public void When_converting_kafkaheader_to_brighterheader() + { + //arrange + + var message = new Message( + new MessageHeader( + messageId: Guid.NewGuid(), + topic: "test", + messageType: MessageType.MT_COMMAND, + timeStamp: DateTime.UtcNow, + correlationId: Guid.NewGuid(), + replyTo: "test", + contentType: "application/octet", + partitionKey: "mykey" + ), + new MessageBody("test content") + ); + + message.Header.DelayedMilliseconds = 500; + message.Header.HandledCount = 2; + + Dictionary bag = message.Header.Bag; + bag.Add("myguid", Guid.NewGuid()); + bag.Add("mystring", "string value"); + bag.Add("myint", 7); + bag.Add("mydouble", 3.56); + bag.Add("mydatetime", DateTime.UtcNow); + + var builder = new KafkaDefaultMessageHeaderBuilder(); + Headers headers = builder.Build(message); + + var result = new ConsumeResult(); + result.Topic = "test"; + result.Message = new Message + { + Headers = headers, + Key = message.Id.ToString(), + Value = "test content"u8.ToArray() + }; + + //act + var readMessage = new KafkaMessageCreator().CreateMessage(result); + + //assert + readMessage.Id.Should().Be(message.Id); + readMessage.Header.MessageType.Should().Be(message.Header.MessageType); + readMessage.Header.Id.Should().Be(message.Header.Id); + readMessage.Header.CorrelationId.Should().Be(message.Header.CorrelationId); + readMessage.Header.ContentType.Should().Be(message.Header.ContentType); + readMessage.Header.Topic.Should().Be(message.Header.Topic); + readMessage.Header.DelayedMilliseconds.Should().Be(message.Header.DelayedMilliseconds); + readMessage.Header.HandledCount.Should().Be(message.Header.HandledCount); + readMessage.Header.TimeStamp.ToString("u").Should().Be(message.Header.TimeStamp.ToString("u")); + + //NOTE: Because we can only coerce the byte[] to a string for a unknown bag key, coercing to a specific + //type has to be done by the user of the bag. + readMessage.Header.Bag["myguid"].Should().Be(bag["myguid"].ToString()); + readMessage.Header.Bag["mystring"].Should().Be(bag["mystring"].ToString()); + readMessage.Header.Bag["myint"].Should().Be(bag["myint"].ToString()); + readMessage.Header.Bag["mydouble"].Should().Be(bag["mydouble"].ToString()); + readMessage.Header.Bag["mydatetime"].Should().Be(((DateTime)bag["mydatetime"]).ToString(CultureInfo.InvariantCulture)); + } +} diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs index 9572d24943..2e146727a9 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs @@ -119,8 +119,8 @@ public void When_posting_a_message() receivedMessage.Header.PartitionKey.Should().Be(_partitionKey); receivedMessage.Body.Bytes.Should().Equal(message.Body.Bytes); receivedMessage.Body.Value.Should().Be(message.Body.Value); - receivedMessage.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ss.fffZ") - .Should().Be(message.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ss.fffZ")); + receivedMessage.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ssZ") + .Should().Be(message.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ssZ")); receivedCommand.Id.Should().Be(command.Id); receivedCommand.Value.Should().Be(command.Value); } diff --git a/tests/Paramore.Brighter.Kafka.Tests/README.md b/tests/Paramore.Brighter.Kafka.Tests/README.md new file mode 100644 index 0000000000..cd7ea22534 --- /dev/null +++ b/tests/Paramore.Brighter.Kafka.Tests/README.md @@ -0,0 +1,22 @@ +# Kafka Test Dependencies + +## Running the docker-based tests + +Tests which talk to a Docker hosted Kafka instance assume the following: + +* No security +* A Kafka instance running on `localhost:9092` +* A Zookeeper instance running on `localhost:2181` +* A schema registry running on `localhost:8081` + + +## Running the confluent-based tests + +Tests which talk to a Confluent hosted Kafka instance have an attribute trait of "Confluent". They assume the following: + +* You have set environment variables for the following: + * CONFLUENT_BOOSTRAP_SERVER - the Kafka instance to connect to + * CONFLUENT_SASL_USERNAME - the username to use for SASL + * CONFLUENT_SASL_PASSWORD - the password to use for SASL + +