Skip to content

Commit

Permalink
KAFKA-18616; Refactor Tools's ApiMessageFormatter
Browse files Browse the repository at this point in the history
  • Loading branch information
dajac committed Jan 24, 2025
1 parent 80d2a8a commit 013a98f
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 160 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@

<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<subpackage name="group">
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="kafka.api"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;

import java.io.IOException;
import java.io.PrintStream;
Expand All @@ -31,44 +34,44 @@

import static java.nio.charset.StandardCharsets.UTF_8;

public abstract class ApiMessageFormatter implements MessageFormatter {
public abstract class CoordinatorRecordMessageFormatter implements MessageFormatter {

private static final String TYPE = "type";
private static final String VERSION = "version";
private static final String DATA = "data";
private static final String KEY = "key";
private static final String VALUE = "value";
static final String UNKNOWN = "unknown";

@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
if (Objects.isNull(consumerRecord.key())) return;

ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
try {
CoordinatorRecord record = deserialize(
consumerRecord.key() != null ? ByteBuffer.wrap(consumerRecord.key()) : null,
consumerRecord.value() != null ? ByteBuffer.wrap(consumerRecord.value()) : null
);
if (!shouldPrint(record.key().apiKey())) return;

byte[] key = consumerRecord.key();
if (Objects.nonNull(key)) {
short keyVersion = ByteBuffer.wrap(key).getShort();
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key));
json
.putObject(KEY)
.put(TYPE, record.key().apiKey())
.set(DATA, keyAsJson(record.key()));

if (dataNode instanceof NullNode) {
return;
if (Objects.nonNull(record.value())) {
json
.putObject(VALUE)
.put(VERSION, record.value().version())
.set(DATA, valueAsJson(record.value().message(), record.value().version()));
} else {
json.set(VALUE, NullNode.getInstance());
}
json.putObject(KEY)
.put(TYPE, keyVersion)
.set(DATA, dataNode);
} else {
} catch (CoordinatorLoader.UnknownRecordTypeException ex) {
return;
}

byte[] value = consumerRecord.value();
if (Objects.nonNull(value)) {
short valueVersion = ByteBuffer.wrap(value).getShort();
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value));

json.putObject(VALUE)
.put(VERSION, valueVersion)
.set(DATA, dataNode);
} else {
json.set(VALUE, NullNode.getInstance());
} catch (RuntimeException ex) {
throw new RuntimeException("Could not read record at offset " + consumerRecord.offset() +
" due to: " + ex.getMessage(), ex);
}

try {
Expand All @@ -78,6 +81,8 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
}
}

protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer);
protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer);
protected abstract CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value);
protected abstract boolean shouldPrint(short recordType);
protected abstract JsonNode keyAsJson(ApiMessage message);
protected abstract JsonNode valueAsJson(ApiMessage message, short version);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,37 @@
*/
package org.apache.kafka.tools.consumer;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;

import java.nio.ByteBuffer;

public class GroupMetadataMessageFormatter extends ApiMessageFormatter {
public class GroupMetadataMessageFormatter extends CoordinatorRecordMessageFormatter {
private CoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde();

@Override
protected CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value) {
return serde.deserialize(key, value);
}

@Override
protected boolean shouldPrint(short recordType) {
return CoordinatorRecordType.GROUP_METADATA.id() == recordType;
}

@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
try {
switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) {
case GROUP_METADATA:
return GroupMetadataKeyJsonConverter.write(
new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), (short) 0),
(short) 0
);

default:
return NullNode.getInstance();
}
} catch (UnsupportedVersionException ex) {
return NullNode.getInstance();
}
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}

@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
return GroupMetadataValueJsonConverter.write(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version), version);
}
return new TextNode(UNKNOWN);
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,41 @@
*/
package org.apache.kafka.tools.consumer;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;

import java.nio.ByteBuffer;

/**
* Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
*/
public class OffsetsMessageFormatter extends ApiMessageFormatter {
public class OffsetsMessageFormatter extends CoordinatorRecordMessageFormatter {
private CoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde();

@Override
protected CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value) {
return serde.deserialize(key, value);
}

@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
try {
switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) {
// We can read both record types with the offset commit one.
case LEGACY_OFFSET_COMMIT:
case OFFSET_COMMIT:
return OffsetCommitKeyJsonConverter.write(
new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0),
(short) 0
);
protected boolean shouldPrint(short recordType) {
return CoordinatorRecordType.OFFSET_COMMIT.id() == recordType ||
CoordinatorRecordType.LEGACY_OFFSET_COMMIT.id() == recordType;
}

default:
return NullNode.getInstance();
}
} catch (UnsupportedVersionException ex) {
return NullNode.getInstance();
}
@Override
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}

@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
return OffsetCommitValueJsonConverter.write(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version), version);
}
return new TextNode(UNKNOWN);
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,36 @@
*/
package org.apache.kafka.tools.consumer;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters;
import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde;

import java.nio.ByteBuffer;

public class TransactionLogMessageFormatter extends ApiMessageFormatter {
public class TransactionLogMessageFormatter extends CoordinatorRecordMessageFormatter {
private CoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();

@Override
protected CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value) {
return serde.deserialize(key, value);
}

@Override
protected boolean shouldPrint(short recordType) {
return true;
}

@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
try {
switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) {
case TRANSACTION_LOG:
return TransactionLogKeyJsonConverter.write(
new TransactionLogKey(new ByteBufferAccessor(byteBuffer), (short) 0),
(short) 0
);

default:
return NullNode.getInstance();
}
} catch (UnsupportedVersionException ex) {
return NullNode.getInstance();
}
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}

@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
return TransactionLogValueJsonConverter.write(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version), version);
}
return new TextNode(UNKNOWN);
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter;

import java.io.IOException;
import java.io.PrintStream;
Expand Down Expand Up @@ -132,7 +133,7 @@ private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersio
* as per RPC spec.
* To differentiate, we need to use the corresponding key versions. This is acceptable as
* the records will always appear in pairs (key, value). However, this means that we cannot
* extend {@link org.apache.kafka.tools.consumer.ApiMessageFormatter} as it requires overriding
* extend {@link CoordinatorRecordMessageFormatter} as it requires overriding
* readToValueJson whose signature does not allow for passing keyversion.
*
* @param byteBuffer - Represents the raw data read from the topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ public class GroupMetadataMessageFormatterTest {

private static Stream<Arguments> parameters() {
return Stream.of(
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_VALUE).array(),
""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(),
Expand Down Expand Up @@ -126,11 +121,13 @@ private static Stream<Arguments> parameters() {
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
null,
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"
),
Arguments.of(
null,
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
""),
""
),
Arguments.of(null, null, ""),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
Expand All @@ -142,7 +139,7 @@ private static Stream<Arguments> parameters() {

@ParameterizedTest
@MethodSource("parameters")
public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) {
public void testMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TOPIC, 0, 0,
0L, TimestampType.CREATE_TIME, 0,
Expand Down
Loading

0 comments on commit 013a98f

Please sign in to comment.