From 68e53e8f329e12782d6ef28844e1ebcccb56af9a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 16 Jan 2025 16:50:30 +0800 Subject: [PATCH 1/3] refactor(frontend): use purified definition for altering --- src/frontend/src/catalog/source_catalog.rs | 9 ++++++ .../rw_catalog/rw_relation_info.rs | 2 +- .../src/handler/alter_source_column.rs | 31 ++----------------- .../src/handler/alter_source_with_sr.rs | 21 ++++--------- 4 files changed, 19 insertions(+), 44 deletions(-) diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index bf02092c1cc0a..c4856e329d605 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -142,6 +142,15 @@ impl SourceCatalog { self.create_sql_ast() } + + /// Fills the `definition` field with the purified SQL definition. + /// + /// There's no need to call this method for correctness because we automatically purify the + /// SQL definition at the time of querying. However, this helps to maintain more accurate + /// `definition` field in the catalog when directly inspected for debugging purposes. + pub fn fill_purified_create_sql(&mut self) { + self.definition = self.create_sql_purified(); + } } impl From<&PbSource> for SourceCatalog { diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs index e5276c4f4f6bc..a4b3eb18387d8 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs @@ -174,7 +174,7 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result unreachable!(), } // update version catalog.version += 1; + catalog.fill_purified_create_sql(); let catalog_writer = session.catalog_writer()?; if catalog.info.is_shared() { @@ -148,27 +144,6 @@ pub async fn handle_alter_source_column( Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) } -/// `alter_definition_add_column` adds a new column to the definition of the relation. -#[inline(always)] -pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Result { - let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); - let mut stmt = ast - .into_iter() - .exactly_one() - .expect("should contains only one statement"); - - match &mut stmt { - Statement::CreateSource { - stmt: CreateSourceStatement { columns, .. }, - } => { - columns.push(column); - } - _ => unreachable!(), - } - - Ok(stmt.to_string()) -} - #[cfg(test)] pub mod tests { use std::collections::BTreeMap; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index e90a250e34bfa..c71f35d82606a 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use anyhow::Context; use either::Either; use itertools::Itertools; use pgwire::pg_response::StatementType; @@ -28,7 +27,6 @@ use risingwave_sqlparser::ast::{ CompatibleFormatEncode, CreateSourceStatement, Encode, Format, FormatEncodeOptions, ObjectName, SqlOption, Statement, }; -use risingwave_sqlparser::parser::Parser; use super::create_source::{ generate_stream_graph_for_source, schema_has_schema_registry, validate_compatibility, @@ -198,10 +196,7 @@ pub async fn refresh_sr_and_get_columns_diff( } fn get_format_encode_from_source(source: &SourceCatalog) -> Result { - let [stmt]: [_; 1] = Parser::parse_sql(&source.definition) - .context("unable to parse original source definition")? - .try_into() - .unwrap(); + let stmt = source.create_sql_ast()?; let Statement::CreateSource { stmt: CreateSourceStatement { format_encode, .. }, } = stmt @@ -263,8 +258,10 @@ pub async fn handle_alter_source_with_sr( source.info = source_info; source.columns.extend(added_columns); - source.definition = - alter_definition_format_encode(&source.definition, format_encode.row_options.clone())?; + source.definition = alter_definition_format_encode( + source.create_sql_ast()?, + format_encode.row_options.clone(), + )?; let (format_encode_options, format_encode_secret_ref) = resolve_secret_ref_in_with_options( WithOptions::try_from(format_encode.row_options())?, @@ -313,15 +310,9 @@ pub async fn handle_alter_source_with_sr( /// Apply the new `format_encode_options` to the source/table definition. pub fn alter_definition_format_encode( - definition: &str, + mut stmt: Statement, format_encode_options: Vec, ) -> Result { - let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); - let mut stmt = ast - .into_iter() - .exactly_one() - .expect("should contain only one statement"); - match &mut stmt { Statement::CreateSource { stmt: CreateSourceStatement { format_encode, .. }, From 191fa18893001410b567a25690b5939c825fcf85 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 16 Jan 2025 17:53:18 +0800 Subject: [PATCH 2/3] add more tests Signed-off-by: Bugen Zhao --- .../source_inline/kafka/alter/add_column_shared.slt | 6 ++++++ e2e_test/source_inline/kafka/avro/alter_source.slt | 6 ++++++ .../kafka/protobuf/alter_source_shared.slt | 10 ++++++++++ src/frontend/src/handler/alter_source_with_sr.rs | 2 +- 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/alter/add_column_shared.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt index bbb03c178fa2f..210f17bad63ab 100644 --- a/e2e_test/source_inline/kafka/alter/add_column_shared.slt +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -48,6 +48,12 @@ select * from mv_before_alter; statement ok alter source s add column v3 varchar; +# Demonstrate definition change. +query T +SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's'; +---- +CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING, v3 CHARACTER VARYING) + # New MV will have v3. # Check it should still be shared source diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index 8bce7f4efd5cf..bb36e116279e4 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -52,6 +52,12 @@ No tracking issue yet. Feel free to submit a feature request at https://github.c statement ok alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); +# Demonstrate definition change. +query T +SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's'; +---- +CREATE SOURCE s (foo CHARACTER VARYING, bar INT) + query ?? select * from s ---- diff --git a/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt index 658d4fa95c6a0..19ded681789cb 100644 --- a/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt +++ b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt @@ -20,6 +20,11 @@ FORMAT PLAIN ENCODE PROTOBUF( message = 'test.User' ); +query T +SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user'; +---- +CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT) INCLUDE timestamp + statement ok CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; @@ -45,6 +50,11 @@ set streaming_use_shared_source to false; statement ok ALTER SOURCE src_user REFRESH SCHEMA; +query T +SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user'; +---- +CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT, age INT) INCLUDE timestamp + # Check it should still be shared source query EXPLAIN CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index c71f35d82606a..66d38f769337e 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -259,7 +259,7 @@ pub async fn handle_alter_source_with_sr( source.info = source_info; source.columns.extend(added_columns); source.definition = alter_definition_format_encode( - source.create_sql_ast()?, + source.create_sql_ast_purified()?, format_encode.row_options.clone(), )?; From e1f410dcba3668323cbdd71025bfe3dc80f2b895 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 17 Jan 2025 12:38:10 +0800 Subject: [PATCH 3/3] fix unit test Signed-off-by: Bugen Zhao --- src/frontend/src/handler/alter_source_with_sr.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 66d38f769337e..472ce8772b99b 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -382,6 +382,9 @@ pub mod tests { .clone() }; + let source = get_source(); + expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT
, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://')"].assert_eq(&source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), "")); + let sql = format!( r#"ALTER SOURCE src FORMAT UPSERT ENCODE PROTOBUF ( message = '.test.TestRecord', @@ -425,10 +428,6 @@ pub mod tests { .unwrap(); assert_eq!(name_column.column_desc.data_type, DataType::Varchar); - let altered_sql = format!( - r#"CREATE SOURCE src WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://{}')"#, - proto_file.path().to_str().unwrap() - ); - assert_eq!(altered_sql, altered_source.definition); + expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT
, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL, name CHARACTER VARYING) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://')"].assert_eq(&altered_source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), "")); } }