From 9efb283b51304ec4878b0a3d3a1626be012ba781 Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Tue, 6 Feb 2024 15:41:20 +0000 Subject: [PATCH 1/4] create tests for kafka headers: --- .../KafkaDefaultMessageHeaderBuilder.cs | 2 +- ...When_Deserializing_A_Message_Header_Bag.cs | 5 +++- ...onverting_brighterheader_to_kafkaheader.cs | 29 +++++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs index ee63098801..fe6772fd52 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs @@ -5,7 +5,7 @@ namespace Paramore.Brighter.MessagingGateway.Kafka { - internal class KafkaDefaultMessageHeaderBuilder : IKafkaMessageHeaderBuilder + public class KafkaDefaultMessageHeaderBuilder : IKafkaMessageHeaderBuilder { private static readonly string[] s_headersToReset; diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Deserializing_A_Message_Header_Bag.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Deserializing_A_Message_Header_Bag.cs index 05ab7bf0f0..9aff2cbc30 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Deserializing_A_Message_Header_Bag.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Deserializing_A_Message_Header_Bag.cs @@ -50,7 +50,10 @@ public void When_deserializing_a_message_header_bag() //Assert foreach (var key in expectedBag.Keys) { - if (key != "myArrayKey") deserializedHeader.Bag[key].Should().Be(expectedBag[key]); + if (key != "myArrayKey") + { + deserializedHeader.Bag[key].Should().Be(expectedBag[key]); + } if (key == "myArrayKey") { var expectedVals = (int[])expectedBag[key]; diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs new file mode 100644 index 0000000000..328e719025 --- /dev/null +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs @@ -0,0 +1,29 @@ +using System; +using Paramore.Brighter.MessagingGateway.Kafka; +using Xunit; + +namespace Paramore.Brighter.Kafka.Tests.MessagingGateway; + +public class KafkaDefaultMessageHeaderBuilderTests +{ + public KafkaDefaultMessageHeaderBuilderTests() + { + var message = new Message( + new MessageHeader( + messageId: Guid.NewGuid(), + topic: "test", + messageType: MessageType.MT_COMMAND, + timeStamp: DateTime.UtcNow, + correlationId: Guid.NewGuid() + ), + new MessageBody("test content") + ); + } + + [Fact] + public void When_converting_brighterheader_to_kafkaheader() + { + var builder = new KafkaDefaultMessageHeaderBuilder(); + + } +} From 76a0f8be98c70d0c54499db343aa44b07f57a0d9 Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Thu, 8 Feb 2024 20:17:51 +0000 Subject: [PATCH 2/4] Test serialization of Kafka header; fix issue with Dynamo GUID converter. --- .../GUIDConverter.cs | 4 -- .../KafkaDefaultMessageHeaderBuilder.cs | 8 +++- .../KafkaHeadersTools.cs | 2 - ...onverting_brighterheader_to_kafkaheader.cs | 45 ++++++++++++++++--- 4 files changed, 46 insertions(+), 13 deletions(-) diff --git a/src/Paramore.Brighter.DynamoDb/GUIDConverter.cs b/src/Paramore.Brighter.DynamoDb/GUIDConverter.cs index 0728cd8371..9972f51cd8 100644 --- a/src/Paramore.Brighter.DynamoDb/GUIDConverter.cs +++ b/src/Paramore.Brighter.DynamoDb/GUIDConverter.cs @@ -9,10 +9,6 @@ public class GUIDConverter : IPropertyConverter public DynamoDBEntry ToEntry(object value) { var uuid = (Guid)value; - if (uuid == null) - throw new InvalidOperationException( - $"Supplied type was of type {value.GetType().Name} not Accounts.Application.CardDetails"); - var json = uuid.ToString(); DynamoDBEntry entry = new Primitive diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs index fe6772fd52..5811ceaeef 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs @@ -60,11 +60,17 @@ public Headers Build(Message message) headers.Add(header.Key, BitConverter.GetBytes(intValue)); break; case Guid guidValue: - headers.Add(header.Key, guidValue.ToByteArray()); + headers.Add(header.Key, guidValue.ToString().ToByteArray()); break; case byte[] byteArray: headers.Add(header.Key, byteArray); break; + case double doubleValue: + headers.Add(header.Key, BitConverter.GetBytes(doubleValue)); + break; + case DateTime dateTimeValue: + headers.Add(header.Key, dateTimeValue.ToString().ToByteArray()); + break; default: headers.Add(header.Key, header.Value.ToString().ToByteArray()); break; diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs index 526ccd370a..7486824ffc 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs @@ -1,7 +1,5 @@ using System; -using System.Collections.Generic; using System.Linq; -using System.Text; using Confluent.Kafka; namespace Paramore.Brighter.MessagingGateway.Kafka diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs index 328e719025..72295877b2 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs @@ -1,4 +1,8 @@ using System; +using System.Collections.Generic; +using System.Text; +using Confluent.Kafka; +using FluentAssertions; using Paramore.Brighter.MessagingGateway.Kafka; using Xunit; @@ -6,7 +10,8 @@ namespace Paramore.Brighter.Kafka.Tests.MessagingGateway; public class KafkaDefaultMessageHeaderBuilderTests { - public KafkaDefaultMessageHeaderBuilderTests() + [Fact] + public void When_converting_brighterheader_to_kafkaheader() { var message = new Message( new MessageHeader( @@ -14,16 +19,44 @@ public KafkaDefaultMessageHeaderBuilderTests() topic: "test", messageType: MessageType.MT_COMMAND, timeStamp: DateTime.UtcNow, - correlationId: Guid.NewGuid() + correlationId: Guid.NewGuid(), + replyTo: "test", + contentType: "application/octet", + partitionKey: "mykey" ), new MessageBody("test content") ); - } - [Fact] - public void When_converting_brighterheader_to_kafkaheader() - { + Dictionary bag = message.Header.Bag; + bag.Add("myguid", Guid.NewGuid()); + bag.Add("mystring", "string value"); + bag.Add("myint", 7); + bag.Add("mydouble", 3.56); + bag.Add("mybytearray", Encoding.UTF8.GetBytes("mybytes")); + bag.Add("mydatetime", DateTime.UtcNow); + bag.Add("mydateonly", DateOnly.FromDateTime(DateTime.UtcNow)); + var builder = new KafkaDefaultMessageHeaderBuilder(); + Headers headers = builder.Build(message); + + //known properties + headers.GetLastBytes(HeaderNames.MESSAGE_TYPE).Should().Equal(message.Header.MessageType.ToString().ToByteArray()); + headers.GetLastBytes(HeaderNames.MESSAGE_ID).Should().Equal(message.Header.Id.ToString().ToByteArray()); + headers.GetLastBytes(HeaderNames.TOPIC).Should().Equal(message.Header.Topic.ToByteArray()); + headers.GetLastBytes(HeaderNames.TIMESTAMP).Should() + .Equal(BitConverter.GetBytes(new DateTimeOffset(message.Header.TimeStamp).ToUnixTimeMilliseconds())); + headers.GetLastBytes(HeaderNames.CORRELATION_ID).Should() + .Equal(message.Header.CorrelationId.ToString().ToByteArray()); + headers.GetLastBytes(HeaderNames.PARTITIONKEY).Should().Equal(message.Header.PartitionKey.ToByteArray()); + headers.GetLastBytes(HeaderNames.CONTENT_TYPE).Should().Equal(message.Header.ContentType.ToByteArray()); + headers.GetLastBytes(HeaderNames.REPLY_TO).Should().Equal(message.Header.ReplyTo.ToByteArray()); + //bag properties + headers.GetLastBytes("myguid").Should().Equal(bag["myguid"].ToString().ToByteArray()); + headers.GetLastBytes("mystring").Should().Equal(bag["mystring"].ToString().ToByteArray()); + headers.GetLastBytes("myint").Should().Equal(BitConverter.GetBytes((int)bag["myint"])); + headers.GetLastBytes("mydouble").Should().Equal(BitConverter.GetBytes((double)bag["mydouble"])); + headers.GetLastBytes("mybytearray").Should().Equal((byte[])bag["mybytearray"]); + headers.GetLastBytes("mydatetime").Should().Equal(((DateTime)bag["mydatetime"]).ToString().ToByteArray()); } } From f84f7291c38a631e8c33de782d37b069e284f2ac Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Sun, 11 Feb 2024 11:13:29 +1100 Subject: [PATCH 3/4] Fix up how we deal with Kafka headers. Simplify timestamps. Treat bag value as a string. --- .../BrighterDefinedHeaders.cs | 47 +++++ .../HeaderNames.cs | 16 ++ .../IKafkaMessageHeaderBuilder.cs | 26 ++- .../KafkaDefaultMessageHeaderBuilder.cs | 80 ++++++--- .../KafkaHeadersTools.cs | 26 ++- .../KafkaMessageConsumer.cs | 1 + .../KafkaMessageCreator.cs | 162 ++++++++++++++---- .../KafkaMessagePublisher.cs | 26 ++- .../KafkaMessagingGateway.cs | 26 ++- ...onverting_brighterheader_to_kafkaheader.cs | 24 ++- ...onverting_kafkaheader_to_brighterheader.cs | 76 ++++++++ .../When_posting_a_message.cs | 4 +- tests/Paramore.Brighter.Kafka.Tests/README.md | 22 +++ 13 files changed, 465 insertions(+), 71 deletions(-) create mode 100644 src/Paramore.Brighter.MessagingGateway.Kafka/BrighterDefinedHeaders.cs create mode 100644 tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs create mode 100644 tests/Paramore.Brighter.Kafka.Tests/README.md diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/BrighterDefinedHeaders.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/BrighterDefinedHeaders.cs new file mode 100644 index 0000000000..3bb3c08052 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/BrighterDefinedHeaders.cs @@ -0,0 +1,47 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +namespace Paramore.Brighter.MessagingGateway.Kafka; + +public static class BrighterDefinedHeaders +{ + public static readonly string[] HeadersToReset; + + static BrighterDefinedHeaders() + { + HeadersToReset = new[] { + HeaderNames.MESSAGE_ID, + HeaderNames.MESSAGE_TYPE, + HeaderNames.TOPIC, + HeaderNames.CORRELATION_ID, + HeaderNames.TIMESTAMP, + HeaderNames.PARTITIONKEY, + HeaderNames.CONTENT_TYPE, + HeaderNames.REPLY_TO, + HeaderNames.DELAYED_MILLISECONDS, + HeaderNames.HANDLED_COUNT + }; + } + +} diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/HeaderNames.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/HeaderNames.cs index 753fbbfe7a..e78cc459a2 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/HeaderNames.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/HeaderNames.cs @@ -27,6 +27,11 @@ namespace Paramore.Brighter.MessagingGateway.Kafka { public class HeaderNames { + /// + /// A dictionary of user defined values + /// + public static string BAG { get; } = "Bag"; + /// /// What is the content type of the message§ /// @@ -37,6 +42,17 @@ public class HeaderNames /// public const string CORRELATION_ID = "CorrelationId"; + /// + /// If the message was deferred, how long for? + /// + public static string DELAYED_MILLISECONDS { get; } = "x-delay"; + + /// + /// How many times has the message been retried with a delay + /// + public static string HANDLED_COUNT { get; } = "HandledCount" ; + + /// /// The message type /// diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/IKafkaMessageHeaderBuilder.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/IKafkaMessageHeaderBuilder.cs index dc4a8794b6..147d5d170f 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/IKafkaMessageHeaderBuilder.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/IKafkaMessageHeaderBuilder.cs @@ -1,4 +1,28 @@ -using Confluent.Kafka; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using Confluent.Kafka; namespace Paramore.Brighter.MessagingGateway.Kafka { diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs index 5811ceaeef..055cacbe73 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs @@ -1,24 +1,41 @@ -using System; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; +using System.Globalization; using System.Linq; using Confluent.Kafka; using Paramore.Brighter.Extensions; namespace Paramore.Brighter.MessagingGateway.Kafka { + /// + /// This class serializes Brighter headers to Kafka. Kafka uses a byte[] for its header values. We convert all + /// header values into a string, and then get a UTF8 encoded set of bytes for that string. + /// public class KafkaDefaultMessageHeaderBuilder : IKafkaMessageHeaderBuilder { - private static readonly string[] s_headersToReset; - - static KafkaDefaultMessageHeaderBuilder() - { - s_headersToReset = new[] { - HeaderNames.MESSAGE_TYPE, - HeaderNames.TOPIC, - HeaderNames.CORRELATION_ID, - HeaderNames.TIMESTAMP - }; - } - public static KafkaDefaultMessageHeaderBuilder Instance => new KafkaDefaultMessageHeaderBuilder(); public Headers Build(Message message) @@ -31,9 +48,9 @@ public Headers Build(Message message) }; if (message.Header.TimeStamp != default) - headers.Add(HeaderNames.TIMESTAMP, BitConverter.GetBytes(new DateTimeOffset(message.Header.TimeStamp).ToUnixTimeMilliseconds())); + headers.Add(HeaderNames.TIMESTAMP, new DateTimeOffset(message.Header.TimeStamp).ToString().ToByteArray()); else - headers.Add(HeaderNames.TIMESTAMP, BitConverter.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())); + headers.Add(HeaderNames.TIMESTAMP, DateTimeOffset.UtcNow.ToString().ToByteArray()); if (message.Header.CorrelationId != Guid.Empty) headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToString().ToByteArray()); @@ -47,29 +64,42 @@ public Headers Build(Message message) if (!string.IsNullOrEmpty(message.Header.ReplyTo)) headers.Add(HeaderNames.REPLY_TO, message.Header.ReplyTo.ToByteArray()); + headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.DelayedMilliseconds.ToString().ToByteArray()); + + headers.Add(HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString().ToByteArray()); + message.Header.Bag.Each((header) => { - if (!s_headersToReset.Any(htr => htr.Equals(header.Key))) + if (!BrighterDefinedHeaders.HeadersToReset.Any(htr => htr.Equals(header.Key))) { switch (header.Value) { case string stringValue: headers.Add(header.Key, stringValue.ToByteArray()); break; - case int intValue: - headers.Add(header.Key, BitConverter.GetBytes(intValue)); + case DateTime dateTimeValue: + headers.Add(header.Key, dateTimeValue.ToString(CultureInfo.InvariantCulture).ToByteArray()); break; - case Guid guidValue: + case Guid guidValue: headers.Add(header.Key, guidValue.ToString().ToByteArray()); break; - case byte[] byteArray: - headers.Add(header.Key, byteArray); + case bool boolValue: + headers.Add(header.Key, boolValue.ToString().ToByteArray()); break; + case int intValue: + headers.Add(header.Key, intValue.ToString().ToByteArray()); + break; case double doubleValue: - headers.Add(header.Key, BitConverter.GetBytes(doubleValue)); + headers.Add(header.Key, doubleValue.ToString(CultureInfo.InvariantCulture).ToByteArray()); break; - case DateTime dateTimeValue: - headers.Add(header.Key, dateTimeValue.ToString().ToByteArray()); + case float floatValue: + headers.Add(header.Key, floatValue.ToString(CultureInfo.InvariantCulture).ToByteArray()); + break; + case long longValue: + headers.Add(header.Key, longValue.ToString().ToByteArray()); + break; + case byte[] byteArray: + headers.Add(header.Key, byteArray); break; default: headers.Add(header.Key, header.Value.ToString().ToByteArray()); @@ -77,7 +107,7 @@ public Headers Build(Message message) } } }); - + return headers; } } diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs index 7486824ffc..b409f74307 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs @@ -1,4 +1,28 @@ -using System; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; using System.Linq; using Confluent.Kafka; diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs index f20c162c70..f7b3bfa1f4 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs @@ -20,6 +20,7 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #endregion + using System; using System.Collections.Concurrent; using System.Collections.Generic; diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs index 6828c7c813..42462655ae 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs @@ -1,4 +1,28 @@ -using System; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; using System.Globalization; using System.Linq; using System.Text; @@ -9,13 +33,19 @@ namespace Paramore.Brighter.MessagingGateway.Kafka { - internal class KafkaMessageCreator + /// + /// Turns a Kafka message into a Brighter message + /// Kafka header values are a key and a byte[]. For known header values we can coerce back into the expected + /// type. For unknown values, we just add them into the MessageHeader's Bag with a type of string. You will need + /// to coerce them to the appropriate type yourself. You can set the serializer, so your alternative is to + /// override the bag creation code to use more specific types. + /// + public class KafkaMessageCreator { private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); public Message CreateMessage(ConsumeResult consumeResult) { - var headers = consumeResult.Message.Headers; var topic = HeaderResult.Empty(); var messageId = HeaderResult.Empty(); var timeStamp = HeaderResult.Empty(); @@ -24,6 +54,8 @@ public Message CreateMessage(ConsumeResult consumeResult) var partitionKey = HeaderResult.Empty(); var replyTo = HeaderResult.Empty(); var contentType = HeaderResult.Empty(); + var delayMilliseconds = HeaderResult.Empty(); + var handledCount = HeaderResult.Empty(); Message message; try @@ -35,7 +67,9 @@ public Message CreateMessage(ConsumeResult consumeResult) correlationId = ReadCorrelationId(consumeResult.Message.Headers); partitionKey = ReadPartitionKey(consumeResult.Message.Headers); replyTo = ReadReplyTo(consumeResult.Message.Headers); - contentType = ReadContentType(consumeResult.Message.Headers); + contentType = ReadContentType(consumeResult.Message.Headers); + delayMilliseconds = ReadDelayMilliseconds(consumeResult.Message.Headers); + handledCount = ReadHandledCount(consumeResult.Message.Headers); if (false == (topic.Success && messageId.Success && messageType.Success && timeStamp.Success)) { @@ -55,19 +89,29 @@ public Message CreateMessage(ConsumeResult consumeResult) messageHeader.PartitionKey = partitionKey.Result; else messageHeader.PartitionKey = consumeResult.Message.Key; - + if (contentType.Success) - messageHeader.ContentType =contentType.Result; + messageHeader.ContentType = contentType.Result; if (replyTo.Success) messageHeader.ReplyTo = replyTo.Result; - message = new Message(messageHeader, new MessageBody(consumeResult.Message.Value, messageHeader.ContentType)); + if (delayMilliseconds.Success) + messageHeader.DelayedMilliseconds = delayMilliseconds.Result; + + if (handledCount.Success) + messageHeader.HandledCount = handledCount.Result; + + message = new Message(messageHeader, + new MessageBody(consumeResult.Message.Value, messageHeader.ContentType)); - headers.Each(header => message.Header.Bag.Add(header.Key, ParseHeaderValue(header))); - if (!message.Header.Bag.ContainsKey(HeaderNames.PARTITION_OFFSET)) message.Header.Bag.Add(HeaderNames.PARTITION_OFFSET, consumeResult.TopicPartitionOffset); + + consumeResult.Message.Headers.Each(header => + { + ReadBagEntry(header, message); + }); } } catch (Exception e) @@ -88,11 +132,10 @@ private Message FailureMessage(HeaderResult topic, HeaderResult me var message = new Message(header, new MessageBody(string.Empty)); return message; } - - + private HeaderResult ReadContentType(Headers headers) { - return ReadHeader(headers, HeaderNames.CONTENT_TYPE,false); + return ReadHeader(headers, HeaderNames.CONTENT_TYPE, false); } private HeaderResult ReadCorrelationId(Headers headers) @@ -116,19 +159,61 @@ private HeaderResult ReadCorrelationId(Headers headers) }); } + private HeaderResult ReadDelayMilliseconds(Headers headers) + { + return ReadHeader(headers, HeaderNames.DELAYED_MILLISECONDS) + .Map(s => + { + if (string.IsNullOrEmpty(s)) + { + s_logger.LogDebug("No delay milliseconds found in message"); + return new HeaderResult(0, true); + } + + if (int.TryParse(s, out int delayMilliseconds)) + { + return new HeaderResult(delayMilliseconds, true); + } + + s_logger.LogDebug("Could not parse message delayMilliseconds: {DelayMillisecondsValue}", s); + return new HeaderResult(0, false); + }); + } + + private HeaderResult ReadHandledCount(Headers headers) + { + return ReadHeader(headers, HeaderNames.HANDLED_COUNT) + .Map(s => + { + if (string.IsNullOrEmpty(s)) + { + s_logger.LogDebug("No handled count found in message"); + return new HeaderResult(0, true); + } + + if (int.TryParse(s, out int handledCount)) + { + return new HeaderResult(handledCount, true); + } + + s_logger.LogDebug("Could not parse message handled count: {HandledCountValue}", s); + return new HeaderResult(0, false); + }); + } + private HeaderResult ReadReplyTo(Headers headers) { return ReadHeader(headers, HeaderNames.REPLY_TO) - .Map(s => - { - if (string.IsNullOrEmpty(s)) - { - s_logger.LogDebug("No reply to found in message"); - return new HeaderResult(string.Empty, true); - } - - return new HeaderResult(s, true); - }); + .Map(s => + { + if (string.IsNullOrEmpty(s)) + { + s_logger.LogDebug("No reply to found in message"); + return new HeaderResult(string.Empty, true); + } + + return new HeaderResult(s, true); + }); } private HeaderResult ReadTimeStamp(Headers headers) @@ -136,14 +221,16 @@ private HeaderResult ReadTimeStamp(Headers headers) if (headers.TryGetLastBytesIgnoreCase(HeaderNames.TIMESTAMP, out byte[] lastHeader)) { //Additional testing for a non unixtimestamp string - if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.CurrentInfo, DateTimeStyles.AdjustToUniversal, out DateTime timestamp)) + if (DateTime.TryParse(lastHeader.FromByteArray(), DateTimeFormatInfo.CurrentInfo, + DateTimeStyles.AdjustToUniversal, out DateTime timestamp)) { return new HeaderResult(timestamp, true); } try { - return new HeaderResult(DateTimeOffset.FromUnixTimeMilliseconds(BitConverter.ToInt64(lastHeader, 0)).DateTime, true); + return new HeaderResult( + DateTimeOffset.FromUnixTimeMilliseconds(BitConverter.ToInt64(lastHeader, 0)).DateTime, true); } catch (Exception) { @@ -183,7 +270,8 @@ private HeaderResult ReadMessageId(Headers headers) { if (string.IsNullOrEmpty(s)) { - s_logger.LogDebug("No message id found in message MessageId, new message id is {NewMessageId}", newMessageId); + s_logger.LogDebug("No message id found in message MessageId, new message id is {NewMessageId}", + newMessageId); return new HeaderResult(newMessageId, true); } @@ -212,12 +300,6 @@ private HeaderResult ReadPartitionKey(Headers headers) }); } - - private static object ParseHeaderValue(object value) - { - return value is byte[] bytes ? Encoding.UTF8.GetString(bytes) : value; - } - private HeaderResult ReadHeader(Headers headers, string key, bool dieOnMissing = false) { if (headers.TryGetLastBytesIgnoreCase(key, out byte[] lastHeader)) @@ -230,12 +312,26 @@ private HeaderResult ReadHeader(Headers headers, string key, bool dieOnM catch (Exception e) { var firstTwentyBytes = BitConverter.ToString(lastHeader.Take(20).ToArray()); - s_logger.LogWarning(e, "Failed to read the value of header {Topic} as UTF-8, first 20 byes follow: \n\t{1}", key, firstTwentyBytes); + s_logger.LogWarning(e, + "Failed to read the value of header {Topic} as UTF-8, first 20 byes follow: \n\t{1}", key, + firstTwentyBytes); return new HeaderResult(null, false); } } - + return new HeaderResult(string.Empty, !dieOnMissing); } + + /// + /// Override this in a derived class if you want to coerce specific user defined header values to the correct + /// type in the bag + /// + /// The Kafka message header + /// The Brighter message + protected virtual void ReadBagEntry(IHeader header, Message message) + { + if (!BrighterDefinedHeaders.HeadersToReset.Any(htr => htr.Equals(header.Key))) + message.Header.Bag.Add(header.Key, Encoding.UTF8.GetString(header.GetValueBytes())); + } } } diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagePublisher.cs index bbf1f1f610..6579a27ded 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagePublisher.cs @@ -1,4 +1,28 @@ -using System; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; using System.Threading.Tasks; using Confluent.Kafka; diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs index 53176c8b0a..963a30b871 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs @@ -1,4 +1,28 @@ -using System; +#region Licence +/* The MIT License (MIT) +Copyright © 2024 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs index 72295877b2..a5ba3bfd8e 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_brighterheader_to_kafkaheader.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Globalization; using System.Text; +using System.Text.Json; using Confluent.Kafka; using FluentAssertions; using Paramore.Brighter.MessagingGateway.Kafka; @@ -13,6 +15,7 @@ public class KafkaDefaultMessageHeaderBuilderTests [Fact] public void When_converting_brighterheader_to_kafkaheader() { + //arrange var message = new Message( new MessageHeader( messageId: Guid.NewGuid(), @@ -27,36 +30,43 @@ public void When_converting_brighterheader_to_kafkaheader() new MessageBody("test content") ); + message.Header.DelayedMilliseconds = 500; + message.Header.HandledCount = 2; + Dictionary bag = message.Header.Bag; bag.Add("myguid", Guid.NewGuid()); bag.Add("mystring", "string value"); bag.Add("myint", 7); bag.Add("mydouble", 3.56); - bag.Add("mybytearray", Encoding.UTF8.GetBytes("mybytes")); bag.Add("mydatetime", DateTime.UtcNow); - bag.Add("mydateonly", DateOnly.FromDateTime(DateTime.UtcNow)); + //act var builder = new KafkaDefaultMessageHeaderBuilder(); Headers headers = builder.Build(message); + //assert + //known properties headers.GetLastBytes(HeaderNames.MESSAGE_TYPE).Should().Equal(message.Header.MessageType.ToString().ToByteArray()); headers.GetLastBytes(HeaderNames.MESSAGE_ID).Should().Equal(message.Header.Id.ToString().ToByteArray()); headers.GetLastBytes(HeaderNames.TOPIC).Should().Equal(message.Header.Topic.ToByteArray()); headers.GetLastBytes(HeaderNames.TIMESTAMP).Should() - .Equal(BitConverter.GetBytes(new DateTimeOffset(message.Header.TimeStamp).ToUnixTimeMilliseconds())); + .Equal(new DateTimeOffset(message.Header.TimeStamp).ToUnixTimeMilliseconds().ToString().ToByteArray()); headers.GetLastBytes(HeaderNames.CORRELATION_ID).Should() .Equal(message.Header.CorrelationId.ToString().ToByteArray()); headers.GetLastBytes(HeaderNames.PARTITIONKEY).Should().Equal(message.Header.PartitionKey.ToByteArray()); headers.GetLastBytes(HeaderNames.CONTENT_TYPE).Should().Equal(message.Header.ContentType.ToByteArray()); headers.GetLastBytes(HeaderNames.REPLY_TO).Should().Equal(message.Header.ReplyTo.ToByteArray()); + headers.GetLastBytes(HeaderNames.DELAYED_MILLISECONDS).Should() + .Equal(message.Header.DelayedMilliseconds.ToString().ToByteArray()); + headers.GetLastBytes(HeaderNames.HANDLED_COUNT).Should() + .Equal(message.Header.HandledCount.ToString().ToByteArray()); //bag properties headers.GetLastBytes("myguid").Should().Equal(bag["myguid"].ToString().ToByteArray()); headers.GetLastBytes("mystring").Should().Equal(bag["mystring"].ToString().ToByteArray()); - headers.GetLastBytes("myint").Should().Equal(BitConverter.GetBytes((int)bag["myint"])); - headers.GetLastBytes("mydouble").Should().Equal(BitConverter.GetBytes((double)bag["mydouble"])); - headers.GetLastBytes("mybytearray").Should().Equal((byte[])bag["mybytearray"]); - headers.GetLastBytes("mydatetime").Should().Equal(((DateTime)bag["mydatetime"]).ToString().ToByteArray()); + headers.GetLastBytes("myint").Should().Equal(bag["myint"].ToString().ToByteArray()); + headers.GetLastBytes("mydouble").Should().Equal(bag["mydouble"].ToString().ToByteArray()); + headers.GetLastBytes("mydatetime").Should().Equal(((DateTime)bag["mydatetime"]).ToString(CultureInfo.InvariantCulture).ToByteArray()); } } diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs new file mode 100644 index 0000000000..15430c8a28 --- /dev/null +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs @@ -0,0 +1,76 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using Confluent.Kafka; +using FluentAssertions; +using Paramore.Brighter.MessagingGateway.Kafka; +using Xunit; + +namespace Paramore.Brighter.Kafka.Tests.MessagingGateway; + +public class KafkaHeaderToBrighterTests +{ + [Fact] + public void When_converting_kafkaheader_to_brighterheader() + { + //arrange + + var message = new Message( + new MessageHeader( + messageId: Guid.NewGuid(), + topic: "test", + messageType: MessageType.MT_COMMAND, + timeStamp: DateTime.UtcNow, + correlationId: Guid.NewGuid(), + replyTo: "test", + contentType: "application/octet", + partitionKey: "mykey" + ), + new MessageBody("test content") + ); + + message.Header.DelayedMilliseconds = 500; + message.Header.HandledCount = 2; + + Dictionary bag = message.Header.Bag; + bag.Add("myguid", Guid.NewGuid()); + bag.Add("mystring", "string value"); + bag.Add("myint", 7); + bag.Add("mydouble", 3.56); + bag.Add("mydatetime", DateTime.UtcNow); + + var builder = new KafkaDefaultMessageHeaderBuilder(); + Headers headers = builder.Build(message); + + var result = new ConsumeResult(); + result.Topic = "test"; + result.Message = new Message + { + Headers = headers, + Key = message.Id.ToString(), + Value = "test content"u8.ToArray() + }; + + //act + var readMessage = new KafkaMessageCreator().CreateMessage(result); + + //assert + readMessage.Id.Should().Be(message.Id); + readMessage.Header.MessageType.Should().Be(message.Header.MessageType); + readMessage.Header.Id.Should().Be(message.Header.Id); + readMessage.Header.CorrelationId.Should().Be(message.Header.CorrelationId); + readMessage.Header.ContentType.Should().Be(message.Header.ContentType); + readMessage.Header.Topic.Should().Be(message.Header.Topic); + readMessage.Header.DelayedMilliseconds.Should().Be(message.Header.DelayedMilliseconds); + readMessage.Header.HandledCount.Should().Be(message.Header.HandledCount); + //readMessage.Header.TimeStamp.ToString("u").Should().Be(message.Header.TimeStamp.ToString("u")); + + //NOTE: Because we can only coerce the byte[] to a string for a unknown bag key, coercing to a specific + //type has to be done by the user of the bag. + readMessage.Header.Bag["myguid"].Should().Be(bag["myguid"].ToString()); + readMessage.Header.Bag["mystring"].Should().Be(bag["mystring"].ToString()); + readMessage.Header.Bag["myint"].Should().Be(bag["myint"].ToString()); + readMessage.Header.Bag["mydouble"].Should().Be(bag["mydouble"].ToString()); + readMessage.Header.Bag["mydatetime"].Should().Be(((DateTime)bag["mydatetime"]).ToString(CultureInfo.InvariantCulture)); + } +} diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs index 9572d24943..2e146727a9 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs @@ -119,8 +119,8 @@ public void When_posting_a_message() receivedMessage.Header.PartitionKey.Should().Be(_partitionKey); receivedMessage.Body.Bytes.Should().Equal(message.Body.Bytes); receivedMessage.Body.Value.Should().Be(message.Body.Value); - receivedMessage.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ss.fffZ") - .Should().Be(message.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ss.fffZ")); + receivedMessage.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ssZ") + .Should().Be(message.Header.TimeStamp.ToString("yyyy-MM-ddTHH:mm:ssZ")); receivedCommand.Id.Should().Be(command.Id); receivedCommand.Value.Should().Be(command.Value); } diff --git a/tests/Paramore.Brighter.Kafka.Tests/README.md b/tests/Paramore.Brighter.Kafka.Tests/README.md new file mode 100644 index 0000000000..cd7ea22534 --- /dev/null +++ b/tests/Paramore.Brighter.Kafka.Tests/README.md @@ -0,0 +1,22 @@ +# Kafka Test Dependencies + +## Running the docker-based tests + +Tests which talk to a Docker hosted Kafka instance assume the following: + +* No security +* A Kafka instance running on `localhost:9092` +* A Zookeeper instance running on `localhost:2181` +* A schema registry running on `localhost:8081` + + +## Running the confluent-based tests + +Tests which talk to a Confluent hosted Kafka instance have an attribute trait of "Confluent". They assume the following: + +* You have set environment variables for the following: + * CONFLUENT_BOOSTRAP_SERVER - the Kafka instance to connect to + * CONFLUENT_SASL_USERNAME - the username to use for SASL + * CONFLUENT_SASL_PASSWORD - the password to use for SASL + + From b5bedd85af071031a59b23456a5dce298a3cd38f Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Sun, 11 Feb 2024 11:21:31 +1100 Subject: [PATCH 4/4] comment back in time test post change away from unix milliseconds --- .../When_converting_kafkaheader_to_brighterheader.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs index 15430c8a28..1a58f1e989 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_converting_kafkaheader_to_brighterheader.cs @@ -63,7 +63,7 @@ public void When_converting_kafkaheader_to_brighterheader() readMessage.Header.Topic.Should().Be(message.Header.Topic); readMessage.Header.DelayedMilliseconds.Should().Be(message.Header.DelayedMilliseconds); readMessage.Header.HandledCount.Should().Be(message.Header.HandledCount); - //readMessage.Header.TimeStamp.ToString("u").Should().Be(message.Header.TimeStamp.ToString("u")); + readMessage.Header.TimeStamp.ToString("u").Should().Be(message.Header.TimeStamp.ToString("u")); //NOTE: Because we can only coerce the byte[] to a string for a unknown bag key, coercing to a specific //type has to be done by the user of the bag.