Skip to content

Commit

Permalink
[ISSUE #9025] [RIP-73] Fix Pop Consumption reset offset (#9087)
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhimins authored Dec 30, 2024
1 parent 7722ce7 commit f32fe78
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2187,9 +2187,13 @@ private RemotingCommand resetOffsetInner(String topic, String group, int queueId
ResetOffsetBody body = new ResetOffsetBody();
String brokerName = brokerController.getBrokerConfig().getBrokerName();
for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) {
brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, group, entry.getKey());
if (brokerController.getPopInflightMessageCounter() != null) {
brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, group, entry.getKey());
}
if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
brokerController.getPopConsumerService().clearCache(group, topic, queueId);
brokerController.getPopConsumerService().clearCache(group, topic, entry.getKey());
brokerController.getConsumerOffsetManager().commitPullOffset(
"ResetOffsetInner", group, topic, entry.getKey(), entry.getValue());
}
body.getOffsetTable().put(new MessageQueue(topic, brokerName, entry.getKey()), entry.getValue());
}
Expand Down

0 comments on commit f32fe78

Please sign in to comment.