From 814754fd57d2683f2e9c8af90f9b653dc2a62016 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Thu, 8 Aug 2024 16:07:53 -0300 Subject: [PATCH] Second round --- src/ts_catalog/continuous_agg.c | 8 +++---- tsl/src/continuous_aggs/common.c | 2 +- tsl/src/continuous_aggs/utils.c | 39 ++++++++++++++++++-------------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/src/ts_catalog/continuous_agg.c b/src/ts_catalog/continuous_agg.c index f2e1a345df6..1eb44bdd298 100644 --- a/src/ts_catalog/continuous_agg.c +++ b/src/ts_catalog/continuous_agg.c @@ -384,12 +384,12 @@ continuous_agg_init(ContinuousAgg *cagg, const Form_continuous_agg fd) Assert(OidIsValid(cagg->relid)); Assert(OidIsValid(cagg->partition_type)); - ObjectAddress direct_view = - get_and_lock_rel_by_name(&fd->direct_view_schema, &fd->direct_view_name, AccessShareLock); - Assert(OidIsValid(direct_view.objectId)); + ObjectAddress partial_view = + get_and_lock_rel_by_name(&fd->partial_view_schema, &fd->partial_view_name, AccessShareLock); + Assert(OidIsValid(partial_view.objectId)); cagg->bucket_function = - ts_cm_functions->continuous_agg_get_bucket_function_info_internal(direct_view.objectId); + ts_cm_functions->continuous_agg_get_bucket_function_info_internal(partial_view.objectId); } TSDLLEXPORT CaggsInfo diff --git a/tsl/src/continuous_aggs/common.c b/tsl/src/continuous_aggs/common.c index b285ad7952b..d4ca0b4d8a7 100644 --- a/tsl/src/continuous_aggs/common.c +++ b/tsl/src/continuous_aggs/common.c @@ -1512,7 +1512,7 @@ tsl_cagg_get_bucket_function_info(Oid view_oid) Assert(query->commandType == CMD_SELECT); ContinuousAggsBucketFunction *bf = palloc0(sizeof(ContinuousAggsBucketFunction)); - TIMESTAMP_NOBEGIN(bf->bucket_time_origin); + TIMESTAMP_NOBEGIN(bf->bucket_time_origin); ListCell *l; foreach (l, query->groupClause) diff --git a/tsl/src/continuous_aggs/utils.c b/tsl/src/continuous_aggs/utils.c index fb9353ce0c1..33f698b7af9 100644 --- a/tsl/src/continuous_aggs/utils.c +++ b/tsl/src/continuous_aggs/utils.c @@ -302,6 +302,7 @@ typedef struct TimeBucketInfoContext /* Was the defined origin added during the migration and needs * to be added to the function parameters during rewrite? */ bool origin_added_during_migration; + TimestampTz bucket_time_origin_to_replace; /* Do we need to flip the timezone and the origin parameter during migration? */ bool need_parameter_order_change; @@ -318,19 +319,19 @@ build_const_value_for_origin(TimeBucketInfoContext *context, Oid origin_type) switch (origin_type) { case TIMESTAMPTZOID: - const_datum = TimestampTzGetDatum(context->cagg->bucket_function->bucket_time_origin); + const_datum = TimestampTzGetDatum(context->bucket_time_origin_to_replace); break; case TIMESTAMPOID: const_datum = DirectFunctionCall1(timestamptz_timestamp, TimestampTzGetDatum( - context->cagg->bucket_function->bucket_time_origin)); + context->bucket_time_origin_to_replace)); break; case DATEOID: const_datum = DirectFunctionCall1(timestamptz_date, TimestampTzGetDatum( - context->cagg->bucket_function->bucket_time_origin)); + context->bucket_time_origin_to_replace)); break; default: elog(ERROR, @@ -429,7 +430,7 @@ cagg_user_query_mutator(Node *node, TimeBucketInfoContext *context) * Rewrite the given CAgg view and replace the bucket function */ static void -continuous_agg_rewrite_view(Oid view_oid, const ContinuousAgg *cagg, TimeBucketInfoContext *context) +continuous_agg_rewrite_view(Oid view_oid, TimeBucketInfoContext *context) { int sec_ctx; Oid uid, saved_uid; @@ -446,7 +447,7 @@ continuous_agg_rewrite_view(Oid view_oid, const ContinuousAgg *cagg, TimeBucketI Query *updated_direct_query = (Query *) cagg_user_query_mutator((Node *) direct_query, context); /* Store updated CAgg query */ - SWITCH_TO_TS_USER(NameStr(cagg->data.user_view_schema), uid, saved_uid, sec_ctx); + SWITCH_TO_TS_USER(NameStr(context->cagg->data.user_view_schema), uid, saved_uid, sec_ctx); StoreViewQuery(view_oid, updated_direct_query, true); CommandCounterIncrement(); RESTORE_USER(uid, saved_uid, sec_ctx); @@ -456,29 +457,32 @@ continuous_agg_rewrite_view(Oid view_oid, const ContinuousAgg *cagg, TimeBucketI * Replace the bucket function in the CAgg view definition */ static void -continuous_agg_replace_function(const ContinuousAgg *cagg, Oid function_to_replace, +continuous_agg_replace_function(ContinuousAgg *cagg, Oid function_to_replace, bool origin_added_during_migration, + TimestampTz bucket_time_origin_to_replace, bool need_parameter_order_change) { - TimeBucketInfoContext context = { 0 }; - context.cagg = cagg; - context.function_to_replace = function_to_replace; - context.origin_added_during_migration = origin_added_during_migration; - context.need_parameter_order_change = need_parameter_order_change; + TimeBucketInfoContext context = { + .cagg = cagg, + .function_to_replace = function_to_replace, + .origin_added_during_migration = origin_added_during_migration, + .bucket_time_origin_to_replace = bucket_time_origin_to_replace, + .need_parameter_order_change = need_parameter_order_change + }; /* Rewrite the direct_view */ Oid direct_view_oid = ts_get_relation_relid(NameStr(cagg->data.direct_view_schema), NameStr(cagg->data.direct_view_name), false); - continuous_agg_rewrite_view(direct_view_oid, cagg, &context); + continuous_agg_rewrite_view(direct_view_oid, &context); /* Rewrite the partial_view */ Oid partial_view_oid = ts_get_relation_relid(NameStr(cagg->data.partial_view_schema), NameStr(cagg->data.partial_view_name), false); - continuous_agg_rewrite_view(partial_view_oid, cagg, &context); + continuous_agg_rewrite_view(partial_view_oid, &context); /* Rewrite the user facing view if needed */ if (!cagg->data.materialized_only) @@ -487,7 +491,7 @@ continuous_agg_replace_function(const ContinuousAgg *cagg, Oid function_to_repla NameStr(cagg->data.user_view_name), false); - continuous_agg_rewrite_view(user_view_oid, cagg, &context); + continuous_agg_rewrite_view(user_view_oid, &context); } } @@ -594,6 +598,7 @@ continuous_agg_migrate_to_time_bucket(PG_FUNCTION_ARGS) /* Update the time_bucket_fuction */ cagg->bucket_function->bucket_function = new_bucket_function; bool origin_added_during_migration = false; + TimestampTz bucket_time_origin_to_replace; /* Set new origin if not already present in the function definition. This is needed since * time_bucket and time_bucket_ng use different origin default values. @@ -601,8 +606,7 @@ continuous_agg_migrate_to_time_bucket(PG_FUNCTION_ARGS) if (cagg->bucket_function->bucket_time_based && TIMESTAMP_NOT_FINITE(cagg->bucket_function->bucket_time_origin)) { - cagg->bucket_function->bucket_time_origin = - continuous_agg_get_default_origin(new_bucket_function); + bucket_time_origin_to_replace = continuous_agg_get_default_origin(new_bucket_function); origin_added_during_migration = true; } @@ -610,12 +614,13 @@ continuous_agg_migrate_to_time_bucket(PG_FUNCTION_ARGS) continuous_agg_replace_function(cagg, old_bucket_function, origin_added_during_migration, + bucket_time_origin_to_replace, need_parameter_order_change); /* Fetch new CAgg definition from catalog */ ContinuousAgg PG_USED_FOR_ASSERTS_ONLY *new_cagg_definition = cagg_get_by_relid_or_fail(cagg_relid); - Assert(new_cagg_definition->bucket_function->bucket_function == old_bucket_function); + Assert(new_cagg_definition->bucket_function->bucket_function == new_bucket_function); Assert(cagg->bucket_function->bucket_time_origin == new_cagg_definition->bucket_function->bucket_time_origin);