From b310a928d2fa70564177e77259a0db0d7b4d8743 Mon Sep 17 00:00:00 2001 From: pratiksha badheka Date: Wed, 18 Feb 2026 08:45:05 -0800 Subject: [PATCH] add pytest openlineage fixture and related files in google system tests --- .../google/tests/system/google/conftest.py | 11 +- .../google/openlineage_tests_operator.py | 248 ++++++++++++++++++ .../google/openlineage_tests_variable.py | 60 +++++ 3 files changed, 318 insertions(+), 1 deletion(-) create mode 100644 providers/google/tests/system/google/openlineage_tests_operator.py create mode 100644 providers/google/tests/system/google/openlineage_tests_variable.py diff --git a/providers/google/tests/system/google/conftest.py b/providers/google/tests/system/google/conftest.py index 1c377b7a34880..debdf2392452e 100644 --- a/providers/google/tests/system/google/conftest.py +++ b/providers/google/tests/system/google/conftest.py @@ -17,8 +17,9 @@ from __future__ import annotations import pytest +from openlineage_tests_variable import VariableTransport -from system.openlineage.conftest import set_transport_variable # noqa: F401 +from airflow.providers.openlineage.plugins.listener import OpenLineageListener REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",) @@ -26,3 +27,11 @@ @pytest.fixture def provider_env_vars(): return REQUIRED_ENV_VARS + + +@pytest.fixture(autouse=True) +def set_transport_variable(listener_manager): + listener = OpenLineageListener() + listener.adapter._client = listener.adapter.get_or_create_openlineage_client() + listener.adapter._client.transport = VariableTransport({}) + listener_manager(listener) diff --git a/providers/google/tests/system/google/openlineage_tests_operator.py b/providers/google/tests/system/google/openlineage_tests_operator.py new file mode 100644 index 0000000000000..0bc8aa7c003b5 --- /dev/null +++ b/providers/google/tests/system/google/openlineage_tests_operator.py @@ -0,0 +1,248 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import json +import logging +import os +import re +import time +import uuid +from typing import TYPE_CHECKING, Any +from urllib.parse import urlparse + +from dateutil.parser import parse +from jinja2 import Environment + +from airflow.providers.common.compat.sdk import BaseOperator, Variable + +if TYPE_CHECKING: + from airflow.providers.common.compat.sdk import Context + +log = logging.getLogger(__name__) + + +def any(result: Any) -> Any: + return result + + +def is_datetime(result: Any) -> str: + try: + parse(result) + return "true" + except Exception: + pass + return "false" + + +def is_uuid(result: Any) -> str: + try: + uuid.UUID(result) + return "true" + except Exception: + pass + return "false" + + +def regex_match(result: Any, pattern: str) -> str: + try: + if re.match(pattern=pattern, string=result): + return "true" + except Exception: + pass + return "false" + + +def env_var(var: str, default: str | None = None) -> str: + """ + Use this jinja method to access the environment variable named 'var'. + + If there is no such environment variable set, return the default. + If the default is None, raise an exception for an undefined variable. + """ + if var in os.environ: + return os.environ[var] + if default is not None: + return default + raise ValueError(f"Env var required but not provided: '{var}'") + + +def not_match(result: str, pattern: str) -> str: + if pattern in result: + raise ValueError(f"Found {pattern} in {result}") + return "true" + + +def url_scheme_authority(url: str) -> str: + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + + +def url_path(url: str) -> str: + return urlparse(url).path + + +def setup_jinja() -> Environment: + env = Environment() + env.globals["any"] = any + env.globals["is_datetime"] = is_datetime + env.globals["is_uuid"] = is_uuid + env.globals["regex_match"] = regex_match + env.globals["env_var"] = env_var + env.globals["not_match"] = not_match + env.filters["url_scheme_authority"] = url_scheme_authority + env.filters["url_path"] = url_path + return env + + +def match(expected, result, env: Environment) -> bool: + """ + Check if result is "equal" to expected value. + + Omits keys not specified in expected value and resolves any jinja templates found. + """ + if isinstance(expected, dict): + # Take a look only at keys present at expected dictionary + if not isinstance(result, dict): + log.error("Not a dict: %s\nExpected %s", result, expected) + return False + for k, v in expected.items(): + if k not in result: + log.error("Key %s not in received event %s\nExpected %s", k, result, expected) + return False + if not match(v, result[k], env): + log.error( + "For key %s, expected value %s not equals received %s\nExpected: %s, request: %s", + k, + v, + result[k], + expected, + result, + ) + return False + elif isinstance(expected, list): + if len(expected) != len(result): + log.error("Length does not match: expected %d, result: %d", len(expected), len(result)) + return False + for i, x in enumerate(expected): + if not match(x, result[i], env): + log.error( + "List not matched at %d\nexpected:\n%s\nresult: \n%s", + i, + json.dumps(x), + json.dumps(result[i]), + ) + return False + elif isinstance(expected, str): + if "{{" in expected: + # Evaluate jinja: in some cases, we want to check only if key exists, or if + # value has the right type + try: + rendered = env.from_string(expected).render(result=result) + except ValueError as e: + log.error("Error rendering jinja template %s: %s", expected, e) + return False + if str(rendered).lower() == "true" or rendered == result: + return True + log.error("Rendered value %s does not equal 'true' or %s", rendered, result) + return False + if expected != result: + log.error("Expected value %s does not equal result %s", expected, result) + return False + elif expected != result: + log.error("Object of type %s: %s does not match %s", type(expected), expected, result) + return False + return True + + +class OpenLineageTestOperator(BaseOperator): + """ + This operator is added for system testing purposes. + + It compares expected event templates set on initialization with ones emitted by OpenLineage integration + and stored in Variables by VariableTransport. + + Note: + If `clear_variables` is True, only the Airflow Variables listed in `event_templates` + (or derived from `file_path`) will be deleted - those that are supposed to be checked by the Operator. + We won't remove all Airflow Variables to avoid interfering with other instances of this Operator + running in parallel on different DAGs. Running continuous system tests without clearing Variables + may lead to leftover or growing Variables size. We recommend implementing a process to remove all + Airflow Variables after all system tests have run to ensure a clean environment for each test run. + + :param event_templates: dictionary where key is the key used by VariableTransport in format of ..event., and value is event template (fragment) that need to be in received events. + :param file_path: alternatively, file_path pointing to file with event templates will be used + :param env: jinja environment used to render event templates + :param allow_duplicate_events_regex: regex pattern; keys matching it are allowed to have multiple events. + :param clear_variables: if set to True, clears only variables to be checked after all events are checked or if any check fails + :raises: ValueError if the received events do not match with expected ones. + """ + + def __init__( + self, + event_templates: dict[str, dict] | None = None, + file_path: str | None = None, + env: Environment = setup_jinja(), + allow_duplicate_events_regex: str | None = None, + clear_variables: bool = True, + **kwargs, + ): + super().__init__(**kwargs) + self.event_templates = event_templates + self.file_path = file_path + self.env = env + self.allow_duplicate_events_regex = allow_duplicate_events_regex + self.clear_variables = clear_variables + if self.event_templates and self.file_path: + raise ValueError("Can't pass both event_templates and file_path") + + def execute(self, context: Context) -> None: + time.sleep(10) # Wait for all variables to update properly + if self.file_path is not None: + self.event_templates = {} + self.log.info("Reading OpenLineage event templates from file `%s`", self.file_path) + with open(self.file_path) as f: + events = json.load(f) + for event in events: + # Just a single event per job and event type is loaded as this is the most common scenario + key = event["job"]["name"] + ".event." + event["eventType"].lower() + self.event_templates[key] = event + try: + for key, template in self.event_templates.items(): # type: ignore[union-attr] + log.info("Checking key: `%s`", key) + actual_events = Variable.get(key=key, deserialize_json=True) + self.log.info( + "Events: len=`%s`, type=`%s`, value=%s", + len(actual_events), + type(actual_events), + actual_events, + ) + if len(actual_events) == 0: + raise ValueError(f"No event for key {key}") + if len(actual_events) != 1: + regex = self.allow_duplicate_events_regex + if regex is None or not re.fullmatch(regex, key): + raise ValueError(f"Expected one event for key {key}, got {len(actual_events)}") + # Last event is checked against the template, this will allow to f.e. check change in try_num + if not match(template, json.loads(actual_events[-1]), self.env): + raise ValueError("Event received does not match one specified in test") + finally: + if self.clear_variables: + for key in self.event_templates: # type: ignore[union-attr] + log.info("Removing variable `%s`", key) + Variable.delete(key=key) diff --git a/providers/google/tests/system/google/openlineage_tests_variable.py b/providers/google/tests/system/google/openlineage_tests_variable.py new file mode 100644 index 0000000000000..57b4bbbd4e8da --- /dev/null +++ b/providers/google/tests/system/google/openlineage_tests_variable.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from openlineage.client.serde import Serde +from openlineage.client.transport import Config, Transport, get_default_factory + +# We use `airflow.models.variable.Variable` instead of `airflow.sdk.Variable`, even for Airflow 3, as it also +# works on scheduler. It may emit DeprecationWarning on workers, but it's needed for DAG events to be emitted. +from airflow.models.variable import Variable + +if TYPE_CHECKING: + from openlineage.client.client import Event + +log = logging.getLogger(__name__) + + +class VariableTransport(Transport): + """ + This transport sends OpenLineage events to Variables. + + Key schema is ..event.. + It's made to be used in system tests, stored data read by OpenLineageTestOperator. + """ + + kind = "variable" + + def __init__(self, config: Config) -> None: + log.debug("Constructing OpenLineage transport that will send events to Airflow Variables.") + + def emit(self, event: Event) -> None: + key = f"{event.job.name}.event.{event.eventType.value.lower()}" # type: ignore[union-attr] + event_str = Serde.to_json(event) + if (var := Variable.get(key=key, default_var=None, deserialize_json=True)) is not None: + log.debug("Appending OL event to Airflow Variable `%s`", key) + Variable.set(key=key, value=var + [event_str], serialize_json=True) + else: + log.debug("Emitting OL event to new Airflow Variable `%s`", key) + Variable.set(key=key, value=[event_str], serialize_json=True) + + +get_default_factory().register_transport(VariableTransport.kind, VariableTransport)