Skip to content

Commit

Permalink
[ISSUE #7585] Support message filtering in rocketmq tiered storage
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhimins committed Nov 29, 2023
1 parent 56e886b commit 956fdd8
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 249 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,10 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String group, String
// so there is no need to update the maximum offset to the local cq offset here,
// otherwise it will cause repeated consumption after next begin offset over commit offset.

logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}",
group, topic, queueId, offset, maxMsgNums, result);
if (storeConfig.isRecordGetMessageResult()) {
logger.info("GetMessageAsync result, {}, group: {}, topic: {}, queueId: {}, offset: {}, count:{}",
result, group, topic, queueId, offset, maxMsgNums);
}

return result;
}).exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.tieredstore.common;

import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;

public class GetMessageResultExt extends GetMessageResult {

private final List<Long> tagCodeList;

public GetMessageResultExt() {
this.tagCodeList = new ArrayList<>();
}

public void addMessageExt(SelectMappedBufferResult bufferResult, long queueOffset, long tagCode) {
super.addMessage(bufferResult, queueOffset);
this.tagCodeList.add(tagCode);
}

public List<Long> getTagCodeList() {
return tagCodeList;
}

public GetMessageResult doFilterMessage(MessageFilter messageFilter) {
if (GetMessageStatus.FOUND != super.getStatus() || messageFilter == null) {
return this;
}

GetMessageResult result = new GetMessageResult();
result.setStatus(GetMessageStatus.FOUND);
result.setMinOffset(this.getMinOffset());
result.setMaxOffset(this.getMaxOffset());
result.setNextBeginOffset(this.getNextBeginOffset());

for (int i = 0; i < this.getMessageMapedList().size(); i++) {
if (!messageFilter.isMatchedByConsumeQueue(this.tagCodeList.get(i), null)) {
continue;
}

SelectMappedBufferResult bufferResult = this.getMessageMapedList().get(i);
if (!messageFilter.isMatchedByCommitLog(bufferResult.getByteBuffer().slice(), null)) {
continue;
}

result.addMessage(new SelectMappedBufferResult(bufferResult.getStartOffset(),
bufferResult.getByteBuffer(), bufferResult.getSize(), null),
MessageBufferUtil.getQueueOffset(bufferResult.getByteBuffer()));
}

if (result.getBufferTotalSize() == 0) {
result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE);
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.tieredstore.common;

import java.nio.ByteBuffer;

public class SelectBufferResult {

private final ByteBuffer byteBuffer;
private final long startOffset;
private final int size;
private final long tagCode;

public SelectBufferResult(ByteBuffer byteBuffer, long startOffset, int size, long tagCode) {
this.startOffset = startOffset;
this.byteBuffer = byteBuffer;
this.size = size;
this.tagCode = tagCode;
}

public ByteBuffer getByteBuffer() {
return byteBuffer;
}

public long getStartOffset() {
return startOffset;
}

public int getSize() {
return size;
}

public long getTagCode() {
return tagCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,21 @@
*/
package org.apache.rocketmq.tieredstore.common;

import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.store.SelectMappedBufferResult;

public class SelectMappedBufferResultWrapper {
public class SelectBufferResultWrapper {

private final SelectMappedBufferResult result;
private final LongAdder accessCount;

private final long curOffset;
private final long minOffset;
private final long maxOffset;
private final long size;

public SelectMappedBufferResultWrapper(
SelectMappedBufferResult result, long curOffset, long minOffset, long maxOffset, long size) {
private final long offset;
private final long tagCode;
private final AtomicInteger accessCount;

public SelectBufferResultWrapper(SelectMappedBufferResult result, long offset, long tagCode, boolean used) {
this.result = result;
this.accessCount = new LongAdder();
this.curOffset = curOffset;
this.minOffset = minOffset;
this.maxOffset = maxOffset;
this.size = size;
}

public SelectMappedBufferResult getResult() {
return result;
this.offset = offset;
this.tagCode = tagCode;
this.accessCount = new AtomicInteger(used ? 1 : 0);
}

public SelectMappedBufferResult getDuplicateResult() {
Expand All @@ -53,27 +42,23 @@ public SelectMappedBufferResult getDuplicateResult() {
result.getMappedFile());
}

public long getCurOffset() {
return curOffset;
}

public long getMinOffset() {
return minOffset;
public long getOffset() {
return offset;
}

public long getMaxOffset() {
return maxOffset;
public int getBufferSize() {
return this.result.getSize();
}

public long getSize() {
return size;
public long getTagCode() {
return tagCode;
}

public void addAccessCount() {
accessCount.increment();
public int incrementAndGet() {
return accessCount.incrementAndGet();
}

public long getAccessCount() {
return accessCount.sum();
public int getAccessCount() {
return accessCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public boolean check(TieredStorageLevel targetLevel) {

private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
private boolean messageIndexEnable = true;
private boolean recordGetMessageResult = false;

// CommitLog file size, default is 1G
private long tieredStoreCommitLogMaxSize = 1024 * 1024 * 1024;
Expand Down Expand Up @@ -182,6 +183,14 @@ public void setMessageIndexEnable(boolean messageIndexEnable) {
this.messageIndexEnable = messageIndexEnable;
}

public boolean isRecordGetMessageResult() {
return recordGetMessageResult;
}

public void setRecordGetMessageResult(boolean recordGetMessageResult) {
this.recordGetMessageResult = recordGetMessageResult;
}

public long getTieredStoreCommitLogMaxSize() {
return tieredStoreCommitLogMaxSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.rocketmq.tieredstore.TieredMessageFetcher;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
Expand Down Expand Up @@ -265,7 +265,7 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
.setUnit("bytes")
.ofLongs()
.buildWithCallback(measurement -> {
Optional<Policy.Eviction<MessageCacheKey, SelectMappedBufferResultWrapper>> eviction = fetcher.getMessageCache().policy().eviction();
Optional<Policy.Eviction<MessageCacheKey, SelectBufferResultWrapper>> eviction = fetcher.getMessageCache().policy().eviction();
eviction.ifPresent(resultEviction -> measurement.record(resultEviction.weightedSize().orElse(0), newAttributesBuilder().build()));
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public CompletableFuture<ByteBuffer> readAsync(long position, int length) {
return future;
}
if (position + length > commitPosition) {
logger.warn("TieredFileSegment#readAsync request position + length is greater than commit position," +
logger.debug("TieredFileSegment#readAsync request position + length is greater than commit position," +
" correct length using commit position, file: {}, request position: {}, commit position:{}, change length from {} to {}",
getPath(), position, commitPosition, length, commitPosition - position);
length = (int) (commitPosition - position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;

Expand Down Expand Up @@ -113,53 +113,72 @@ public static Map<String, String> getProperties(ByteBuffer message) {
return MessageDecoder.decodeProperties(slice);
}

public static List<Pair<Integer/* offset of msgBuffer */, Integer/* msg size */>> splitMessageBuffer(
ByteBuffer cqBuffer, ByteBuffer msgBuffer) {
public static List<SelectBufferResult> splitMessageBuffer(ByteBuffer cqBuffer, ByteBuffer msgBuffer) {

cqBuffer.rewind();
msgBuffer.rewind();
List<Pair<Integer, Integer>> messageList = new ArrayList<>(cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);

List<SelectBufferResult> bufferResultList = new ArrayList<>(
cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);

if (msgBuffer.remaining() == 0) {
logger.error("MessageBufferUtil#splitMessage, msg buffer length is zero");
return bufferResultList;
}

if (cqBuffer.remaining() % TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) {
logger.warn("MessageBufferUtil#splitMessage: consume queue buffer size {} is not an integer multiple of CONSUME_QUEUE_STORE_UNIT_SIZE {}",
cqBuffer.remaining(), TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
return messageList;
logger.error("MessageBufferUtil#splitMessage, consume queue buffer size incorrect, {}", cqBuffer.remaining());
return bufferResultList;
}

try {
long startCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
for (int pos = cqBuffer.position(); pos < cqBuffer.limit(); pos += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {
cqBuffer.position(pos);
int diff = (int) (CQItemBufferUtil.getCommitLogOffset(cqBuffer) - startCommitLogOffset);
int size = CQItemBufferUtil.getSize(cqBuffer);
if (diff + size > msgBuffer.limit()) {
logger.error("MessageBufferUtil#splitMessage: message buffer size is incorrect: record in consume queue: {}, actual: {}", diff + size, msgBuffer.remaining());
return messageList;
long firstCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);

for (int position = cqBuffer.position(); position < cqBuffer.limit();
position += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {

cqBuffer.position(position);
long logOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
int bufferSize = CQItemBufferUtil.getSize(cqBuffer);
long tagCode = CQItemBufferUtil.getTagCode(cqBuffer);

int offset = (int) (logOffset - firstCommitLogOffset);
if (offset + bufferSize > msgBuffer.limit()) {
logger.error("MessageBufferUtil#splitMessage, message buffer size incorrect. " +
"Expect length in consume queue: {}, actual length: {}", offset + bufferSize, msgBuffer.limit());
break;
}
msgBuffer.position(diff);

msgBuffer.position(offset);
int magicCode = getMagicCode(msgBuffer);
if (magicCode == TieredCommitLog.BLANK_MAGIC_CODE) {
logger.warn("MessageBufferUtil#splitMessage: message decode error: blank magic code, this message may be coda, try to fix offset");
diff = diff + TieredCommitLog.CODA_SIZE;
msgBuffer.position(diff);
offset += TieredCommitLog.CODA_SIZE;
msgBuffer.position(offset);
magicCode = getMagicCode(msgBuffer);
}
if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
logger.warn("MessageBufferUtil#splitMessage: message decode error: unknown magic code");
if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE &&
magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
logger.warn("MessageBufferUtil#splitMessage, found unknown magic code. " +
"Message offset: {}, wrong magic code: {}", offset, magicCode);
continue;
}

if (getTotalSize(msgBuffer) != size) {
logger.warn("MessageBufferUtil#splitMessage: message size is not right: except: {}, actual: {}", size, getTotalSize(msgBuffer));
if (bufferSize != getTotalSize(msgBuffer)) {
logger.warn("MessageBufferUtil#splitMessage, message length in commitlog incorrect. " +
"Except length in commitlog: {}, actual: {}", getTotalSize(msgBuffer), bufferSize);
continue;
}

messageList.add(Pair.of(diff, size));
ByteBuffer sliceBuffer = msgBuffer.slice();
sliceBuffer.limit(bufferSize);
bufferResultList.add(new SelectBufferResult(sliceBuffer, offset, bufferSize, tagCode));
}
} catch (Exception e) {
logger.error("MessageBufferUtil#splitMessage: split message failed, maybe decode consume queue item failed", e);
logger.error("MessageBufferUtil#splitMessage, split message buffer error", e);
} finally {
cqBuffer.rewind();
msgBuffer.rewind();
}
return messageList;
return bufferResultList;
}
}
Loading

0 comments on commit 956fdd8

Please sign in to comment.