Skip to content

Commit

Permalink
feat: Added TimeMapper Interface and Default Implementation
Browse files Browse the repository at this point in the history
fix some sonarlint issue

test timeMapper class override

Code review changes

Update DefaultEventBridgeMapperTest.java

Test displaynames

Signed-off-by: Jens Rathsman <[email protected]>

Update DefaultEventBridgeMapperTest.java

Signed-off-by: Jens Rathsman <[email protected]>

Update src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultTimeMapper.java

Co-authored-by: Michael Gasch <[email protected]>
Signed-off-by: Jens Rathsman <[email protected]>

Update src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java

Co-authored-by: Michael Gasch <[email protected]>
Signed-off-by: Jens Rathsman <[email protected]>

Code review changes plus test that makes more sense

Code review change: Private internals in SinkRecordJsonMapper

formatting

Update src/test/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapperTest.java

Co-authored-by: Michael Gasch <[email protected]>
Signed-off-by: Jens Rathsman <[email protected]>
  • Loading branch information
jensur77 and embano1 committed Aug 22, 2024
1 parent d840ee8 commit c278061
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class EventBridgeSinkConfig extends AbstractConfig {
static final String AWS_ROLE_EXTERNAL_ID_CONFIG = "aws.eventbridge.iam.external.id";
static final String AWS_DETAIL_TYPES_CONFIG = "aws.eventbridge.detail.types";
static final String AWS_DETAIL_TYPES_MAPPER_CLASS = "aws.eventbridge.detail.types.mapper.class";
static final String AWS_TIME_MAPPER_CLASS = "aws.eventbridge.time.mapper.class";
static final String AWS_EVENTBUS_RESOURCES_CONFIG = "aws.eventbridge.eventbus.resources";
static final String AWS_OFFLOADING_DEFAULT_S3_BUCKET =
"aws.eventbridge.offloading.default.s3.bucket";
Expand Down Expand Up @@ -76,10 +77,14 @@ public class EventBridgeSinkConfig extends AbstractConfig {
+ "Can be defined per topic e.g., 'topic1:MyDetailType, topic2:MyDetailType', as a single expression "
+ "with a dynamic '${topic}' placeholder for all topics e.g., 'my-detail-type-${topic}', "
+ "or as a static value without additional topic information for all topics e.g., 'my-detail-type'.";

private static final String AWS_DETAIL_TYPES_MAPPER_DOC =
"Define a custom implementation class for the DetailTypeMapper interface to customize the mapping of Kafka topics or records to the EventBridge detail-type. Define full class path e.g. software.amazon.event.kafkaconnector.mapping.DefaultDetailTypeMapper.";

private static final String AWS_TIME_MAPPER_CLASS_DEFAULT =
"software.amazon.event.kafkaconnector.mapping.DefaultTimeMapper";
private static final String AWS_TIME_MAPPER_DOC =
"Provide a custom implementation class for the TimeMapper interface to customize the mapping of records to EventBridge metadata field 'time' e.g. 'software.amazon.event.kafkaconnector.mapping.DefaultTimeMapper'.";

private static final String AWS_EVENTBUS_RESOURCES_DOC =
"An optional comma-separated list of strings to add to "
+ "the resources field in the outgoing EventBridge events.";
Expand All @@ -99,6 +104,7 @@ public class EventBridgeSinkConfig extends AbstractConfig {
public Map<String, String> detailTypeByTopic;
public String detailType;
public String detailTypeMapperClass;
public String timeMapperClass;
public String offloadingDefaultS3Bucket;
public String offloadingDefaultFieldRef;

Expand All @@ -116,6 +122,7 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
this.retriesDelay = getInt(AWS_RETRIES_DELAY_CONFIG);
this.resources = getList(AWS_EVENTBUS_RESOURCES_CONFIG);
this.detailTypeMapperClass = getString(AWS_DETAIL_TYPES_MAPPER_CLASS);
this.timeMapperClass = getString(AWS_TIME_MAPPER_CLASS);
this.offloadingDefaultS3Bucket = getString(AWS_OFFLOADING_DEFAULT_S3_BUCKET);
this.offloadingDefaultFieldRef = getString(AWS_OFFLOADING_DEFAULT_FIELDREF);

Expand All @@ -132,7 +139,7 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
"EventBridge properties: connectorId={} eventBusArn={} eventBusRegion={} eventBusEndpointURI={} "
+ "eventBusMaxRetries={} eventBusRetriesDelay={} eventBusResources={} "
+ "eventBusEndpointID={} roleArn={} roleSessionName={} roleExternalID={} "
+ "offloadingDefaultS3Bucket={} offloadingDefaultFieldRef={}",
+ "offloadingDefaultS3Bucket={} offloadingDefaultFieldRef={} detailTypeMapperClass={} timeMapperClass={}",
connectorId,
eventBusArn,
region,
Expand All @@ -145,7 +152,9 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
connectorId,
externalId,
offloadingDefaultS3Bucket,
offloadingDefaultFieldRef);
offloadingDefaultFieldRef,
detailTypeMapperClass,
timeMapperClass);
}

private static ConfigDef createConfigDef() {
Expand Down Expand Up @@ -189,6 +198,12 @@ private static void addParams(final ConfigDef configDef) {
AWS_DETAIL_TYPES_DEFAULT,
Importance.MEDIUM,
AWS_DETAIL_TYPES_DOC);
configDef.define(
AWS_TIME_MAPPER_CLASS,
Type.STRING,
AWS_TIME_MAPPER_CLASS_DEFAULT,
Importance.MEDIUM,
AWS_TIME_MAPPER_DOC);
configDef.define(
AWS_EVENTBUS_RESOURCES_CONFIG,
Type.LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ public static void validate(ConfigValue configValue, EnvVarGetter getenv) {

case AWS_DETAIL_TYPES_MAPPER_CLASS:
{
validateDetailTypeMapperClass(configValue);
validateClassExists(configValue);
break;
}

case AWS_TIME_MAPPER_CLASS:
{
validateClassExists(configValue);
break;
}

Expand All @@ -115,7 +121,7 @@ private static void validateConnectorId(ConfigValue configValue) {
}
}

private static void validateDetailTypeMapperClass(ConfigValue configValue) {
private static void validateClassExists(ConfigValue configValue) {
var mapperClass = (String) configValue.value();
try {
Class.forName(mapperClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,13 @@
*/
package software.amazon.event.kafkaconnector.mapping;

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.partitioningBy;
import static java.util.stream.Collectors.toList;
import static software.amazon.event.kafkaconnector.EventBridgeResult.Error.reportOnly;
import static software.amazon.event.kafkaconnector.EventBridgeResult.failure;
import static software.amazon.event.kafkaconnector.EventBridgeResult.success;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import java.util.List;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry;
import software.amazon.event.kafkaconnector.EventBridgeResult;
Expand All @@ -28,14 +21,17 @@ public class DefaultEventBridgeMapper implements EventBridgeMapper {
private static final String sourcePrefix = "kafka-connect.";

private final EventBridgeSinkConfig config;
private final JsonConverter jsonConverter = new JsonConverter();
private final ObjectMapper objectMapper = new ObjectMapper();

private final SinkRecordJsonMapper jsonMapper = new SinkRecordJsonMapper();

private final DetailTypeMapper detailTypeMapper;
private final TimeMapper timeMapper;

public DefaultEventBridgeMapper(EventBridgeSinkConfig config) {
jsonConverter.configure(singletonMap("schemas.enable", "false"), false);

this.config = config;
this.detailTypeMapper = getDetailTypeMapper(config);
this.timeMapper = getTimeMapper(config);
}

public EventBridgeMappingResult map(List<SinkRecord> records) {
Expand All @@ -61,77 +57,14 @@ private EventBridgeResult<PutEventsRequestEntry> createPutEventsEntry(SinkRecord
.source(sourcePrefix + config.connectorId)
.detailType(detailTypeMapper.getDetailType(record))
.resources(config.resources)
.detail(createJsonPayload(record))
.detail(jsonMapper.createJsonPayload(record))
.time(timeMapper.getTime(record))
.build());
} catch (Exception e) {
return failure(record, reportOnly("Cannot convert Kafka record to EventBridge.", e));
}
}

private String createJsonPayload(SinkRecord record) throws IOException {
var root = objectMapper.createObjectNode();
root.put("topic", record.topic());
root.put("partition", record.kafkaPartition());
root.put("offset", record.kafkaOffset());
root.put("timestamp", record.timestamp());
root.put("timestampType", record.timestampType().toString());
root.set("headers", createHeaderArray(record));

if (record.key() == null) {
root.set("key", null);
} else {
root.set(
"key",
createJSONFromByteArray(
jsonConverter.fromConnectData(record.topic(), record.keySchema(), record.key())));
}

// tombstone handling
if (record.value() == null) {
root.set("value", null);
} else {
root.set(
"value",
createJSONFromByteArray(
jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value())));
}

return root.toString();
}

/**
* This method serializes Kafka message headers to JSON.
*
* @param record Kafka record to be sent to EventBridge
* @return headers to be added to EventBridge message
* @throws IOException
*/
private ArrayNode createHeaderArray(SinkRecord record) throws IOException {
var headersArray = objectMapper.createArrayNode();

for (Header header : record.headers()) {
var headerItem = objectMapper.createObjectNode();
headerItem.set(
header.key(),
createJSONFromByteArray(
jsonConverter.fromConnectHeader(
record.topic(), header.key(), header.schema(), header.value())));
headersArray.add(headerItem);
}
return headersArray;
}

/**
* This method converts the byteArray which is returned by the {@link JsonConverter} to JSON.
*
* @param jsonBytes - byteArray to convert to JSON
* @return the JSON representation of jsonBytes
* @throws IOException
*/
private JsonNode createJSONFromByteArray(byte[] jsonBytes) throws IOException {
return objectMapper.readTree(jsonBytes);
}

private DetailTypeMapper getDetailTypeMapper(EventBridgeSinkConfig config) {
try {
var myClass = Class.forName(config.detailTypeMapperClass);
Expand All @@ -144,4 +77,15 @@ private DetailTypeMapper getDetailTypeMapper(EventBridgeSinkConfig config) {
throw new RuntimeException("Topic to Detail-Type Mapper Class can't be loaded.");
}
}

private TimeMapper getTimeMapper(EventBridgeSinkConfig config) {
try {
var myClass = Class.forName(config.timeMapperClass);
var constructor = myClass.getDeclaredConstructor();
return (TimeMapper) constructor.newInstance();
} catch (Exception e) {
// This will already be verified in the Config Validator
throw new RuntimeException("Time Mapper Class can't be loaded.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.event.kafkaconnector.mapping;

import java.time.Instant;
import org.apache.kafka.connect.sink.SinkRecord;

public class DefaultTimeMapper implements TimeMapper {

@Override
public Instant getTime(SinkRecord sinkRecord) {
// As described in AWS documentation
// https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEventsRequestEntry.html
// If no timestamp is provided, the timestamp of the PutEvents call is used.
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.event.kafkaconnector.mapping;

import static java.util.Collections.singletonMap;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;

public class SinkRecordJsonMapper {
private final JsonConverter jsonConverter = new JsonConverter();
private final ObjectMapper objectMapper = new ObjectMapper();

public SinkRecordJsonMapper() {
jsonConverter.configure(singletonMap("schemas.enable", "false"), false);
}

public String createJsonPayload(SinkRecord sinkRecord) throws IOException {
var root = objectMapper.createObjectNode();
root.put("topic", sinkRecord.topic());
root.put("partition", sinkRecord.kafkaPartition());
root.put("offset", sinkRecord.kafkaOffset());
root.put("timestamp", sinkRecord.timestamp());
root.put("timestampType", sinkRecord.timestampType().toString());
root.set("headers", createHeaderArray(sinkRecord));

if (sinkRecord.key() == null) {
root.set("key", null);
} else {
root.set(
"key",
createJSONFromByteArray(
jsonConverter.fromConnectData(
sinkRecord.topic(), sinkRecord.keySchema(), sinkRecord.key())));
}

// tombstone handling
if (sinkRecord.value() == null) {
root.set("value", null);
} else {
root.set(
"value",
createJSONFromByteArray(
jsonConverter.fromConnectData(
sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value())));
}
return root.toString();
}

/**
* This method serializes Kafka message headers to JSON.
*
* @param sinkRecord Kafka record to be sent to EventBridge
* @return headers to be added to EventBridge message
* @throws IOException
*/
private ArrayNode createHeaderArray(SinkRecord sinkRecord) throws IOException {
var headersArray = objectMapper.createArrayNode();

for (Header header : sinkRecord.headers()) {
var headerItem = objectMapper.createObjectNode();
headerItem.set(
header.key(),
createJSONFromByteArray(
jsonConverter.fromConnectHeader(
sinkRecord.topic(), header.key(), header.schema(), header.value())));
headersArray.add(headerItem);
}
return headersArray;
}

/**
* This method converts the byteArray which is returned by the {@link JsonConverter} to JSON.
*
* @param jsonBytes - byteArray to convert to JSON
* @return the JSON representation of jsonBytes
* @throws IOException
*/
private JsonNode createJSONFromByteArray(byte[] jsonBytes) throws IOException {
return objectMapper.readTree(jsonBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.event.kafkaconnector.mapping;

import java.time.Instant;
import org.apache.kafka.connect.sink.SinkRecord;

public interface TimeMapper {
Instant getTime(SinkRecord sinkRecord);
}
Loading

0 comments on commit c278061

Please sign in to comment.