Skip to content

Commit

Permalink
SNOW-1692749: Add assertions for records in table (#964)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon authored Oct 21, 2024
1 parent bc8e824 commit cf5ff6a
Show file tree
Hide file tree
Showing 6 changed files with 470 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import com.snowflake.kafka.connector.internal.DescribeTableRow;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.TestUtils;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.function.Function;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

Expand Down Expand Up @@ -63,6 +65,11 @@ protected static void enableSchemaEvolution(String tableName) {
doExecuteQueryWithParameter(query, tableName);
}

protected static <T> T select(
String tableName, String query, Function<ResultSet, T> resultCollector) {
return executeQueryAndCollectResult(conn.getConnection(), query, tableName, resultCollector);
}

protected static String describeRecordMetadataType(String tableName) {
String query = "describe table identifier(?)";
return executeQueryAndCollectResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.streaming.InMemorySinkTaskContext;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -32,15 +35,22 @@ public abstract class IcebergIngestionIT extends BaseIcebergIT {
protected TopicPartition topicPartition;
protected SnowflakeSinkService service;
protected static final String simpleRecordJson = "{\"simple\": \"extra field\"}";

protected static final PrimitiveJsonRecord primitiveJsonRecordValue =
// FIXME: there is currently some bug in Iceberg when storing int64 values
new PrimitiveJsonRecord(8L, 16L, 32L, /*64L,*/ "dogs are the best", 0.5, 0.25, true);
protected static final PrimitiveJsonRecord emptyPrimitiveJsonRecordValue =
// FIXME: there is currently some bug in Iceberg when storing int64 values
new PrimitiveJsonRecord(0L, 0L, 0L, /*0L, */ null, 0.0, 0.0, false);
protected static final String primitiveJson =
"{"
+ " \"id_int8\": 0,"
+ " \"id_int16\": 42,"
+ " \"id_int32\": 42,"
+ " \"id_int64\": 42,"
+ " \"id_int8\": 8,"
+ " \"id_int16\": 16,"
+ " \"id_int32\": 32,"
+ " \"id_int64\": 64,"
+ " \"description\": \"dogs are the best\","
+ " \"rating_float32\": 0.99,"
+ " \"rating_float64\": 0.99,"
+ " \"rating_float32\": 0.5,"
+ " \"rating_float64\": 0.25,"
+ " \"approval\": true"
+ "}";

Expand Down Expand Up @@ -116,14 +126,6 @@ public void setUp() {
.build();
}

protected abstract void createIcebergTable();

protected abstract Boolean isSchemaEvolutionEnabled();

protected void waitForOffset(int targetOffset) throws Exception {
TestUtils.assertWithRetry(() -> service.getOffset(topicPartition) == targetOffset);
}

@AfterEach
public void tearDown() {
if (service != null) {
Expand All @@ -132,6 +134,14 @@ public void tearDown() {
dropIcebergTable(tableName);
}

protected abstract void createIcebergTable();

protected abstract Boolean isSchemaEvolutionEnabled();

protected void waitForOffset(int targetOffset) throws Exception {
TestUtils.assertWithRetry(() -> service.getOffset(topicPartition) == targetOffset);
}

protected SinkRecord createKafkaRecord(String jsonString, int offset, boolean withSchema) {
JsonConverter converter = new JsonConverter();
converter.configure(
Expand All @@ -147,4 +157,21 @@ protected SinkRecord createKafkaRecord(String jsonString, int offset, boolean wi
inputValue.value(),
offset);
}

private final String selectAllSortByOffset =
"WITH extracted_data AS ("
+ "SELECT *, RECORD_METADATA:\"offset\"::number AS offset_extracted "
+ "FROM identifier(?) "
+ ") "
+ "SELECT * FROM extracted_data "
+ "ORDER BY offset_extracted asc;";

protected List<RecordWithMetadata<PrimitiveJsonRecord>> selectAllSchematizedRecords() {

return select(tableName, selectAllSortByOffset, PrimitiveJsonRecord::fromSchematizedResult);
}

protected List<RecordWithMetadata<PrimitiveJsonRecord>> selectAllFromRecordContent() {
return select(tableName, selectAllSortByOffset, PrimitiveJsonRecord::fromRecordContentColumn);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static org.assertj.core.api.Assertions.assertThat;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -58,5 +65,30 @@ void shouldInsertRecords(String description, String message, boolean withSchema)
waitForOffset(2);
service.insert(Collections.singletonList(createKafkaRecord(message, 2, withSchema)));
waitForOffset(3);

assertRecordsInTable();
}

private void assertRecordsInTable() {
List<RecordWithMetadata<PrimitiveJsonRecord>> recordsWithMetadata =
selectAllFromRecordContent();
assertThat(recordsWithMetadata)
.hasSize(3)
.extracting(RecordWithMetadata::getRecord)
.containsExactly(
primitiveJsonRecordValue, primitiveJsonRecordValue, primitiveJsonRecordValue);
List<MetadataRecord> metadataRecords =
recordsWithMetadata.stream()
.map(RecordWithMetadata::getMetadata)
.collect(Collectors.toList());
assertThat(metadataRecords).extracting(MetadataRecord::getOffset).containsExactly(0L, 1L, 2L);
assertThat(metadataRecords)
.hasSize(3)
.allMatch(
record ->
record.getTopic().equals(topicPartition.topic())
&& record.getPartition().equals(topicPartition.partition())
&& record.getKey().equals("test")
&& record.getSnowflakeConnectorPushTime() != null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.DescribeTableRow;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -70,6 +73,32 @@ void shouldEvolveSchemaAndInsertRecords(
// reinsert record with extra field
service.insert(Collections.singletonList(createKafkaRecord(simpleRecordJson, 2, false)));
waitForOffset(3);

assertRecordsInTable();
}

private void assertRecordsInTable() {
List<RecordWithMetadata<PrimitiveJsonRecord>> recordsWithMetadata =
selectAllSchematizedRecords();

assertThat(recordsWithMetadata)
.hasSize(3)
.extracting(RecordWithMetadata::getRecord)
.containsExactly(
primitiveJsonRecordValue, primitiveJsonRecordValue, emptyPrimitiveJsonRecordValue);
List<MetadataRecord> metadataRecords =
recordsWithMetadata.stream()
.map(RecordWithMetadata::getMetadata)
.collect(Collectors.toList());
assertThat(metadataRecords).extracting(MetadataRecord::getOffset).containsExactly(0L, 1L, 2L);
assertThat(metadataRecords)
.hasSize(3)
.allMatch(
r ->
r.getTopic().equals(topicPartition.topic())
&& r.getPartition().equals(topicPartition.partition())
&& r.getKey().equals("test")
&& r.getSnowflakeConnectorPushTime() != null);
}

private static Stream<Arguments> prepareData() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package com.snowflake.kafka.connector.streaming.iceberg.sql;

import com.snowflake.kafka.connector.Utils;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonCreator;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import org.assertj.core.api.Assertions;

public class MetadataRecord {
private final Long offset;
private final String topic;
private final Integer partition;
private final String key;
private final Integer schemaId;
private final Integer keySchemaId;
private final Long createTime;
private final Long logAppendTime;
private final Long snowflakeConnectorPushTime;
private final String headers;

private static final ObjectMapper MAPPER = new ObjectMapper();

@JsonCreator
public MetadataRecord(
@JsonProperty("offset") Long offset,
@JsonProperty("topic") String topic,
@JsonProperty("partition") Integer partition,
@JsonProperty("key") String key,
@JsonProperty("schema_id") Integer schemaId,
@JsonProperty("key_schema_id") Integer keySchemaId,
@JsonProperty("CreateTime") Long createTime,
@JsonProperty("LogAppendTime") Long logAppendTime,
@JsonProperty("SnowflakeConnectorPushTime") Long snowflakeConnectorPushTime,
@JsonProperty("headers") String headers) {
this.offset = offset;
this.topic = topic;
this.partition = partition;
this.key = key;
this.schemaId = schemaId;
this.keySchemaId = keySchemaId;
this.createTime = createTime;
this.logAppendTime = logAppendTime;
this.snowflakeConnectorPushTime = snowflakeConnectorPushTime;
this.headers = headers;
}

public static MetadataRecord fromMetadataSingleRow(ResultSet resultSet) {
try {
String jsonString = resultSet.getString(Utils.TABLE_COLUMN_METADATA);
return MAPPER.readValue(jsonString, MetadataRecord.class);
} catch (SQLException | IOException e) {
Assertions.fail("Couldn't map ResultSet to MetadataRecord: " + e.getMessage());
}
return null;
}

// Getters for each field
public Long getOffset() {
return offset;
}

public String getTopic() {
return topic;
}

public Integer getPartition() {
return partition;
}

public String getKey() {
return key;
}

public Integer getSchemaId() {
return schemaId;
}

public Integer getKeySchemaId() {
return keySchemaId;
}

public Long getCreateTime() {
return createTime;
}

public Long getLogAppendTime() {
return logAppendTime;
}

public Long getSnowflakeConnectorPushTime() {
return snowflakeConnectorPushTime;
}

public String getHeaders() {
return headers;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MetadataRecord that = (MetadataRecord) o;
return Objects.equals(offset, that.offset)
&& Objects.equals(topic, that.topic)
&& Objects.equals(partition, that.partition)
&& Objects.equals(key, that.key)
&& Objects.equals(schemaId, that.schemaId)
&& Objects.equals(keySchemaId, that.keySchemaId)
&& Objects.equals(createTime, that.createTime)
&& Objects.equals(logAppendTime, that.logAppendTime)
&& Objects.equals(snowflakeConnectorPushTime, that.snowflakeConnectorPushTime)
&& Objects.equals(headers, that.headers);
}

@Override
public int hashCode() {
return Objects.hash(
offset,
topic,
partition,
key,
schemaId,
keySchemaId,
createTime,
logAppendTime,
snowflakeConnectorPushTime,
headers);
}

@Override
public String toString() {
return "MetadataRecord{"
+ "offset="
+ offset
+ ", topic='"
+ topic
+ '\''
+ ", partition="
+ partition
+ ", key='"
+ key
+ '\''
+ ", schemaId="
+ schemaId
+ ", keySchemaId="
+ keySchemaId
+ ", createTime="
+ createTime
+ ", logAppendTime="
+ logAppendTime
+ ", snowflakeConnectorPushTime="
+ snowflakeConnectorPushTime
+ ", headers='"
+ headers
+ '\''
+ '}';
}

public static class RecordWithMetadata<T> {
private final T record;
private final MetadataRecord metadata;

private RecordWithMetadata(MetadataRecord metadata, T record) {
this.record = record;
this.metadata = metadata;
}

public static <T> RecordWithMetadata<T> of(MetadataRecord metadata, T record) {
return new RecordWithMetadata<>(metadata, record);
}

public T getRecord() {
return record;
}

public MetadataRecord getMetadata() {
return metadata;
}
}
}
Loading

0 comments on commit cf5ff6a

Please sign in to comment.