Skip to content

Commit

Permalink
db: Get last modified timestamp from table rows
Browse files Browse the repository at this point in the history
Instead solely targeting the get_last_modified() method to databases
which have load rate limit, make all of them support it, and return the
maximum `_timestamp` value across all tables, if the schema has it.
  • Loading branch information
spbnick committed Nov 2, 2024
1 parent 534d6d6 commit bbe6529
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 84 deletions.
14 changes: 9 additions & 5 deletions kcidb/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,19 @@ def get_current_time(self):

def get_last_modified(self):
"""
Get the time the data in the connected database was last modified.
Can return the minimum timestamp constant, if the database is not
initialized or its data loading interface is not limited in the amount
of load() method calls.
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
modification time.
data arrival time.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
"""
assert self.is_initialized()
last_modified = self.driver.get_last_modified()
assert isinstance(last_modified, datetime.datetime)
assert last_modified.tzinfo
Expand Down
13 changes: 8 additions & 5 deletions kcidb/db/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,17 @@ def get_current_time(self):
@abstractmethod
def get_last_modified(self):
"""
Get the time the data in the driven database was last modified.
Can return the minimum timestamp constant, if the database is not
initialized, or its data loading interface is not limited in the
amount of load() method calls.
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
modification time.
data arrival time.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
"""

@abstractmethod
Expand Down
47 changes: 30 additions & 17 deletions kcidb/db/bigquery/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,6 @@ def get_current_time(self):
self.query_create("SELECT CURRENT_TIMESTAMP").result()
))[0]

def get_last_modified(self):
"""
Get the time the data in the connected database was last modified.
Can return the minimum timestamp constant, if the database is not
initialized or its data loading interface is not limited in the amount
of load() method calls.
Returns:
A timezone-aware datetime object representing the last
modification time.
"""
return next(iter(self.query_create(
"SELECT TIMESTAMP_MILLIS(MAX(last_modified_time)) "
"FROM __TABLES__"
).result()))[0] or \
datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)


class Schema(AbstractSchema):
"""BigQuery database schema v4.0"""
Expand Down Expand Up @@ -1215,3 +1198,33 @@ def load(self, data, with_metadata):
raise Exception("".join([
f"ERROR: {error['message']}\n" for error in job.errors
])) from exc

def get_last_modified(self):
"""
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
data arrival time.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
"""
if not all(
next((f for f in table_schema if f.name == "_timestamp"), None)
for table_schema in self.TABLE_MAP.values()
):
raise NoTimestamps("Database is missing timestamps in its schema")

return next(iter(self.conn.query_create(
"SELECT MAX(last_modified) AS last_modified FROM(\n" +
"UNION ALL\n".join(
f"SELECT MAX(_timestamp) AS last_modified FROM {table_name}\n"
for table_name in self.TABLE_MAP
) +
")\n"
).result()))[0] or \
datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
13 changes: 8 additions & 5 deletions kcidb/db/mux.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,17 @@ def get_current_time(self):

def get_last_modified(self):
"""
Get the time the data in the driven databases was last modified.
Can return the minimum timestamp constant, if the databases are not
initialized, or their data loading interface is not limited in the
amount of load() method calls.
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
modification time.
data arrival time.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
"""
return max(driver.get_last_modified() for driver in self.drivers)

Expand Down
13 changes: 8 additions & 5 deletions kcidb/db/null.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,17 @@ def get_current_time(self):

def get_last_modified(self):
"""
Get the time the data in the driven database was last modified.
Can return the minimum timestamp constant, if the database is not
initialized, or its data loading interface is not limited in the
amount of load() method calls.
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
modification time.
data arrival time.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
"""
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

Expand Down
43 changes: 30 additions & 13 deletions kcidb/db/postgresql/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,6 @@ def get_current_time(self):
cursor.execute("SELECT CURRENT_TIMESTAMP")
return cursor.fetchone()[0]

def get_last_modified(self):
"""
Get the time the data in the connected database was last modified.
Can return the minimum timestamp constant, if the database is not
initialized or its data loading interface is not limited in the amount
of load() method calls.
Returns:
A timezone-aware datetime object representing the last
modification time.
"""
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)


class Schema(AbstractSchema):
"""PostgreSQL database schema v4.0"""
Expand Down Expand Up @@ -950,3 +937,33 @@ def load(self, data, with_metadata):
# Flip priority for the next load to maintain (rough)
# parity with non-determinism of BigQuery's ANY_VALUE()
self.conn.load_prio_db = not self.conn.load_prio_db

def get_last_modified(self):
"""
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
data arrival time.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
"""
statement = (
"SELECT MAX(last_modified) AS last_modified\n" +
"FROM (\n" +
textwrap.indent(
"\nUNION ALL\n".join(
table_schema.format_get_last_modified(table_name)
for table_name, table_schema in self.TABLES.items()
),
" " * 4
) + "\n) AS tables\n"
)
with self.conn, self.conn.cursor() as cursor:
cursor.execute(statement)
return cursor.fetchone()[0] or \
datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
45 changes: 26 additions & 19 deletions kcidb/db/schematic.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,6 @@ def get_current_time(self):
time on the database server.
"""

@abstractmethod
def get_last_modified(self):
"""
Get the time the data in the connected database was last modified.
Can return the minimum timestamp constant, if the database is not
initialized, or its data loading interface is not limited in the
amount of load() method calls.
Returns:
A timezone-aware datetime object representing the last
modification time.
"""

def is_initialized(self):
"""
Check if the connected database is initialized.
Expand Down Expand Up @@ -381,6 +368,22 @@ def load(self, data, with_metadata):
"""
# Relying on the driver to check compatibility/validity

@abstractmethod
def get_last_modified(self):
"""
Get the time data has arrived last into the database. Can return the
minimum timestamp constant, if the database is empty.
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
data arrival time.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
"""


class MetaDriver(ABCMeta):
"""A schematic metadriver"""
Expand Down Expand Up @@ -546,16 +549,20 @@ def get_current_time(self):

def get_last_modified(self):
"""
Get the time the data in the driven database was last modified.
Can return the minimum timestamp constant, if the database is not
initialized, or its data loading interface is not limited in the
amount of load() method calls.
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
modification time.
data arrival time.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
"""
return self.conn.get_last_modified()
assert self.is_initialized()
return self.schema.get_last_modified()

def get_schemas(self):
"""
Expand Down
22 changes: 22 additions & 0 deletions kcidb/db/sql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,28 @@ def format_dump(self, name, with_metadata, after, until):
]
)

def format_get_last_modified(self, name):
"""
Format the "SELECT" command returning the timestamp of last data
written to the table, or NULL, if the table is empty.
Args:
name: The name of the target table of the command.
Returns:
The formatted "SELECT" command, returning the timestamp in
"last_modified" column.
Raises:
NoTimestamps - The table doesn't have row timestamps.
"""
assert isinstance(name, str)
if not self.timestamp:
raise NoTimestamps("Table has no timestamp column")
return (
f"SELECT MAX({self.timestamp.name}) AS last_modified FROM {name}"
)

def format_delete(self, name):
"""
Format the "DELETE" command for emptying the table (removing all
Expand Down
49 changes: 36 additions & 13 deletions kcidb/db/sqlite/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,6 @@ def get_current_time(self):
finally:
cursor.close()

def get_last_modified(self):
"""
Get the time the data in the connected database was last modified.
Can return the minimum timestamp constant, if the database is not
initialized or its data loading interface is not limited in the amount
of load() method calls.
Returns:
A timezone-aware datetime object representing the last
modification time.
"""
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)


class Schema(AbstractSchema):
"""SQLite database schema v4.0"""
Expand Down Expand Up @@ -905,3 +892,39 @@ def load(self, data, with_metadata):
# Flip priority for the next load to maintain (rough)
# parity with non-determinism of BigQuery's ANY_VALUE()
self.conn.load_prio_db = not self.conn.load_prio_db

def get_last_modified(self):
"""
Get the time data has arrived last into the driven database. Can
return the minimum timestamp constant, if the database is empty.
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
data arrival time.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine the last data arrival time.
"""
statement = (
"SELECT MAX(last_modified) AS last_modified\n" +
"FROM (\n" +
textwrap.indent(
"\nUNION ALL\n".join(
table_schema.format_get_last_modified(table_name)
for table_name, table_schema in self.TABLES.items()
),
" " * 4
) + "\n) AS tables\n"
)
with self.conn:
cursor = self.conn.cursor()
try:
cursor.execute(statement)
timestamp = cursor.fetchone()[0]
if timestamp:
return dateutil.parser.isoparse(timestamp)
finally:
cursor.close()
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
Loading

0 comments on commit bbe6529

Please sign in to comment.