Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archive DB daily #598

Merged
merged 4 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cloud
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ function execute_command() {
declare -r cache_bucket_name="${project}_${prefix}cache"
declare -r pick_notifications_trigger_topic="${prefix}pick_notifications_trigger"
declare -r purge_db_trigger_topic="${prefix}purge_db_trigger"
declare -r archive_trigger_topic="${prefix}archive_trigger"
declare -r cache_redirect_function_name="cache_redirect"
declare cache_redirector_url="https://${FUNCTION_REGION}"
declare cache_redirector_url+="-${project}.cloudfunctions.net/"
Expand Down Expand Up @@ -301,6 +302,7 @@ function execute_command() {
--updated-topic="$updated_topic"
--load-queue-trigger-topic="$load_queue_trigger_topic"
--purge-db-trigger-topic="$purge_db_trigger_topic"
--archive-trigger-topic="$archive_trigger_topic"
--updated-urls-topic="$updated_urls_topic"
--spool-collection-path="$spool_collection_path"
--extra-cc="$extra_cc"
Expand Down Expand Up @@ -394,6 +396,7 @@ function execute_command() {
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-db-trigger-topic "$purge_db_trigger_topic" \
--archive-trigger-topic "$archive_trigger_topic" \
--smtp-topic="$smtp_topic" \
--smtp-subscription="$smtp_subscription" \
--cost-topic="$cost_topic" \
Expand All @@ -411,6 +414,7 @@ function execute_command() {
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-db-trigger-topic "$purge_db_trigger_topic" \
--archive-trigger-topic "$archive_trigger_topic" \
--updated-urls-topic="$updated_urls_topic" \
--updated-topic="$updated_topic" \
--cache-redirect-function-name="$cache_redirect_function_name" \
Expand All @@ -422,7 +426,8 @@ function execute_command() {
--load-queue-trigger-topic="$load_queue_trigger_topic" \
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-db-trigger-topic="$purge_db_trigger_topic"
--purge-db-trigger-topic="$purge_db_trigger_topic" \
--archive-trigger-topic="$archive_trigger_topic"
sections_run "$sections" submitters_deploy \
"$project" "$new_topic" "${submitters[@]}"
# Handle "shutdown" command
Expand Down Expand Up @@ -456,6 +461,7 @@ function execute_command() {
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-db-trigger-topic "$purge_db_trigger_topic" \
--archive-trigger-topic "$archive_trigger_topic" \
--new-topic="$new_topic" \
--new-load-subscription="$new_load_subscription" \
--new-debug-subscription="$new_debug_subscription" \
Expand Down
6 changes: 4 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ def empty_deployment():
for _ in kcidb.mq.IOSubscriber(project, topic, subscription). \
pull_iter(timeout=30):
pass
# Empty the database
kcidb.db.Client(os.environ["KCIDB_DATABASE"]).empty()
# Empty all the databases
kcidb.db.Client(os.environ["KCIDB_OPERATIONAL_DATABASE"]).empty()
kcidb.db.Client(os.environ["KCIDB_SAMPLE_DATABASE"]).empty()
kcidb.db.Client(os.environ["KCIDB_ARCHIVE_DATABASE"]).empty()
# Wipe the spool
kcidb.monitor.spool.Client(
os.environ["KCIDB_SPOOL_COLLECTION_PATH"]
Expand Down
16 changes: 16 additions & 0 deletions kcidb/cloud/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ declare _FUNCTIONS_SH=
# --updated-topic=NAME
# --load-queue-trigger-topic=NAME
# --purge-db-trigger-topic=NAME
# --archive-trigger-topic=NAME
# --updated-urls-topic=NAME
# --cache-bucket-name=NAME
# --cache-redirector-url=URL
Expand All @@ -43,6 +44,7 @@ function functions_env() {
updated_publish updated_topic \
load_queue_trigger_topic \
purge_db_trigger_topic \
archive_trigger_topic \
updated_urls_topic \
spool_collection_path \
extra_cc \
Expand Down Expand Up @@ -78,6 +80,7 @@ function functions_env() {
[KCIDB_UPDATED_QUEUE_TOPIC]="$updated_topic"
[KCIDB_LOAD_QUEUE_TRIGGER_TOPIC]="$load_queue_trigger_topic"
[KCIDB_PURGE_DB_TRIGGER_TOPIC]="$purge_db_trigger_topic"
[KCIDB_ARCHIVE_TRIGGER_TOPIC]="$archive_trigger_topic"
[KCIDB_UPDATED_URLS_TOPIC]="$updated_urls_topic"
[KCIDB_SELECTED_SUBSCRIPTIONS]=""
[KCIDB_SPOOL_COLLECTION_PATH]="$spool_collection_path"
Expand Down Expand Up @@ -137,6 +140,7 @@ function functions_env() {
# --spool-collection-path=PATH
# --cache-redirect-function-name=NAME
# --env-yaml=YAML
# --archive-trigger-topic=NAME
function functions_deploy() {
declare params
params="$(getopt_vars sections project prefix source \
Expand All @@ -148,6 +152,7 @@ function functions_deploy() {
spool_collection_path \
cache_redirect_function_name \
env_yaml \
archive_trigger_topic \
-- "$@")"
eval "$params"

Expand All @@ -171,6 +176,15 @@ function functions_deploy() {
trigger_event+="document.create"
declare trigger_resource="projects/$project/databases/(default)/documents/"
trigger_resource+="${spool_collection_path}/{notification_id}"

function_deploy "$sections" "$source" "$project" "$prefix" \
archive true \
--env-vars-file "$env_yaml_file" \
--trigger-topic "${archive_trigger_topic}" \
--memory 2048MB \
--max-instances=1 \
--timeout 540

function_deploy "$sections" "$source" "$project" "$prefix" \
purge_db true \
--env-vars-file "$env_yaml_file" \
Expand Down Expand Up @@ -243,6 +257,8 @@ function _functions_withdraw_or_shutdown() {
cache_redirect_function_name \
-- "$@")"
eval "$params"
"function_$action" "$sections" "$project" "$prefix" \
archive
"function_$action" "$sections" "$project" "$prefix" \
purge_db
"function_$action" "$sections" "$project" "$prefix" \
Expand Down
6 changes: 6 additions & 0 deletions kcidb/cloud/pubsub.sh
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ function pubsub_subscription_withdraw() {
# --cost-upd-service-account=NAME
# --cost-mon-service=NAME
# --iss-ed-service=NAME
# --archive-trigger-topic=NAME
function pubsub_deploy() {
declare params
params="$(getopt_vars project \
Expand All @@ -160,6 +161,7 @@ function pubsub_deploy() {
cost_upd_service_account \
cost_mon_service \
iss_ed_service \
archive_trigger_topic \
-- "$@")"
eval "$params"
declare project_number
Expand Down Expand Up @@ -198,6 +200,7 @@ function pubsub_deploy() {
--message-retention-duration=12h
pubsub_topic_deploy "$project" "${pick_notifications_trigger_topic}"
pubsub_topic_deploy "$project" "${purge_db_trigger_topic}"
pubsub_topic_deploy "$project" "${archive_trigger_topic}"
pubsub_topic_deploy "$project" "${updated_urls_topic}"
if [ -n "$smtp_topic" ]; then
pubsub_topic_deploy "$project" "$smtp_topic"
Expand Down Expand Up @@ -242,6 +245,7 @@ function pubsub_deploy() {
# --smtp-subscription=NAME
# --cost-topic=NAME
# --cost-upd-service-account=NAME
# --archive-trigger-topic=NAME
function pubsub_withdraw() {
declare params
params="$(getopt_vars project \
Expand All @@ -257,6 +261,7 @@ function pubsub_withdraw() {
smtp_topic smtp_subscription \
cost_topic \
cost_upd_service_account \
archive_trigger_topic \
-- "$@")"
eval "$params"
declare project_number
Expand All @@ -275,6 +280,7 @@ function pubsub_withdraw() {
pubsub_subscription_withdraw "$project" "$new_debug_subscription"
pubsub_subscription_withdraw "$project" "$new_load_subscription"
pubsub_topic_withdraw "$project" "$new_topic"
pubsub_topic_withdraw "$project" "$archive_trigger_topic"
pubsub_topic_withdraw "$project" "$load_queue_trigger_topic"
pubsub_topic_withdraw "$project" "$pick_notifications_trigger_topic"
pubsub_topic_withdraw "$project" "$updated_urls_topic"
Expand Down
7 changes: 7 additions & 0 deletions kcidb/cloud/scheduler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ function scheduler_job_withdraw() {
# --load-queue-trigger-topic=NAME
# --pick-notifications-trigger-topic=NAME
# --purge-db-trigger-topic=NAME
# --archive-trigger-topic=NAME
function scheduler_deploy() {
declare params
params="$(getopt_vars project \
prefix \
load_queue_trigger_topic \
pick_notifications_trigger_topic \
purge_db_trigger_topic \
archive_trigger_topic \
-- "$@")"
eval "$params"
# Deploy the jobs
Expand All @@ -90,6 +92,10 @@ function scheduler_deploy() {
"$project" "${prefix}purge_sm_db_trigger" \
"$purge_db_trigger_topic" '0 7 * * *' \
'{"database": "sm", "timedelta": {"delta": {"days": 30}}}'
scheduler_job_pubsub_deploy \
"$project" "${prefix}archive_trigger" \
"$archive_trigger_topic" '0 12 * * *' \
'{}'
}

# Withdraw from the scheduler
Expand All @@ -101,6 +107,7 @@ function scheduler_withdraw() {
scheduler_job_withdraw "$project" "${prefix}pick_notifications_trigger"
scheduler_job_withdraw "$project" "${prefix}purge_op_db_trigger"
scheduler_job_withdraw "$project" "${prefix}purge_sm_db_trigger"
scheduler_job_withdraw "$project" "${prefix}archive_trigger"
}

fi # _SCHEDULER_SH
1 change: 1 addition & 0 deletions kcidb/cloud/sections.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ declare -A -r SECTIONS=(
["firestore"]="Firestore database"
["storage"]="Google cloud storage"
["functions.purge_db"]="Cloud Functions: kcidb_purge_db()"
["functions.archive"]="Cloud Functions: kcidb_archive()"
["functions.pick_notifications"]="Cloud Functions: kcidb_pick_notifications()"
["functions.send_notification"]="Cloud Functions: kcidb_send_notification()"
["functions.spool_notifications"]="Cloud Functions: kcidb_spool_notifications()"
Expand Down
32 changes: 26 additions & 6 deletions kcidb/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,44 @@ def get_current_time(self):
assert current_time.tzinfo
return current_time

def get_first_modified(self):
"""
Get the time data has arrived first into the driven database.
The database must be initialized.

Returns:
A timezone-aware datetime object representing the first
data arrival time, or None if the database is empty.

Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine data arrival time.
"""
assert self.is_initialized()
first_modified = self.driver.get_first_modified()
assert first_modified is None or \
isinstance(first_modified, datetime.datetime) and \
first_modified.tzinfo
return first_modified

def get_last_modified(self):
"""
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
Get the time data has arrived last into the driven database.
The database must be initialized.

Returns:
A timezone-aware datetime object representing the last
data arrival time.
data arrival time, or None if the database is empty.

Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
cannot determine data arrival time.
"""
assert self.is_initialized()
last_modified = self.driver.get_last_modified()
assert isinstance(last_modified, datetime.datetime)
assert last_modified.tzinfo
assert last_modified is None or \
isinstance(last_modified, datetime.datetime) and \
last_modified.tzinfo
return last_modified

def get_schemas(self):
Expand Down
22 changes: 18 additions & 4 deletions kcidb/db/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,34 @@ def get_current_time(self):
time on the database server.
"""

@abstractmethod
def get_first_modified(self):
"""
Get the time data has arrived first into the driven database.
The database must be initialized.

Returns:
A timezone-aware datetime object representing the first
data arrival time, or None if the database is empty.

Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine data arrival time.
"""

@abstractmethod
def get_last_modified(self):
"""
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
Get the time data has arrived last into the driven database.
The database must be initialized.

Returns:
A timezone-aware datetime object representing the last
data arrival time.
data arrival time, or None if the database is empty.

Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
cannot determine data arrival time.
"""

@abstractmethod
Expand Down
38 changes: 32 additions & 6 deletions kcidb/db/bigquery/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -1199,19 +1199,46 @@ def load(self, data, with_metadata):
f"ERROR: {error['message']}\n" for error in job.errors
])) from exc

def get_first_modified(self):
"""
Get the time data has arrived first into the driven database.
The database must be initialized.

Returns:
A timezone-aware datetime object representing the first
data arrival time, or None if the database is empty.

Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine data arrival time.
"""
if not all(
next((f for f in table_schema if f.name == "_timestamp"), None)
for table_schema in self.TABLE_MAP.values()
):
raise NoTimestamps("Database is missing timestamps in its schema")

return next(iter(self.conn.query_create(
"SELECT MIN(first_modified) AS first_modified FROM(\n" +
"UNION ALL\n".join(
f"SELECT MIN(_timestamp) AS first_modified FROM {table_name}\n"
for table_name in self.TABLE_MAP
) +
")\n"
).result()))[0]

def get_last_modified(self):
"""
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
Get the time data has arrived last into the driven database.
The database must be initialized.

Returns:
A timezone-aware datetime object representing the last
data arrival time.
data arrival time, or None if the database is empty.

Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
cannot determine data arrival time.
"""
if not all(
next((f for f in table_schema if f.name == "_timestamp"), None)
Expand All @@ -1226,5 +1253,4 @@ def get_last_modified(self):
for table_name in self.TABLE_MAP
) +
")\n"
).result()))[0] or \
datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
).result()))[0]
Loading