Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: cagg_migrate() does not work on caggs that have a time_bucket column with an interval added as offset #7236

Open
Streamlinesx opened this issue Sep 5, 2024 · 6 comments

Comments

@Streamlinesx
Copy link

Streamlinesx commented Sep 5, 2024

What type of bug is this?

Unexpected error

What subsystems and features are affected?

Other

What happened?

What happened:
I tried to migrate our caggs to the 'new' format from the 'old' format. However when trying to execute the cagg_migrate() function on the affected cagg, it throws an error saying it cannot find the time_partition_col

This prevents us from updating our database to the most recent version without losing data and we are stuck using PG12

What is expected to happen:
The cagg_migrate() to execute successfully and migrate the cagg

TimescaleDB version affected

2.9.3

PostgreSQL version used

12

What operating system did you use?

Windows 10 x64

What installation method did you use?

Docker

What platform did you run on?

Not applicable

Relevant log output and stack trace

CALL cagg_migrate('public.data_aggregation_hour_max'::regclass, override=>TRUE, drop_old=>FALSE);
ERROR:  column "time_partition_col" does not exist
LINE 3:             SELECT min(time_partition_col), max(time_partiti...
                               ^
QUERY:  
        WITH boundaries AS (
            SELECT min(time_partition_col), max(time_partition_col), 'time_partition_col' AS bucket_column_name, 'timestamp without time zone' AS bucket_column_type, 'data_aggregation_hour_max_new' AS cagg_name_new
            FROM public.data_aggregation_hour_max
            WHERE time_partition_col < CAST('4714-11-24 00:00:00 BC' AS timestamp without time zone)
        )
        INSERT INTO
            _timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config)
        SELECT
            '2',
            'COPY DATA',
            jsonb_build_object (
                'start_ts', start::text,
                'end_ts', (start + CAST('10 days' AS interval))::text,
                'bucket_column_name', bucket_column_name,
                'bucket_column_type', bucket_column_type,
                'cagg_name_new', cagg_name_new
            )
        FROM boundaries,
             LATERAL generate_series(min, max, CAST('10 days' AS interval)) AS start;
        
CONTEXT:  PL/pgSQL function _timescaledb_internal.cagg_migrate_create_plan(_timescaledb_catalog.continuous_agg,text,boolean,boolean) line 116 at EXECUTE
SQL statement "CALL _timescaledb_internal.cagg_migrate_create_plan(_cagg_data, _cagg_name_new, override, drop_old)"
PL/pgSQL function cagg_migrate(regclass,boolean,boolean) line 25 at CALL

How can we reproduce the bug?

In my case I have arrived at the bug by creating a cagg on a version of TSDB that uses the 'old' format, and add a interval to the time_bucket column, i.e.

time_bucket(INTERVAL '1 hour', timestamp) + '1 hour' AS timestamp

To do this I brought up a docker container running tsdb 1.7.5 on PG12 and have a local folder mapped to the data folder of PG, in my case from a docker-compose file.

Content of Dockerfile/image used:

FROM timescale/timescaledb:1.7.5-pg12

Content of docker-compose.yaml:

services:
  test-postgres-server: # The Postgres Database Service
    container_name: timescaledb
    build: PayaraPostgres
    restart: unless-stopped
    networks:
      - default
    environment: # Username, password and database name variables
      POSTGRES_USER: testuser
      POSTGRES_DB: testdb
      POSTGRES_PASSWORD: test
    volumes: # Volumes for scripts and related files you can add
      - ./PayaraPostgres/volumes/test-postgres-server/var/testdb/data/postgresdata:/var/lib/postgresql/data
    ports:
      - 5435:5432


Here is the script used to create the initial database:

--Create base table

BEGIN TRANSACTION;

CREATE TABLE IF NOT EXISTS data
(
    dongle_id     integer      not null,
    ingestion     timestamp(3) not null,
    timestamp     timestamp(0) not null,
    tracked_value integer,
    constraint data_pkey primary key (dongle_id, timestamp)
);

-- Create hypertable
SELECT create_hypertable('data', 'timestamp', partitioning_column := 'dongle_id', number_partitions := 2,
                         chunk_time_interval := INTERVAL '1 day');

COMMIT;

-- Create hour aggregation
CREATE VIEW data_aggregation_hour_max
            WITH ( timescaledb.continuous, timescaledb.ignore_invalidation_older_than='6 days', timescaledb.refresh_lag = '10 minutes', timescaledb.refresh_interval = '1 hour')
AS
SELECT dongle_id, time_bucket(INTERVAL '1 hour', timestamp) + '1 hour' AS timestamp, max(tracked_value) AS tracked_value_max
FROM data
GROUP BY dongle_id, time_bucket(INTERVAL '1 hour', timestamp);

SELECT add_drop_chunks_policy('data', INTERVAL '7 days', false);
SELECT add_drop_chunks_policy('data_aggregation_hour_max', INTERVAL '62 days', false);


Then update to tsdb 2.9.3 by mounting the volume to the new container and updating the extension inside the database:

docker pull timescale/timescaledb:2.9.3-pg12
docker stop timescaledb
docker rm timescaledb

docker run -v {absolute path to your postgres postgresdata folder}:/var/lib/postgresql/data -d --name timescaledb -p 5435:5432 timescale/timescaledb:2.9.3-pg12

Connect to the docker container shell and log in to the DB:

docker exec -it {containerId} bash
psql -U testuser -d testdb -X

Update the timescaledb extension:

ALTER EXTENSION timescaledb UPDATE;

At this point timescaledb informs about the outdated cagg format:

WARNING:  Continuous Aggregate: public.data_aggregation_hour_max with old format will not be supported with PG15. You should upgrade to the new format

To find outdated caggs and prepare the necessary query:

SELECT format('CALL cagg_migrate(%L::regclass, override=>TRUE, drop_old=>FALSE);', format('%I.%I', view_schema, view_name)) FROM timescaledb_information.continuous_aggregates WHERE finalized IS FALSE;

In this example it returns:

                                              format                                               
---------------------------------------------------------------------------------------------------
 CALL cagg_migrate('public.data_aggregation_hour_max'::regclass, override=>TRUE, drop_old=>FALSE);
(1 row)

When executing the above function call it returns the above documented error.

When trying the same procedure without adding the interval (+ '1 hour') to the time_bucket column during the initial creation of the cagg, it will not throw this error:

CREATE VIEW data_aggregation_hour_max
            WITH ( timescaledb.continuous, timescaledb.ignore_invalidation_older_than='6 days', timescaledb.refresh_lag = '10 minutes', timescaledb.refresh_interval = '1 hour')
AS
SELECT dongle_id, time_bucket(INTERVAL '1 hour', timestamp) AS timestamp, max(tracked_value) AS tracked_value_max
FROM data
GROUP BY dongle_id, time_bucket(INTERVAL '1 hour', timestamp);

After updating to 2.9.3 and calling the cagg_migrate function on the cagg it executes successfully 
@Streamlinesx
Copy link
Author

Streamlinesx commented Sep 5, 2024

Digging a bit further into this, by looking at how cagg_migrate.sql works and what different queries used in the script return when querying tables from the different _timescaledb schemas. Creating two caggs where one has the offset and one does not:

CREATE VIEW data_aggregation_hour_max_offset
            WITH ( timescaledb.continuous, timescaledb.ignore_invalidation_older_than='6 days', timescaledb.refresh_lag = '10 minutes', timescaledb.refresh_interval = '1 hour')
AS
SELECT dongle_id, time_bucket(INTERVAL '1 hour', timestamp) + '1 hour' AS timestamp, max(tracked_value) AS tracked_value_max
FROM data
GROUP BY dongle_id, time_bucket(INTERVAL '1 hour', timestamp);

CREATE VIEW data_aggregation_hour_max_not_offset
            WITH ( timescaledb.continuous, timescaledb.ignore_invalidation_older_than='6 days', timescaledb.refresh_lag = '10 minutes', timescaledb.refresh_interval = '1 hour')
AS
SELECT dongle_id, time_bucket(INTERVAL '1 hour', timestamp) AS timestamp, max(tracked_value) AS tracked_value_max
FROM data
GROUP BY dongle_id, time_bucket(INTERVAL '1 hour', timestamp);

This produces two different materialized_hypertable tables in the _timescaledb_internal schema:

WITH offset:

create table _timescaledb_internal._materialized_hypertable_2
(
    dongle_id          integer,
    timestamp          timestamp,
    agg_3_3            bytea,
    time_partition_col timestamp not null, --note this column
    chunk_id           integer
);

WITHOUT offset:

create table _timescaledb_internal._materialized_hypertable_3
(
    dongle_id integer,
    timestamp timestamp not null, --here the time_partition_column does not exist
    agg_3_3   bytea,
    chunk_id  integer
);

Then querying

select * from timescaledb_information.dimensions;

Of which the column_name column is used to assign to declared variables in the cagg_migration script:

SELECT time_interval, integer_interval, column_name, column_type
INTO _time_interval, _integer_interval, _bucket_column_name, _bucket_column_type
FROM timescaledb_information.dimensions
WHERE hypertable_schema = _matht.schema_name
  AND hypertable_name = _matht.table_name
  AND dimension_type = 'Time';

When looking at the column_name of the two caggs,
the one WITH the offset has column_name value time_partition_col
the one WITHOUT the offset (later will not produce an error when migrating) has column_name value timestamp

I imagine that timescaledb puts information about the caggs into the timescaledb_information.dimensions table and column_name points to the column that holds the timestamp used to do "time bucketing" with. However since we created a cagg with an offset added to the time_bucket, it adds the time_partition_col in the materialized hypertable table since time_buckets work with a timestamp that is at the start of the bucket? So to function properly, the time_partition_col is added.

@mkindahl
Copy link
Contributor

mkindahl commented Sep 6, 2024

@Streamlinesx Thank you for the bug report, this indeed looks like a bug.

@Streamlinesx
Copy link
Author

Streamlinesx commented Sep 6, 2024

Hi @mkindahl , thanks for acknowledging the bug. @fabriziomello is this a possible fix? Replace at line 163 in cagg_migrate.sql
Select boundaries directly from materialized hypertable instead of user defined view. Couldn't find a way to test yet since I am having trouble with functions not existing in _timescaledb_functions schema

_sql := format (
            $$
    WITH
    boundaries AS (
        SELECT min(%1$I), max(%1$I), %1$L AS bucket_column_name, %2$L AS bucket_column_type, %3$L AS cagg_name_new
        FROM _timescaledb_internal._materialized_hypertable_%7$I
        WHERE %1$I < CAST(%6$L AS %2$s)
    )
    INSERT INTO
        _timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config)
    SELECT
        %7$L,
        'COPY DATA',
        jsonb_build_object (
            'start_ts', start::text,
            'end_ts', (start + CAST(%8$L AS %9$s))::text,
            'bucket_column_name', bucket_column_name,
            'bucket_column_type', bucket_column_type,
            'cagg_name_new', cagg_name_new
        )
    FROM boundaries,
         LATERAL generate_series(min, max, CAST(%8$L AS %9$s)) AS start;
    $$,
            _bucket_column_name, _bucket_column_type, _cagg_name_new, _cagg_data.user_view_schema,
            _cagg_data.user_view_name, _watermark, _cagg_data.mat_hypertable_id, _interval_value, _interval_type
            );

@Streamlinesx
Copy link
Author

Streamlinesx commented Sep 10, 2024

@fabriziomello I have continued development on this issue and found a way that works for our specific database structure, but I believe it is possible to work with the ideas and form them into a generic fix by adding the space dimension identifier if present to join tables in the copy_data procedure. I had to work on two procedures:

The cagg_migrate_create_plan procedure and the cagg_migrate_execute_copy_data procedure.

Both were having issues with the 'time_partition_col'. The modifications to the cagg_migrate_create_plan procedure should be generic, but the modifications in the cagg_migrate_execute_copy_data are not. Also while testing my solution I always had to manually add the column 'user_view_definition' of type 'text' to the _timescaledb_catalog.continuous_agg_migrate_plan table after bringing up a fresh container with tsdb version 2.9.3 and PG12. This column is important for the copy_data procedure.

CREATE OR REPLACE PROCEDURE _timescaledb_internal.cagg_migrate_create_plan(
    _cagg_data _timescaledb_catalog.continuous_agg,
    _cagg_name_new TEXT,
    _override BOOLEAN DEFAULT FALSE,
    _drop_old BOOLEAN DEFAULT FALSE
)
    LANGUAGE plpgsql AS
$BODY$
DECLARE
    _sql                TEXT;
    _matht              RECORD;
    _time_interval      INTERVAL;
    _integer_interval   BIGINT;
    _watermark          TEXT;
    _policies           JSONB;
    _bucket_column_name TEXT;
    _bucket_column_type TEXT;
    _interval_type      TEXT;
    _interval_value     TEXT;
BEGIN
    IF _timescaledb_internal.cagg_migrate_plan_exists(_cagg_data.mat_hypertable_id) IS TRUE THEN
        RAISE EXCEPTION 'plan already exists for materialized hypertable %', _cagg_data.mat_hypertable_id;
    END IF;

    -- If exist steps for this migration means that it's resuming the execution
    IF EXISTS (SELECT 1
               FROM _timescaledb_catalog.continuous_agg_migrate_plan_step
               WHERE mat_hypertable_id = _cagg_data.mat_hypertable_id) THEN
        RAISE WARNING 'resuming the migration of the continuous aggregate "%.%"', _cagg_data.user_view_schema, _cagg_data.user_view_name;
        RETURN;
    END IF;

    INSERT INTO _timescaledb_catalog.continuous_agg_migrate_plan (mat_hypertable_id, user_view_definition)
    VALUES (_cagg_data.mat_hypertable_id,
            pg_get_viewdef(format('%I.%I', _cagg_data.user_view_schema, _cagg_data.user_view_name)::regclass));

    SELECT schema_name, table_name INTO _matht FROM _timescaledb_catalog.hypertable WHERE id = _cagg_data.mat_hypertable_id;

    SELECT time_interval, integer_interval, column_name, column_type
    INTO _time_interval, _integer_interval, _bucket_column_name, _bucket_column_type
    FROM timescaledb_information.dimensions
    WHERE hypertable_schema = _matht.schema_name
      AND hypertable_name = _matht.table_name
      AND dimension_type = 'Time';

    IF _integer_interval IS NOT NULL THEN
        _interval_value := _integer_interval::TEXT;
        _interval_type := _bucket_column_type;
        IF _bucket_column_type = 'bigint' THEN
            _watermark := COALESCE(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)::bigint,
                                   '-9223372036854775808'::bigint)::TEXT;
        ELSIF _bucket_column_type = 'integer' THEN
            _watermark :=
                    COALESCE(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)::integer, '-2147483648'::integer)::TEXT;
        ELSE
            _watermark := COALESCE(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)::smallint, '-32768'::smallint)::TEXT;
        END IF;
    ELSE
        _interval_value := _time_interval::TEXT;
        _interval_type := 'interval';

        -- We expect an ISO date later in parsing (i.e., min value has to be '4714-11-24 00:53:28+00:53:28 BC')
        SET LOCAL datestyle = 'ISO, MDY';
        IF _bucket_column_type = 'timestamp with time zone' THEN
            _watermark := COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)),
                                   '-infinity'::timestamptz)::TEXT;
        ELSE
            _watermark := COALESCE(
                    _timescaledb_internal.to_timestamp_without_timezone(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)),
                    '-infinity'::timestamp)::TEXT;
        END IF;
    END IF;

    -- get all scheduled policies except the refresh
    SELECT jsonb_build_object('policies', array_agg(id))
    INTO _policies
    FROM _timescaledb_config.bgw_job
    WHERE hypertable_id = _cagg_data.mat_hypertable_id
      AND proc_name IS DISTINCT FROM 'policy_refresh_continuous_aggregate'
      AND scheduled IS TRUE
      AND id >= 1000;

    INSERT INTO _timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config)
    VALUES (_cagg_data.mat_hypertable_id, 'SAVE WATERMARK', jsonb_build_object('watermark', _watermark)),
           (_cagg_data.mat_hypertable_id, 'CREATE NEW CAGG', jsonb_build_object('cagg_name_new', _cagg_name_new)),
           (_cagg_data.mat_hypertable_id, 'DISABLE POLICIES', _policies),
           (_cagg_data.mat_hypertable_id, 'REFRESH NEW CAGG',
            jsonb_build_object('cagg_name_new', _cagg_name_new, 'window_start', _watermark, 'window_start_type', _bucket_column_type));

-- Finish the step because don't require any extra step
    UPDATE _timescaledb_catalog.continuous_agg_migrate_plan_step
    SET status   = 'FINISHED',
        start_ts = now(),
        end_ts   = clock_timestamp()
    WHERE type = 'SAVE WATERMARK';

    _sql := format($$
    WITH
    boundaries AS (
        SELECT min(%1$I), max(%1$I), %1$L AS bucket_column_name, %2$L AS bucket_column_type, %3$L AS cagg_name_new
        FROM _timescaledb_internal._materialized_hypertable_%7$s
        WHERE %1$I < CAST(%6$L AS %2$s)
    )
    INSERT INTO
        _timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config)
    SELECT
        %7$L,
        'COPY DATA',
        jsonb_build_object (
            'start_ts', start::text,
            'end_ts', (start + CAST(%8$L AS %9$s))::text,
            'bucket_column_name', bucket_column_name,
            'bucket_column_type', bucket_column_type,
            'cagg_name_new', cagg_name_new
        )
    FROM boundaries,
         LATERAL generate_series(min, max, CAST(%8$L AS %9$s)) AS start;
    $$, _bucket_column_name, _bucket_column_type, _cagg_name_new, _cagg_data.user_view_schema, _cagg_data.user_view_name, _watermark,
                   _cagg_data.mat_hypertable_id, _interval_value, _interval_type);


    EXECUTE _sql;

-- get all scheduled policies
    SELECT jsonb_build_object('policies', array_agg(id))
    INTO _policies
    FROM _timescaledb_config.bgw_job
    WHERE hypertable_id = _cagg_data.mat_hypertable_id
      AND scheduled IS TRUE
      AND id >= 1000;

    INSERT INTO _timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config)
    VALUES (_cagg_data.mat_hypertable_id, 'COPY POLICIES', _policies || jsonb_build_object('cagg_name_new', _cagg_name_new)),
           (_cagg_data.mat_hypertable_id, 'OVERRIDE CAGG',
            jsonb_build_object('cagg_name_new', _cagg_name_new, 'override', _override, 'drop_old', _drop_old)),
           (_cagg_data.mat_hypertable_id, 'DROP OLD CAGG',
            jsonb_build_object('cagg_name_new', _cagg_name_new, 'override', _override, 'drop_old', _drop_old)),
           (_cagg_data.mat_hypertable_id, 'ENABLE POLICIES', NULL);
END;
$BODY$ SET search_path TO pg_catalog, pg_temp;
CREATE OR REPLACE PROCEDURE _timescaledb_internal.cagg_migrate_execute_copy_data(
    _cagg_data _timescaledb_catalog.continuous_agg,
    _plan_step _timescaledb_catalog.continuous_agg_migrate_plan_step
)
    LANGUAGE plpgsql AS
$BODY$
DECLARE
    _column_list     TEXT;
    _stmt            TEXT;
    _mat_schema_name TEXT;
    _mat_table_name  TEXT;
BEGIN
    SELECT string_agg(format('%1$s.%2$s',_cagg_data.user_view_name, column_name), ', ')
    INTO _column_list
    FROM information_schema.columns
    WHERE table_name = _cagg_data.user_view_name;

    SELECT h.schema_name, h.table_name
    INTO _mat_schema_name, _mat_table_name
    FROM _timescaledb_catalog.continuous_agg ca
             JOIN _timescaledb_catalog.hypertable h ON (h.id = ca.mat_hypertable_id)
    WHERE user_view_schema = _cagg_data.user_view_schema
      AND user_view_name = _plan_step.config ->> 'cagg_name_new';

    _stmt := format(
            'INSERT INTO %1$I.%2$I SELECT %3$s, _timescaledb_internal._materialized_hypertable_%6$s.time_partition_col FROM %4$I.%5$I LEFT JOIN _timescaledb_internal._materialized_hypertable_%6$s ON %4$I.%5$I.dongle_id = _timescaledb_internal._materialized_hypertable_%6$s.dongle_id AND %4$I.%5$I.timestamp = _timescaledb_internal._materialized_hypertable_%6$s.timestamp WHERE %7$I >= %8$L AND %7$I < %9$L',
            _mat_schema_name,
            _mat_table_name,
            _column_list,
            _cagg_data.user_view_schema,
            _cagg_data.user_view_name,
            _cagg_data.mat_hypertable_id,
            _plan_step.config->>'bucket_column_name',
            _plan_step.config->>'start_ts',
            _plan_step.config->>'end_ts'
             );
    EXECUTE _stmt;
END;
$BODY$ SET search_path TO pg_catalog, pg_temp;

@mkindahl
Copy link
Contributor

@Streamlinesx Looks like you're on the way to fix it. :) If you have a proposal for a fix, do not hesitate to submit a pull request and we can do a proper review of it and suggest improvements and find problems.

@Streamlinesx
Copy link
Author

Streamlinesx commented Sep 11, 2024

@mkindahl for our specific database I have fixed it and I was able to migrate the affected caggs to the new format without issue with the above modifications. However, with the modifications it will not work with caggs that do not have an offset anymore, so during the migration I had to migrate the non-offset caggs first, then modify the procedures, and then migrate the offset caggs. I have an idea for how I could make the fix more generic, but I don't think I will have time to spend on it. Maybe I'll get to it in my private time since it does interest me and I feel like I have a good understanding of the situation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants