Skip to content

Commit

Permalink
Support I/O schema v4.3 along with purging
Browse files Browse the repository at this point in the history
  • Loading branch information
spbnick committed Nov 2, 2023
1 parent acac307 commit bf159d8
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 7 deletions.
2 changes: 1 addition & 1 deletion kcidb/db/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import textwrap
from kcidb.db.schematic import Driver as SchematicDriver
from kcidb.db.bigquery.v04_01 import Schema as LatestSchema
from kcidb.db.bigquery.v04_02 import Schema as LatestSchema


class Driver(SchematicDriver):
Expand Down
111 changes: 111 additions & 0 deletions kcidb/db/bigquery/v04_02.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Kernel CI report database - BigQuery schema v4.2"""

import datetime
import logging
from google.cloud.bigquery import ScalarQueryParameter
from google.cloud.bigquery.schema import SchemaField as Field
from google.cloud.bigquery.table import Table
import kcidb.io as io
from .v04_01 import Schema as PreviousSchema

# Module's logger
LOGGER = logging.getLogger(__name__)

# The new _timestamp field added to every table in this schema
TIMESTAMP_FIELD = Field(
"_timestamp", "TIMESTAMP",
default_value_expression="CURRENT_TIMESTAMP",
description="The time the row was added"
)


class Schema(PreviousSchema):
"""BigQuery database schema v4.2"""

# The schema's version.
version = (4, 2)
# The I/O schema the database schema supports
io = io.schema.V4_3

# A map of table names to their BigQuery schemas
TABLE_MAP = {
name: [TIMESTAMP_FIELD, *fields]
for name, fields in PreviousSchema.TABLE_MAP.items()
}

# A map of table names to the dictionary of fields and the names of their
# aggregation function, if any (the default is "ANY_VALUE").
AGGS_MAP = {
name: {TIMESTAMP_FIELD.name: "MAX"}
for name in TABLE_MAP
}

@classmethod
def _inherit(cls, conn):
"""
Inerit the database data from the previous schema version (if any).
Args:
conn: Connection to the database to inherit. The database must
comply with the previous version of the schema.
"""
assert isinstance(conn, cls.Connection)
# For all tables/views
for table_name, table_fields in cls.TABLE_MAP.items():
# Add the _timestamp field to the raw table
conn.query_create(f"""
ALTER TABLE `_{table_name}`
ADD COLUMN IF NOT EXISTS
`{TIMESTAMP_FIELD.name}` {TIMESTAMP_FIELD.field_type}
OPTIONS(description={TIMESTAMP_FIELD.description!r})
""").result()
# Set the _timestamp field default to current timestamp
conn.query_create(f"""
ALTER TABLE `_{table_name}`
ALTER COLUMN `{TIMESTAMP_FIELD.name}`
SET DEFAULT {TIMESTAMP_FIELD.default_value_expression}
""").result()
# Set missing _timestamp fields to start_time, or current time
expr = "CURRENT_TIMESTAMP"
if any(f.name == "start_time" for f in table_fields):
expr = f"IFNULL(start_time, {expr})"
conn.query_create(f"""
UPDATE `_{table_name}`
SET `{TIMESTAMP_FIELD.name}` = {expr}
WHERE `{TIMESTAMP_FIELD.name}` IS NULL
""").result()
# Update the view
view_ref = conn.dataset_ref.table(table_name)
view = Table(view_ref)
view.view_query = cls._format_view_query(conn, table_name)
conn.client.update_table(view, ["view_query"])

def purge(self, before):
"""
Remove all the data from the database that arrived before the
specified time, if the database supports that.
Args:
before: An "aware" datetime.datetime object specifying the
the earliest (database server) time the data to be
*preserved* should've arrived. Any other data will be
purged.
Can be None to have nothing removed. The latter can be
used to test if the database supports purging.
Returns:
True if the database supports purging, and the requested data was
purged. False if the database doesn't support purging.
"""
assert before is None or \
isinstance(before, datetime.datetime) and before.tzinfo
if before is not None:
for table_name in self.TABLE_MAP:
self.conn.query_create(
f"DELETE FROM `_{table_name}`"
f"WHERE `{TIMESTAMP_FIELD.name}` < @before",
[ScalarQueryParameter(
"before", TIMESTAMP_FIELD.field_type, before
)]
).result()
return True
2 changes: 1 addition & 1 deletion kcidb/db/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import textwrap
from kcidb.db.schematic import Driver as SchematicDriver
from kcidb.db.postgresql.v04_01 import Schema as LatestSchema
from kcidb.db.postgresql.v04_02 import Schema as LatestSchema


class Driver(SchematicDriver):
Expand Down
106 changes: 106 additions & 0 deletions kcidb/db/postgresql/v04_02.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Kernel CI report database - PostgreSQL schema v4.2"""

import datetime
import logging
import kcidb.io as io
from kcidb.misc import merge_dicts
from kcidb.db.postgresql.schema import \
TimestampColumn, Table
from .v04_01 import Schema as PreviousSchema

# Module's logger
LOGGER = logging.getLogger(__name__)

# The new _timestamp column added to every table in this schema
TIMESTAMP_COLUMN = TimestampColumn(
conflict_func="GREATEST",
metadata_expr="CURRENT_TIMESTAMP"
)


class Schema(PreviousSchema):
"""PostgreSQL database schema v4.2"""

# The schema's version.
version = (4, 2)
# The I/O schema the database schema supports
io = io.schema.V4_3

# A map of table names and Table constructor arguments
# For use by descendants
TABLES_ARGS = {
name: merge_dicts(
args,
columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"])
)
for name, args in PreviousSchema.TABLES_ARGS.items()
}

# A map of table names and schemas
TABLES = {
name: Table(**args) for name, args in TABLES_ARGS.items()
}

@classmethod
def _inherit(cls, conn):
"""
Inerit the database data from the previous schema version (if any).
Args:
conn: Connection to the database to inherit. The database must
comply with the previous version of the schema.
"""
assert isinstance(conn, cls.Connection)
with conn, conn.cursor() as cursor:
# For all tables
for name, schema in cls.TABLES.items():
# Get the _timestamp table column
column = schema.columns["_timestamp"]
# Add the _timestamp column
cursor.execute(f"""
ALTER TABLE {name} ADD COLUMN {column.format_def()}
""")
# Set missing _timestamps to start_time, or current time
expr = column.schema.metadata_expr
if "start_time" in schema.columns:
expr = f"COALESCE(start_time, {expr})"
cursor.execute(f"""
UPDATE {name}
SET {column.name} = {expr}
WHERE {column.name} IS NULL
""")

def purge(self, before):
"""
Remove all the data from the database that arrived before the
specified time, if the database supports that.
Args:
before: An "aware" datetime.datetime object specifying the
the earliest (database server) time the data to be
*preserved* should've arrived. Any other data will be
purged.
Can be None to have nothing removed. The latter can be
used to test if the database supports purging.
Returns:
True if the database supports purging, and the requested data was
purged. False if the database doesn't support purging.
"""
assert before is None or \
isinstance(before, datetime.datetime) and before.tzinfo
if before is not None:
before_json = before.isoformat(timespec='microseconds')
with self.conn, self.conn.cursor() as cursor:
for name, schema in self.TABLES.items():
# Get the _timestamp table column
column = schema.columns["_timestamp"]
# Purge
cursor.execute(
f"""
DELETE FROM {name}
WHERE {column.name} < {schema.placeholder}
""",
(column.schema.pack(before_json),)
)
return True
2 changes: 1 addition & 1 deletion kcidb/db/sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import textwrap
from kcidb.db.schematic import Driver as SchematicDriver
from kcidb.db.sqlite.v04_01 import Schema as LatestSchema
from kcidb.db.sqlite.v04_02 import Schema as LatestSchema


class Driver(SchematicDriver):
Expand Down
115 changes: 115 additions & 0 deletions kcidb/db/sqlite/v04_02.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Kernel CI report database - SQLite schema v4.2"""

import datetime
import logging
import kcidb.io as io
from kcidb.misc import merge_dicts
from kcidb.db.sqlite.schema import \
TimestampColumn, Table
from .v04_01 import Schema as PreviousSchema

# Module's logger
LOGGER = logging.getLogger(__name__)

# The new _timestamp column added to every table in this schema
TIMESTAMP_COLUMN = TimestampColumn(
conflict_func="MAX",
metadata_expr="strftime('%Y-%m-%dT%H:%M:%f000Z', 'now')"
)


class Schema(PreviousSchema):
"""SQLite database schema v4.2"""

# The schema's version.
version = (4, 2)
# The I/O schema the database schema supports
io = io.schema.V4_3

# A map of table names and Table constructor arguments
# For use by descendants
TABLES_ARGS = {
name: merge_dicts(
args,
columns=dict(_timestamp=TIMESTAMP_COLUMN, **args["columns"])
)
for name, args in PreviousSchema.TABLES_ARGS.items()
}

# A map of table names and schemas
TABLES = {
name: Table(**args) for name, args in TABLES_ARGS.items()
}

@classmethod
def _inherit(cls, conn):
"""
Inerit the database data from the previous schema version (if any).
Args:
conn: Connection to the database to inherit. The database must
comply with the previous version of the schema.
"""
assert isinstance(conn, cls.Connection)
with conn:
cursor = conn.cursor()
try:
# For all tables
for name, schema in cls.TABLES.items():
# Get the _timestamp table column
column = schema.columns["_timestamp"]
# Add the _timestamp column
cursor.execute(f"""
ALTER TABLE {name} ADD COLUMN {column.format_def()}
""")
# Set missing _timestamps to start_time, or current time
expr = column.schema.metadata_expr
if "start_time" in schema.columns:
expr = f"COALESCE(start_time, {expr})"
cursor.execute(f"""
UPDATE {name}
SET {column.name} = {expr}
WHERE {column.name} IS NULL
""")
finally:
cursor.close()

def purge(self, before):
"""
Remove all the data from the database that arrived before the
specified time, if the database supports that.
Args:
before: An "aware" datetime.datetime object specifying the
the earliest (database server) time the data to be
*preserved* should've arrived. Any other data will be
purged.
Can be None to have nothing removed. The latter can be
used to test if the database supports purging.
Returns:
True if the database supports purging, and the requested data was
purged. False if the database doesn't support purging.
"""
assert before is None or \
isinstance(before, datetime.datetime) and before.tzinfo
if before is not None:
before_json = before.isoformat(timespec='microseconds')
with self.conn:
cursor = self.conn.cursor()
try:
# For all tables
for name, schema in self.TABLES.items():
# Get the _timestamp table column
column = schema.columns["_timestamp"]
# Purge
cursor.execute(
f"""
DELETE FROM {name}
WHERE {column.name} < {schema.placeholder}
""",
(column.schema.pack(before_json),)
)
finally:
cursor.close()
return True
2 changes: 1 addition & 1 deletion kcidb/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
from kcidb_io import * # noqa: F403

# The I/O schema version used by KCIDB
SCHEMA = schema.V4_2 # noqa: F405
SCHEMA = schema.V4_3 # noqa: F405
Loading

0 comments on commit bf159d8

Please sign in to comment.