-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support I/O schema v4.3 along with purging
- Loading branch information
Showing
8 changed files
with
465 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
"""Kernel CI report database - BigQuery schema v4.2""" | ||
|
||
import datetime | ||
import logging | ||
from google.cloud.bigquery import ScalarQueryParameter | ||
from google.cloud.bigquery.schema import SchemaField as Field | ||
from google.cloud.bigquery.table import Table | ||
import kcidb.io as io | ||
from .v04_01 import Schema as PreviousSchema | ||
|
||
# Module's logger | ||
LOGGER = logging.getLogger(__name__) | ||
|
||
# The new _timestamp field added to every table in this schema | ||
TIMESTAMP_FIELD = Field( | ||
"_timestamp", "TIMESTAMP", | ||
default_value_expression="CURRENT_TIMESTAMP", | ||
description="The time the row was added" | ||
) | ||
|
||
|
||
class Schema(PreviousSchema): | ||
"""BigQuery database schema v4.2""" | ||
|
||
# The schema's version. | ||
version = (4, 2) | ||
# The I/O schema the database schema supports | ||
io = io.schema.V4_3 | ||
|
||
# A map of table names to their BigQuery schemas | ||
TABLE_MAP = { | ||
name: [TIMESTAMP_FIELD, *fields] | ||
for name, fields in PreviousSchema.TABLE_MAP.items() | ||
} | ||
|
||
# A map of table names to the dictionary of fields and the names of their | ||
# aggregation function, if any (the default is "ANY_VALUE"). | ||
AGGS_MAP = { | ||
name: {TIMESTAMP_FIELD.name: "MAX"} | ||
for name in TABLE_MAP | ||
} | ||
|
||
@classmethod | ||
def _inherit(cls, conn): | ||
""" | ||
Inerit the database data from the previous schema version (if any). | ||
Args: | ||
conn: Connection to the database to inherit. The database must | ||
comply with the previous version of the schema. | ||
""" | ||
assert isinstance(conn, cls.Connection) | ||
# For all tables/views | ||
for table_name, table_fields in cls.TABLE_MAP.items(): | ||
# Add the _timestamp field to the raw table | ||
conn.query_create(f""" | ||
ALTER TABLE `_{table_name}` | ||
ADD COLUMN IF NOT EXISTS | ||
`{TIMESTAMP_FIELD.name}` {TIMESTAMP_FIELD.field_type} | ||
OPTIONS(description={TIMESTAMP_FIELD.description!r}) | ||
""").result() | ||
# Set the _timestamp field default to current timestamp | ||
conn.query_create(f""" | ||
ALTER TABLE `_{table_name}` | ||
ALTER COLUMN `{TIMESTAMP_FIELD.name}` | ||
SET DEFAULT {TIMESTAMP_FIELD.default_value_expression} | ||
""").result() | ||
# Set missing _timestamp fields to start_time, or current time | ||
expr = "CURRENT_TIMESTAMP" | ||
if any(f.name == "start_time" for f in table_fields): | ||
expr = f"IFNULL(start_time, {expr})" | ||
conn.query_create(f""" | ||
UPDATE `_{table_name}` | ||
SET `{TIMESTAMP_FIELD.name}` = {expr} | ||
WHERE `{TIMESTAMP_FIELD.name}` IS NULL | ||
""").result() | ||
# Update the view | ||
view_ref = conn.dataset_ref.table(table_name) | ||
view = Table(view_ref) | ||
view.view_query = cls._format_view_query(conn, table_name) | ||
conn.client.update_table(view, ["view_query"]) | ||
|
||
def purge(self, before): | ||
""" | ||
Remove all the data from the database that arrived before the | ||
specified time, if the database supports that. | ||
Args: | ||
before: An "aware" datetime.datetime object specifying the | ||
the earliest (database server) time the data to be | ||
*preserved* should've arrived. Any other data will be | ||
purged. | ||
Can be None to have nothing removed. The latter can be | ||
used to test if the database supports purging. | ||
Returns: | ||
True if the database supports purging, and the requested data was | ||
purged. False if the database doesn't support purging. | ||
""" | ||
assert before is None or \ | ||
isinstance(before, datetime.datetime) and before.tzinfo | ||
if before is not None: | ||
for table_name in self.TABLE_MAP: | ||
self.conn.query_create( | ||
f"DELETE FROM `_{table_name}`" | ||
f"WHERE `{TIMESTAMP_FIELD.name}` < @before", | ||
[ScalarQueryParameter( | ||
"before", TIMESTAMP_FIELD.field_type, before | ||
)] | ||
).result() | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
"""Kernel CI report database - PostgreSQL schema v4.2""" | ||
|
||
import datetime | ||
import logging | ||
import kcidb.io as io | ||
from kcidb.misc import merge_dicts | ||
from kcidb.db.postgresql.schema import \ | ||
TimestampColumn, Table | ||
from .v04_01 import Schema as PreviousSchema | ||
|
||
# Module's logger | ||
LOGGER = logging.getLogger(__name__) | ||
|
||
# The new _timestamp column added to every table in this schema | ||
TIMESTAMP_COLUMN = TimestampColumn( | ||
conflict_func="GREATEST", | ||
metadata_expr="CURRENT_TIMESTAMP" | ||
) | ||
|
||
|
||
class Schema(PreviousSchema): | ||
"""PostgreSQL database schema v4.2""" | ||
|
||
# The schema's version. | ||
version = (4, 2) | ||
# The I/O schema the database schema supports | ||
io = io.schema.V4_3 | ||
|
||
# A map of table names and Table constructor arguments | ||
# For use by descendants | ||
TABLES_ARGS = { | ||
name: merge_dicts( | ||
args, | ||
columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]) | ||
) | ||
for name, args in PreviousSchema.TABLES_ARGS.items() | ||
} | ||
|
||
# A map of table names and schemas | ||
TABLES = { | ||
name: Table(**args) for name, args in TABLES_ARGS.items() | ||
} | ||
|
||
@classmethod | ||
def _inherit(cls, conn): | ||
""" | ||
Inerit the database data from the previous schema version (if any). | ||
Args: | ||
conn: Connection to the database to inherit. The database must | ||
comply with the previous version of the schema. | ||
""" | ||
assert isinstance(conn, cls.Connection) | ||
with conn, conn.cursor() as cursor: | ||
# For all tables | ||
for name, schema in cls.TABLES.items(): | ||
# Get the _timestamp table column | ||
column = schema.columns["_timestamp"] | ||
# Add the _timestamp column | ||
cursor.execute(f""" | ||
ALTER TABLE {name} ADD COLUMN {column.format_def()} | ||
""") | ||
# Set missing _timestamps to start_time, or current time | ||
expr = column.schema.metadata_expr | ||
if "start_time" in schema.columns: | ||
expr = f"COALESCE(start_time, {expr})" | ||
cursor.execute(f""" | ||
UPDATE {name} | ||
SET {column.name} = {expr} | ||
WHERE {column.name} IS NULL | ||
""") | ||
|
||
def purge(self, before): | ||
""" | ||
Remove all the data from the database that arrived before the | ||
specified time, if the database supports that. | ||
Args: | ||
before: An "aware" datetime.datetime object specifying the | ||
the earliest (database server) time the data to be | ||
*preserved* should've arrived. Any other data will be | ||
purged. | ||
Can be None to have nothing removed. The latter can be | ||
used to test if the database supports purging. | ||
Returns: | ||
True if the database supports purging, and the requested data was | ||
purged. False if the database doesn't support purging. | ||
""" | ||
assert before is None or \ | ||
isinstance(before, datetime.datetime) and before.tzinfo | ||
if before is not None: | ||
before_json = before.isoformat(timespec='microseconds') | ||
with self.conn, self.conn.cursor() as cursor: | ||
for name, schema in self.TABLES.items(): | ||
# Get the _timestamp table column | ||
column = schema.columns["_timestamp"] | ||
# Purge | ||
cursor.execute( | ||
f""" | ||
DELETE FROM {name} | ||
WHERE {column.name} < {schema.placeholder} | ||
""", | ||
(column.schema.pack(before_json),) | ||
) | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
"""Kernel CI report database - SQLite schema v4.2""" | ||
|
||
import datetime | ||
import logging | ||
import kcidb.io as io | ||
from kcidb.misc import merge_dicts | ||
from kcidb.db.sqlite.schema import \ | ||
TimestampColumn, Table | ||
from .v04_01 import Schema as PreviousSchema | ||
|
||
# Module's logger | ||
LOGGER = logging.getLogger(__name__) | ||
|
||
# The new _timestamp column added to every table in this schema | ||
TIMESTAMP_COLUMN = TimestampColumn( | ||
conflict_func="MAX", | ||
metadata_expr="strftime('%Y-%m-%dT%H:%M:%f000+00:00', 'now')" | ||
) | ||
|
||
|
||
class Schema(PreviousSchema): | ||
"""SQLite database schema v4.2""" | ||
|
||
# The schema's version. | ||
version = (4, 2) | ||
# The I/O schema the database schema supports | ||
io = io.schema.V4_3 | ||
|
||
# A map of table names and Table constructor arguments | ||
# For use by descendants | ||
TABLES_ARGS = { | ||
name: merge_dicts( | ||
args, | ||
columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"]) | ||
) | ||
for name, args in PreviousSchema.TABLES_ARGS.items() | ||
} | ||
|
||
# A map of table names and schemas | ||
TABLES = { | ||
name: Table(**args) for name, args in TABLES_ARGS.items() | ||
} | ||
|
||
@classmethod | ||
def _inherit(cls, conn): | ||
""" | ||
Inerit the database data from the previous schema version (if any). | ||
Args: | ||
conn: Connection to the database to inherit. The database must | ||
comply with the previous version of the schema. | ||
""" | ||
assert isinstance(conn, cls.Connection) | ||
with conn: | ||
cursor = conn.cursor() | ||
try: | ||
# For all tables | ||
for name, schema in cls.TABLES.items(): | ||
# Get the _timestamp table column | ||
column = schema.columns["_timestamp"] | ||
# Add the _timestamp column | ||
cursor.execute(f""" | ||
ALTER TABLE {name} ADD COLUMN {column.format_def()} | ||
""") | ||
# Set missing _timestamps to start_time, or current time | ||
expr = column.schema.metadata_expr | ||
if "start_time" in schema.columns: | ||
expr = f"COALESCE(start_time, {expr})" | ||
cursor.execute(f""" | ||
UPDATE {name} | ||
SET {column.name} = {expr} | ||
WHERE {column.name} IS NULL | ||
""") | ||
finally: | ||
cursor.close() | ||
|
||
def purge(self, before): | ||
""" | ||
Remove all the data from the database that arrived before the | ||
specified time, if the database supports that. | ||
Args: | ||
before: An "aware" datetime.datetime object specifying the | ||
the earliest (database server) time the data to be | ||
*preserved* should've arrived. Any other data will be | ||
purged. | ||
Can be None to have nothing removed. The latter can be | ||
used to test if the database supports purging. | ||
Returns: | ||
True if the database supports purging, and the requested data was | ||
purged. False if the database doesn't support purging. | ||
""" | ||
assert before is None or \ | ||
isinstance(before, datetime.datetime) and before.tzinfo | ||
if before is not None: | ||
before_json = before.isoformat(timespec='microseconds') | ||
with self.conn: | ||
cursor = self.conn.cursor() | ||
try: | ||
# For all tables | ||
for name, schema in self.TABLES.items(): | ||
# Get the _timestamp table column | ||
column = schema.columns["_timestamp"] | ||
# Purge | ||
cursor.execute( | ||
f""" | ||
DELETE FROM {name} | ||
WHERE {column.name} < {schema.placeholder} | ||
""", | ||
(column.schema.pack(before_json),) | ||
) | ||
finally: | ||
cursor.close() | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.