Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@

sys.path.append("../../common/lib")

from databricks_ingestion_monitoring.common_ldp import Configuration, Constants, MonitoringEtlPipeline
from databricks_ingestion_monitoring.common_ldp import (
Configuration,
Constants,
MonitoringEtlPipeline,
)
from databricks_ingestion_monitoring.standard_tables import (
EVENTS_TABLE_METRICS,
TABLE_STATUS,
TABLE_STATUS_PER_PIPELINE_RUN
)
EVENTS_TABLE_METRICS,
TABLE_STATUS,
TABLE_STATUS_PER_PIPELINE_RUN,
)

# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
logger.info("Starting CDC Connector Monitoring ETL Pipeline")
Expand All @@ -24,86 +28,98 @@

conf = Configuration(spark.conf)


class CdcConstants:
CDC_FLOW_TYPE = 'cdc'
SNAPSHOT_FLOW_TYPE = 'snapshot'
CDC_STAGING_TABLE_FLOW_TYPE = 'cdc_staging'
TABLE_STATUS_PER_PIPELINE_RUN = 'table_status_per_pipeline_run'
CDC_STAGING_TABLE = 'cdc_staging_table'
CDC_FLOW_TYPE = "cdc"
SNAPSHOT_FLOW_TYPE = "snapshot"
CDC_STAGING_TABLE_FLOW_TYPE = "cdc_staging"
TABLE_STATUS_PER_PIPELINE_RUN = "table_status_per_pipeline_run"
CDC_STAGING_TABLE = "cdc_staging_table"


class CdcConnectorMonitoringEtlPipeline(MonitoringEtlPipeline):
def __init__(self, conf: Configuration, spark: SparkSession):
super().__init__(conf, spark)

def _get_event_logs_bronze_sql(self, event_log_source: str):
"""
Override base definition for append flows from the event log sources into `event_logs_bronze` table. It adds
CDC Connector-specific fields
"""
sql = super()._get_event_logs_bronze_sql(event_log_source)
sql = sql.replace(Constants.sql_fields_def_extension_point,
f""", (CASE WHEN endswith(flow_name, "_snapshot_flow") THEN 'snapshot'
def __init__(self, conf: Configuration, spark: SparkSession):
super().__init__(conf, spark)

def _get_event_logs_bronze_sql(self, event_log_source: str):
"""
Override base definition for append flows from the event log sources into `event_logs_bronze` table. It adds
CDC Connector-specific fields
"""
sql = super()._get_event_logs_bronze_sql(event_log_source)
sql = sql.replace(
Constants.sql_fields_def_extension_point,
f""", (CASE WHEN endswith(flow_name, "_snapshot_flow") THEN 'snapshot'
WHEN details:operation_progress.cdc_snapshot.table_name::string is not null THEN '{CdcConstants.SNAPSHOT_FLOW_TYPE}'
WHEN endswith(flow_name, "_cdc_flow") THEN '{CdcConstants.CDC_FLOW_TYPE}'
WHEN endswith(flow_name, ".{CdcConstants.CDC_STAGING_TABLE}") THEN '{CdcConstants.CDC_STAGING_TABLE_FLOW_TYPE}'
END) flow_type{Constants.sql_fields_def_extension_point}
""")
return sql


def _get_events_errors_sql(self):
sql = super()._get_events_errors_sql()
sql = sql.replace(Constants.sql_fields_def_extension_point,
f", flow_type{Constants.sql_fields_def_extension_point}")
return sql


def _get_events_warnings_sql(self):
sql = super()._get_events_warnings_sql()
sql = sql.replace(Constants.sql_fields_def_extension_point,
f", flow_type{Constants.sql_fields_def_extension_point}")
return sql


def _get_events_table_metrics_sql(self):
sql = super()._get_events_table_metrics_sql()
return sql.replace(Constants.sql_fields_def_extension_point, f", flow_type{Constants.sql_fields_def_extension_point}")


def register_base_tables_and_views(self, spark: SparkSession):
super().register_base_tables_and_views(spark)

def _get_table_run_processing_state_sql(self):
sql = super()._get_table_run_processing_state_sql()
sql = sql.replace(Constants.where_clause_extension_point, f"AND (table_name not LIKE '%.{CdcConstants.CDC_STAGING_TABLE}') {Constants.where_clause_extension_point}")
sql = sql.replace(Constants.sql_fields_def_extension_point, f", flow_type{Constants.sql_fields_def_extension_point}")
return sql


def register_table_status(self, spark: SparkSession):
table_status_per_pipeline_run_cdf = f"{TABLE_STATUS_PER_PIPELINE_RUN.name}_cdf"
@dlt.view(name=table_status_per_pipeline_run_cdf)
def table_run_processing_state_cdf():
return (
spark.readStream
.option("readChangeFeed", "true")
.table(TABLE_STATUS_PER_PIPELINE_RUN.name)
.filter("_change_type IN ('insert', 'update_postimage')")
)

silver_table_name = f"{TABLE_STATUS.name}_silver"
dlt.create_streaming_table(name=silver_table_name,
comment="Capture information about the latest state, ingested data and errors for target tables",
cluster_by=['pipeline_id', 'table_name'],
table_properties={
"delta.enableRowTracking": "true"
})

silver_latest_source_view_name = f"{silver_table_name}_latest_source"
@dlt.view(name=silver_latest_source_view_name)
def table_latest_run_processing_state_source():
return spark.sql(f"""
""",
)
return sql

def _get_events_errors_sql(self):
sql = super()._get_events_errors_sql()
sql = sql.replace(
Constants.sql_fields_def_extension_point,
f", flow_type{Constants.sql_fields_def_extension_point}",
)
return sql

def _get_events_warnings_sql(self):
sql = super()._get_events_warnings_sql()
sql = sql.replace(
Constants.sql_fields_def_extension_point,
f", flow_type{Constants.sql_fields_def_extension_point}",
)
return sql

def _get_events_table_metrics_sql(self):
sql = super()._get_events_table_metrics_sql()
return sql.replace(
Constants.sql_fields_def_extension_point,
f", flow_type{Constants.sql_fields_def_extension_point}",
)

def register_base_tables_and_views(self, spark: SparkSession):
super().register_base_tables_and_views(spark)

def _get_table_run_processing_state_sql(self):
sql = super()._get_table_run_processing_state_sql()
sql = sql.replace(
Constants.where_clause_extension_point,
f"AND (table_name not LIKE '%.{CdcConstants.CDC_STAGING_TABLE}') {Constants.where_clause_extension_point}",
)
sql = sql.replace(
Constants.sql_fields_def_extension_point,
f", flow_type{Constants.sql_fields_def_extension_point}",
)
return sql

def register_table_status(self, spark: SparkSession):
table_status_per_pipeline_run_cdf = f"{TABLE_STATUS_PER_PIPELINE_RUN.name}_cdf"

@dlt.view(name=table_status_per_pipeline_run_cdf)
def table_run_processing_state_cdf():
return (
spark.readStream.option("readChangeFeed", "true")
.table(TABLE_STATUS_PER_PIPELINE_RUN.name)
.filter("_change_type IN ('insert', 'update_postimage')")
)

silver_table_name = f"{TABLE_STATUS.name}_silver"
dlt.create_streaming_table(
name=silver_table_name,
comment="Capture information about the latest state, ingested data and errors for target tables",
cluster_by=["pipeline_id", "table_name"],
table_properties={"delta.enableRowTracking": "true"},
)

silver_latest_source_view_name = f"{silver_table_name}_latest_source"

@dlt.view(name=silver_latest_source_view_name)
def table_latest_run_processing_state_source():
return spark.sql(f"""
SELECT pipeline_id,
table_name,
pipeline_run_id AS latest_pipeline_run_id,
Expand All @@ -129,18 +145,22 @@ def table_latest_run_processing_state_source():
WHERE table_name NOT LIKE '%.{CdcConstants.CDC_STAGING_TABLE}'
""")

dlt.create_auto_cdc_flow(
name=f"{silver_table_name}_apply_latest",
source=silver_latest_source_view_name,
target=silver_table_name,
keys=['pipeline_id', 'table_name'],
sequence_by='updated_at',
ignore_null_updates=True)

silver_latest_cdc_changes_source_view_name = f"{silver_table_name}_latest_cdc_changes_source"
@dlt.view(name=silver_latest_cdc_changes_source_view_name)
def table_latest_run_processing_state_source():
return spark.sql(f"""
dlt.create_auto_cdc_flow(
name=f"{silver_table_name}_apply_latest",
source=silver_latest_source_view_name,
target=silver_table_name,
keys=["pipeline_id", "table_name"],
sequence_by="updated_at",
ignore_null_updates=True,
)

silver_latest_cdc_changes_source_view_name = (
f"{silver_table_name}_latest_cdc_changes_source"
)

@dlt.view(name=silver_latest_cdc_changes_source_view_name)
def table_latest_run_processing_state_source():
return spark.sql(f"""
SELECT pipeline_id,
table_name,
null AS latest_pipeline_run_id,
Expand Down Expand Up @@ -168,18 +188,22 @@ def table_latest_run_processing_state_source():
AND flow_type='cdc'
""")

dlt.create_auto_cdc_flow(
name=f"{silver_table_name}_apply_latest_cdc_changes",
source=silver_latest_cdc_changes_source_view_name,
target=silver_table_name,
keys=['pipeline_id', 'table_name'],
sequence_by='updated_at',
ignore_null_updates=True)

silver_latest_snapshot_changes_source_view_name = f"{silver_table_name}_latest_snapshot_changes_source"
@dlt.view(name=silver_latest_snapshot_changes_source_view_name)
def table_latest_run_processing_state_source():
return spark.sql(f"""
dlt.create_auto_cdc_flow(
name=f"{silver_table_name}_apply_latest_cdc_changes",
source=silver_latest_cdc_changes_source_view_name,
target=silver_table_name,
keys=["pipeline_id", "table_name"],
sequence_by="updated_at",
ignore_null_updates=True,
)

silver_latest_snapshot_changes_source_view_name = (
f"{silver_table_name}_latest_snapshot_changes_source"
)

@dlt.view(name=silver_latest_snapshot_changes_source_view_name)
def table_latest_run_processing_state_source():
return spark.sql(f"""
SELECT pipeline_id,
table_name,
null AS latest_pipeline_run_id,
Expand Down Expand Up @@ -207,22 +231,23 @@ def table_latest_run_processing_state_source():
AND flow_type='snapshot'
""")

dlt.create_auto_cdc_flow(
name=f"{silver_table_name}_apply_latest_snapshot_changes",
source=silver_latest_snapshot_changes_source_view_name,
target=silver_table_name,
keys=['pipeline_id', 'table_name'],
sequence_by='updated_at',
ignore_null_updates=True)

@dlt.table(name=TABLE_STATUS.name,
comment=TABLE_STATUS.table_comment,
cluster_by=['pipeline_id', 'table_name'],
table_properties={
"delta.enableRowTracking": "true"
})
def table_status():
return spark.sql(f"""
dlt.create_auto_cdc_flow(
name=f"{silver_table_name}_apply_latest_snapshot_changes",
source=silver_latest_snapshot_changes_source_view_name,
target=silver_table_name,
keys=["pipeline_id", "table_name"],
sequence_by="updated_at",
ignore_null_updates=True,
)

@dlt.table(
name=TABLE_STATUS.name,
comment=TABLE_STATUS.table_comment,
cluster_by=["pipeline_id", "table_name"],
table_properties={"delta.enableRowTracking": "true"},
)
def table_status():
return spark.sql(f"""
SELECT s.*,
latest_pipeline_run_num_written_cdc_changes,
latest_pipeline_run_num_written_snapshot_changes
Expand All @@ -240,7 +265,7 @@ def table_status():
AND s.latest_pipeline_run_id = etm.pipeline_run_id
AND s.table_name = etm.table_name
""")


pipeline = CdcConnectorMonitoringEtlPipeline(conf, spark)
pipeline.register_base_tables_and_views(spark)
pipeline.register_base_tables_and_views(spark)
Loading