Skip to content

Commit

Permalink
[ISSUE #8988] Support dispatchBehindMilliseconds (#8989)
Browse files Browse the repository at this point in the history
* support dispatchBehindMilliseconds

* Modify the initial value of currentReputTimestamp

---------

Co-authored-by: guyinyou <[email protected]>
  • Loading branch information
guyinyou and guyinyou authored Dec 10, 2024
1 parent 564e55e commit 9aa081b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,10 @@ public boolean checkInStoreByConsumeOffset(String topic, int queueId, long consu
public long dispatchBehindBytes() {
return this.reputMessageService.behind();
}
@Override
public long dispatchBehindMilliseconds() {
return this.reputMessageService.behindMs();
}

public long flushBehindBytes() {
if (this.messageStoreConfig.isTransientStorePoolEnable()) {
Expand Down Expand Up @@ -2793,6 +2797,7 @@ public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
class ReputMessageService extends ServiceThread {

protected volatile long reputFromOffset = 0;
protected volatile long currentReputTimestamp = System.currentTimeMillis();

public long getReputFromOffset() {
return reputFromOffset;
Expand All @@ -2802,6 +2807,10 @@ public void setReputFromOffset(long reputFromOffset) {
this.reputFromOffset = reputFromOffset;
}

public long getCurrentReputTimestamp() {
return currentReputTimestamp;
}

@Override
public void shutdown() {
for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
Expand All @@ -2824,6 +2833,15 @@ public long behind() {
return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset;
}

public long behindMs() {
long lastCommitLogFileTimeStamp = System.currentTimeMillis();
MappedFile lastMappedFile = DefaultMessageStore.this.commitLog.getMappedFileQueue().getLastMappedFile();
if (lastMappedFile != null) {
lastCommitLogFileTimeStamp = lastMappedFile.getStoreTimestamp();
}
return Math.max(0, lastCommitLogFileTimeStamp - this.currentReputTimestamp);
}

public boolean isCommitLogAvailable() {
return this.reputFromOffset < getReputEndOffset();
}
Expand All @@ -2838,7 +2856,11 @@ public void doReput() {
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
boolean isCommitLogAvailable = isCommitLogAvailable();
if (!isCommitLogAvailable) {
currentReputTimestamp = System.currentTimeMillis();
}
for (boolean doNext = true; isCommitLogAvailable && doNext; ) {

SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);

Expand All @@ -2861,6 +2883,7 @@ public void doReput() {

if (dispatchRequest.isSuccess()) {
if (size > 0) {
currentReputTimestamp = dispatchRequest.getStoreTimestamp();
DefaultMessageStore.this.doDispatch(dispatchRequest);

if (!notifyMessageArriveInBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,13 @@ CompletableFuture<QueryMessageResult> queryMessageAsync(final String topic, fina
*/
long dispatchBehindBytes();

/**
* Get number of the milliseconds that have been stored in commit log and not yet dispatched to consume queue.
*
* @return number of the milliseconds to dispatch.
*/
long dispatchBehindMilliseconds();

/**
* Flush the message store to persist all data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ public long dispatchBehindBytes() {
return next.dispatchBehindBytes();
}

@Override
public long dispatchBehindMilliseconds() {
return next.dispatchBehindMilliseconds();
}

@Override
public long flush() {
return next.flush();
Expand Down

0 comments on commit 9aa081b

Please sign in to comment.