Skip to content

Commit

Permalink
split definition for plan & definition to persist
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 14, 2025
1 parent db328ec commit fa85b46
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 19 deletions.
70 changes: 57 additions & 13 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ pub async fn get_new_table_definition_for_cdc_table(
);

// Retrieve the original table definition.
// TODO(purify): use purified definition.
let mut definition = original_catalog.create_sql_ast()?;
let mut definition = original_catalog.create_sql_ast_purified()?;

// Clear the original columns field, so that we'll follow `new_columns` to generate a
// purified definition.
Expand Down Expand Up @@ -89,7 +88,8 @@ pub async fn get_new_table_definition_for_cdc_table(
pub async fn get_replace_table_plan(
session: &Arc<SessionImpl>,
table_name: ObjectName,
new_definition: Statement,
new_definition_for_plan: Statement,
new_definition_to_persist: Option<Statement>,
old_catalog: &Arc<TableCatalog>,
) -> Result<(
Option<Source>,
Expand All @@ -98,8 +98,12 @@ pub async fn get_replace_table_plan(
ColIndexMapping,
TableJobType,
)> {
let new_definition_to_persist = new_definition_to_persist
.as_ref()
.unwrap_or(&new_definition_for_plan);
// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
let handler_args = HandlerArgs::new(session.clone(), new_definition_to_persist, Arc::from(""))?;

let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
let Statement::CreateTable {
columns,
Expand All @@ -114,9 +118,9 @@ pub async fn get_replace_table_plan(
include_column_options,
engine,
..
} = new_definition
} = new_definition_for_plan
else {
panic!("unexpected statement type: {:?}", new_definition);
panic!("unexpected statement type: {:?}", new_definition_for_plan);
};

let format_encode = format_encode
Expand Down Expand Up @@ -258,8 +262,7 @@ pub async fn handle_alter_table_column(
}

// Retrieve the original table definition and parse it to AST.
// TODO(purify): use purified definition.
let mut definition = original_catalog.create_sql_ast()?;
let mut definition = original_catalog.create_sql_ast_purified()?;
let Statement::CreateTable {
columns,
format_encode,
Expand All @@ -272,11 +275,12 @@ pub async fn handle_alter_table_column(
let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());
let has_schema_registry = format_encode
.as_ref()
.is_some_and(schema_has_schema_registry);

let fail_if_has_schema_registry = || {
if let Some(format_encode) = &format_encode
&& schema_has_schema_registry(format_encode)
{
if has_schema_registry {
Err(ErrorCode::NotSupported(
"alter table with schema registry".to_owned(),
"try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
Expand All @@ -286,6 +290,20 @@ pub async fn handle_alter_table_column(
}
};

// If there's schema registry, we also operate on the raw definition (besides the purified one)
// to maintain a definition for refreshing use.
let mut raw_definition = if has_schema_registry {
Some(original_catalog.create_sql_ast()?)
} else {
None
};
let raw_columns = raw_definition.as_mut().map(|raw_definition| {
let Statement::CreateTable { columns, .. } = raw_definition else {
panic!("unexpected statement: {:?}", raw_definition);
};
columns
});

if columns.is_empty() {
Err(ErrorCode::NotSupported(
"alter a table with empty column definitions".to_owned(),
Expand Down Expand Up @@ -331,6 +349,8 @@ pub async fn handle_alter_table_column(

// Add the new column to the table definition if it is not created by `create table (*)` syntax.
columns.push(new_column);
// Currently we don't support adding columns to table with schema registry.
assert!(raw_columns.is_none());
}

AlterTableOperation::DropColumn {
Expand Down Expand Up @@ -372,6 +392,24 @@ pub async fn handle_alter_table_column(
.ok()
.unwrap();

if let Some(raw_columns) = raw_columns {
let raw_removed_column = raw_columns
.extract_if(|c| c.name.real_value() == column_name)
.at_most_one()
.ok()
.unwrap();

// Currently the removed column from schema registry enabled table must be a generated column.
// So we must find it in the raw definition as well.
assert_eq!(
removed_column.is_some(),
raw_removed_column.is_some(),
"mismatched raw column definition, purified: {:?}, raw: {:?}",
removed_column,
raw_removed_column
);
}

if removed_column.is_some() {
// PASS
} else if if_exists {
Expand All @@ -392,8 +430,14 @@ pub async fn handle_alter_table_column(
_ => unreachable!(),
};

let (source, table, graph, col_index_mapping, job_type) =
get_replace_table_plan(&session, table_name, definition, &original_catalog).await?;
let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
&session,
table_name,
definition,
raw_definition,
&original_catalog,
)
.await?;

let catalog_writer = session.catalog_writer()?;

Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use crate::error::{ErrorCode, Result};
use crate::TableCatalog;

fn get_format_encode_from_table(table: &TableCatalog) -> Result<Option<FormatEncodeOptions>> {
// TODO(purify): use purified definition.
let stmt = table.create_sql_ast()?;
let stmt = table.create_sql_ast_purified()?;
let Statement::CreateTable { format_encode, .. } = stmt else {
unreachable!()
};
Expand Down Expand Up @@ -65,7 +64,7 @@ pub async fn handle_refresh_schema(

let (source, table, graph, col_index_mapping, job_type) = {
let result =
get_replace_table_plan(&session, table_name, definition, &original_table).await;
get_replace_table_plan(&session, table_name, definition, None, &original_table).await;
match result {
Ok((source, table, graph, col_index_mapping, job_type)) => {
Ok((source, table, graph, col_index_mapping, job_type))
Expand Down
9 changes: 6 additions & 3 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,11 @@ pub(crate) async fn reparse_table_for_sink(
table_catalog: &Arc<TableCatalog>,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>)> {
// Retrieve the original table definition and parse it to AST.
// TODO(purify): use purified definition.
let definition = table_catalog.create_sql_ast()?;
let definition = table_catalog.create_sql_ast_purified()?;
let raw_definition = table_catalog
.create_sql_ast()
.unwrap_or_else(|_| definition.clone() /* create table as */);

let Statement::CreateTable {
name,
format_encode,
Expand All @@ -519,7 +522,7 @@ pub(crate) async fn reparse_table_for_sink(
.map(|format_encode| format_encode.into_v2_with_warning());

// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
let handler_args = HandlerArgs::new(session.clone(), &raw_definition, Arc::from(""))?;
let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
let Statement::CreateTable {
columns,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ async fn get_new_table_plan(
&session,
table_name,
new_table_definition,
None,
&original_catalog,
)
.await?;
Expand Down

0 comments on commit fa85b46

Please sign in to comment.