Skip to content

Commit

Permalink
DBZ-8082: Pass Headers to Key/Value Converters
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanvanhuuksloot authored and jpechane committed Aug 15, 2024
1 parent 42fbde8 commit 8617f47
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
Expand Down Expand Up @@ -100,19 +100,25 @@ public Function<SourceRecord, R> toFormat(HeaderConverter headerConverter) {
if (topicName == null) {
topicName = TOPIC_NAME;
}
final byte[] key = keyConverter.fromConnectData(topicName, record.keySchema(), record.key());
final byte[] value = valueConverter.fromConnectData(topicName, record.valueSchema(), record.value());
org.apache.kafka.common.header.internals.RecordHeaders recordHeaders = new RecordHeaders();

List<Header<?>> headers = Collections.emptyList();
if (headerConverter != null) {
List<Header<byte[]>> byteArrayHeaders = convertHeaders(record, topicName, headerConverter);
headers = (List) byteArrayHeaders;
if (shouldConvertHeadersToString()) {
headers = byteArrayHeaders.stream()
.map(h -> new EmbeddedEngineHeader<>(h.getKey(), new String(h.getValue(), StandardCharsets.UTF_8)))
.collect(Collectors.toList());
for (org.apache.kafka.connect.header.Header header : record.headers()) {
byte[] rawHeader = headerConverter.fromConnectHeader(topicName, header.key(), header.schema(), header.value());
recordHeaders.add(header.key(), rawHeader);
}
}

final byte[] key = keyConverter.fromConnectData(topicName, recordHeaders, record.keySchema(), record.key());
final byte[] value = valueConverter.fromConnectData(topicName, recordHeaders, record.valueSchema(), record.value());

List<Header<byte[]>> byteArrayHeaders = convertHeaders(recordHeaders);
List<Header<?>> headers = (List) byteArrayHeaders;
if (shouldConvertHeadersToString()) {
headers = byteArrayHeaders.stream()
.map(h -> new EmbeddedEngineHeader<>(h.getKey(), new String(h.getValue(), StandardCharsets.UTF_8)))
.collect(Collectors.toList());
}
Object convertedKey = key;
Object convertedValue = value;
if (key != null && shouldConvertKeyToString()) {
Expand Down Expand Up @@ -148,14 +154,11 @@ private boolean shouldConvertHeadersToString() {
return isFormat(formatHeader, Json.class);
}

private List<Header<byte[]>> convertHeaders(
SourceRecord record, String topicName, HeaderConverter headerConverter) {
private List<Header<byte[]>> convertHeaders(org.apache.kafka.common.header.Headers recordHeaders) {
List<Header<byte[]>> headers = new ArrayList<>();

for (org.apache.kafka.connect.header.Header header : record.headers()) {
String headerKey = header.key();
byte[] rawHeader = headerConverter.fromConnectHeader(topicName, headerKey, header.schema(), header.value());
headers.add(new EmbeddedEngineHeader<>(headerKey, rawHeader));
for (org.apache.kafka.common.header.Header header : recordHeaders) {
headers.add(new EmbeddedEngineHeader<>(header.key(), header.value()));
}

return headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.DebeziumEngineTestUtils;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.EmbeddedEngineHeader;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
Expand Down Expand Up @@ -219,6 +223,47 @@ public void testTasksAreStoppedIfSomeFailsToStart() {
waitForEngineToStop();
}

@Test
public void testHeaderConverter() throws Exception {
final Properties props = new Properties();
props.setProperty(ConnectorConfig.NAME_CONFIG, "debezium-engine");
props.setProperty(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSourceConnector.class.getName());
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "0");
props.setProperty(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, "testTopic");
props.setProperty("transforms", "header");
props.setProperty("transforms.header.type", "io.debezium.embedded.async.FixedValueHeader");

appendLinesToSource(1);
CountDownLatch recordsLatch = new CountDownLatch(1); // 1 count down for headers

DebeziumEngine.Builder<EmbeddedEngineChangeEvent> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(
KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class));
DebeziumEngine<EmbeddedEngineChangeEvent> embeddedEngine = builder
.using(props)
.using(new TestEngineConnectorCallback())
.notifying((records, committer) -> {
for (EmbeddedEngineChangeEvent r : records) {
committer.markProcessed(r);
assertThat(r.headers().size()).isEqualTo(1);
assertThat(
((EmbeddedEngineHeader) r.headers().get(0)).getValue()).isEqualTo("{\"schema\":{\"type\":\"int32\",\"optional\":false},\"payload\":2}");
}
committer.markBatchFinished();
}).build();

engineExecSrv.submit(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
embeddedEngine.run();
});

recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);

embeddedEngine.close();
}

@Test
public void testCompletionCallbackCalledUponSuccess() throws Exception {
final Properties props = new Properties();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded.async;

import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.Transformation;

public class FixedValueHeader<R extends ConnectRecord<R>> implements Transformation<R> {

@Override
public R apply(R record) {
Headers headers = new ConnectHeaders();
headers.add("fixed-key", 2, Schema.INT32_SCHEMA);
headers.forEach(h -> record.headers().add(h));

return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
record.value(),
record.timestamp(),
headers);
}

@Override
public ConfigDef config() {
return null;
}

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public void close() {
}

}

0 comments on commit 8617f47

Please sign in to comment.