diff --git a/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java index 6a091f35a..a368c4e44 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java @@ -19,13 +19,13 @@ class IcebergTableStreamingRecordMapper extends StreamingRecordMapper { private static final TypeReference> OBJECTS_MAP_TYPE_REFERENCE = new TypeReference>() {}; - public IcebergTableStreamingRecordMapper(ObjectMapper objectMapper) { - super(objectMapper); + public IcebergTableStreamingRecordMapper( + ObjectMapper objectMapper, boolean schematizationEnabled) { + super(objectMapper, schematizationEnabled); } @Override - public Map processSnowflakeRecord( - SnowflakeTableRow row, boolean schematizationEnabled, boolean includeMetadata) + public Map processSnowflakeRecord(SnowflakeTableRow row, boolean includeMetadata) throws JsonProcessingException { final Map streamingIngestRow = new HashMap<>(); for (JsonNode node : row.getContent().getData()) { diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 8a72732c9..1a92238e9 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -69,8 +69,6 @@ public class RecordService { private static final String KEY_SCHEMA_ID = "key_schema_id"; static final String HEADERS = "headers"; - private final boolean enableSchematization; - private final StreamingRecordMapper streamingRecordMapper; // For each task, we require a separate instance of SimpleDataFormat, since they are not @@ -95,23 +93,15 @@ public class RecordService { // This class is designed to work with empty metadata config map private SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(); - RecordService( - Clock clock, - boolean enableSchematization, - StreamingRecordMapper streamingRecordMapper, - ObjectMapper mapper) { + RecordService(Clock clock, StreamingRecordMapper streamingRecordMapper, ObjectMapper mapper) { this.clock = clock; - this.enableSchematization = enableSchematization; this.streamingRecordMapper = streamingRecordMapper; this.mapper = mapper; } /** Creates a record service with a UTC {@link Clock}. */ - RecordService( - boolean enableSchematization, - StreamingRecordMapper streamingRecordMapper, - ObjectMapper mapper) { - this(Clock.systemUTC(), enableSchematization, streamingRecordMapper, mapper); + RecordService(StreamingRecordMapper streamingRecordMapper, ObjectMapper mapper) { + this(Clock.systemUTC(), streamingRecordMapper, mapper); } public void setMetadataConfig(SnowflakeMetadataConfig metadataConfigIn) { @@ -219,8 +209,7 @@ public Map getProcessedRecordForStreamingIngest(SinkRecord recor throws JsonProcessingException { SnowflakeTableRow row = processRecord(record, clock.instant()); - return streamingRecordMapper.processSnowflakeRecord( - row, enableSchematization, metadataConfig.allFlag); + return streamingRecordMapper.processSnowflakeRecord(row, metadataConfig.allFlag); } /** For now there are two columns one is content and other is metadata. Both are Json */ diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java b/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java index 95a422dc0..ab8d3deac 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java @@ -8,11 +8,10 @@ public static RecordService createRecordService( ObjectMapper objectMapper = new ObjectMapper(); if (isIcebergEnabled) { return new RecordService( - enableSchematization, new IcebergTableStreamingRecordMapper(objectMapper), objectMapper); + new IcebergTableStreamingRecordMapper(objectMapper, enableSchematization), objectMapper); } else { return new RecordService( - enableSchematization, - new SnowflakeTableStreamingRecordMapper(objectMapper), + new SnowflakeTableStreamingRecordMapper(objectMapper, enableSchematization), objectMapper); } } diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java index 52dff1faa..06e83c1c9 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java @@ -14,15 +14,13 @@ class SnowflakeTableStreamingRecordMapper extends StreamingRecordMapper { - public SnowflakeTableStreamingRecordMapper(ObjectMapper mapper) { - super(mapper); + public SnowflakeTableStreamingRecordMapper(ObjectMapper mapper, boolean schematizationEnabled) { + super(mapper, schematizationEnabled); } @Override public Map processSnowflakeRecord( - RecordService.SnowflakeTableRow row, - boolean schematizationEnabled, - boolean includeAllMetadata) + RecordService.SnowflakeTableRow row, boolean includeAllMetadata) throws JsonProcessingException { final Map streamingIngestRow = new HashMap<>(); for (JsonNode node : row.getContent().getData()) { diff --git a/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java index 6a997d359..f58b751bf 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java @@ -10,14 +10,15 @@ abstract class StreamingRecordMapper { protected final ObjectMapper mapper; + protected final boolean schematizationEnabled; - public StreamingRecordMapper(ObjectMapper mapper) { + public StreamingRecordMapper(ObjectMapper mapper, boolean schematizationEnabled) { this.mapper = mapper; + this.schematizationEnabled = schematizationEnabled; } abstract Map processSnowflakeRecord( - SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata) - throws JsonProcessingException; + SnowflakeTableRow row, boolean includeAllMetadata) throws JsonProcessingException; protected String getTextualValue(JsonNode valueNode) throws JsonProcessingException { String value; diff --git a/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java b/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java index 94deebd3d..226d04824 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java @@ -18,8 +18,6 @@ import org.junit.jupiter.params.provider.MethodSource; class IcebergTableStreamingRecordMapperTest { - private final IcebergTableStreamingRecordMapper mapper = - new IcebergTableStreamingRecordMapper(objectMapper); private static final ObjectMapper objectMapper = new ObjectMapper(); private static final ImmutableMap primitiveJsonAsMap = @@ -95,7 +93,9 @@ void shouldMapRecord_schematizationEnabled( String description, SnowflakeTableRow row, Map expected) throws JsonProcessingException { // When - Map result = mapper.processSnowflakeRecord(row, true, true); + IcebergTableStreamingRecordMapper mapper = + new IcebergTableStreamingRecordMapper(objectMapper, true); + Map result = mapper.processSnowflakeRecord(row, true); // Then assertThat(result).isEqualTo(expected); @@ -106,8 +106,12 @@ void shouldMapRecord_schematizationEnabled( void shouldMapMetadata(String description, SnowflakeTableRow row, Map expected) throws JsonProcessingException { // When - Map result = mapper.processSnowflakeRecord(row, false, true); - Map resultSchematized = mapper.processSnowflakeRecord(row, true, true); + IcebergTableStreamingRecordMapper mapper = + new IcebergTableStreamingRecordMapper(objectMapper, false); + IcebergTableStreamingRecordMapper mapperSchematization = + new IcebergTableStreamingRecordMapper(objectMapper, true); + Map result = mapper.processSnowflakeRecord(row, true); + Map resultSchematized = mapperSchematization.processSnowflakeRecord(row, true); // Then assertThat(result.get(Utils.TABLE_COLUMN_METADATA)).isEqualTo(expected); @@ -120,8 +124,12 @@ void shouldSkipMapMetadata() throws JsonProcessingException { SnowflakeTableRow row = buildRow(primitiveJsonExample); // When - Map result = mapper.processSnowflakeRecord(row, false, false); - Map resultSchematized = mapper.processSnowflakeRecord(row, true, false); + IcebergTableStreamingRecordMapper mapper = + new IcebergTableStreamingRecordMapper(objectMapper, false); + IcebergTableStreamingRecordMapper mapperSchematization = + new IcebergTableStreamingRecordMapper(objectMapper, true); + Map result = mapper.processSnowflakeRecord(row, false); + Map resultSchematized = mapperSchematization.processSnowflakeRecord(row, false); // Then assertThat(result).doesNotContainKey(Utils.TABLE_COLUMN_METADATA); @@ -134,7 +142,9 @@ void shouldMapRecord_schematizationDisabled( String description, SnowflakeTableRow row, Map expected) throws JsonProcessingException { // When - Map result = mapper.processSnowflakeRecord(row, false, true); + IcebergTableStreamingRecordMapper mapper = + new IcebergTableStreamingRecordMapper(objectMapper, false); + Map result = mapper.processSnowflakeRecord(row, true); // Then assertThat(result).isEqualTo(expected); diff --git a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java index 77670f8ce..895d0bacc 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java @@ -51,7 +51,7 @@ void connectorTimestamp_byDefault_writes() throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); RecordService service = new RecordService( - fixedClock, false, new SnowflakeTableStreamingRecordMapper(mapper), mapper); + fixedClock, new SnowflakeTableStreamingRecordMapper(mapper, false), mapper); // when Map recordData = service.getProcessedRecordForStreamingIngest(record);