diff --git a/src/sqlmesh_openlineage/console.py b/src/sqlmesh_openlineage/console.py index fe878c6..256608e 100644 --- a/src/sqlmesh_openlineage/console.py +++ b/src/sqlmesh_openlineage/console.py @@ -1,9 +1,12 @@ """OpenLineage Console wrapper for SQLMesh.""" from __future__ import annotations +import logging import uuid import typing as t +logger = logging.getLogger(__name__) + if t.TYPE_CHECKING: from sqlmesh.core.console import Console from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike @@ -72,10 +75,14 @@ def start_snapshot_evaluation_progress( # Store snapshot for later reference self._current_snapshots[snapshot.name] = snapshot - self._emitter.emit_snapshot_start( - snapshot=snapshot, - run_id=run_id, - ) + try: + self._emitter.emit_snapshot_start( + snapshot=snapshot, + run_id=run_id, + snapshots=self._current_snapshots, + ) + except Exception: + logger.warning("Failed to emit START event for %s", snapshot.name, exc_info=True) # Delegate to wrapped console self._wrapped.start_snapshot_evaluation_progress(snapshot, audit_only) @@ -96,20 +103,24 @@ def update_snapshot_evaluation_progress( run_id = self._active_runs.pop(snapshot.name, None) if run_id: - if num_audits_failed > 0: - self._emitter.emit_snapshot_fail( - snapshot=snapshot, - run_id=run_id, - error=f"Audit failed: {num_audits_failed} audit(s) failed", - ) - else: - self._emitter.emit_snapshot_complete( - snapshot=snapshot, - run_id=run_id, - interval=interval, - duration_ms=duration_ms, - execution_stats=execution_stats, - ) + try: + if num_audits_failed > 0: + self._emitter.emit_snapshot_fail( + snapshot=snapshot, + run_id=run_id, + error=f"Audit failed: {num_audits_failed} audit(s) failed", + ) + else: + self._emitter.emit_snapshot_complete( + snapshot=snapshot, + run_id=run_id, + interval=interval, + duration_ms=duration_ms, + execution_stats=execution_stats, + snapshots=self._current_snapshots, + ) + except Exception: + logger.warning("Failed to emit event for %s", snapshot.name, exc_info=True) # Delegate to wrapped console self._wrapped.update_snapshot_evaluation_progress( @@ -130,11 +141,14 @@ def stop_evaluation_progress(self, success: bool = True) -> None: for snapshot_name, run_id in list(self._active_runs.items()): snapshot = self._current_snapshots.get(snapshot_name) if snapshot and run_id: - self._emitter.emit_snapshot_fail( - snapshot=snapshot, - run_id=run_id, - error="Evaluation interrupted" if not success else "Unknown error", - ) + try: + self._emitter.emit_snapshot_fail( + snapshot=snapshot, + run_id=run_id, + error="Evaluation interrupted" if not success else "Unknown error", + ) + except Exception: + logger.warning("Failed to emit FAIL event for %s", snapshot_name, exc_info=True) # Clear tracking state self._active_runs.clear() diff --git a/src/sqlmesh_openlineage/datasets.py b/src/sqlmesh_openlineage/datasets.py index 71e0c5e..836b5ae 100644 --- a/src/sqlmesh_openlineage/datasets.py +++ b/src/sqlmesh_openlineage/datasets.py @@ -2,11 +2,9 @@ from __future__ import annotations import typing as t -from collections import defaultdict if t.TYPE_CHECKING: from sqlmesh.core.snapshot import Snapshot - from sqlmesh.core.model import Model from openlineage.client.event_v2 import InputDataset, OutputDataset @@ -90,19 +88,28 @@ def snapshot_to_column_lineage_facet( # Get column name source_col = exp.to_column(lineage_node.name).name + # Determine transformation type based on whether + # output column name matches source column name + is_identity = col_name == source_col + transformations = [ + column_lineage_dataset.Transformation( + type="DIRECT", + subtype="IDENTITY" if is_identity else "TRANSFORMATION", + ) + ] + input_fields.append( column_lineage_dataset.InputField( namespace=namespace, name=table_name, field=source_col, + transformations=transformations, ) ) if input_fields: fields[col_name] = column_lineage_dataset.Fields( inputFields=input_fields, - transformationType="", - transformationDescription="", ) except Exception: @@ -156,19 +163,28 @@ def snapshot_to_output_dataset( def snapshot_to_input_datasets( snapshot: "Snapshot", namespace: str, + snapshots: t.Optional[t.Dict[str, "Snapshot"]] = None, ) -> t.List["InputDataset"]: - """Get upstream dependencies as input datasets.""" + """Get upstream dependencies as input datasets. + + When a snapshots dict is provided, parent snapshots are looked up to + produce fully qualified table names consistent with output datasets. + """ from openlineage.client.event_v2 import InputDataset inputs: t.List["InputDataset"] = [] # Get parent snapshot IDs for parent_id in snapshot.parents: - # Parent ID contains the name we need + # Try to resolve fully qualified name via the snapshots dict + parent_name = parent_id.name + if snapshots and parent_name in snapshots: + parent_name = snapshot_to_table_name(snapshots[parent_name]) + inputs.append( InputDataset( namespace=namespace, - name=parent_id.name, + name=parent_name, ) ) diff --git a/src/sqlmesh_openlineage/emitter.py b/src/sqlmesh_openlineage/emitter.py index 17e477e..774c707 100644 --- a/src/sqlmesh_openlineage/emitter.py +++ b/src/sqlmesh_openlineage/emitter.py @@ -1,14 +1,19 @@ """OpenLineage event emitter for SQLMesh.""" from __future__ import annotations +import logging import typing as t from datetime import datetime, timezone +logger = logging.getLogger(__name__) + if t.TYPE_CHECKING: from sqlmesh.core.snapshot import Snapshot from sqlmesh.core.snapshot.definition import Interval from sqlmesh.core.snapshot.execution_tracker import QueryExecutionStats +PRODUCER = "https://github.com/sidequery/sqlmesh-openlineage" + class OpenLineageEmitter: """Emits OpenLineage events for SQLMesh operations.""" @@ -20,6 +25,7 @@ def __init__( api_key: t.Optional[str] = None, ): from openlineage.client import OpenLineageClient + from openlineage.client.client import OpenLineageClientOptions self.namespace = namespace self.url = url @@ -32,15 +38,82 @@ def __init__( elif api_key: self.client = OpenLineageClient( url=url, - options={"api_key": api_key}, + options=OpenLineageClientOptions(api_key=api_key), ) else: self.client = OpenLineageClient(url=url) + def _build_job_facets(self, snapshot: "Snapshot") -> t.Dict[str, t.Any]: + """Build job facets including SQL, job type, and source code location.""" + from openlineage.client.facet_v2 import job_type_job, sql_job, source_code_location_job + + facets: t.Dict[str, t.Any] = {} + + # JobTypeJobFacet: identify as SQLMesh batch job + facets["jobType"] = job_type_job.JobTypeJobFacet( + processingType="BATCH", + integration="SQLMESH", + jobType="MODEL", + ) + + # SQLJobFacet: include the model SQL query + try: + if snapshot.is_model and snapshot.model: + query = snapshot.model.query + if query is not None: + sql_str = str(query) + if sql_str: + facets["sql"] = sql_job.SQLJobFacet(query=sql_str) + except Exception: + pass + + # SourceCodeLocationJobFacet: include model file path + try: + if snapshot.is_model and snapshot.model: + model_path = getattr(snapshot.model, "_path", None) + if model_path is not None: + path_str = str(model_path) + if path_str: + facets["sourceCodeLocation"] = ( + source_code_location_job.SourceCodeLocationJobFacet( + type="file", + url=f"file://{path_str}", + ) + ) + except Exception: + pass + + return facets + + def _build_processing_engine_facet(self) -> t.Dict[str, t.Any]: + """Build run facets for processing engine info.""" + from openlineage.client.facet_v2 import processing_engine_run + + facets: t.Dict[str, t.Any] = {} + + try: + from sqlmesh import __version__ as sqlmesh_version + except ImportError: + sqlmesh_version = "unknown" + + try: + from sqlmesh_openlineage import __version__ as adapter_version + except ImportError: + adapter_version = "unknown" + + facets["processing_engine"] = processing_engine_run.ProcessingEngineRunFacet( + version=sqlmesh_version, + name="SQLMesh", + openlineageAdapterVersion=adapter_version, + ) + + return facets + def emit_snapshot_start( self, snapshot: "Snapshot", run_id: str, + snapshots: t.Optional[t.Dict[str, "Snapshot"]] = None, ) -> None: """Emit a START event for snapshot evaluation.""" from openlineage.client.event_v2 import RunEvent, RunState, Run, Job @@ -50,19 +123,25 @@ def emit_snapshot_start( snapshot_to_input_datasets, ) - inputs = snapshot_to_input_datasets(snapshot, self.namespace) + inputs = snapshot_to_input_datasets(snapshot, self.namespace, snapshots=snapshots) output = snapshot_to_output_dataset(snapshot, self.namespace) + job_facets = self._build_job_facets(snapshot) + run_facets = self._build_processing_engine_facet() + event = RunEvent( eventType=RunState.START, eventTime=datetime.now(timezone.utc).isoformat(), - run=Run(runId=run_id), - job=Job(namespace=self.namespace, name=snapshot.name), + run=Run(runId=run_id, facets=run_facets), + job=Job(namespace=self.namespace, name=snapshot.name, facets=job_facets), inputs=inputs, outputs=[output] if output else [], - producer="sqlmesh-openlineage", + producer=PRODUCER, ) - self.client.emit(event) + try: + self.client.emit(event) + except Exception: + logger.warning("Failed to emit %s event for %s", event.eventType, snapshot.name, exc_info=True) def emit_snapshot_complete( self, @@ -71,17 +150,22 @@ def emit_snapshot_complete( interval: t.Optional["Interval"] = None, duration_ms: t.Optional[int] = None, execution_stats: t.Optional["QueryExecutionStats"] = None, + snapshots: t.Optional[t.Dict[str, "Snapshot"]] = None, ) -> None: """Emit a COMPLETE event for snapshot evaluation.""" from openlineage.client.event_v2 import RunEvent, RunState, Run, Job - from sqlmesh_openlineage.datasets import snapshot_to_output_dataset + from sqlmesh_openlineage.datasets import ( + snapshot_to_output_dataset, + snapshot_to_input_datasets, + ) from sqlmesh_openlineage.facets import build_run_facets, build_output_facets run_facets = build_run_facets( duration_ms=duration_ms, execution_stats=execution_stats, ) + run_facets.update(self._build_processing_engine_facet()) output = snapshot_to_output_dataset( snapshot, @@ -89,15 +173,23 @@ def emit_snapshot_complete( facets=build_output_facets(execution_stats), ) + inputs = snapshot_to_input_datasets(snapshot, self.namespace, snapshots=snapshots) + + job_facets = self._build_job_facets(snapshot) + event = RunEvent( eventType=RunState.COMPLETE, eventTime=datetime.now(timezone.utc).isoformat(), run=Run(runId=run_id, facets=run_facets), - job=Job(namespace=self.namespace, name=snapshot.name), + job=Job(namespace=self.namespace, name=snapshot.name, facets=job_facets), + inputs=inputs, outputs=[output] if output else [], - producer="sqlmesh-openlineage", + producer=PRODUCER, ) - self.client.emit(event) + try: + self.client.emit(event) + except Exception: + logger.warning("Failed to emit %s event for %s", event.eventType, snapshot.name, exc_info=True) def emit_snapshot_fail( self, @@ -124,6 +216,9 @@ def emit_snapshot_fail( }, ), job=Job(namespace=self.namespace, name=snapshot.name), - producer="sqlmesh-openlineage", + producer=PRODUCER, ) - self.client.emit(event) + try: + self.client.emit(event) + except Exception: + logger.warning("Failed to emit %s event for %s", event.eventType, snapshot.name, exc_info=True) diff --git a/src/sqlmesh_openlineage/facets.py b/src/sqlmesh_openlineage/facets.py index 34eb1c2..408e3ef 100644 --- a/src/sqlmesh_openlineage/facets.py +++ b/src/sqlmesh_openlineage/facets.py @@ -17,8 +17,8 @@ def build_run_facets( # Add custom SQLMesh facet with execution info if duration_ms is not None or execution_stats is not None: sqlmesh_facet = { - "_producer": "sqlmesh-openlineage", - "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLMeshExecutionFacet.json", + "_producer": "https://github.com/sidequery/sqlmesh-openlineage", + "_schemaURL": "https://github.com/sidequery/sqlmesh-openlineage#SQLMeshExecutionFacet", } if duration_ms is not None: diff --git a/src/sqlmesh_openlineage/install.py b/src/sqlmesh_openlineage/install.py index e6d818d..299fb13 100644 --- a/src/sqlmesh_openlineage/install.py +++ b/src/sqlmesh_openlineage/install.py @@ -2,9 +2,11 @@ from __future__ import annotations import os +import threading import typing as t _installed = False +_install_lock = threading.Lock() def install( @@ -37,37 +39,38 @@ def install( """ global _installed - if _installed: - return + with _install_lock: + if _installed: + return - from sqlmesh.core.console import set_console, create_console - from sqlmesh_openlineage.console import OpenLineageConsole + from sqlmesh.core.console import set_console, create_console + from sqlmesh_openlineage.console import OpenLineageConsole - # Resolve config from args or env vars - resolved_url = url or os.environ.get("OPENLINEAGE_URL") - resolved_namespace = namespace or os.environ.get("OPENLINEAGE_NAMESPACE", "sqlmesh") - resolved_api_key = api_key or os.environ.get("OPENLINEAGE_API_KEY") + # Resolve config from args or env vars + resolved_url = url if url is not None else os.environ.get("OPENLINEAGE_URL") + resolved_namespace = namespace if namespace is not None else os.environ.get("OPENLINEAGE_NAMESPACE", "sqlmesh") + resolved_api_key = api_key if api_key is not None else os.environ.get("OPENLINEAGE_API_KEY") - if not resolved_url: - raise ValueError( - "OpenLineage URL required. Pass url= or set OPENLINEAGE_URL env var." - ) + if not resolved_url: + raise ValueError( + "OpenLineage URL required. Pass url= or set OPENLINEAGE_URL env var." + ) - # Create the default console for the current environment - default_console = create_console() + # Create the default console for the current environment + default_console = create_console() - # Wrap it with OpenLineage emission - ol_console = OpenLineageConsole( - wrapped=default_console, - url=resolved_url, - namespace=resolved_namespace, - api_key=resolved_api_key, - ) + # Wrap it with OpenLineage emission + ol_console = OpenLineageConsole( + wrapped=default_console, + url=resolved_url, + namespace=resolved_namespace, + api_key=resolved_api_key, + ) - # Set as global console - SQLMesh's CLI will use this - set_console(ol_console) + # Set as global console - SQLMesh's CLI will use this + set_console(ol_console) - _installed = True + _installed = True def is_installed() -> bool: diff --git a/tests/test_console.py b/tests/test_console.py index 482b313..9da2d43 100644 --- a/tests/test_console.py +++ b/tests/test_console.py @@ -119,3 +119,211 @@ def test_update_snapshot_emits_fail_on_audit_failure( mock_emit.assert_called_once() assert "audit" in mock_emit.call_args[1]["error"].lower() + + def test_start_passes_snapshots_dict_to_emitter( + self, mock_console, mock_snapshot, mock_openlineage_client + ): + """Test that start_snapshot_evaluation_progress passes current_snapshots to emitter.""" + from sqlmesh_openlineage.console import OpenLineageConsole + + console = OpenLineageConsole( + wrapped=mock_console, + url="http://localhost:5000", + namespace="test", + ) + + with patch.object(console._emitter, "emit_snapshot_start") as mock_emit: + console.start_snapshot_evaluation_progress(mock_snapshot, audit_only=False) + + mock_emit.assert_called_once() + call_kwargs = mock_emit.call_args[1] + assert "snapshots" in call_kwargs + assert call_kwargs["snapshots"] is console._current_snapshots + + def test_complete_passes_snapshots_dict_to_emitter( + self, mock_console, mock_snapshot, mock_openlineage_client + ): + """Test that update_snapshot_evaluation_progress passes current_snapshots to emitter.""" + from sqlmesh_openlineage.console import OpenLineageConsole + + console = OpenLineageConsole( + wrapped=mock_console, + url="http://localhost:5000", + namespace="test", + ) + + console._active_runs[mock_snapshot.name] = "test-run-id" + + interval = MagicMock() + + with patch.object(console._emitter, "emit_snapshot_complete") as mock_emit: + console.update_snapshot_evaluation_progress( + snapshot=mock_snapshot, + interval=interval, + batch_idx=0, + duration_ms=1000, + num_audits_passed=1, + num_audits_failed=0, + ) + + mock_emit.assert_called_once() + call_kwargs = mock_emit.call_args[1] + assert "snapshots" in call_kwargs + assert call_kwargs["snapshots"] is console._current_snapshots + + + def test_emit_failure_does_not_crash_console( + self, mock_console, mock_snapshot, mock_openlineage_client + ): + """Transport errors should not propagate to SQLMesh.""" + from sqlmesh_openlineage.console import OpenLineageConsole + + console = OpenLineageConsole( + wrapped=mock_console, + url="http://localhost:5000", + namespace="test", + ) + + console._emitter.emit_snapshot_start = MagicMock( + side_effect=ConnectionError("unreachable") + ) + + # Should not raise + console.start_snapshot_evaluation_progress(mock_snapshot, audit_only=False) + + # Delegation should still happen + mock_console.start_snapshot_evaluation_progress.assert_called_once_with( + mock_snapshot, False + ) + + def test_emit_complete_failure_does_not_crash_console( + self, mock_console, mock_snapshot, mock_openlineage_client + ): + """Transport errors on complete should not propagate.""" + from sqlmesh_openlineage.console import OpenLineageConsole + + console = OpenLineageConsole( + wrapped=mock_console, + url="http://localhost:5000", + namespace="test", + ) + + console._active_runs[mock_snapshot.name] = "test-run-id" + console._emitter.emit_snapshot_complete = MagicMock( + side_effect=ConnectionError("unreachable") + ) + + interval = MagicMock() + + # Should not raise + console.update_snapshot_evaluation_progress( + snapshot=mock_snapshot, + interval=interval, + batch_idx=0, + duration_ms=1000, + num_audits_passed=1, + num_audits_failed=0, + ) + + # Delegation should still happen + mock_console.update_snapshot_evaluation_progress.assert_called_once() + + +class TestEmitterFacets: + """Tests for new emitter facets (job type, SQL, processing engine, source code).""" + + def test_build_job_facets_includes_job_type(self, mock_snapshot, mock_openlineage_client): + """Test that _build_job_facets includes JobTypeJobFacet.""" + from sqlmesh_openlineage.emitter import OpenLineageEmitter + + emitter = OpenLineageEmitter(url="http://localhost:5000", namespace="test") + facets = emitter._build_job_facets(mock_snapshot) + + assert "jobType" in facets + assert facets["jobType"].processingType == "BATCH" + assert facets["jobType"].integration == "SQLMESH" + assert facets["jobType"].jobType == "MODEL" + + def test_build_job_facets_includes_sql(self, mock_snapshot, mock_openlineage_client): + """Test that _build_job_facets includes SQLJobFacet when query is available.""" + from sqlmesh_openlineage.emitter import OpenLineageEmitter + + mock_snapshot.model.query = MagicMock() + mock_snapshot.model.query.__str__ = lambda self: "SELECT id, name FROM source" + + emitter = OpenLineageEmitter(url="http://localhost:5000", namespace="test") + facets = emitter._build_job_facets(mock_snapshot) + + assert "sql" in facets + assert facets["sql"].query == "SELECT id, name FROM source" + + def test_build_job_facets_no_sql_when_no_query(self, mock_snapshot, mock_openlineage_client): + """Test that SQL facet is omitted when model has no query.""" + from sqlmesh_openlineage.emitter import OpenLineageEmitter + + mock_snapshot.model.query = None + + emitter = OpenLineageEmitter(url="http://localhost:5000", namespace="test") + facets = emitter._build_job_facets(mock_snapshot) + + assert "sql" not in facets + + def test_build_job_facets_source_code_location( + self, mock_snapshot, mock_openlineage_client + ): + """Test that _build_job_facets includes SourceCodeLocationJobFacet.""" + from sqlmesh_openlineage.emitter import OpenLineageEmitter + + mock_snapshot.model._path = "/path/to/models/test_model.sql" + + emitter = OpenLineageEmitter(url="http://localhost:5000", namespace="test") + facets = emitter._build_job_facets(mock_snapshot) + + assert "sourceCodeLocation" in facets + assert facets["sourceCodeLocation"].type == "file" + assert facets["sourceCodeLocation"].url == "file:///path/to/models/test_model.sql" + + def test_build_job_facets_no_source_code_when_no_path( + self, mock_snapshot, mock_openlineage_client + ): + """Test that source code location is omitted when model has no _path.""" + from sqlmesh_openlineage.emitter import OpenLineageEmitter + + # Simulate missing _path by raising AttributeError + del mock_snapshot.model._path + + emitter = OpenLineageEmitter(url="http://localhost:5000", namespace="test") + facets = emitter._build_job_facets(mock_snapshot) + + assert "sourceCodeLocation" not in facets + + def test_build_processing_engine_facet(self, mock_openlineage_client): + """Test that _build_processing_engine_facet returns correct facet.""" + from sqlmesh_openlineage.emitter import OpenLineageEmitter + + emitter = OpenLineageEmitter(url="http://localhost:5000", namespace="test") + facets = emitter._build_processing_engine_facet() + + assert "processing_engine" in facets + pe = facets["processing_engine"] + assert pe.name == "SQLMesh" + assert pe.version # should be non-empty + assert pe.openlineageAdapterVersion == "0.1.0" + + def test_emitter_with_api_key_uses_client_options(self): + """Verify api_key creates OpenLineageClientOptions, not dict.""" + from sqlmesh_openlineage.emitter import OpenLineageEmitter + from openlineage.client.client import OpenLineageClientOptions + + with patch("openlineage.client.OpenLineageClient") as mock_cls: + mock_cls.return_value = MagicMock() + emitter = OpenLineageEmitter(url="http://test:5000", api_key="my-key") + call_kwargs = mock_cls.call_args[1] + assert isinstance(call_kwargs["options"], OpenLineageClientOptions) + assert call_kwargs["options"].api_key == "my-key" + + def test_producer_uri(self, mock_openlineage_client): + """Test that PRODUCER constant is a proper URI.""" + from sqlmesh_openlineage.emitter import PRODUCER + + assert PRODUCER == "https://github.com/sidequery/sqlmesh-openlineage" diff --git a/tests/test_datasets.py b/tests/test_datasets.py index f66bb9c..e4f6b26 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -72,3 +72,49 @@ def test_snapshot_to_input_datasets_empty(self, mock_snapshot): datasets = snapshot_to_input_datasets(mock_snapshot, namespace="test") assert len(datasets) == 0 + + def test_snapshot_to_input_datasets_with_snapshots_dict(self, mock_snapshot): + """Test input datasets use qualified names when snapshots dict is provided.""" + from sqlmesh_openlineage.datasets import snapshot_to_input_datasets + + # Create parent snapshot with qualified view name + parent_snapshot = MagicMock() + parent_snapshot.name = "parent_model" + parent_qvn = MagicMock() + parent_qvn.catalog = "catalog" + parent_qvn.schema_name = "schema" + parent_qvn.table = "parent_model" + parent_snapshot.qualified_view_name = parent_qvn + + # Add parent ID to snapshot + parent_id = MagicMock() + parent_id.name = "parent_model" + mock_snapshot.parents = [parent_id] + + snapshots = {"parent_model": parent_snapshot} + + datasets = snapshot_to_input_datasets( + mock_snapshot, namespace="test", snapshots=snapshots + ) + + assert len(datasets) == 1 + assert datasets[0].name == "catalog.schema.parent_model" + assert datasets[0].namespace == "test" + + def test_snapshot_to_input_datasets_fallback_without_snapshot(self, mock_snapshot): + """Test input datasets fall back to parent_id.name when snapshot not in dict.""" + from sqlmesh_openlineage.datasets import snapshot_to_input_datasets + + parent_id = MagicMock() + parent_id.name = "unknown_parent" + mock_snapshot.parents = [parent_id] + + # Provide a snapshots dict that does not contain this parent + snapshots = {} + + datasets = snapshot_to_input_datasets( + mock_snapshot, namespace="test", snapshots=snapshots + ) + + assert len(datasets) == 1 + assert datasets[0].name == "unknown_parent" diff --git a/tests/test_integration.py b/tests/test_integration.py index e35d08f..f2d422d 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -168,6 +168,47 @@ def capture_emit(event): assert processed_complete.run.facets is not None assert "sqlmesh_execution" in processed_complete.run.facets + # Verify producer URI on all events + for event in captured_events: + assert event.producer == "https://github.com/sidequery/sqlmesh-openlineage" + + # Verify JobTypeJobFacet on START events + for start_event in start_events: + assert start_event.job.facets is not None + assert "jobType" in start_event.job.facets + jt = start_event.job.facets["jobType"] + assert jt.processingType == "BATCH" + assert jt.integration == "SQLMESH" + assert jt.jobType == "MODEL" + + # Verify SQLJobFacet on START events + for start_event in start_events: + assert "sql" in start_event.job.facets + assert start_event.job.facets["sql"].query # non-empty SQL + + # Verify ProcessingEngineRunFacet on START events + for start_event in start_events: + assert start_event.run.facets is not None + assert "processing_engine" in start_event.run.facets + pe = start_event.run.facets["processing_engine"] + assert pe.name == "SQLMesh" + assert pe.version # non-empty version + assert pe.openlineageAdapterVersion == "0.1.0" + + # Verify COMPLETE events also have inputs (improvement #6) + assert len(processed_complete.inputs) == 1, "COMPLETE should also have inputs" + assert "source_data" in processed_complete.inputs[0].name + + # Verify COMPLETE events have job facets too + for complete_event in complete_events: + assert complete_event.job.facets is not None + assert "jobType" in complete_event.job.facets + assert "sql" in complete_event.job.facets + + # Verify ProcessingEngineRunFacet on COMPLETE events + for complete_event in complete_events: + assert "processing_engine" in complete_event.run.facets + def test_audit_failure_emits_fail_event(self, temp_project, captured_events): """Test that audit failures emit FAIL events.""" # Create a model with a failing audit