Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ public override void Up()
.WithColumn("Message").AsString()
.WithColumn("Recipient_Id").AsInt32();

Create.ForeignKey()
.FromTable("Greeting").ForeignColumn("Recipient_Id")
.ToTable("Person").PrimaryColumn("Id");
if (_configuration.DbType != "Sqlite")
{
Create.ForeignKey()
.FromTable("Greeting").ForeignColumn("Recipient_Id")
.ToTable("Person").PrimaryColumn("Id");
}
}

public override void Down()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ THE SOFTWARE. */
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;

namespace Paramore.Brighter.MessagingGateway.Kafka
{
Expand Down Expand Up @@ -178,21 +177,12 @@ public KafkaMessageConsumer(
})
.SetPartitionsRevokedHandler((consumer, list) =>
{
try
{
_consumer?.Commit(list);
}
catch (KafkaException error)
{
s_logger.LogError(
"Error Committing Offsets During Partition Revoke: {Message} Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}",
error.Message, error.Error.Code, error.Error.Reason, error.Error.IsFatal
);
}
//We should commit any offsets we have stored for these partitions
CommitOffsetsFor(list);

var revokedPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
var revokedPartitionInfo = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();

s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitions));
s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitionInfo));

_partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList();
})
Expand Down Expand Up @@ -304,14 +294,14 @@ public Message[] Receive(int timeoutInMilliseconds)
{
CheckHasPartitions();

s_logger.LogDebug("No messages available from Kafka stream");
return new Message[] {new Message()};
s_logger.LogDebug($"No messages available from Kafka stream");
return new[] {new Message()};
}

if (consumeResult.IsPartitionEOF)
{
s_logger.LogDebug("Consumer {ConsumerMemberId} has reached the end of the partition", _consumer.MemberId);
return new Message[] {new Message()};
return new[] {new Message()};
}

s_logger.LogDebug("Usable message retrieved from Kafka stream: {Request}", consumeResult.Message.Value);
Expand Down Expand Up @@ -368,15 +358,10 @@ public bool Requeue(Message message, int delayMilliseconds)
return false;
}

private bool CheckHasPartitions()
private void CheckHasPartitions()
{
if (_partitions.Count <= 0)
{
s_logger.LogDebug("Consumer is not allocated any partitions");
return false;
}

return true;
}


Expand Down Expand Up @@ -457,13 +442,58 @@ private void CommitOffsets()
_flushToken.Release(1);
}
}

//Called during a revoke, we are passed the partitions that we are revoking and their last offset and we need to
//commit anything we have not stored.
private void CommitOffsetsFor(List<TopicPartitionOffset> revokedPartitions)
{
try
{
//find the provided set of partitions amongst our stored offsets
var partitionOffsets = _offsetStorage.ToArray();
var revokedOffsetsToCommit =
partitionOffsets.Where(tpo =>
revokedPartitions.Any(ptc =>
ptc.TopicPartition == tpo.TopicPartition
&& ptc.Offset.Value != Offset.Unset.Value
&& tpo.Offset.Value > ptc.Offset.Value
)
)
.ToList();
//determine if we have offsets still to commit
if (revokedOffsetsToCommit.Any())
{
//commit them
LogOffSetCommitRevokedPartitions(revokedOffsetsToCommit);
_consumer.Commit(revokedOffsetsToCommit);
}
}
catch (KafkaException error)
{
s_logger.LogError(
"Error Committing Offsets During Partition Revoke: {Message} Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}",
error.Message, error.Error.Code, error.Error.Reason, error.Error.IsFatal
);
}
}

[Conditional("DEBUG")]
[DebuggerStepThrough]
private void LogOffSetCommitRevokedPartitions(List<TopicPartitionOffset> revokedOffsetsToCommit)
{
s_logger.LogDebug("Saving revoked partition offsets: {OffSetCount}", revokedOffsetsToCommit.Count);
foreach (var offset in revokedOffsetsToCommit)
{
s_logger.LogDebug("Saving revoked partition offset: {Offset} on partition: {Partition} for topic: {Topic}",
offset.Offset.Value.ToString(), offset.Partition.Value.ToString(), offset.Topic);
}
}

//Just flush everything
private void CommitAllOffsets(DateTime flushTime)
{
try
{


var listOffsets = new List<TopicPartitionOffset>();
var currentOffsetsInBag = _offsetStorage.Count;
for (int i = 0; i < currentOffsetsInBag; i++)
Expand Down Expand Up @@ -501,7 +531,7 @@ private void FlushOffsets()
{
//This is expensive, so use a background thread
Task.Factory.StartNew(
action: state => CommitOffsets(),
action: _ => CommitOffsets(),
state: now,
cancellationToken: CancellationToken.None,
creationOptions: TaskCreationOptions.DenyChildAttach,
Expand Down