From cf5ff6a9509cb68701554543fa691a4a2eb6f45e Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Mon, 21 Oct 2024 14:48:19 +0200 Subject: [PATCH] SNOW-1692749: Add assertions for records in table (#964) --- .../streaming/iceberg/BaseIcebergIT.java | 7 + .../streaming/iceberg/IcebergIngestionIT.java | 55 ++++-- .../IcebergIngestionNoSchemaEvolutionIT.java | 32 +++ .../IcebergIngestionSchemaEvolutionIT.java | 29 +++ .../streaming/iceberg/sql/MetadataRecord.java | 184 ++++++++++++++++++ .../iceberg/sql/PrimitiveJsonRecord.java | 177 +++++++++++++++++ 6 files changed, 470 insertions(+), 14 deletions(-) create mode 100644 src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java create mode 100644 src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java index df0cbdd90..9392c04c3 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java @@ -6,8 +6,10 @@ import com.snowflake.kafka.connector.internal.DescribeTableRow; import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; import com.snowflake.kafka.connector.internal.TestUtils; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; +import java.util.function.Function; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -63,6 +65,11 @@ protected static void enableSchemaEvolution(String tableName) { doExecuteQueryWithParameter(query, tableName); } + protected static T select( + String tableName, String query, Function resultCollector) { + return executeQueryAndCollectResult(conn.getConnection(), query, tableName, resultCollector); + } + protected static String describeRecordMetadataType(String tableName) { String query = "describe table identifier(?)"; return executeQueryAndCollectResult( diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 3dd8dee2b..26b1d0fa7 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -11,9 +11,12 @@ import com.snowflake.kafka.connector.internal.TestUtils; import com.snowflake.kafka.connector.internal.streaming.InMemorySinkTaskContext; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; +import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; +import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; @@ -32,15 +35,22 @@ public abstract class IcebergIngestionIT extends BaseIcebergIT { protected TopicPartition topicPartition; protected SnowflakeSinkService service; protected static final String simpleRecordJson = "{\"simple\": \"extra field\"}"; + + protected static final PrimitiveJsonRecord primitiveJsonRecordValue = + // FIXME: there is currently some bug in Iceberg when storing int64 values + new PrimitiveJsonRecord(8L, 16L, 32L, /*64L,*/ "dogs are the best", 0.5, 0.25, true); + protected static final PrimitiveJsonRecord emptyPrimitiveJsonRecordValue = + // FIXME: there is currently some bug in Iceberg when storing int64 values + new PrimitiveJsonRecord(0L, 0L, 0L, /*0L, */ null, 0.0, 0.0, false); protected static final String primitiveJson = "{" - + " \"id_int8\": 0," - + " \"id_int16\": 42," - + " \"id_int32\": 42," - + " \"id_int64\": 42," + + " \"id_int8\": 8," + + " \"id_int16\": 16," + + " \"id_int32\": 32," + + " \"id_int64\": 64," + " \"description\": \"dogs are the best\"," - + " \"rating_float32\": 0.99," - + " \"rating_float64\": 0.99," + + " \"rating_float32\": 0.5," + + " \"rating_float64\": 0.25," + " \"approval\": true" + "}"; @@ -116,14 +126,6 @@ public void setUp() { .build(); } - protected abstract void createIcebergTable(); - - protected abstract Boolean isSchemaEvolutionEnabled(); - - protected void waitForOffset(int targetOffset) throws Exception { - TestUtils.assertWithRetry(() -> service.getOffset(topicPartition) == targetOffset); - } - @AfterEach public void tearDown() { if (service != null) { @@ -132,6 +134,14 @@ public void tearDown() { dropIcebergTable(tableName); } + protected abstract void createIcebergTable(); + + protected abstract Boolean isSchemaEvolutionEnabled(); + + protected void waitForOffset(int targetOffset) throws Exception { + TestUtils.assertWithRetry(() -> service.getOffset(topicPartition) == targetOffset); + } + protected SinkRecord createKafkaRecord(String jsonString, int offset, boolean withSchema) { JsonConverter converter = new JsonConverter(); converter.configure( @@ -147,4 +157,21 @@ protected SinkRecord createKafkaRecord(String jsonString, int offset, boolean wi inputValue.value(), offset); } + + private final String selectAllSortByOffset = + "WITH extracted_data AS (" + + "SELECT *, RECORD_METADATA:\"offset\"::number AS offset_extracted " + + "FROM identifier(?) " + + ") " + + "SELECT * FROM extracted_data " + + "ORDER BY offset_extracted asc;"; + + protected List> selectAllSchematizedRecords() { + + return select(tableName, selectAllSortByOffset, PrimitiveJsonRecord::fromSchematizedResult); + } + + protected List> selectAllFromRecordContent() { + return select(tableName, selectAllSortByOffset, PrimitiveJsonRecord::fromRecordContentColumn); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java index b340a4c8f..8e8c63c6f 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java @@ -1,8 +1,15 @@ package com.snowflake.kafka.connector.streaming.iceberg; +import static org.assertj.core.api.Assertions.assertThat; + import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord; +import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; +import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; @@ -58,5 +65,30 @@ void shouldInsertRecords(String description, String message, boolean withSchema) waitForOffset(2); service.insert(Collections.singletonList(createKafkaRecord(message, 2, withSchema))); waitForOffset(3); + + assertRecordsInTable(); + } + + private void assertRecordsInTable() { + List> recordsWithMetadata = + selectAllFromRecordContent(); + assertThat(recordsWithMetadata) + .hasSize(3) + .extracting(RecordWithMetadata::getRecord) + .containsExactly( + primitiveJsonRecordValue, primitiveJsonRecordValue, primitiveJsonRecordValue); + List metadataRecords = + recordsWithMetadata.stream() + .map(RecordWithMetadata::getMetadata) + .collect(Collectors.toList()); + assertThat(metadataRecords).extracting(MetadataRecord::getOffset).containsExactly(0L, 1L, 2L); + assertThat(metadataRecords) + .hasSize(3) + .allMatch( + record -> + record.getTopic().equals(topicPartition.topic()) + && record.getPartition().equals(topicPartition.partition()) + && record.getKey().equals("test") + && record.getSnowflakeConnectorPushTime() != null); } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index f5b1f79b5..27c35fb09 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -4,6 +4,9 @@ import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.DescribeTableRow; +import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord; +import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; +import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -70,6 +73,32 @@ void shouldEvolveSchemaAndInsertRecords( // reinsert record with extra field service.insert(Collections.singletonList(createKafkaRecord(simpleRecordJson, 2, false))); waitForOffset(3); + + assertRecordsInTable(); + } + + private void assertRecordsInTable() { + List> recordsWithMetadata = + selectAllSchematizedRecords(); + + assertThat(recordsWithMetadata) + .hasSize(3) + .extracting(RecordWithMetadata::getRecord) + .containsExactly( + primitiveJsonRecordValue, primitiveJsonRecordValue, emptyPrimitiveJsonRecordValue); + List metadataRecords = + recordsWithMetadata.stream() + .map(RecordWithMetadata::getMetadata) + .collect(Collectors.toList()); + assertThat(metadataRecords).extracting(MetadataRecord::getOffset).containsExactly(0L, 1L, 2L); + assertThat(metadataRecords) + .hasSize(3) + .allMatch( + r -> + r.getTopic().equals(topicPartition.topic()) + && r.getPartition().equals(topicPartition.partition()) + && r.getKey().equals("test") + && r.getSnowflakeConnectorPushTime() != null); } private static Stream prepareData() { diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java new file mode 100644 index 000000000..11362cc46 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java @@ -0,0 +1,184 @@ +package com.snowflake.kafka.connector.streaming.iceberg.sql; + +import com.snowflake.kafka.connector.Utils; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Objects; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonCreator; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import org.assertj.core.api.Assertions; + +public class MetadataRecord { + private final Long offset; + private final String topic; + private final Integer partition; + private final String key; + private final Integer schemaId; + private final Integer keySchemaId; + private final Long createTime; + private final Long logAppendTime; + private final Long snowflakeConnectorPushTime; + private final String headers; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @JsonCreator + public MetadataRecord( + @JsonProperty("offset") Long offset, + @JsonProperty("topic") String topic, + @JsonProperty("partition") Integer partition, + @JsonProperty("key") String key, + @JsonProperty("schema_id") Integer schemaId, + @JsonProperty("key_schema_id") Integer keySchemaId, + @JsonProperty("CreateTime") Long createTime, + @JsonProperty("LogAppendTime") Long logAppendTime, + @JsonProperty("SnowflakeConnectorPushTime") Long snowflakeConnectorPushTime, + @JsonProperty("headers") String headers) { + this.offset = offset; + this.topic = topic; + this.partition = partition; + this.key = key; + this.schemaId = schemaId; + this.keySchemaId = keySchemaId; + this.createTime = createTime; + this.logAppendTime = logAppendTime; + this.snowflakeConnectorPushTime = snowflakeConnectorPushTime; + this.headers = headers; + } + + public static MetadataRecord fromMetadataSingleRow(ResultSet resultSet) { + try { + String jsonString = resultSet.getString(Utils.TABLE_COLUMN_METADATA); + return MAPPER.readValue(jsonString, MetadataRecord.class); + } catch (SQLException | IOException e) { + Assertions.fail("Couldn't map ResultSet to MetadataRecord: " + e.getMessage()); + } + return null; + } + + // Getters for each field + public Long getOffset() { + return offset; + } + + public String getTopic() { + return topic; + } + + public Integer getPartition() { + return partition; + } + + public String getKey() { + return key; + } + + public Integer getSchemaId() { + return schemaId; + } + + public Integer getKeySchemaId() { + return keySchemaId; + } + + public Long getCreateTime() { + return createTime; + } + + public Long getLogAppendTime() { + return logAppendTime; + } + + public Long getSnowflakeConnectorPushTime() { + return snowflakeConnectorPushTime; + } + + public String getHeaders() { + return headers; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MetadataRecord that = (MetadataRecord) o; + return Objects.equals(offset, that.offset) + && Objects.equals(topic, that.topic) + && Objects.equals(partition, that.partition) + && Objects.equals(key, that.key) + && Objects.equals(schemaId, that.schemaId) + && Objects.equals(keySchemaId, that.keySchemaId) + && Objects.equals(createTime, that.createTime) + && Objects.equals(logAppendTime, that.logAppendTime) + && Objects.equals(snowflakeConnectorPushTime, that.snowflakeConnectorPushTime) + && Objects.equals(headers, that.headers); + } + + @Override + public int hashCode() { + return Objects.hash( + offset, + topic, + partition, + key, + schemaId, + keySchemaId, + createTime, + logAppendTime, + snowflakeConnectorPushTime, + headers); + } + + @Override + public String toString() { + return "MetadataRecord{" + + "offset=" + + offset + + ", topic='" + + topic + + '\'' + + ", partition=" + + partition + + ", key='" + + key + + '\'' + + ", schemaId=" + + schemaId + + ", keySchemaId=" + + keySchemaId + + ", createTime=" + + createTime + + ", logAppendTime=" + + logAppendTime + + ", snowflakeConnectorPushTime=" + + snowflakeConnectorPushTime + + ", headers='" + + headers + + '\'' + + '}'; + } + + public static class RecordWithMetadata { + private final T record; + private final MetadataRecord metadata; + + private RecordWithMetadata(MetadataRecord metadata, T record) { + this.record = record; + this.metadata = metadata; + } + + public static RecordWithMetadata of(MetadataRecord metadata, T record) { + return new RecordWithMetadata<>(metadata, record); + } + + public T getRecord() { + return record; + } + + public MetadataRecord getMetadata() { + return metadata; + } + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java new file mode 100644 index 000000000..3b4a9983a --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java @@ -0,0 +1,177 @@ +package com.snowflake.kafka.connector.streaming.iceberg.sql; + +import static net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonCreator; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import org.assertj.core.api.Assertions; + +public class PrimitiveJsonRecord { + + private static final ObjectMapper MAPPER = + new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + + private final Long idInt8; + + private final Long idInt16; + + private final Long idInt32; + + private final Long idInt64; + + private final String description; + + private final Double ratingFloat32; + + private final Double ratingFloat64; + + private final Boolean approval; + + @JsonCreator + public PrimitiveJsonRecord( + @JsonProperty("id_int8") Long idInt8, + @JsonProperty("id_int16") Long idInt16, + @JsonProperty("id_int32") Long idInt32, + // FIXME: there is currently some bug in Iceberg when storing int64 values + // @JsonProperty("id_int64") Long idInt64, + @JsonProperty("description") String description, + @JsonProperty("rating_float32") Double ratingFloat32, + @JsonProperty("rating_float64") Double ratingFloat64, + @JsonProperty("approval") Boolean approval) { + this.idInt8 = idInt8; + this.idInt16 = idInt16; + this.idInt32 = idInt32; + this.idInt64 = 64L; + this.description = description; + this.ratingFloat32 = ratingFloat32; + this.ratingFloat64 = ratingFloat64; + this.approval = approval; + } + + public static List> fromSchematizedResult( + ResultSet resultSet) { + List> records = new ArrayList<>(); + try { + while (resultSet.next()) { + PrimitiveJsonRecord record = + new PrimitiveJsonRecord( + resultSet.getLong("ID_INT8"), + resultSet.getLong("ID_INT16"), + resultSet.getLong("ID_INT32"), + // FIXME: there is currently some bug in Iceberg when storing int64 values + // resultSet.getLong("ID_INT64"), + resultSet.getString("DESCRIPTION"), + resultSet.getDouble("RATING_FLOAT32"), + resultSet.getDouble("RATING_FLOAT64"), + resultSet.getBoolean("APPROVAL")); + MetadataRecord metadata = MetadataRecord.fromMetadataSingleRow(resultSet); + records.add(RecordWithMetadata.of(metadata, record)); + } + } catch (SQLException e) { + Assertions.fail("Couldn't map ResultSet to PrimitiveJsonRecord"); + } + return records; + } + + public static List> fromRecordContentColumn( + ResultSet resultSet) { + List> records = new ArrayList<>(); + + try { + while (resultSet.next()) { + String jsonString = resultSet.getString(Utils.TABLE_COLUMN_CONTENT); + PrimitiveJsonRecord record = MAPPER.readValue(jsonString, PrimitiveJsonRecord.class); + MetadataRecord metadata = MetadataRecord.fromMetadataSingleRow(resultSet); + records.add(RecordWithMetadata.of(metadata, record)); + } + } catch (SQLException | IOException e) { + Assertions.fail("Couldn't map ResultSet to PrimitiveJsonRecord: " + e.getMessage()); + } + return records; + } + + public Long getIdInt8() { + return idInt8; + } + + public Long getIdInt16() { + return idInt16; + } + + public Long getIdInt32() { + return idInt32; + } + + public Long getIdInt64() { + return idInt64; + } + + public String getDescription() { + return description; + } + + public Double getRatingFloat32() { + return ratingFloat32; + } + + public Double getRatingFloat64() { + return ratingFloat64; + } + + public Boolean isApproval() { + return approval; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PrimitiveJsonRecord that = (PrimitiveJsonRecord) o; + return Objects.equals(idInt8, that.idInt8) + && Objects.equals(idInt16, that.idInt16) + && Objects.equals(idInt32, that.idInt32) + && Objects.equals(idInt64, that.idInt64) + && Objects.equals(description, that.description) + && Objects.equals(ratingFloat32, that.ratingFloat32) + && Objects.equals(ratingFloat64, that.ratingFloat64) + && Objects.equals(approval, that.approval); + } + + @Override + public int hashCode() { + return Objects.hash( + idInt8, idInt16, idInt32, idInt64, description, ratingFloat32, ratingFloat64, approval); + } + + @Override + public String toString() { + return "PrimitiveJsonRecord{" + + "idInt8=" + + idInt8 + + ", idInt16=" + + idInt16 + + ", idInt32=" + + idInt32 + + ", idInt64=" + + idInt64 + + ", description='" + + description + + '\'' + + ", ratingFloat32=" + + ratingFloat32 + + ", ratingFloat64=" + + ratingFloat64 + + ", approval=" + + approval + + '}'; + } +}