diff --git a/e2e_test/source_inline/kafka/alter_table_drop_connector.slt b/e2e_test/source_inline/kafka/alter_table_drop_connector.slt index 6b12a0d3f1c18..ed867867a5c57 100644 --- a/e2e_test/source_inline/kafka/alter_table_drop_connector.slt +++ b/e2e_test/source_inline/kafka/alter_table_drop_connector.slt @@ -63,7 +63,66 @@ SELECT count(*) FROM plain_students; ---- 5 +# ===== test with schema registry ===== + +system ok +rpk topic delete 'avro_drop_table_connector_test' || true; \ +(rpk sr subject delete 'avro_drop_table_connector_test-value' && rpk sr subject delete 'avro_drop_table_connector_test-value' --permanent) || true; + +system ok +rpk topic create 'avro_drop_table_connector_test' + +system ok +sr_register avro_drop_table_connector_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' + +system ok +echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_drop_table_connector_test + +statement ok +create table avro_drop_table_connector_test_table +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'avro_drop_table_connector_test' +) +FORMAT PLAIN ENCODE AVRO ( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}' +); + +sleeps 1s + +query ?? +select * from avro_drop_table_connector_test_table +---- +ABC 1 + +statement ok +alter table avro_drop_table_connector_test_table drop connector; + +query ?? +select * from avro_drop_table_connector_test_table +---- +ABC 1 + +# produce another message +system ok +echo '{"foo":"DEF", "bar":2}' | rpk topic produce --schema-id=topic avro_drop_table_connector_test + +sleep 1s + +# the new message is not ingested +query ?? +select * from avro_drop_table_connector_test_table +---- +ABC 1 + # ===== clean up ===== statement ok DROP TABLE plain_students; + +statement ok +drop table avro_drop_table_connector_test_table; + +system ok +rpk topic delete 'avro_drop_table_connector_test' || true; \ +(rpk sr subject delete 'avro_drop_table_connector_test-value' && rpk sr subject delete 'avro_drop_table_connector_test-value' --permanent) || true;