Skip to content

Commit

Permalink
fix: invoke IKafkaOffsetCommittedCallback also when auto commit is di…
Browse files Browse the repository at this point in the history
…sabled

#167
  • Loading branch information
BEagle1984 committed Jul 1, 2022
1 parent ba1ded0 commit 44aca09
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix></BaseVersionSuffix>
<BaseVersion>3.7.1$(BaseVersionSuffix)</BaseVersion>
<BaseVersion>3.7.2$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
</PropertyGroup>
Expand Down
6 changes: 6 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ uid: releases

# Releases

## [3.7.2](https://github.com/BEagle1984/silverback/releases/tag/v3.7.2)

### Fixes

* Correctly invoke the [IKafkaOffsetCommittedCallback](xref:Silverback.Messaging.Broker.Callbacks.IKafkaOffsetCommittedCallback) when auto commit is disabled [[#167](https://github.com/BEagle1984/silverback/issues/167)]

## [3.7.1](https://github.com/BEagle1984/silverback/releases/tag/v3.7.1)

### Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ public void Subscribe(IEnumerable<string> topics)

if (string.IsNullOrEmpty(GroupId))
{
throw new ArgumentException(
"'group.id' configuration parameter is required and was not specified.");
throw new ArgumentException("'group.id' configuration parameter is required and was not specified.");
}

var topicsList =
Expand Down Expand Up @@ -214,40 +213,7 @@ public void StoreOffset(TopicPartitionOffset offset)
partitionOffsetDictionary[offset.Partition] = offset.Offset;
}

public List<TopicPartitionOffset> Commit()
{
EnsureNotDisposed();

var topicPartitionOffsets = _storedOffsets.SelectMany(
topicPair => topicPair.Value.Select(
partitionPair => new TopicPartitionOffset(
topicPair.Key,
partitionPair.Key,
partitionPair.Value))).ToList();

var topicPartitionOffsetsByTopic =
topicPartitionOffsets.GroupBy(topicPartitionOffset => topicPartitionOffset.Topic);

var actualCommittedOffsets = new List<TopicPartitionOffset>();
foreach (var group in topicPartitionOffsetsByTopic)
{
actualCommittedOffsets.AddRange(_topics.Get(group.Key, _config).Commit(GroupId, group));
}

if (actualCommittedOffsets.Count > 0)
{
OffsetsCommittedHandler?.Invoke(
this,
new CommittedOffsets(
actualCommittedOffsets
.Select(
topicPartitionOffset =>
new TopicPartitionOffsetError(topicPartitionOffset, null)).ToList(),
null));
}

return actualCommittedOffsets;
}
public List<TopicPartitionOffset> Commit() => CommitCore(false);

public void Commit(IEnumerable<TopicPartitionOffset> offsets) => throw new NotSupportedException();

Expand Down Expand Up @@ -333,8 +299,7 @@ internal void OnPartitionsRevoked(string topicName)
internal void OnPartitionsAssigned(string topicName, IReadOnlyCollection<Partition> partitions)
{
_temporaryAssignment.RemoveAll(topicPartitionOffset => topicPartitionOffset.Topic == topicName);
_temporaryAssignment.AddRange(
partitions.Select(partition => new TopicPartitionOffset(topicName, partition, Offset.Unset)));
_temporaryAssignment.AddRange(partitions.Select(partition => new TopicPartitionOffset(topicName, partition, Offset.Unset)));

if (!_topicAssignments.Contains(topicName))
_topicAssignments.Add(topicName);
Expand All @@ -354,8 +319,7 @@ internal void OnPartitionsAssigned(string topicName, IReadOnlyCollection<Partiti
PartitionsAssigned = true;
}

private List<TopicPartitionOffset>? InvokePartitionsAssignedHandler(
IEnumerable<TopicPartitionOffset> partitionOffsets) =>
private List<TopicPartitionOffset>? InvokePartitionsAssignedHandler(IEnumerable<TopicPartitionOffset> partitionOffsets) =>
PartitionsAssignedHandler?.Invoke(
this,
partitionOffsets.Select(partitionOffset => partitionOffset.TopicPartition).ToList())
Expand Down Expand Up @@ -482,7 +446,7 @@ private async Task AutoCommitAsync()
{
try
{
Commit();
CommitCore(true);
}
catch (Exception)
{
Expand All @@ -493,6 +457,41 @@ private async Task AutoCommitAsync()
}
}

private List<TopicPartitionOffset> CommitCore(bool isAutoCommit)
{
EnsureNotDisposed();

var topicPartitionOffsets = _storedOffsets.SelectMany(
topicPair => topicPair.Value.Select(
partitionPair => new TopicPartitionOffset(
topicPair.Key,
partitionPair.Key,
partitionPair.Value))).ToList();

var topicPartitionOffsetsByTopic =
topicPartitionOffsets.GroupBy(topicPartitionOffset => topicPartitionOffset.Topic);

var actualCommittedOffsets = new List<TopicPartitionOffset>();
foreach (var group in topicPartitionOffsetsByTopic)
{
actualCommittedOffsets.AddRange(_topics.Get(group.Key, _config).Commit(GroupId, group));
}

if (isAutoCommit && actualCommittedOffsets.Count > 0)
{
OffsetsCommittedHandler?.Invoke(
this,
new CommittedOffsets(
actualCommittedOffsets
.Select(
topicPartitionOffset =>
new TopicPartitionOffsetError(topicPartitionOffset, null)).ToList(),
null));
}

return actualCommittedOffsets;
}

private bool GetEofMessageIfNeeded(
TopicPartitionOffset topicPartitionOffset,
out ConsumeResult<byte[]?, byte[]?>? result)
Expand All @@ -504,7 +503,7 @@ private bool GetEofMessageIfNeeded(
}

if (!_lastPartitionEof[topicPartitionOffset.Topic]
.ContainsKey(topicPartitionOffset.Partition))
.ContainsKey(topicPartitionOffset.Partition))
{
_lastPartitionEof[topicPartitionOffset.Topic][topicPartitionOffset.Partition] = -1;
}
Expand Down
17 changes: 6 additions & 11 deletions src/Silverback.Integration.Kafka/Messaging/Broker/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ internal void OnPartitionsRevoked()
if (!Endpoint.Configuration.IsAutoCommitEnabled)
CommitOffsets();

AsyncHelper.RunSynchronously(
() => SequenceStores.DisposeAllAsync(SequenceAbortReason.ConsumerAborted));
AsyncHelper.RunSynchronously(() => SequenceStores.DisposeAllAsync(SequenceAbortReason.ConsumerAborted));
SequenceStores.Clear();

// The ConsumeLoopHandler needs to be immediately restarted because the partitions will be
Expand Down Expand Up @@ -302,17 +301,14 @@ protected override Task RollbackCoreAsync(IReadOnlyCollection<KafkaOffset> broke

if (IsConsuming)
{
_confluentConsumer?.Pause(
latestTopicPartitionOffsets.Select(
topicPartitionOffset => topicPartitionOffset.TopicPartition));
_confluentConsumer?.Pause(latestTopicPartitionOffsets.Select(topicPartitionOffset => topicPartitionOffset.TopicPartition));
}

var channelsManagerStoppingTasks = new List<Task?>(latestTopicPartitionOffsets.Count);

foreach (var topicPartitionOffset in latestTopicPartitionOffsets)
{
channelsManagerStoppingTasks.Add(
_channelsManager?.StopReadingAsync(topicPartitionOffset.TopicPartition));
channelsManagerStoppingTasks.Add(_channelsManager?.StopReadingAsync(topicPartitionOffset.TopicPartition));
ConfluentConsumer.Seek(topicPartitionOffset);
_logger.LogPartitionOffsetReset(topicPartitionOffset, this);
}
Expand Down Expand Up @@ -387,8 +383,7 @@ private void InitConfluentConsumer()
{
_confluentConsumer.Assign(Endpoint.TopicPartitions);

Endpoint.TopicPartitions.ForEach(
topicPartitionOffset => _logger.LogPartitionManuallyAssigned(topicPartitionOffset, this));
Endpoint.TopicPartitions.ForEach(topicPartitionOffset => _logger.LogPartitionManuallyAssigned(topicPartitionOffset, this));

SetReadyStatus();
}
Expand Down Expand Up @@ -550,10 +545,10 @@ private void CommitOffsetsIfNeeded()
if (++_messagesSinceCommit < Endpoint.Configuration.CommitOffsetEach)
return;

CommitOffsets();

_messagesSinceCommit = 0;
}

ConfluentConsumer.Commit();
}

private void CommitOffsets()
Expand Down

0 comments on commit 44aca09

Please sign in to comment.