Skip to content

Commit

Permalink
Second round
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Dec 12, 2024
1 parent f9bd929 commit 814754f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
8 changes: 4 additions & 4 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,12 @@ continuous_agg_init(ContinuousAgg *cagg, const Form_continuous_agg fd)
Assert(OidIsValid(cagg->relid));
Assert(OidIsValid(cagg->partition_type));

ObjectAddress direct_view =
get_and_lock_rel_by_name(&fd->direct_view_schema, &fd->direct_view_name, AccessShareLock);
Assert(OidIsValid(direct_view.objectId));
ObjectAddress partial_view =
get_and_lock_rel_by_name(&fd->partial_view_schema, &fd->partial_view_name, AccessShareLock);
Assert(OidIsValid(partial_view.objectId));

cagg->bucket_function =
ts_cm_functions->continuous_agg_get_bucket_function_info_internal(direct_view.objectId);
ts_cm_functions->continuous_agg_get_bucket_function_info_internal(partial_view.objectId);
}

TSDLLEXPORT CaggsInfo
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,7 @@ tsl_cagg_get_bucket_function_info(Oid view_oid)
Assert(query->commandType == CMD_SELECT);

ContinuousAggsBucketFunction *bf = palloc0(sizeof(ContinuousAggsBucketFunction));
TIMESTAMP_NOBEGIN(bf->bucket_time_origin);
TIMESTAMP_NOBEGIN(bf->bucket_time_origin);

ListCell *l;
foreach (l, query->groupClause)
Expand Down
39 changes: 22 additions & 17 deletions tsl/src/continuous_aggs/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ typedef struct TimeBucketInfoContext
/* Was the defined origin added during the migration and needs
* to be added to the function parameters during rewrite? */
bool origin_added_during_migration;
TimestampTz bucket_time_origin_to_replace;

/* Do we need to flip the timezone and the origin parameter during migration? */
bool need_parameter_order_change;
Expand All @@ -318,19 +319,19 @@ build_const_value_for_origin(TimeBucketInfoContext *context, Oid origin_type)
switch (origin_type)
{
case TIMESTAMPTZOID:
const_datum = TimestampTzGetDatum(context->cagg->bucket_function->bucket_time_origin);
const_datum = TimestampTzGetDatum(context->bucket_time_origin_to_replace);
break;
case TIMESTAMPOID:
const_datum =
DirectFunctionCall1(timestamptz_timestamp,
TimestampTzGetDatum(
context->cagg->bucket_function->bucket_time_origin));
context->bucket_time_origin_to_replace));
break;
case DATEOID:
const_datum =
DirectFunctionCall1(timestamptz_date,
TimestampTzGetDatum(
context->cagg->bucket_function->bucket_time_origin));
context->bucket_time_origin_to_replace));
break;
default:
elog(ERROR,
Expand Down Expand Up @@ -429,7 +430,7 @@ cagg_user_query_mutator(Node *node, TimeBucketInfoContext *context)
* Rewrite the given CAgg view and replace the bucket function
*/
static void
continuous_agg_rewrite_view(Oid view_oid, const ContinuousAgg *cagg, TimeBucketInfoContext *context)
continuous_agg_rewrite_view(Oid view_oid, TimeBucketInfoContext *context)
{
int sec_ctx;
Oid uid, saved_uid;
Expand All @@ -446,7 +447,7 @@ continuous_agg_rewrite_view(Oid view_oid, const ContinuousAgg *cagg, TimeBucketI
Query *updated_direct_query = (Query *) cagg_user_query_mutator((Node *) direct_query, context);

/* Store updated CAgg query */
SWITCH_TO_TS_USER(NameStr(cagg->data.user_view_schema), uid, saved_uid, sec_ctx);
SWITCH_TO_TS_USER(NameStr(context->cagg->data.user_view_schema), uid, saved_uid, sec_ctx);
StoreViewQuery(view_oid, updated_direct_query, true);
CommandCounterIncrement();
RESTORE_USER(uid, saved_uid, sec_ctx);
Expand All @@ -456,29 +457,32 @@ continuous_agg_rewrite_view(Oid view_oid, const ContinuousAgg *cagg, TimeBucketI
* Replace the bucket function in the CAgg view definition
*/
static void
continuous_agg_replace_function(const ContinuousAgg *cagg, Oid function_to_replace,
continuous_agg_replace_function(ContinuousAgg *cagg, Oid function_to_replace,
bool origin_added_during_migration,
TimestampTz bucket_time_origin_to_replace,
bool need_parameter_order_change)
{
TimeBucketInfoContext context = { 0 };
context.cagg = cagg;
context.function_to_replace = function_to_replace;
context.origin_added_during_migration = origin_added_during_migration;
context.need_parameter_order_change = need_parameter_order_change;
TimeBucketInfoContext context = {
.cagg = cagg,
.function_to_replace = function_to_replace,
.origin_added_during_migration = origin_added_during_migration,
.bucket_time_origin_to_replace = bucket_time_origin_to_replace,
.need_parameter_order_change = need_parameter_order_change
};

/* Rewrite the direct_view */
Oid direct_view_oid = ts_get_relation_relid(NameStr(cagg->data.direct_view_schema),
NameStr(cagg->data.direct_view_name),
false);

continuous_agg_rewrite_view(direct_view_oid, cagg, &context);
continuous_agg_rewrite_view(direct_view_oid, &context);

/* Rewrite the partial_view */
Oid partial_view_oid = ts_get_relation_relid(NameStr(cagg->data.partial_view_schema),
NameStr(cagg->data.partial_view_name),
false);

continuous_agg_rewrite_view(partial_view_oid, cagg, &context);
continuous_agg_rewrite_view(partial_view_oid, &context);

/* Rewrite the user facing view if needed */
if (!cagg->data.materialized_only)
Expand All @@ -487,7 +491,7 @@ continuous_agg_replace_function(const ContinuousAgg *cagg, Oid function_to_repla
NameStr(cagg->data.user_view_name),
false);

continuous_agg_rewrite_view(user_view_oid, cagg, &context);
continuous_agg_rewrite_view(user_view_oid, &context);
}
}

Expand Down Expand Up @@ -594,28 +598,29 @@ continuous_agg_migrate_to_time_bucket(PG_FUNCTION_ARGS)
/* Update the time_bucket_fuction */
cagg->bucket_function->bucket_function = new_bucket_function;
bool origin_added_during_migration = false;
TimestampTz bucket_time_origin_to_replace;

/* Set new origin if not already present in the function definition. This is needed since
* time_bucket and time_bucket_ng use different origin default values.
*/
if (cagg->bucket_function->bucket_time_based &&
TIMESTAMP_NOT_FINITE(cagg->bucket_function->bucket_time_origin))
{
cagg->bucket_function->bucket_time_origin =
continuous_agg_get_default_origin(new_bucket_function);
bucket_time_origin_to_replace = continuous_agg_get_default_origin(new_bucket_function);
origin_added_during_migration = true;
}

/* Modify the CAgg view definition */
continuous_agg_replace_function(cagg,
old_bucket_function,
origin_added_during_migration,
bucket_time_origin_to_replace,
need_parameter_order_change);

/* Fetch new CAgg definition from catalog */
ContinuousAgg PG_USED_FOR_ASSERTS_ONLY *new_cagg_definition =
cagg_get_by_relid_or_fail(cagg_relid);
Assert(new_cagg_definition->bucket_function->bucket_function == old_bucket_function);
Assert(new_cagg_definition->bucket_function->bucket_function == new_bucket_function);
Assert(cagg->bucket_function->bucket_time_origin ==
new_cagg_definition->bucket_function->bucket_time_origin);

Expand Down

0 comments on commit 814754f

Please sign in to comment.