From 9aa081b8acfd01a40f50bd9c3face3c0d2c530b1 Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Tue, 10 Dec 2024 20:05:44 +0800 Subject: [PATCH] [ISSUE #8988] Support dispatchBehindMilliseconds (#8989) * support dispatchBehindMilliseconds * Modify the initial value of currentReputTimestamp --------- Co-authored-by: guyinyou --- .../rocketmq/store/DefaultMessageStore.java | 25 ++++++++++++++++++- .../apache/rocketmq/store/MessageStore.java | 7 ++++++ .../plugin/AbstractPluginMessageStore.java | 5 ++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 6b8ea0ee8ad..9d3c46a438a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1556,6 +1556,10 @@ public boolean checkInStoreByConsumeOffset(String topic, int queueId, long consu public long dispatchBehindBytes() { return this.reputMessageService.behind(); } + @Override + public long dispatchBehindMilliseconds() { + return this.reputMessageService.behindMs(); + } public long flushBehindBytes() { if (this.messageStoreConfig.isTransientStorePoolEnable()) { @@ -2793,6 +2797,7 @@ public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) { class ReputMessageService extends ServiceThread { protected volatile long reputFromOffset = 0; + protected volatile long currentReputTimestamp = System.currentTimeMillis(); public long getReputFromOffset() { return reputFromOffset; @@ -2802,6 +2807,10 @@ public void setReputFromOffset(long reputFromOffset) { this.reputFromOffset = reputFromOffset; } + public long getCurrentReputTimestamp() { + return currentReputTimestamp; + } + @Override public void shutdown() { for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { @@ -2824,6 +2833,15 @@ public long behind() { return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset; } + public long behindMs() { + long lastCommitLogFileTimeStamp = System.currentTimeMillis(); + MappedFile lastMappedFile = DefaultMessageStore.this.commitLog.getMappedFileQueue().getLastMappedFile(); + if (lastMappedFile != null) { + lastCommitLogFileTimeStamp = lastMappedFile.getStoreTimestamp(); + } + return Math.max(0, lastCommitLogFileTimeStamp - this.currentReputTimestamp); + } + public boolean isCommitLogAvailable() { return this.reputFromOffset < getReputEndOffset(); } @@ -2838,7 +2856,11 @@ public void doReput() { this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); } - for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { + boolean isCommitLogAvailable = isCommitLogAvailable(); + if (!isCommitLogAvailable) { + currentReputTimestamp = System.currentTimeMillis(); + } + for (boolean doNext = true; isCommitLogAvailable && doNext; ) { SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); @@ -2861,6 +2883,7 @@ public void doReput() { if (dispatchRequest.isSuccess()) { if (size > 0) { + currentReputTimestamp = dispatchRequest.getStoreTimestamp(); DefaultMessageStore.this.doDispatch(dispatchRequest); if (!notifyMessageArriveInBatch) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 5c3984e5b2c..4bbee142a17 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -511,6 +511,13 @@ CompletableFuture queryMessageAsync(final String topic, fina */ long dispatchBehindBytes(); + /** + * Get number of the milliseconds that have been stored in commit log and not yet dispatched to consume queue. + * + * @return number of the milliseconds to dispatch. + */ + long dispatchBehindMilliseconds(); + /** * Flush the message store to persist all data. * diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java index 0f57a17d463..d5d6236458e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java @@ -293,6 +293,11 @@ public long dispatchBehindBytes() { return next.dispatchBehindBytes(); } + @Override + public long dispatchBehindMilliseconds() { + return next.dispatchBehindMilliseconds(); + } + @Override public long flush() { return next.flush();