From 90fddf7a9003f21d007324fbcf983a41f9934c94 Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Sat, 20 Apr 2024 11:19:22 +0200 Subject: [PATCH 1/2] Fix some issues with schema creation in the Sqlite example --- .../Migrations/20220527_InitialCreate.cs | 9 ++- .../KafkaMessageConsumer.cs | 74 ++++++++++++++----- 2 files changed, 63 insertions(+), 20 deletions(-) diff --git a/samples/WebAPI_Dapper/Greetings_Migrations/Migrations/20220527_InitialCreate.cs b/samples/WebAPI_Dapper/Greetings_Migrations/Migrations/20220527_InitialCreate.cs index 2957c97da3..8a032c2c14 100644 --- a/samples/WebAPI_Dapper/Greetings_Migrations/Migrations/20220527_InitialCreate.cs +++ b/samples/WebAPI_Dapper/Greetings_Migrations/Migrations/20220527_InitialCreate.cs @@ -26,9 +26,12 @@ public override void Up() .WithColumn("Message").AsString() .WithColumn("Recipient_Id").AsInt32(); - Create.ForeignKey() - .FromTable("Greeting").ForeignColumn("Recipient_Id") - .ToTable("Person").PrimaryColumn("Id"); + if (_configuration.DbType != "Sqlite") + { + Create.ForeignKey() + .FromTable("Greeting").ForeignColumn("Recipient_Id") + .ToTable("Person").PrimaryColumn("Id"); + } } public override void Down() diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs index eda8923066..8763dd1142 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs @@ -30,7 +30,6 @@ THE SOFTWARE. */ using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Logging; -using Paramore.Brighter.Logging; namespace Paramore.Brighter.MessagingGateway.Kafka { @@ -178,10 +177,12 @@ public KafkaMessageConsumer( }) .SetPartitionsRevokedHandler((consumer, list) => { - consumer.Commit(list); - var revokedPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList(); + //We should commit any offsets we have stored for these partitions + CommitOffsetsFor(list); - s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitions)); + var revokedPartitionInfo = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList(); + + s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitionInfo)); _partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList(); }) @@ -237,7 +238,6 @@ public void Acknowledge(Message message) s_logger.LogInformation("Storing offset {Offset} to topic {Topic} for partition {ChannelName}", new Offset(topicPartitionOffset.Offset + 1).Value, topicPartitionOffset.TopicPartition.Topic, topicPartitionOffset.TopicPartition.Partition.Value); - _consumer.StoreOffset(offset); _offsetStorage.Add(offset); if (_offsetStorage.Count % _maxBatchSize == 0) @@ -294,14 +294,14 @@ public Message[] Receive(int timeoutInMilliseconds) { CheckHasPartitions(); - s_logger.LogDebug("No messages available from Kafka stream"); - return new Message[] {new Message()}; + s_logger.LogDebug($"No messages available from Kafka stream"); + return new[] {new Message()}; } if (consumeResult.IsPartitionEOF) { s_logger.LogDebug("Consumer {ConsumerMemberId} has reached the end of the partition", _consumer.MemberId); - return new Message[] {new Message()}; + return new[] {new Message()}; } s_logger.LogDebug("Usable message retrieved from Kafka stream: {Request}", consumeResult.Message.Value); @@ -358,15 +358,10 @@ public bool Requeue(Message message, int delayMilliseconds) return false; } - private bool CheckHasPartitions() + private void CheckHasPartitions() { if (_partitions.Count <= 0) - { s_logger.LogDebug("Consumer is not allocated any partitions"); - return false; - } - - return true; } @@ -447,13 +442,58 @@ private void CommitOffsets() _flushToken.Release(1); } } + + //Called during a revoke, we are passed the partitions that we are revoking and their last offset and we need to + //commit anything we have not stored. + private void CommitOffsetsFor(List partitionsToCommit) + { + try + { + //find the provided set of partitions amongst our stored offsets + var offsets = _offsetStorage.ToArray(); + var revokedOffsetsToCommit = + offsets.Where(tpo => + partitionsToCommit.Any(ptc => + ptc.TopicPartition == tpo.TopicPartition + && ptc.Offset != Offset.Unset + && ptc.Offset.Value < tpo.Offset.Value + ) + ) + .ToList(); + //determine if we have offsets still to commit + if (revokedOffsetsToCommit.Any()) + { + //commit them + LogOffSetCommitRevokedPartitions(revokedOffsetsToCommit); + _consumer.Commit(revokedOffsetsToCommit); + } + } + catch (KafkaException error) + { + s_logger.LogError( + "Error Committing Offsets During Partition Revoke: {Message} Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}", + error.Message, error.Error.Code, error.Error.Reason, error.Error.IsFatal + ); + } + } + + [Conditional("DEBUG")] + [DebuggerStepThrough] + private void LogOffSetCommitRevokedPartitions(List revokedOffsetsToCommit) + { + s_logger.LogDebug("Saving revoked partition offsets: {OffSetCount}", revokedOffsetsToCommit.Count); + foreach (var offset in revokedOffsetsToCommit) + { + s_logger.LogDebug("Saving revoked partition offset: {Offset} on partition: {Partition} for topic: {Topic}", + offset.Offset.Value.ToString(), offset.Partition.Value.ToString(), offset.Topic); + } + } + //Just flush everything private void CommitAllOffsets(DateTime flushTime) { try { - - var listOffsets = new List(); var currentOffsetsInBag = _offsetStorage.Count; for (int i = 0; i < currentOffsetsInBag; i++) @@ -491,7 +531,7 @@ private void FlushOffsets() { //This is expensive, so use a background thread Task.Factory.StartNew( - action: state => CommitOffsets(), + action: _ => CommitOffsets(), state: now, cancellationToken: CancellationToken.None, creationOptions: TaskCreationOptions.DenyChildAttach, From 225de326c1c41ab9e5c2679e6bfa1a8e0d55557e Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Sat, 20 Apr 2024 20:14:02 +0100 Subject: [PATCH 2/2] Ensure we don't commit work that is already committed on a revoke --- .../KafkaMessageConsumer.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs index 8763dd1142..b077494c5a 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs @@ -445,18 +445,18 @@ private void CommitOffsets() //Called during a revoke, we are passed the partitions that we are revoking and their last offset and we need to //commit anything we have not stored. - private void CommitOffsetsFor(List partitionsToCommit) + private void CommitOffsetsFor(List revokedPartitions) { try { //find the provided set of partitions amongst our stored offsets - var offsets = _offsetStorage.ToArray(); + var partitionOffsets = _offsetStorage.ToArray(); var revokedOffsetsToCommit = - offsets.Where(tpo => - partitionsToCommit.Any(ptc => + partitionOffsets.Where(tpo => + revokedPartitions.Any(ptc => ptc.TopicPartition == tpo.TopicPartition - && ptc.Offset != Offset.Unset - && ptc.Offset.Value < tpo.Offset.Value + && ptc.Offset.Value != Offset.Unset.Value + && tpo.Offset.Value > ptc.Offset.Value ) ) .ToList();