Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ function execute_command() {
--database="$database"
--clean-test-databases="$clean_test_databases"
--empty-test-databases="$empty_test_databases"
--auto-sync="$test"
)

# Handle "env" command
Expand Down
9 changes: 5 additions & 4 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@
EMPTY_SQLITE_FILE = tempfile.mkstemp(suffix=".sqlite3")[1]
atexit.register(os.unlink, EMPTY_SQLITE_FILE)
EMPTY_DATABASE_SPECS = ["sqlite:" + EMPTY_SQLITE_FILE]
EMPTY_SQLITE_DATABASE = kcidb.db.Client(EMPTY_DATABASE_SPECS[0])
EMPTY_SQLITE_DATABASE = kcidb.db.Client(EMPTY_DATABASE_SPECS[0],
auto_sync=True)
EMPTY_SQLITE_DATABASE.init()
del EMPTY_SQLITE_DATABASE

# "Clean" (uninitialized) databases indexed by their specifications
CLEAN_DATABASES = {
spec: kcidb.db.Client(spec)
spec: kcidb.db.Client(spec, auto_sync=True)
for spec in CLEAN_DATABASE_SPECS
}

# "Empty" (no-data) databases indexed by their specifications
EMPTY_DATABASES = {
spec: kcidb.db.Client(spec)
spec: kcidb.db.Client(spec, auto_sync=True)
for spec in EMPTY_DATABASE_SPECS
}

Expand All @@ -65,7 +66,7 @@ def empty_deployment():
pull_iter(timeout=30):
pass
# Empty the database
kcidb.db.Client(os.environ["KCIDB_DATABASE"]).empty()
kcidb.db.Client(os.environ["KCIDB_DATABASE"], auto_sync=True).empty()
# Wipe the spool
kcidb.monitor.spool.Client(
os.environ["KCIDB_SPOOL_COLLECTION_PATH"]
Expand Down
5 changes: 5 additions & 0 deletions kcidb/cloud/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ declare _FUNCTIONS_SH=
# --database=SPEC
# --clean-test-databases=SPEC_LIST
# --empty-test-databases=SPEC_LIST
# --auto-sync=true|false
function functions_env() {
declare params
params="$(getopt_vars format \
Expand All @@ -57,6 +58,7 @@ function functions_env() {
database \
clean_test_databases \
empty_test_databases \
auto_sync \
-- "$@")"
eval "$params"
declare -A env=(
Expand Down Expand Up @@ -107,6 +109,9 @@ function functions_env() {
if "$updated_publish"; then
env[KCIDB_UPDATED_PUBLISH]="1"
fi
if "$auto_sync"; then
env[KCIDB_AUTO_SYNC]="1"
fi
if [ "$format" == "yaml" ]; then
# Silly Python and its significant whitespace
sed -E 's/^[[:blank:]]+//' <<<'
Expand Down
17 changes: 17 additions & 0 deletions kcidb/cloud/psql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ function psql_instance_deploy() {
cloudsql.iam_authentication=on
max_connections=200
random_page_cost=1.5
cloudsql.enable_pg_cron=on
cloudsql.allow_passwordless_local_connections=on
)
# Get and cache the password in the current shell first
password_get psql_superuser >/dev/null
Expand All @@ -102,6 +104,21 @@ function psql_instance_deploy() {
--database-flags="$(IFS=','; echo "${database_flags[*]}")"
fi

# Deploy pg_cron and its maintenance job
psql_proxy_session "$project" "$name" \
mute psql --dbname postgres -e <<<"
\\set ON_ERROR_STOP on
CREATE EXTENSION IF NOT EXISTS pg_cron;
SELECT cron.schedule(
'purge-old-job-run-details',
'0 12 * * *',
\$\$
DELETE FROM cron.job_run_details
WHERE end_time < now() - interval '7 days'
\$\$
);
"

# Deploy the shared viewer user
exists=$(psql_user_exists "$project" "$name" "$viewer")
updated=$(password_is_updated psql_viewer)
Expand Down
66 changes: 61 additions & 5 deletions kcidb/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_drivers(cls):
class Client(kcidb.orm.Source):
"""Kernel CI report database client"""

def __init__(self, database):
def __init__(self, database, auto_sync=False):
"""
Initialize the Kernel CI report database client.

Expand All @@ -64,6 +64,12 @@ def __init__(self, database):
as "<DRIVER>:<PARAMS>" or just "<DRIVER>". Where
"<DRIVER>" is the driver name, and "<PARAMS>" is the
optional driver-specific database parameter string.
auto_sync: True if the client should propagate changes (load,
purge, etc.) through the database before returning
from the corresponding method.
False if the propagation should be left to periodic
processes or explicit propagation using the sync()
method.

Raises:
UnknownDriver - an unknown (sub-)driver encountered in the
Expand All @@ -74,7 +80,9 @@ def __init__(self, database):
driver
"""
assert isinstance(database, str)
assert isinstance(auto_sync, bool)
self.database = database
self.auto_sync = auto_sync
self.driver = None
self.reset()

Expand Down Expand Up @@ -142,6 +150,8 @@ def empty(self):
"""
assert self.is_initialized()
self.driver.empty()
if self.auto_sync:
self.sync()

def purge(self, before=None):
"""
Expand All @@ -164,7 +174,10 @@ def purge(self, before=None):
assert self.is_initialized()
assert before is None or \
isinstance(before, datetime.datetime) and before.tzinfo
return self.driver.purge(before)
purged = self.driver.purge(before)
if purged and self.auto_sync:
self.sync()
return purged

def get_current_time(self):
"""
Expand Down Expand Up @@ -433,6 +446,21 @@ def load(self, data, with_metadata=False):
assert LIGHT_ASSERTS or io_schema.is_valid_exactly(data)
assert isinstance(with_metadata, bool)
self.driver.load(data, with_metadata=with_metadata)
if self.auto_sync:
self.sync()

def sync(self):
"""
Propagate the recent changes (load, purge, etc.) through the
database, immediately, without leaving it to periodic propagation.
Such as updating materialized views. The database must be initialized.

Returns:
True if sync is supported and has succeeded.
False if sync is not supported/required.
"""
assert LIGHT_ASSERTS or self.is_initialized()
return self.driver.sync()


class DBHelpAction(argparse.Action):
Expand Down Expand Up @@ -721,8 +749,13 @@ def load_main():
help='Load metadata fields as well',
action='store_true'
)
parser.add_argument(
'--sync',
help='Propagate database changes immediately, if needed',
action='store_true'
)
args = parser.parse_args()
client = Client(args.database)
client = Client(args.database, auto_sync=args.sync)
if not client.is_initialized():
raise Exception(f"Database {args.database!r} is not initialized")
io_schema = client.get_schema()[1]
Expand Down Expand Up @@ -857,8 +890,13 @@ def empty_main():
description = 'kcidb-db-empty - Remove all data from a ' \
'Kernel CI report database'
parser = ArgumentParser(description=description)
parser.add_argument(
'--sync',
help='Propagate database changes immediately, if needed',
action='store_true'
)
args = parser.parse_args()
client = Client(args.database)
client = Client(args.database, auto_sync=args.sync)
if client.is_initialized():
client.empty()
else:
Expand All @@ -881,8 +919,26 @@ def purge_main():
"be *preserved* should've arrived. "
"No data is removed if not specified."
)
parser.add_argument(
'--sync',
help='Propagate database changes immediately, if needed',
action='store_true'
)
args = parser.parse_args()
client = Client(args.database)
client = Client(args.database, auto_sync=args.sync)
if not client.is_initialized():
raise Exception(f"Database {args.database!r} is not initialized")
return 0 if client.purge(before=args.before) else 2


def sync_main():
"""Execute the kcidb-db-sync command-line tool"""
sys.excepthook = kcidb.misc.log_and_print_excepthook
description = 'kcidb-db-sync - Propagate database changes. ' \
'Exit with status 2, if not needed/supported by database.'
parser = ArgumentParser(description=description)
args = parser.parse_args()
client = Client(args.database)
if not client.is_initialized():
raise Exception(f"Database {args.database!r} is not initialized")
return 0 if client.sync() else 2
13 changes: 13 additions & 0 deletions kcidb/db/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,16 @@ def load(self, data, with_metadata):
assert io_schema.is_compatible_directly(data)
assert LIGHT_ASSERTS or io_schema.is_valid_exactly(data)
assert isinstance(with_metadata, bool)

@abstractmethod
def sync(self):
"""
Propagate the recent changes (load, purge, etc.) through the
database, immediately, without leaving it to periodic propagation.
Such as updating materialized views. The database must be initialized.

Returns:
True if sync is supported and has succeeded.
False if sync is not supported/required.
"""
assert self.is_initialized()
30 changes: 22 additions & 8 deletions kcidb/db/bigquery/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,14 +579,15 @@ class Schema(AbstractSchema):
' "" AS misc\n'
'FROM UNNEST([])',
incident='SELECT\n'
' "" AS id,\n'
' "" AS origin,\n'
' "" AS issue_id,\n'
' 0 AS issue_version,\n'
' "" AS build_id,\n'
' "" AS test_id,\n'
' "" AS comment,\n'
' "" AS misc\n'
' "" AS id,\n'
' "" AS origin,\n'
' "" AS issue_id,\n'
' 0 AS issue_version,\n'
' "" AS build_id,\n'
' "" AS test_id,\n'
' FALSE AS present,\n'
' "" AS comment,\n'
' "" AS misc\n'
'FROM UNNEST([])',
)

Expand Down Expand Up @@ -1081,3 +1082,16 @@ def load(self, data, with_metadata):
raise Exception("".join([
f"ERROR: {error['message']}\n" for error in job.errors
])) from exc

def sync(self):
"""
Propagate the recent changes (load, purge, etc.) through the
database, immediately, without leaving it to periodic propagation.
Such as updating materialized views. The database must be initialized.

Returns:
True if sync is supported and has succeeded.
False if sync is not supported/required.
"""
# No syncing needed at the moment
return False
14 changes: 12 additions & 2 deletions kcidb/db/bigquery/v04_01.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class Schema(PreviousSchema):
" issue_version,\n"
" build_id,\n"
" test_id,\n"
" present,\n"
" comment,\n"
" misc\n"
"FROM (\n"
Expand All @@ -221,14 +222,23 @@ class Schema(PreviousSchema):
" present,\n"
" comment,\n"
" misc,\n"
" DENSE_RANK() OVER (\n"
" RANK() OVER (\n"
" PARTITION BY\n"
" issue_id, build_id, test_id\n"
" ORDER BY issue_version DESC\n"
" ) AS precedence\n"
" FROM incidents\n"
" WHERE\n"
" present IS NOT NULL AND\n"
" EXISTS (\n"
" SELECT TRUE\n"
" FROM issues\n"
" WHERE\n"
" incidents.issue_id = issues.id AND\n"
" incidents.issue_version = issues.version\n"
" )\n"
")\n"
"WHERE precedence = 1 AND present",
"WHERE precedence = 1",
)

@classmethod
Expand Down
17 changes: 17 additions & 0 deletions kcidb/db/mux.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,3 +416,20 @@ def load(self, data, with_metadata):
if driver_io_schema != io_schema else data,
with_metadata=with_metadata
)

def sync(self):
"""
Propagate the recent changes (load, purge, etc.) through the
databases, immediately, without leaving it to periodic propagation.
Such as updating materialized views. The databases must be initialized.

Returns:
True if sync is supported and has succeeded.
False if sync is not supported/required.
"""
assert self.is_initialized()
synced = False
# Sync every driver
for driver in self.drivers:
synced = driver.sync() or synced
return synced
13 changes: 13 additions & 0 deletions kcidb/db/null.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,16 @@ def load(self, data, with_metadata):
should be discarded and the database should
generate its metadata itself.
"""

def sync(self):
"""
Propagate the recent changes (load, purge, etc.) through the
database, immediately, without leaving it to periodic propagation.
Such as updating materialized views. The database must be initialized.

Returns:
True if sync is supported and has succeeded.
False if sync is not supported/required.
"""
assert self.is_initialized()
return False
2 changes: 1 addition & 1 deletion kcidb/db/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import textwrap
from kcidb.db.schematic import Driver as SchematicDriver
from kcidb.db.postgresql.v04_08 import Schema as LatestSchema
from kcidb.db.postgresql.v04_09 import Schema as LatestSchema


class Driver(SchematicDriver):
Expand Down
Loading