Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ authors = [
dependencies = [
"berdl-notebook-utils",
"biopython>=1.86",
"cdm-schema",
"click>=8.1.8",
"ruff>=0.14.8",
"click>=8.3.1",
"xmlschema>=4.2.0",
"xsdata[cli,lxml]>=25.7",
"lxml>=6.0.2",
"pytest-asyncio>=1.3.0",
"ruff>=0.14.13",
]

[dependency-groups]
dev = [
"hypothesis>=6.148.9",
"mutmut>=3.4.0",
"pytest>=9.0.2",
"pytest-cov>=7.0.0",
"pytest-env>=1.2.0",
Expand Down
Empty file.
109 changes: 109 additions & 0 deletions src/cdm_data_loader_utils/audit/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Schema for audit tables."""

from pyspark.sql.types import ArrayType, IntegerType, LongType, StringType, StructField, StructType, TimestampType

from cdm_data_loader_utils.core.pipeline_run import PipelineRun

CHECKPOINT = "checkpoint"
METRICS = "metrics"
REJECTS = "rejects"
RUN = "run"

RUN_ID = "run_id"
PIPELINE = "pipeline"
SOURCE = "source_path"
NAMESPACE = "namespace"

END_TIME = "end_time"
ERROR = "error"
LAST_ENTRY_ID = "last_entry_id"
PARSED_ROW = "parsed_record"
RAW_ROW = "raw_record"
RECORDS_PROCESSED = "records_processed"
START_TIME = "start_time"
STATUS = "status"
TABLE = "table"
TIMESTAMP = "timestamp"
UPDATED = "updated"

STATUS_ERROR = "ERROR"
STATUS_RUNNING = "RUNNING"
STATUS_SUCCESS = "SUCCESS"

N_INVALID = "records_invalid"
N_READ = "records_read"
N_VALID = "records_valid"
ROW_ERRORS = "errors_in_record"
VALIDATION_ERRORS = "validation_errors"


FIELDS = {
RUN_ID: StructField(RUN_ID, StringType(), nullable=False),
PIPELINE: StructField(PIPELINE, StringType(), nullable=False),
SOURCE: StructField(SOURCE, StringType(), nullable=False),
STATUS: StructField(STATUS, StringType(), nullable=False),
UPDATED: StructField(UPDATED, TimestampType(), nullable=False),
}

PIPELINE_FIELDS = [FIELDS[RUN_ID], FIELDS[PIPELINE], FIELDS[SOURCE]]

AUDIT_SCHEMA = {
# represents the whole run
RUN: StructType(
[
*PIPELINE_FIELDS,
FIELDS[STATUS],
StructField(RECORDS_PROCESSED, LongType(), nullable=True),
StructField(START_TIME, TimestampType(), nullable=False),
StructField(END_TIME, TimestampType(), nullable=True),
StructField(ERROR, StringType(), nullable=True),
]
),
# checkpoint during processing to allow parsing to be restarted after an error
CHECKPOINT: StructType(
[
*PIPELINE_FIELDS,
FIELDS[STATUS],
StructField(RECORDS_PROCESSED, LongType(), nullable=True),
StructField(LAST_ENTRY_ID, StringType(), nullable=True),
FIELDS[UPDATED],
]
),
# per-source metrics on data ingested
METRICS: StructType(
[
*PIPELINE_FIELDS,
StructField(N_READ, IntegerType(), nullable=False),
StructField(N_VALID, IntegerType(), nullable=False),
StructField(N_INVALID, IntegerType(), nullable=False),
StructField(VALIDATION_ERRORS, ArrayType(StringType(), containsNull=False), nullable=False),
FIELDS[UPDATED],
]
),
# invalid data
REJECTS: StructType(
[
*PIPELINE_FIELDS,
StructField(RAW_ROW, StringType(), nullable=False),
StructField(PARSED_ROW, StringType(), nullable=True),
StructField(ROW_ERRORS, ArrayType(StringType()), nullable=False),
StructField(TIMESTAMP, TimestampType(), nullable=False),
]
),
}


def current_run_expr(target: str = "t", source: str = "s") -> str:
"""SQL expression for matching the current run in any of the audit tables."""
return " AND ".join([f"{target or 't'}.{param} = {source or 's'}.{param}" for param in [RUN_ID, SOURCE, PIPELINE]])

Check warning on line 98 in src/cdm_data_loader_utils/audit/schema.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loader_utils/audit/schema.py#L98

Added line #L98 was not covered by tests


def match_run(run: PipelineRun) -> str:
"""SQL expression for matching the current run in any of the audit tables.

:param run: current pipeline run
:type run: PipelineRun
:return: SQL expression string
:rtype: str
"""
return " AND ".join([f"{p} = '{getattr(run, p)}'" for p in [RUN_ID, PIPELINE, SOURCE]])

Check warning on line 109 in src/cdm_data_loader_utils/audit/schema.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loader_utils/audit/schema.py#L109

Added line #L109 was not covered by tests
Empty file.
4 changes: 4 additions & 0 deletions src/cdm_data_loader_utils/core/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Constants used in various places throughout the codebase."""

INVALID_DATA_FIELD_NAME = "__invalid_data__"
D = "delta"
13 changes: 13 additions & 0 deletions src/cdm_data_loader_utils/core/pipeline_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Pipeline run class."""

from dataclasses import dataclass


@dataclass(frozen=True)
class PipelineRun:
"""Dataclass for capturing ingestion run information."""

run_id: str
pipeline: str
source_path: str
namespace: str
Empty file.
130 changes: 130 additions & 0 deletions src/cdm_data_loader_utils/readers/dsv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Generic DSV file reader with validation of incoming data."""

from typing import Any

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StringType, StructField, StructType

from cdm_data_loader_utils.core.constants import INVALID_DATA_FIELD_NAME
from cdm_data_loader_utils.utils.cdm_logger import get_cdm_logger, log_and_die

# mapping of delimiters to format names (for logging)
# spark defaults to separating on commas if nothing is specified
FORMAT_NAME = {None: "CSV", ",": "CSV", "\t": "TSV"}

# schema field to catch input errors
INVALID_DATA_FIELD = StructField(INVALID_DATA_FIELD_NAME, StringType(), nullable=True)

# read modes
PERMISSIVE = "PERMISSIVE"
FAILFAST = "FAILFAST"
DROP = "DROPMALFORMED"

# default options: enforce schema, catch parse errors in INVALID_DATA_FIELD_NAME column
DEFAULT_DSV_OPTIONS = {
"inferSchema": False,
"enforceSchema": True,
"mode": PERMISSIVE,
"columnNameOfCorruptRecord": INVALID_DATA_FIELD_NAME,
}


logger = get_cdm_logger()


def get_format_name(delimiter: str | None) -> str:
"""Get the nice name of the format being parsed, given the string used as delimiter."""
return FORMAT_NAME.get(delimiter, "DSV")


def read(
spark: SparkSession,
path: str,
schema_fields: list[StructField],
options: dict[str, Any] | None = None,
) -> DataFrame:
"""Read in a delimiter-separated file with spark.

:param spark: spark sesh
:type spark: SparkSession
:param path: location of the file to parse
:type path: str
:param schema_fields: list of StructFields describing the expected input
:type schema_fields: list[StructField]
:param options: dictionary of options
:type options: dict[str, Any]
:return: dataframe of parsed rows
:rtype: DataFrame
"""
if not isinstance(schema_fields, list) or not all(isinstance(field, StructField) for field in schema_fields):
log_and_die("schema_fields must be specified as a list of StructFields", TypeError)

if not options:
options = {}

dsv_schema = StructType([*schema_fields, INVALID_DATA_FIELD])
dsv_options = {
**DEFAULT_DSV_OPTIONS,
**options,
}

if dsv_options["mode"] not in (PERMISSIVE, FAILFAST):
msg = "The only permitted read modes are PERMISSIVE and FAILFAST."
log_and_die(msg, ValueError)

format_name = get_format_name(options.get("delimiter", options.get("sep")))

try:
df = spark.read.options(**dsv_options).csv(path, schema=dsv_schema)
except Exception:
# Log the full stack trace and re-raise to be handled by the caller
logger.exception("Failed to load %s from %s", format_name, path)
raise

# count will not trigger an error even if in FAILFAST mode and all records are corrupt.
logger.info("Loaded %d %s records from %s", df.count(), format_name, path)
return df


def read_tsv(
spark: SparkSession, path: str, schema_fields: list[StructField], options: dict[str, Any] | None = None
) -> DataFrame:
"""Shortcut for reading in a tab-separated file.

:param spark: spark sesh
:type spark: SparkSession
:param path: location of the file to parse
:type path: str
:param schema_fields: list of StructFields describing the expected input
:type schema_fields: list[StructField]
:param options: dictionary of options
:type options: dict[str, Any]
:return: dataframe of parsed rows
:rtype: DataFrame
"""
if not options:
options = {}
options["separator"] = "\t"
return read(spark, path, schema_fields, options)

Check warning on line 108 in src/cdm_data_loader_utils/readers/dsv.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loader_utils/readers/dsv.py#L105-L108

Added lines #L105 - L108 were not covered by tests


def read_csv(
spark: SparkSession, path: str, schema_fields: list[StructField], options: dict[str, Any] | None = None
) -> DataFrame:
"""Shortcut for reading in a comma-separated file.

:param spark: spark sesh
:type spark: SparkSession
:param path: location of the file to parse
:type path: str
:param schema_fields: list of StructFields describing the expected input
:type schema_fields: list[StructField]
:param options: dictionary of options
:type options: dict[str, Any]
:return: dataframe of parsed rows
:rtype: DataFrame
"""
if not options:
options = {}
options["separator"] = ","
return read(spark, path, schema_fields, options)

Check warning on line 130 in src/cdm_data_loader_utils/readers/dsv.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loader_utils/readers/dsv.py#L127-L130

Added lines #L127 - L130 were not covered by tests
116 changes: 116 additions & 0 deletions src/cdm_data_loader_utils/utils/cdm_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
Provides structured logging with contextual metadata for CDM data import pipelines.
"""

import logging
import logging.handlers
import os
import sys
from pathlib import Path

DEFAULT_LOGGER_NAME = "cdm_data_loader"

GENERIC_ERROR_MESSAGE = "An error of unknown origin occurred."

LOG_FILENAME = "cdm_data_loader.log"
MAX_LOG_FILE_SIZE = 2**30 # 1 GiB
MAX_LOG_BACKUPS = 5


# TODO: adopt logging config, set just once
LOGGING_CONFIG = {
"root": {"name": "cdm_data_loader", "level": "INFO", "handlers": ["console", "file"]},
"version": 1,
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "json",
"level": "INFO",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "json",
"filename": LOG_FILENAME,
"maxBytes": MAX_LOG_FILE_SIZE,
"backupCount": MAX_LOG_BACKUPS,
},
},
"formatters": {
"json": {
"format": '{"time": "%(asctime)s", "level": "%(levelname)s", "module": "%(module)s", "msg": "%(message)s"}'
}
},
}


def get_cdm_logger(
logger_name: str | None = None, log_level: str | None = None, log_dir: str | None = None
) -> logging.Logger:
"""Initialise the logger for the module.

If the logger name is not set, the default name "cdm_data_loader" will be used.

:param logger_name: name for the logger, defaults to None
:type logger_name: str | None, optional
:param log_level: logger level, defaults to None
:type log_level: str | None, optional
:param log_dir: directory to save log files to, optional. If no directory is specified, logs will just be emitted to the console.
:type log_dir: str | None
:return: initialised logger
:rtype: logging.Logger
"""
if not logger_name:
logger_name = DEFAULT_LOGGER_NAME
# Always get the same logger by name
logger = logging.getLogger(logger_name)

# Determine log level (argument > env var > default)
effective_log_level = (log_level or os.getenv("LOG_LEVEL", "INFO")).upper()
logger.setLevel(getattr(logging, effective_log_level, logging.DEBUG))

# JSON-style structured formatter
formatter = logging.Formatter(
'{"time": "%(asctime)s", "level": "%(levelname)s", "module": "%(module)s", "msg": "%(message)s"}'
)

# Console handler
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
logger.addHandler(ch)

if log_dir:
log_dir_path = Path(log_dir)
if not log_dir_path.exists() and log_dir_path.is_dir():
msg = f"{log_dir} does not exist or is not a directory."
raise FileNotFoundError(msg)

Check warning on line 86 in src/cdm_data_loader_utils/utils/cdm_logger.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loader_utils/utils/cdm_logger.py#L83-L86

Added lines #L83 - L86 were not covered by tests
# Add the log message handler to the logger
file_handler = logging.handlers.RotatingFileHandler(

Check warning on line 88 in src/cdm_data_loader_utils/utils/cdm_logger.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loader_utils/utils/cdm_logger.py#L88

Added line #L88 was not covered by tests
LOG_FILENAME, maxBytes=MAX_LOG_FILE_SIZE, backupCount=MAX_LOG_BACKUPS
)
logger.addHandler(file_handler)

Check warning on line 91 in src/cdm_data_loader_utils/utils/cdm_logger.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loader_utils/utils/cdm_logger.py#L91

Added line #L91 was not covered by tests

return logger


def log_and_die(error_msg: str, error_class: type[Exception], logger_name: str | None = None) -> None:
"""Log an error message and then raise the error.

:param error_msg: error message string
:type error_msg: str
:param error_class: class of error to throw
:type error_class: type[Exception]
:param logger_name: name of the logger to use, defaults to None
:type logger_name: str | None, optional
"""
logger = get_cdm_logger(logger_name)

if not error_msg:
logger.warning("No error supplied to log_and_die. Using generic error message.")
error_msg = GENERIC_ERROR_MESSAGE

Check warning on line 110 in src/cdm_data_loader_utils/utils/cdm_logger.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loader_utils/utils/cdm_logger.py#L109-L110

Added lines #L109 - L110 were not covered by tests

if not isinstance(error_class, type) or not issubclass(error_class, BaseException):
error_class = RuntimeError

Check warning on line 113 in src/cdm_data_loader_utils/utils/cdm_logger.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loader_utils/utils/cdm_logger.py#L113

Added line #L113 was not covered by tests

logger.error(error_msg)
raise error_class(error_msg)
Empty file.
Loading
Loading