Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(frontend): use purified definition for altering source #20182

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/alter/add_column_shared.slt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ select * from mv_before_alter;
statement ok
alter source s add column v3 varchar;

# Demonstrate definition change.
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's';
----
CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING, v3 CHARACTER VARYING)

# New MV will have v3.

# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ No tracking issue yet. Feel free to submit a feature request at https://github.c
statement ok
alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

# Demonstrate definition change.
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's';
----
CREATE SOURCE s (foo CHARACTER VARYING, bar INT)

query ??
select * from s
----
Expand Down
10 changes: 10 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ FORMAT PLAIN ENCODE PROTOBUF(
message = 'test.User'
);

query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user';
----
CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>) INCLUDE timestamp

statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

Expand All @@ -45,6 +50,11 @@ set streaming_use_shared_source to false;
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;

query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user';
----
CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>, age INT) INCLUDE timestamp

# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
query
EXPLAIN CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ impl SourceCatalog {

self.create_sql_ast()
}

/// Fills the `definition` field with the purified SQL definition.
///
/// There's no need to call this method for correctness because we automatically purify the
/// SQL definition at the time of querying. However, this helps to maintain more accurate
/// `definition` field in the catalog when directly inspected for debugging purposes.
pub fn fill_purified_create_sql(&mut self) {
self.definition = self.create_sql_purified();
}
}

impl From<&PbSource> for SourceCatalog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn read_relation_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwRelat
schemaname: schema.clone(),
relationname: t.name.clone(),
relationowner: t.owner as i32,
definition: t.definition.clone(),
definition: t.create_sql_purified(),
relationtype: "SOURCE".into(),
relationid: t.id as i32,
relationtimezone: timezone,
Expand Down
31 changes: 3 additions & 28 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::max_column_id;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
use risingwave_sqlparser::ast::{
AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement,
};
use risingwave_sqlparser::parser::Parser;
use risingwave_sqlparser::ast::{AlterSourceOperation, ObjectName};

use super::create_source::generate_stream_graph_for_source;
use super::create_table::bind_sql_columns;
Expand Down Expand Up @@ -107,17 +103,17 @@ pub async fn handle_alter_source_column(
"column \"{new_column_name}\" of source \"{source_name}\" already exists"
)))?
}
catalog.definition =
alter_definition_add_column(&catalog.definition, column_def.clone())?;
let mut bound_column = bind_sql_columns(&[column_def])?.remove(0);
bound_column.column_desc.column_id = max_column_id(columns).next();
columns.push(bound_column);
// No need to update the definition here. It will be done by purification later.
}
_ => unreachable!(),
}

// update version
catalog.version += 1;
catalog.fill_purified_create_sql();

let catalog_writer = session.catalog_writer()?;
if catalog.info.is_shared() {
Expand Down Expand Up @@ -148,27 +144,6 @@ pub async fn handle_alter_source_column(
Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
}

/// `alter_definition_add_column` adds a new column to the definition of the relation.
#[inline(always)]
pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Result<String> {
let ast = Parser::parse_sql(definition).expect("failed to parse relation definition");
let mut stmt = ast
.into_iter()
.exactly_one()
.expect("should contains only one statement");

match &mut stmt {
Statement::CreateSource {
stmt: CreateSourceStatement { columns, .. },
} => {
columns.push(column);
}
_ => unreachable!(),
}

Ok(stmt.to_string())
}

#[cfg(test)]
pub mod tests {
use std::collections::BTreeMap;
Expand Down
30 changes: 10 additions & 20 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::sync::Arc;

use anyhow::Context;
use either::Either;
use itertools::Itertools;
use pgwire::pg_response::StatementType;
Expand All @@ -28,7 +27,6 @@ use risingwave_sqlparser::ast::{
CompatibleFormatEncode, CreateSourceStatement, Encode, Format, FormatEncodeOptions, ObjectName,
SqlOption, Statement,
};
use risingwave_sqlparser::parser::Parser;

use super::create_source::{
generate_stream_graph_for_source, schema_has_schema_registry, validate_compatibility,
Expand Down Expand Up @@ -198,10 +196,7 @@ pub async fn refresh_sr_and_get_columns_diff(
}

fn get_format_encode_from_source(source: &SourceCatalog) -> Result<FormatEncodeOptions> {
let [stmt]: [_; 1] = Parser::parse_sql(&source.definition)
.context("unable to parse original source definition")?
.try_into()
.unwrap();
let stmt = source.create_sql_ast()?;
let Statement::CreateSource {
stmt: CreateSourceStatement { format_encode, .. },
} = stmt
Expand Down Expand Up @@ -263,8 +258,10 @@ pub async fn handle_alter_source_with_sr(

source.info = source_info;
source.columns.extend(added_columns);
source.definition =
alter_definition_format_encode(&source.definition, format_encode.row_options.clone())?;
source.definition = alter_definition_format_encode(
source.create_sql_ast_purified()?,
format_encode.row_options.clone(),
)?;

let (format_encode_options, format_encode_secret_ref) = resolve_secret_ref_in_with_options(
WithOptions::try_from(format_encode.row_options())?,
Expand Down Expand Up @@ -313,15 +310,9 @@ pub async fn handle_alter_source_with_sr(

/// Apply the new `format_encode_options` to the source/table definition.
pub fn alter_definition_format_encode(
definition: &str,
mut stmt: Statement,
format_encode_options: Vec<SqlOption>,
) -> Result<String> {
let ast = Parser::parse_sql(definition).expect("failed to parse relation definition");
let mut stmt = ast
.into_iter()
.exactly_one()
.expect("should contain only one statement");

match &mut stmt {
Statement::CreateSource {
stmt: CreateSourceStatement { format_encode, .. },
Expand Down Expand Up @@ -391,6 +382,9 @@ pub mod tests {
.clone()
};

let source = get_source();
expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://')"].assert_eq(&source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), ""));

let sql = format!(
r#"ALTER SOURCE src FORMAT UPSERT ENCODE PROTOBUF (
message = '.test.TestRecord',
Expand Down Expand Up @@ -434,10 +428,6 @@ pub mod tests {
.unwrap();
assert_eq!(name_column.column_desc.data_type, DataType::Varchar);

let altered_sql = format!(
r#"CREATE SOURCE src WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://{}')"#,
proto_file.path().to_str().unwrap()
);
assert_eq!(altered_sql, altered_source.definition);
expect_test::expect!["CREATE SOURCE src (id INT, country STRUCT<address CHARACTER VARYING, city STRUCT<address CHARACTER VARYING, zipcode CHARACTER VARYING>, zipcode CHARACTER VARYING>, zipcode BIGINT, rate REAL, name CHARACTER VARYING) WITH (connector = 'kafka', topic = 'test-topic', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecordExt', schema.location = 'file://')"].assert_eq(&altered_source.create_sql_purified().replace(proto_file.path().to_str().unwrap(), ""));
}
}
Loading