From 62de00453359aed1813ad327281cd1b2a9e1d873 Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Tue, 5 Nov 2024 20:46:58 +0200 Subject: [PATCH 1/4] db: Return None from get_last_modified() on no-data --- kcidb/db/__init__.py | 10 +++++----- kcidb/db/abstract.py | 5 ++--- kcidb/db/bigquery/v04_00.py | 8 +++----- kcidb/db/mux.py | 7 ++++++- kcidb/db/null.py | 7 +++---- kcidb/db/postgresql/v04_00.py | 9 +++------ kcidb/db/schematic.py | 10 ++++------ kcidb/db/sqlite/v04_00.py | 12 ++++-------- kcidb/test_db.py | 3 +-- 9 files changed, 31 insertions(+), 40 deletions(-) diff --git a/kcidb/db/__init__.py b/kcidb/db/__init__.py index 590cb52b..e5dc7bd6 100644 --- a/kcidb/db/__init__.py +++ b/kcidb/db/__init__.py @@ -182,13 +182,12 @@ def get_current_time(self): def get_last_modified(self): """ - Get the time data has arrived last into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: A timezone-aware datetime object representing the last - data arrival time. + data arrival time, or None if the database is empty. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -196,8 +195,9 @@ def get_last_modified(self): """ assert self.is_initialized() last_modified = self.driver.get_last_modified() - assert isinstance(last_modified, datetime.datetime) - assert last_modified.tzinfo + assert last_modified is None or \ + isinstance(last_modified, datetime.datetime) and \ + last_modified.tzinfo return last_modified def get_schemas(self): diff --git a/kcidb/db/abstract.py b/kcidb/db/abstract.py index 2f04420c..b3368584 100644 --- a/kcidb/db/abstract.py +++ b/kcidb/db/abstract.py @@ -113,13 +113,12 @@ def get_current_time(self): @abstractmethod def get_last_modified(self): """ - Get the time data has arrived last into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: A timezone-aware datetime object representing the last - data arrival time. + data arrival time, or None if the database is empty. Raises: NoTimestamps - The database doesn't have row timestamps, and diff --git a/kcidb/db/bigquery/v04_00.py b/kcidb/db/bigquery/v04_00.py index f13689bb..6d1683d9 100644 --- a/kcidb/db/bigquery/v04_00.py +++ b/kcidb/db/bigquery/v04_00.py @@ -1201,13 +1201,12 @@ def load(self, data, with_metadata): def get_last_modified(self): """ - Get the time data has arrived last into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: A timezone-aware datetime object representing the last - data arrival time. + data arrival time, or None if the database is empty. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -1226,5 +1225,4 @@ def get_last_modified(self): for table_name in self.TABLE_MAP ) + ")\n" - ).result()))[0] or \ - datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + ).result()))[0] diff --git a/kcidb/db/mux.py b/kcidb/db/mux.py index d1ca6e68..5d71d509 100644 --- a/kcidb/db/mux.py +++ b/kcidb/db/mux.py @@ -303,7 +303,12 @@ def get_last_modified(self): NoTimestamps - The database doesn't have row timestamps, and cannot determine the last data arrival time. """ - return max(driver.get_last_modified() for driver in self.drivers) + assert self.is_initialized() + min_ts = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + return max( + (driver.get_last_modified() for driver in self.drivers), + key=lambda ts: ts or min_ts + ) def get_schemas(self): """ diff --git a/kcidb/db/null.py b/kcidb/db/null.py index ad82b4fa..1869dbbc 100644 --- a/kcidb/db/null.py +++ b/kcidb/db/null.py @@ -118,19 +118,18 @@ def get_current_time(self): def get_last_modified(self): """ - Get the time data has arrived last into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: A timezone-aware datetime object representing the last - data arrival time. + data arrival time, or None if the database is empty. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine the last data arrival time. """ - return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + return None def dump_iter(self, objects_per_report, with_metadata, after, until): """ diff --git a/kcidb/db/postgresql/v04_00.py b/kcidb/db/postgresql/v04_00.py index 8e37ea4b..4ebf2b9c 100644 --- a/kcidb/db/postgresql/v04_00.py +++ b/kcidb/db/postgresql/v04_00.py @@ -3,7 +3,6 @@ import random import logging import textwrap -import datetime from collections import namedtuple from itertools import chain import psycopg2 @@ -940,13 +939,12 @@ def load(self, data, with_metadata): def get_last_modified(self): """ - Get the time data has arrived last into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: A timezone-aware datetime object representing the last - data arrival time. + data arrival time, or None if the database is empty. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -965,5 +963,4 @@ def get_last_modified(self): ) with self.conn, self.conn.cursor() as cursor: cursor.execute(statement) - return cursor.fetchone()[0] or \ - datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + return cursor.fetchone()[0] diff --git a/kcidb/db/schematic.py b/kcidb/db/schematic.py index d734d76c..57033f83 100644 --- a/kcidb/db/schematic.py +++ b/kcidb/db/schematic.py @@ -371,13 +371,12 @@ def load(self, data, with_metadata): @abstractmethod def get_last_modified(self): """ - Get the time data has arrived last into the database. Can return the - minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: A timezone-aware datetime object representing the last - data arrival time. + data arrival time, or None if the database is empty. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -549,13 +548,12 @@ def get_current_time(self): def get_last_modified(self): """ - Get the time data has arrived last into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: A timezone-aware datetime object representing the last - data arrival time. + data arrival time, or None if the database is empty. Raises: NoTimestamps - The database doesn't have row timestamps, and diff --git a/kcidb/db/sqlite/v04_00.py b/kcidb/db/sqlite/v04_00.py index 202fd50d..dfd8695a 100644 --- a/kcidb/db/sqlite/v04_00.py +++ b/kcidb/db/sqlite/v04_00.py @@ -3,7 +3,6 @@ import random import textwrap from functools import reduce -import datetime from collections import namedtuple from itertools import chain import logging @@ -895,13 +894,12 @@ def load(self, data, with_metadata): def get_last_modified(self): """ - Get the time data has arrived last into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: A timezone-aware datetime object representing the last - data arrival time. + data arrival time, or None if the database is empty. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -922,9 +920,7 @@ def get_last_modified(self): cursor = self.conn.cursor() try: cursor.execute(statement) - timestamp = cursor.fetchone()[0] - if timestamp: - return dateutil.parser.isoparse(timestamp) + ts_str = cursor.fetchone()[0] finally: cursor.close() - return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + return ts_str and dateutil.parser.isoparse(ts_str) diff --git a/kcidb/test_db.py b/kcidb/test_db.py index eece79e4..362cf017 100644 --- a/kcidb/test_db.py +++ b/kcidb/test_db.py @@ -448,8 +448,7 @@ def test_get_last_modified(clean_database): time.sleep(1) client.init() timestamp = client.get_last_modified() - assert timestamp == \ - datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + assert timestamp is None before_load = client.get_current_time() client.load(COMPREHENSIVE_IO_DATA) timestamp = client.get_last_modified() From 422cf831d498df242c05a873c414924468095855 Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Wed, 6 Nov 2024 11:32:18 +0200 Subject: [PATCH 2/4] db: Add get_first_modified() --- kcidb/db/__init__.py | 22 +++++++++++++++++++++- kcidb/db/abstract.py | 17 ++++++++++++++++- kcidb/db/bigquery/v04_00.py | 30 +++++++++++++++++++++++++++++- kcidb/db/mux.py | 23 ++++++++++++++++++++++- kcidb/db/null.py | 17 ++++++++++++++++- kcidb/db/postgresql/v04_00.py | 30 +++++++++++++++++++++++++++++- kcidb/db/schematic.py | 35 +++++++++++++++++++++++++++++++++-- kcidb/db/sql/schema.py | 22 ++++++++++++++++++++++ kcidb/db/sqlite/v04_00.py | 35 ++++++++++++++++++++++++++++++++++- kcidb/test_db.py | 30 +++++++++++++++++++++++------- 10 files changed, 245 insertions(+), 16 deletions(-) diff --git a/kcidb/db/__init__.py b/kcidb/db/__init__.py index e5dc7bd6..67ff2a3c 100644 --- a/kcidb/db/__init__.py +++ b/kcidb/db/__init__.py @@ -180,6 +180,26 @@ def get_current_time(self): assert current_time.tzinfo return current_time + def get_first_modified(self): + """ + Get the time data has arrived first into the driven database. + The database must be initialized. + + Returns: + A timezone-aware datetime object representing the first + data arrival time, or None if the database is empty. + + Raises: + NoTimestamps - The database doesn't have row timestamps, and + cannot determine data arrival time. + """ + assert self.is_initialized() + first_modified = self.driver.get_first_modified() + assert first_modified is None or \ + isinstance(first_modified, datetime.datetime) and \ + first_modified.tzinfo + return first_modified + def get_last_modified(self): """ Get the time data has arrived last into the driven database. @@ -191,7 +211,7 @@ def get_last_modified(self): Raises: NoTimestamps - The database doesn't have row timestamps, and - cannot determine the last data arrival time. + cannot determine data arrival time. """ assert self.is_initialized() last_modified = self.driver.get_last_modified() diff --git a/kcidb/db/abstract.py b/kcidb/db/abstract.py index b3368584..31ecfbac 100644 --- a/kcidb/db/abstract.py +++ b/kcidb/db/abstract.py @@ -110,6 +110,21 @@ def get_current_time(self): time on the database server. """ + @abstractmethod + def get_first_modified(self): + """ + Get the time data has arrived first into the driven database. + The database must be initialized. + + Returns: + A timezone-aware datetime object representing the first + data arrival time, or None if the database is empty. + + Raises: + NoTimestamps - The database doesn't have row timestamps, and + cannot determine data arrival time. + """ + @abstractmethod def get_last_modified(self): """ @@ -122,7 +137,7 @@ def get_last_modified(self): Raises: NoTimestamps - The database doesn't have row timestamps, and - cannot determine the last data arrival time. + cannot determine data arrival time. """ @abstractmethod diff --git a/kcidb/db/bigquery/v04_00.py b/kcidb/db/bigquery/v04_00.py index 6d1683d9..72a4c4b7 100644 --- a/kcidb/db/bigquery/v04_00.py +++ b/kcidb/db/bigquery/v04_00.py @@ -1199,6 +1199,34 @@ def load(self, data, with_metadata): f"ERROR: {error['message']}\n" for error in job.errors ])) from exc + def get_first_modified(self): + """ + Get the time data has arrived first into the driven database. + The database must be initialized. + + Returns: + A timezone-aware datetime object representing the first + data arrival time, or None if the database is empty. + + Raises: + NoTimestamps - The database doesn't have row timestamps, and + cannot determine data arrival time. + """ + if not all( + next((f for f in table_schema if f.name == "_timestamp"), None) + for table_schema in self.TABLE_MAP.values() + ): + raise NoTimestamps("Database is missing timestamps in its schema") + + return next(iter(self.conn.query_create( + "SELECT MIN(first_modified) AS first_modified FROM(\n" + + "UNION ALL\n".join( + f"SELECT MIN(_timestamp) AS first_modified FROM {table_name}\n" + for table_name in self.TABLE_MAP + ) + + ")\n" + ).result()))[0] + def get_last_modified(self): """ Get the time data has arrived last into the driven database. @@ -1210,7 +1238,7 @@ def get_last_modified(self): Raises: NoTimestamps - The database doesn't have row timestamps, and - cannot determine the last data arrival time. + cannot determine data arrival time. """ if not all( next((f for f in table_schema if f.name == "_timestamp"), None) diff --git a/kcidb/db/mux.py b/kcidb/db/mux.py index 5d71d509..b6bc8789 100644 --- a/kcidb/db/mux.py +++ b/kcidb/db/mux.py @@ -289,6 +289,27 @@ def get_current_time(self): """ return max(driver.get_current_time() for driver in self.drivers) + def get_first_modified(self): + """ + Get the time data has arrived first into the driven database. Can + return the minimum timestamp constant, if the database is empty. + The database must be initialized. + + Returns: + A timezone-aware datetime object representing the first + data arrival time. + + Raises: + NoTimestamps - The database doesn't have row timestamps, and + cannot determine data arrival time. + """ + assert self.is_initialized() + max_ts = datetime.datetime.max.replace(tzinfo=datetime.timezone.utc) + return min( + (driver.get_first_modified() for driver in self.drivers), + key=lambda ts: ts or max_ts + ) + def get_last_modified(self): """ Get the time data has arrived last into the driven database. Can @@ -301,7 +322,7 @@ def get_last_modified(self): Raises: NoTimestamps - The database doesn't have row timestamps, and - cannot determine the last data arrival time. + cannot determine data arrival time. """ assert self.is_initialized() min_ts = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) diff --git a/kcidb/db/null.py b/kcidb/db/null.py index 1869dbbc..bcfc0dff 100644 --- a/kcidb/db/null.py +++ b/kcidb/db/null.py @@ -116,6 +116,21 @@ def get_current_time(self): """ return datetime.datetime.now(datetime.timezone.utc) + def get_first_modified(self): + """ + Get the time data has arrived first into the driven database. + The database must be initialized. + + Returns: + A timezone-aware datetime object representing the first + data arrival time, or None if the database is empty. + + Raises: + NoTimestamps - The database doesn't have row timestamps, and + cannot determine data arrival time. + """ + return None + def get_last_modified(self): """ Get the time data has arrived last into the driven database. @@ -127,7 +142,7 @@ def get_last_modified(self): Raises: NoTimestamps - The database doesn't have row timestamps, and - cannot determine the last data arrival time. + cannot determine data arrival time. """ return None diff --git a/kcidb/db/postgresql/v04_00.py b/kcidb/db/postgresql/v04_00.py index 4ebf2b9c..981f07aa 100644 --- a/kcidb/db/postgresql/v04_00.py +++ b/kcidb/db/postgresql/v04_00.py @@ -937,6 +937,34 @@ def load(self, data, with_metadata): # parity with non-determinism of BigQuery's ANY_VALUE() self.conn.load_prio_db = not self.conn.load_prio_db + def get_first_modified(self): + """ + Get the time data has arrived first into the driven database. + The database must be initialized. + + Returns: + A timezone-aware datetime object representing the first + data arrival time, or None if the database is empty. + + Raises: + NoTimestamps - The database doesn't have row timestamps, and + cannot determine data arrival time. + """ + statement = ( + "SELECT MIN(first_modified) AS first_modified\n" + + "FROM (\n" + + textwrap.indent( + "\nUNION ALL\n".join( + table_schema.format_get_first_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ), + " " * 4 + ) + "\n) AS tables\n" + ) + with self.conn, self.conn.cursor() as cursor: + cursor.execute(statement) + return cursor.fetchone()[0] + def get_last_modified(self): """ Get the time data has arrived last into the driven database. @@ -948,7 +976,7 @@ def get_last_modified(self): Raises: NoTimestamps - The database doesn't have row timestamps, and - cannot determine the last data arrival time. + cannot determine data arrival time. """ statement = ( "SELECT MAX(last_modified) AS last_modified\n" + diff --git a/kcidb/db/schematic.py b/kcidb/db/schematic.py index 57033f83..8ac3daaf 100644 --- a/kcidb/db/schematic.py +++ b/kcidb/db/schematic.py @@ -368,6 +368,21 @@ def load(self, data, with_metadata): """ # Relying on the driver to check compatibility/validity + @abstractmethod + def get_first_modified(self): + """ + Get the time data has arrived first into the driven database. + The database must be initialized. + + Returns: + A timezone-aware datetime object representing the first + data arrival time, or None if the database is empty. + + Raises: + NoTimestamps - The database doesn't have row timestamps, and + cannot determine data arrival time. + """ + @abstractmethod def get_last_modified(self): """ @@ -380,7 +395,7 @@ def get_last_modified(self): Raises: NoTimestamps - The database doesn't have row timestamps, and - cannot determine the last data arrival time. + cannot determine data arrival time. """ @@ -546,6 +561,22 @@ def get_current_time(self): """ return self.conn.get_current_time() + def get_first_modified(self): + """ + Get the time data has arrived first into the driven database. + The database must be initialized. + + Returns: + A timezone-aware datetime object representing the first + data arrival time, or None if the database is empty. + + Raises: + NoTimestamps - The database doesn't have row timestamps, and + cannot determine data arrival time. + """ + assert self.is_initialized() + return self.schema.get_first_modified() + def get_last_modified(self): """ Get the time data has arrived last into the driven database. @@ -557,7 +588,7 @@ def get_last_modified(self): Raises: NoTimestamps - The database doesn't have row timestamps, and - cannot determine the last data arrival time. + cannot determine data arrival time. """ assert self.is_initialized() return self.schema.get_last_modified() diff --git a/kcidb/db/sql/schema.py b/kcidb/db/sql/schema.py index 803aca19..1a96c4e2 100644 --- a/kcidb/db/sql/schema.py +++ b/kcidb/db/sql/schema.py @@ -343,6 +343,28 @@ def format_dump(self, name, with_metadata, after, until): ] ) + def format_get_first_modified(self, name): + """ + Format the "SELECT" command returning the timestamp of first data + written to the table, or NULL, if the table is empty. + + Args: + name: The name of the target table of the command. + + Returns: + The formatted "SELECT" command, returning the timestamp in + "first_modified" column. + + Raises: + NoTimestamps - The table doesn't have row timestamps. + """ + assert isinstance(name, str) + if not self.timestamp: + raise NoTimestamps("Table has no timestamp column") + return ( + f"SELECT MIN({self.timestamp.name}) AS first_modified FROM {name}" + ) + def format_get_last_modified(self, name): """ Format the "SELECT" command returning the timestamp of last data diff --git a/kcidb/db/sqlite/v04_00.py b/kcidb/db/sqlite/v04_00.py index dfd8695a..f195358f 100644 --- a/kcidb/db/sqlite/v04_00.py +++ b/kcidb/db/sqlite/v04_00.py @@ -892,6 +892,39 @@ def load(self, data, with_metadata): # parity with non-determinism of BigQuery's ANY_VALUE() self.conn.load_prio_db = not self.conn.load_prio_db + def get_first_modified(self): + """ + Get the time data has arrived first into the driven database. + The database must be initialized. + + Returns: + A timezone-aware datetime object representing the first + data arrival time, or None if the database is empty. + + Raises: + NoTimestamps - The database doesn't have row timestamps, and + cannot determine data arrival time. + """ + statement = ( + "SELECT MIN(first_modified) AS first_modified\n" + + "FROM (\n" + + textwrap.indent( + "\nUNION ALL\n".join( + table_schema.format_get_first_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ), + " " * 4 + ) + "\n) AS tables\n" + ) + with self.conn: + cursor = self.conn.cursor() + try: + cursor.execute(statement) + ts_str = cursor.fetchone()[0] + finally: + cursor.close() + return ts_str and dateutil.parser.isoparse(ts_str) + def get_last_modified(self): """ Get the time data has arrived last into the driven database. @@ -903,7 +936,7 @@ def get_last_modified(self): Raises: NoTimestamps - The database doesn't have row timestamps, and - cannot determine the last data arrival time. + cannot determine data arrival time. """ statement = ( "SELECT MAX(last_modified) AS last_modified\n" + diff --git a/kcidb/test_db.py b/kcidb/test_db.py index 362cf017..15029f6b 100644 --- a/kcidb/test_db.py +++ b/kcidb/test_db.py @@ -408,13 +408,15 @@ def test_get_current_time(clean_database): assert client.get_current_time() > timestamp -def test_get_last_modified(clean_database): +def test_get_modified(clean_database): """ - Check get_last_modified() works correctly + Check get_first_modified() and get_last_modified() work correctly """ client = clean_database # Check a pre-timestamp schema version client.init(kcidb.io.schema.V4_2) + with pytest.raises(kcidb.db.misc.NoTimestamps): + client.get_first_modified() with pytest.raises(kcidb.db.misc.NoTimestamps): client.get_last_modified() client.load({ @@ -440,6 +442,8 @@ def test_get_last_modified(clean_database): ) ] }) + with pytest.raises(kcidb.db.misc.NoTimestamps): + client.get_first_modified() with pytest.raises(kcidb.db.misc.NoTimestamps): client.get_last_modified() client.cleanup() @@ -447,15 +451,27 @@ def test_get_last_modified(clean_database): # Check a post-timestamp schema version time.sleep(1) client.init() + timestamp = client.get_first_modified() + assert timestamp is None timestamp = client.get_last_modified() assert timestamp is None before_load = client.get_current_time() client.load(COMPREHENSIVE_IO_DATA) - timestamp = client.get_last_modified() - assert timestamp is not None - assert isinstance(timestamp, datetime.datetime) - assert timestamp.tzinfo is not None - assert timestamp >= before_load + + first_modified = client.get_first_modified() + last_modified = client.get_last_modified() + + assert first_modified is not None + assert isinstance(first_modified, datetime.datetime) + assert first_modified.tzinfo is not None + assert first_modified >= before_load + + assert last_modified is not None + assert isinstance(last_modified, datetime.datetime) + assert last_modified.tzinfo is not None + assert last_modified >= before_load + + assert last_modified >= first_modified client.cleanup() From 2628fb31fd381ef4a47ab0c37459c4be6628e9bd Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Wed, 6 Nov 2024 16:12:55 +0200 Subject: [PATCH 3/4] tests: Make sure to empty all deployed databases --- conftest.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/conftest.py b/conftest.py index cdf58c34..c87b80fb 100644 --- a/conftest.py +++ b/conftest.py @@ -64,8 +64,10 @@ def empty_deployment(): for _ in kcidb.mq.IOSubscriber(project, topic, subscription). \ pull_iter(timeout=30): pass - # Empty the database - kcidb.db.Client(os.environ["KCIDB_DATABASE"]).empty() + # Empty all the databases + kcidb.db.Client(os.environ["KCIDB_OPERATIONAL_DATABASE"]).empty() + kcidb.db.Client(os.environ["KCIDB_SAMPLE_DATABASE"]).empty() + kcidb.db.Client(os.environ["KCIDB_ARCHIVE_DATABASE"]).empty() # Wipe the spool kcidb.monitor.spool.Client( os.environ["KCIDB_SPOOL_COLLECTION_PATH"] From a756f4662b326fb55da4d37095e6ba07f79293c4 Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Fri, 1 Nov 2024 20:47:05 +0200 Subject: [PATCH 4/4] Copy pieces of operational DB to archive DB daily Add a Cloud Scheduler job posting messages to a Pub/Sub topic every day at 12:00, which triggers a Cloud Function, which copies one week of (missing) data from the operational DB (PostgreSQL) to archive DB (BigQuery). This replaces the previously-used writing through a mux with rate limiting. --- cloud | 8 +++- kcidb/cloud/functions.sh | 16 ++++++++ kcidb/cloud/pubsub.sh | 6 +++ kcidb/cloud/scheduler.sh | 7 ++++ kcidb/cloud/sections.sh | 1 + main.py | 49 +++++++++++++++++++++++ test_main.py | 86 ++++++++++++++++++++++++++++++++++++++++ 7 files changed, 172 insertions(+), 1 deletion(-) diff --git a/cloud b/cloud index 000772aa..fa49c71d 100755 --- a/cloud +++ b/cloud @@ -126,6 +126,7 @@ function execute_command() { declare -r cache_bucket_name="${project}_${prefix}cache" declare -r pick_notifications_trigger_topic="${prefix}pick_notifications_trigger" declare -r purge_db_trigger_topic="${prefix}purge_db_trigger" + declare -r archive_trigger_topic="${prefix}archive_trigger" declare -r cache_redirect_function_name="cache_redirect" declare cache_redirector_url="https://${FUNCTION_REGION}" declare cache_redirector_url+="-${project}.cloudfunctions.net/" @@ -301,6 +302,7 @@ function execute_command() { --updated-topic="$updated_topic" --load-queue-trigger-topic="$load_queue_trigger_topic" --purge-db-trigger-topic="$purge_db_trigger_topic" + --archive-trigger-topic="$archive_trigger_topic" --updated-urls-topic="$updated_urls_topic" --spool-collection-path="$spool_collection_path" --extra-cc="$extra_cc" @@ -394,6 +396,7 @@ function execute_command() { --pick-notifications-trigger-topic \ "$pick_notifications_trigger_topic" \ --purge-db-trigger-topic "$purge_db_trigger_topic" \ + --archive-trigger-topic "$archive_trigger_topic" \ --smtp-topic="$smtp_topic" \ --smtp-subscription="$smtp_subscription" \ --cost-topic="$cost_topic" \ @@ -411,6 +414,7 @@ function execute_command() { --pick-notifications-trigger-topic \ "$pick_notifications_trigger_topic" \ --purge-db-trigger-topic "$purge_db_trigger_topic" \ + --archive-trigger-topic "$archive_trigger_topic" \ --updated-urls-topic="$updated_urls_topic" \ --updated-topic="$updated_topic" \ --cache-redirect-function-name="$cache_redirect_function_name" \ @@ -422,7 +426,8 @@ function execute_command() { --load-queue-trigger-topic="$load_queue_trigger_topic" \ --pick-notifications-trigger-topic \ "$pick_notifications_trigger_topic" \ - --purge-db-trigger-topic="$purge_db_trigger_topic" + --purge-db-trigger-topic="$purge_db_trigger_topic" \ + --archive-trigger-topic="$archive_trigger_topic" sections_run "$sections" submitters_deploy \ "$project" "$new_topic" "${submitters[@]}" # Handle "shutdown" command @@ -456,6 +461,7 @@ function execute_command() { --pick-notifications-trigger-topic \ "$pick_notifications_trigger_topic" \ --purge-db-trigger-topic "$purge_db_trigger_topic" \ + --archive-trigger-topic "$archive_trigger_topic" \ --new-topic="$new_topic" \ --new-load-subscription="$new_load_subscription" \ --new-debug-subscription="$new_debug_subscription" \ diff --git a/kcidb/cloud/functions.sh b/kcidb/cloud/functions.sh index d9eff196..6a35c8a4 100644 --- a/kcidb/cloud/functions.sh +++ b/kcidb/cloud/functions.sh @@ -18,6 +18,7 @@ declare _FUNCTIONS_SH= # --updated-topic=NAME # --load-queue-trigger-topic=NAME # --purge-db-trigger-topic=NAME +# --archive-trigger-topic=NAME # --updated-urls-topic=NAME # --cache-bucket-name=NAME # --cache-redirector-url=URL @@ -43,6 +44,7 @@ function functions_env() { updated_publish updated_topic \ load_queue_trigger_topic \ purge_db_trigger_topic \ + archive_trigger_topic \ updated_urls_topic \ spool_collection_path \ extra_cc \ @@ -78,6 +80,7 @@ function functions_env() { [KCIDB_UPDATED_QUEUE_TOPIC]="$updated_topic" [KCIDB_LOAD_QUEUE_TRIGGER_TOPIC]="$load_queue_trigger_topic" [KCIDB_PURGE_DB_TRIGGER_TOPIC]="$purge_db_trigger_topic" + [KCIDB_ARCHIVE_TRIGGER_TOPIC]="$archive_trigger_topic" [KCIDB_UPDATED_URLS_TOPIC]="$updated_urls_topic" [KCIDB_SELECTED_SUBSCRIPTIONS]="" [KCIDB_SPOOL_COLLECTION_PATH]="$spool_collection_path" @@ -137,6 +140,7 @@ function functions_env() { # --spool-collection-path=PATH # --cache-redirect-function-name=NAME # --env-yaml=YAML +# --archive-trigger-topic=NAME function functions_deploy() { declare params params="$(getopt_vars sections project prefix source \ @@ -148,6 +152,7 @@ function functions_deploy() { spool_collection_path \ cache_redirect_function_name \ env_yaml \ + archive_trigger_topic \ -- "$@")" eval "$params" @@ -171,6 +176,15 @@ function functions_deploy() { trigger_event+="document.create" declare trigger_resource="projects/$project/databases/(default)/documents/" trigger_resource+="${spool_collection_path}/{notification_id}" + + function_deploy "$sections" "$source" "$project" "$prefix" \ + archive true \ + --env-vars-file "$env_yaml_file" \ + --trigger-topic "${archive_trigger_topic}" \ + --memory 2048MB \ + --max-instances=1 \ + --timeout 540 + function_deploy "$sections" "$source" "$project" "$prefix" \ purge_db true \ --env-vars-file "$env_yaml_file" \ @@ -243,6 +257,8 @@ function _functions_withdraw_or_shutdown() { cache_redirect_function_name \ -- "$@")" eval "$params" + "function_$action" "$sections" "$project" "$prefix" \ + archive "function_$action" "$sections" "$project" "$prefix" \ purge_db "function_$action" "$sections" "$project" "$prefix" \ diff --git a/kcidb/cloud/pubsub.sh b/kcidb/cloud/pubsub.sh index 3d26c772..791b7070 100644 --- a/kcidb/cloud/pubsub.sh +++ b/kcidb/cloud/pubsub.sh @@ -143,6 +143,7 @@ function pubsub_subscription_withdraw() { # --cost-upd-service-account=NAME # --cost-mon-service=NAME # --iss-ed-service=NAME +# --archive-trigger-topic=NAME function pubsub_deploy() { declare params params="$(getopt_vars project \ @@ -160,6 +161,7 @@ function pubsub_deploy() { cost_upd_service_account \ cost_mon_service \ iss_ed_service \ + archive_trigger_topic \ -- "$@")" eval "$params" declare project_number @@ -198,6 +200,7 @@ function pubsub_deploy() { --message-retention-duration=12h pubsub_topic_deploy "$project" "${pick_notifications_trigger_topic}" pubsub_topic_deploy "$project" "${purge_db_trigger_topic}" + pubsub_topic_deploy "$project" "${archive_trigger_topic}" pubsub_topic_deploy "$project" "${updated_urls_topic}" if [ -n "$smtp_topic" ]; then pubsub_topic_deploy "$project" "$smtp_topic" @@ -242,6 +245,7 @@ function pubsub_deploy() { # --smtp-subscription=NAME # --cost-topic=NAME # --cost-upd-service-account=NAME +# --archive-trigger-topic=NAME function pubsub_withdraw() { declare params params="$(getopt_vars project \ @@ -257,6 +261,7 @@ function pubsub_withdraw() { smtp_topic smtp_subscription \ cost_topic \ cost_upd_service_account \ + archive_trigger_topic \ -- "$@")" eval "$params" declare project_number @@ -275,6 +280,7 @@ function pubsub_withdraw() { pubsub_subscription_withdraw "$project" "$new_debug_subscription" pubsub_subscription_withdraw "$project" "$new_load_subscription" pubsub_topic_withdraw "$project" "$new_topic" + pubsub_topic_withdraw "$project" "$archive_trigger_topic" pubsub_topic_withdraw "$project" "$load_queue_trigger_topic" pubsub_topic_withdraw "$project" "$pick_notifications_trigger_topic" pubsub_topic_withdraw "$project" "$updated_urls_topic" diff --git a/kcidb/cloud/scheduler.sh b/kcidb/cloud/scheduler.sh index d8373173..8bb135d0 100644 --- a/kcidb/cloud/scheduler.sh +++ b/kcidb/cloud/scheduler.sh @@ -67,6 +67,7 @@ function scheduler_job_withdraw() { # --load-queue-trigger-topic=NAME # --pick-notifications-trigger-topic=NAME # --purge-db-trigger-topic=NAME +# --archive-trigger-topic=NAME function scheduler_deploy() { declare params params="$(getopt_vars project \ @@ -74,6 +75,7 @@ function scheduler_deploy() { load_queue_trigger_topic \ pick_notifications_trigger_topic \ purge_db_trigger_topic \ + archive_trigger_topic \ -- "$@")" eval "$params" # Deploy the jobs @@ -90,6 +92,10 @@ function scheduler_deploy() { "$project" "${prefix}purge_sm_db_trigger" \ "$purge_db_trigger_topic" '0 7 * * *' \ '{"database": "sm", "timedelta": {"delta": {"days": 30}}}' + scheduler_job_pubsub_deploy \ + "$project" "${prefix}archive_trigger" \ + "$archive_trigger_topic" '0 12 * * *' \ + '{}' } # Withdraw from the scheduler @@ -101,6 +107,7 @@ function scheduler_withdraw() { scheduler_job_withdraw "$project" "${prefix}pick_notifications_trigger" scheduler_job_withdraw "$project" "${prefix}purge_op_db_trigger" scheduler_job_withdraw "$project" "${prefix}purge_sm_db_trigger" + scheduler_job_withdraw "$project" "${prefix}archive_trigger" } fi # _SCHEDULER_SH diff --git a/kcidb/cloud/sections.sh b/kcidb/cloud/sections.sh index f14e8fbb..fd2a72f4 100644 --- a/kcidb/cloud/sections.sh +++ b/kcidb/cloud/sections.sh @@ -16,6 +16,7 @@ declare -A -r SECTIONS=( ["firestore"]="Firestore database" ["storage"]="Google cloud storage" ["functions.purge_db"]="Cloud Functions: kcidb_purge_db()" + ["functions.archive"]="Cloud Functions: kcidb_archive()" ["functions.pick_notifications"]="Cloud Functions: kcidb_pick_notifications()" ["functions.send_notification"]="Cloud Functions: kcidb_send_notification()" ["functions.spool_notifications"]="Cloud Functions: kcidb_spool_notifications()" diff --git a/main.py b/main.py index 59dc4105..c1385207 100644 --- a/main.py +++ b/main.py @@ -417,6 +417,55 @@ def kcidb_pick_notifications(data, context): spool_client.ack(notification_id) +def kcidb_archive(event, context): + """ + Transfer data from the operational database into the archive database, + that is out of the editing window (to be enforced), and hasn't been + transferred yet. + """ + op_client = get_db_client(OPERATIONAL_DATABASE) + op_now = op_client.get_current_time() + op_first_modified = op_client.get_first_modified() + if not op_first_modified: + LOGGER.info("Operational database is empty, nothing to archive") + return + + ar_client = get_db_client(ARCHIVE_DATABASE) + ar_last_modified = ar_client.get_last_modified() + + after = ar_last_modified or \ + (op_first_modified - datetime.timedelta(seconds=1)) + until = min( + # Add a timespan we can fit in memory and exec time limits + after + datetime.timedelta(days=7), + # Subtract editing window (to be enforced) + op_now - datetime.timedelta(days=14) + ) + if until <= after: + LOGGER.info("No data old enough to archive") + return + + after_str = after.isoformat(timespec='microseconds') + until_str = until.isoformat(timespec='microseconds') + + # TODO: Transfer data in multiple smaller pieces + + # Fetch the data from operational database + # Preserve timestamps! + LOGGER.info("FETCHING operational database dump for (%s, %s] range", + after_str, until_str) + dump = op_client.dump(with_metadata=True, after=after, until=until) + + # Load data into archive database + # Preserve timestamps! + LOGGER.info("LOADING a dump of %u objects into archive database", + kcidb.io.SCHEMA.count(dump)) + ar_client.load(dump, with_metadata=True) + + LOGGER.info("ARCHIVED %u objects in (%s, %s] range", + kcidb.io.SCHEMA.count(dump), after_str, until_str) + + def kcidb_purge_db(event, context): """ Purge data from the operational database, older than the optional delta diff --git a/test_main.py b/test_main.py index 59b3083f..8e6e98db 100644 --- a/test_main.py +++ b/test_main.py @@ -356,3 +356,89 @@ def filter_test_data(data): assert dump == client.get_schema()[1].upgrade( data_after if purging else data ) + + +def test_archive(empty_deployment): + """Check kcidb_archive() works correctly""" + # Make empty_deployment appear used to silence pylint warning + assert empty_deployment is None + + op_client = kcidb.db.Client(os.environ["KCIDB_OPERATIONAL_DATABASE"]) + op_schema = op_client.get_schema()[1] + ar_client = kcidb.db.Client(os.environ["KCIDB_ARCHIVE_DATABASE"]) + ar_schema = ar_client.get_schema()[1] + publisher = kcidb.mq.JSONPublisher( + os.environ["GCP_PROJECT"], + os.environ["KCIDB_ARCHIVE_TRIGGER_TOPIC"] + ) + + # Empty the archive + ar_client.empty() + + # Generate timestamps + ts_now = op_client.get_current_time() + ts_3w = ts_now - timedelta(days=7 * 3) + ts_4w = ts_now - timedelta(days=7 * 4) + + def gen_data(id, ts): + """ + Generate a dataset with one object per type, all using the specified + timestamp, ID, and origin extracted from the ID. + """ + assert isinstance(id, str) + assert isinstance(ts, datetime) and ts.tzinfo + origin = id.split(":")[0] + assert origin + assert origin != id + base = dict(id=id, origin=origin, + _timestamp=ts.isoformat(timespec='microseconds')) + return dict( + checkouts=[base | dict()], + builds=[base | dict(checkout_id=id)], + tests=[base | dict(build_id=id)], + issues=[base | dict(version=1)], + incidents=[base | dict(issue_id=id, issue_version=1)], + **op_schema.new(), + ) + + # Generate datasets + data_now = gen_data("archive:now", ts_now) + data_3w = gen_data("archive:3w", ts_3w) + data_4w = gen_data("archive:4w", ts_4w) + + # Load data_now into the operational DB + op_client.load(data_now, with_metadata=True) + # Trigger and wait for archival (ignore possibility of actual trigger) + publisher.publish({}) + time.sleep(30) + # Check data_now doesn't end up in the archive DB + assert ar_schema.count(ar_client.dump()) == 0 + + # Load data_3w and data_4w + op_client.load(op_schema.merge(data_3w, [data_4w]), with_metadata=True) + # Trigger and wait for archival (ignore possibility of actual trigger) + publisher.publish({}) + time.sleep(30) + # Check data_4w is in the archive database + dump = ar_client.dump() + assert all( + any(obj["id"] == "archive:4w" + for obj in dump.get(obj_list_name, [])) + for obj_list_name in op_schema.id_fields + ), "No complete four-week old data in the archive" + # Check data_3w is not in the archive database + assert not any( + any(obj["id"] == "archive:3w" + for obj in dump.get(obj_list_name, [])) + for obj_list_name in op_schema.id_fields + ), "Some three-week old data in the archive" + # Trigger and wait for another archival (ignore chance of actual trigger) + publisher.publish({}) + time.sleep(30) + # Check data_3w is now in the archive database + dump = ar_client.dump() + assert all( + any(obj["id"] == "archive:3w" + for obj in dump.get(obj_list_name, [])) + for obj_list_name in op_schema.id_fields + ), "No complete three-week old data in the archive"