Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/Paramore.Brighter.DynamoDb/GUIDConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ public class GUIDConverter : IPropertyConverter
{
public DynamoDBEntry ToEntry(object value)
{
var uuid = value as Guid? ??
throw new InvalidOperationException(
$"Supplied type was of type {value.GetType().Name} not Guid");

var uuid = (Guid)value;
var json = uuid.ToString();

DynamoDBEntry entry = new Primitive
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#region Licence
/* The MIT License (MIT)
Copyright © 2024 Ian Cooper <ian_hammond_cooper@yahoo.co.uk>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the “Software”), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. */

#endregion

namespace Paramore.Brighter.MessagingGateway.Kafka;

public static class BrighterDefinedHeaders
{
public static readonly string[] HeadersToReset;

static BrighterDefinedHeaders()
{
HeadersToReset = new[] {
HeaderNames.MESSAGE_ID,
HeaderNames.MESSAGE_TYPE,
HeaderNames.TOPIC,
HeaderNames.CORRELATION_ID,
HeaderNames.TIMESTAMP,
HeaderNames.PARTITIONKEY,
HeaderNames.CONTENT_TYPE,
HeaderNames.REPLY_TO,
HeaderNames.DELAYED_MILLISECONDS,
HeaderNames.HANDLED_COUNT
};
}

}
16 changes: 16 additions & 0 deletions src/Paramore.Brighter.MessagingGateway.Kafka/HeaderNames.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ namespace Paramore.Brighter.MessagingGateway.Kafka
{
public class HeaderNames
{
/// <summary>
/// A dictionary of user defined values
/// </summary>
public static string BAG { get; } = "Bag";

/// <summary>
/// What is the content type of the message§
/// </summary>
Expand All @@ -37,6 +42,17 @@ public class HeaderNames
/// </summary>
public const string CORRELATION_ID = "CorrelationId";

/// <summary>
/// If the message was deferred, how long for?
/// </summary>
public static string DELAYED_MILLISECONDS { get; } = "x-delay";

/// <summary>
/// How many times has the message been retried with a delay
/// </summary>
public static string HANDLED_COUNT { get; } = "HandledCount" ;


/// <summary>
/// The message type
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
using Confluent.Kafka;
#region Licence
/* The MIT License (MIT)
Copyright © 2024 Ian Cooper <ian_hammond_cooper@yahoo.co.uk>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the “Software”), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. */

#endregion

using Confluent.Kafka;

namespace Paramore.Brighter.MessagingGateway.Kafka
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,41 @@
using System;
#region Licence
/* The MIT License (MIT)
Copyright © 2024 Ian Cooper <ian_hammond_cooper@yahoo.co.uk>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the “Software”), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. */

#endregion

using System;
using System.Globalization;
using System.Linq;
using Confluent.Kafka;
using Paramore.Brighter.Extensions;

namespace Paramore.Brighter.MessagingGateway.Kafka
{
internal class KafkaDefaultMessageHeaderBuilder : IKafkaMessageHeaderBuilder
/// <summary>
/// This class serializes Brighter headers to Kafka. Kafka uses a byte[] for its header values. We convert all
/// header values into a string, and then get a UTF8 encoded set of bytes for that string.
/// </summary>
public class KafkaDefaultMessageHeaderBuilder : IKafkaMessageHeaderBuilder
{
private static readonly string[] s_headersToReset;

static KafkaDefaultMessageHeaderBuilder()
{
s_headersToReset = new[] {
HeaderNames.MESSAGE_TYPE,
HeaderNames.TOPIC,
HeaderNames.CORRELATION_ID,
HeaderNames.TIMESTAMP
};
}

public static KafkaDefaultMessageHeaderBuilder Instance => new KafkaDefaultMessageHeaderBuilder();

public Headers Build(Message message)
Expand All @@ -31,9 +48,9 @@ public Headers Build(Message message)
};

if (message.Header.TimeStamp != default)
headers.Add(HeaderNames.TIMESTAMP, BitConverter.GetBytes(new DateTimeOffset(message.Header.TimeStamp).ToUnixTimeMilliseconds()));
headers.Add(HeaderNames.TIMESTAMP, new DateTimeOffset(message.Header.TimeStamp).ToString().ToByteArray());
else
headers.Add(HeaderNames.TIMESTAMP, BitConverter.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()));
headers.Add(HeaderNames.TIMESTAMP, DateTimeOffset.UtcNow.ToString().ToByteArray());

if (message.Header.CorrelationId != Guid.Empty)
headers.Add(HeaderNames.CORRELATION_ID, message.Header.CorrelationId.ToString().ToByteArray());
Expand All @@ -47,20 +64,39 @@ public Headers Build(Message message)
if (!string.IsNullOrEmpty(message.Header.ReplyTo))
headers.Add(HeaderNames.REPLY_TO, message.Header.ReplyTo.ToByteArray());

headers.Add(HeaderNames.DELAYED_MILLISECONDS, message.Header.DelayedMilliseconds.ToString().ToByteArray());

headers.Add(HeaderNames.HANDLED_COUNT, message.Header.HandledCount.ToString().ToByteArray());

message.Header.Bag.Each((header) =>
{
if (!s_headersToReset.Any(htr => htr.Equals(header.Key)))
if (!BrighterDefinedHeaders.HeadersToReset.Any(htr => htr.Equals(header.Key)))
{
switch (header.Value)
{
case string stringValue:
headers.Add(header.Key, stringValue.ToByteArray());
break;
case DateTime dateTimeValue:
headers.Add(header.Key, dateTimeValue.ToString(CultureInfo.InvariantCulture).ToByteArray());
break;
case Guid guidValue:
headers.Add(header.Key, guidValue.ToString().ToByteArray());
break;
case bool boolValue:
headers.Add(header.Key, boolValue.ToString().ToByteArray());
break;
case int intValue:
headers.Add(header.Key, BitConverter.GetBytes(intValue));
headers.Add(header.Key, intValue.ToString().ToByteArray());
break;
case double doubleValue:
headers.Add(header.Key, doubleValue.ToString(CultureInfo.InvariantCulture).ToByteArray());
break;
case float floatValue:
headers.Add(header.Key, floatValue.ToString(CultureInfo.InvariantCulture).ToByteArray());
break;
case Guid guidValue:
headers.Add(header.Key, guidValue.ToByteArray());
case long longValue:
headers.Add(header.Key, longValue.ToString().ToByteArray());
break;
case byte[] byteArray:
headers.Add(header.Key, byteArray);
Expand All @@ -71,7 +107,7 @@ public Headers Build(Message message)
}
}
});

return headers;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,29 @@
using System;
using System.Collections.Generic;
#region Licence
/* The MIT License (MIT)
Copyright © 2024 Ian Cooper <ian_hammond_cooper@yahoo.co.uk>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the “Software”), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. */

#endregion

using System;
using System.Linq;
using System.Text;
using Confluent.Kafka;

namespace Paramore.Brighter.MessagingGateway.Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. */
#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand Down
Loading