Skip to content

Add pytest OpenLineage fixture and required files to google system tests#62138

Open
Prab-27 wants to merge 1 commit intoapache:mainfrom
Prab-27:system-tests-fixture-path
Open

Add pytest OpenLineage fixture and required files to google system tests#62138
Prab-27 wants to merge 1 commit intoapache:mainfrom
Prab-27:system-tests-fixture-path

Conversation

@Prab-27
Copy link
Contributor

@Prab-27 Prab-27 commented Feb 18, 2026

Related slack discussion : here

Add OpenLineage fixture in google confest.py
Add files - providers/openlineage/tests/system/openlineage/operator.py and providers/openlineage/tests/system/openlineage/transport/variable.py in google provider


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@Prab-27 Prab-27 requested a review from shahar1 as a code owner February 18, 2026 17:00
@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Feb 18, 2026
@Prab-27
Copy link
Contributor Author

Prab-27 commented Feb 18, 2026

But I have 2 ques :
1 Not only this provider, but many providers use this type of system test import. I believe this is an acceptable solution, but it is not ideal to have the same files duplicated in multiple places just for a system test fixture.

Could we handle this type of import dependency inpyproject.toml?
Or any other idea ?

2 When I try to run this in breeze container it shows me - I tried a lot but I don't understand this

Home of the user: /root
Airflow home /root/airflow
Skipping initializing of the DB as it was initialized already.
You can re-initialize the database by adding --with-db-init flag when running tests.
-------------------------------------- Captured stderr setup --------------------------------------
2026-02-18T12:04:27.598659Z [warning  ] Skipping masking for a secret as it's too short (<5 chars) [airflow._shared.secrets_masker.secrets_masker]
2026-02-18T12:04:29.843045Z [warning  ] Couldn't find any OpenLineage transport configuration; will print events to console. [openlineage.client.client]
2026-02-18T12:04:29.843521Z [info     ] OpenLineageClient will use `console` transport [openlineage.client.client]
-------------------------------------- Captured stdout call ---------------------------------------
2026-02-18T12:04:29.915594Z [info     ] creating dag run               [airflow.serialization.definitions.dag] logical_date=DateTime(2026, 2, 18, 12, 4, 29, 899251, tzinfo=Timezone('UTC')) partition_key=None run_after=DateTime(2026, 2, 18, 12, 4, 29, 910741, tzinfo=Timezone('UTC')) run_id=manual__2026-02-18T12:04:29.910741+00:00
2026-02-18T12:04:29.932627Z [info     ] Created dag run.               [airflow.models.dagrun] dagrun=<DagRun example_oracle_to_gcs @ 2026-02-18 12:04:29.899251+00:00: manual__2026-02-18T12:04:29.910741+00:00, state:running, queued_at: None. run_type: manual>
2026-02-18T12:04:33.671669Z [info     ] Task started                   [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a3-37ee-7fec-9d1d-aa5594c1d54c hostname=2b0514c4dab4 previous_state=queued ti_id=019c70a3-346e-7a38-9e59-96facc942ad6
2026-02-18T12:04:33.675223Z [info     ] Task instance state updated    [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a3-37ee-7fec-9d1d-aa5594c1d54c rows_affected=1 ti_id=019c70a3-346e-7a38-9e59-96facc942ad6
2026-02-18T12:04:33.710586Z [info     ] Updating RenderedTaskInstanceFields [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a3-4327-7c16-95f6-94ed764d2eee field_count=6 ti_id=019c70a3-346e-7a38-9e59-96facc942ad6
2026-02-18T12:04:46.818938Z [info     ] Creating Bucket: bucket_example_oracle_to_gcs_0op; Location: US; Storage Class: MULTI_REGIONAL [airflow.task.hooks.airflow.providers.google.cloud.hooks.gcs.GCSHook]
2026-02-18T12:04:46.819230Z [info     ] Getting connection using `google.auth.default()` since no explicit credentials are provided. [airflow.providers.google.cloud.utils.credentials_provider._CredentialProvider]
2026-02-18T12:04:57.413797Z [error    ] Task failed with exception     [task]
Traceback (most recent call last):
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1203, in run
    result = _execute_task(context=context, ti=ti, log=log)
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1617, in _execute_task
    result = ctx.run(execute, context=context)
  File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 443, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/operators/gcs.py", line 154, in execute
    hook.create_bucket(
  File "/opt/airflow/providers/google/src/airflow/providers/google/common/hooks/base_google.py", line 550, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py", line 1129, in create_bucket
    client = self.get_conn()
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py", line 157, in get_conn
    credentials=self.get_credentials(), client_info=CLIENT_INFO, project=self.project_id
  File "/opt/airflow/providers/google/src/airflow/providers/google/common/hooks/base_google.py", line 356, in get_credentials
    credentials, _ = self.get_credentials_and_project_id()
  File "/opt/airflow/providers/google/src/airflow/providers/google/common/hooks/base_google.py", line 329, in get_credentials_and_project_id
    credentials, project_id = get_credentials_and_project_id(
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/utils/credentials_provider.py", line 438, in get_credentials_and_project_id
    return _CredentialProvider(*args, **kwargs).get_credentials_and_project()
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/utils/credentials_provider.py", line 277, in get_credentials_and_project
    credentials, project_id = self._get_credentials_using_adc()
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/utils/credentials_provider.py", line 419, in _get_credentials_using_adc
    credentials, project_id = google.auth.default(scopes=scopes)
  File "/usr/python/lib/python3.10/site-packages/google/auth/_default.py", line 752, in default
    raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
google.auth.exceptions.DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.
2026-02-18T12:05:10.875201Z [info     ] Task instance state updated    [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a3-d43a-7092-a1fd-12c95e2a2b5e new_state=failed rows_affected=1 ti_id=019c70a3-346e-7a38-9e59-96facc942ad6
2026-02-18T12:05:13.523004Z [info     ] Task started                   [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a3-d4f8-7963-a092-49a6b675ea09 hostname=2b0514c4dab4 previous_state=queued ti_id=019c70a3-3471-7bd9-b173-4c5dca8a719f
2026-02-18T12:05:13.523971Z [info     ] Task instance state updated    [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a3-d4f8-7963-a092-49a6b675ea09 rows_affected=1 ti_id=019c70a3-3471-7bd9-b173-4c5dca8a719f
2026-02-18T12:05:13.547989Z [info     ] Updating RenderedTaskInstanceFields [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a3-dec5-7e0f-a8f2-20d04dcbe08a field_count=3 ti_id=019c70a3-3471-7bd9-b173-4c5dca8a719f
2026-02-18T12:05:26.600466Z [error    ] Task failed with exception     [task]
Traceback (most recent call last):
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1203, in run
    result = _execute_task(context=context, ti=ti, log=log)
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1617, in _execute_task
    result = ctx.run(execute, context=context)
  File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 443, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/task-sdk/src/airflow/sdk/bases/decorator.py", line 299, in execute
    return_value = super().execute(context)
  File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 443, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 228, in execute
    return_value = self.execute_callable()
  File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 251, in execute_callable
    return runner.run(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_runner.py", line 97, in run
    return func(*args, **kwargs)
  File "/opt/airflow/devel-common/src/tests_common/test_utils/watcher.py", line 41, in watcher
    raise AirflowException("Failing task because one or more upstream tasks failed.")
airflow.sdk.exceptions.AirflowException: Failing task because one or more upstream tasks failed.
2026-02-18T12:05:39.654541Z [info     ] Task instance state updated    [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a4-44b2-7063-8dad-349dd5b38b52 new_state=failed rows_affected=1 ti_id=019c70a3-3471-7bd9-b173-4c5dca8a719f
2026-02-18T12:05:42.174265Z [info     ] Task started                   [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a4-454d-7625-9c8b-b86d0837017b hostname=2b0514c4dab4 previous_state=queued ti_id=019c70a3-3470-7a55-a0e2-0747894fd338
2026-02-18T12:05:42.175260Z [info     ] Task instance state updated    [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a4-454d-7625-9c8b-b86d0837017b rows_affected=1 ti_id=019c70a3-3470-7a55-a0e2-0747894fd338
2026-02-18T12:05:42.199044Z [info     ] Updating RenderedTaskInstanceFields [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a4-4eb0-761a-9f1d-1065d3786ea1 field_count=4 ti_id=019c70a3-3470-7a55-a0e2-0747894fd338
2026-02-18T12:05:53.645030Z [info     ] Getting connection using `google.auth.default()` since no explicit credentials are provided. [airflow.providers.google.cloud.utils.credentials_provider._CredentialProvider]
2026-02-18T12:06:05.537019Z [error    ] Task failed with exception     [task]
Traceback (most recent call last):
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1203, in run
    result = _execute_task(context=context, ti=ti, log=log)
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1617, in _execute_task
    result = ctx.run(execute, context=context)
  File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 443, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/operators/gcs.py", line 999, in execute
    hook.delete_bucket(bucket_name=self.bucket_name, force=self.force, user_project=self.user_project)
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py", line 743, in delete_bucket
    client = self.get_conn()
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py", line 157, in get_conn
    credentials=self.get_credentials(), client_info=CLIENT_INFO, project=self.project_id
  File "/opt/airflow/providers/google/src/airflow/providers/google/common/hooks/base_google.py", line 356, in get_credentials
    credentials, _ = self.get_credentials_and_project_id()
  File "/opt/airflow/providers/google/src/airflow/providers/google/common/hooks/base_google.py", line 329, in get_credentials_and_project_id
    credentials, project_id = get_credentials_and_project_id(
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/utils/credentials_provider.py", line 438, in get_credentials_and_project_id
    return _CredentialProvider(*args, **kwargs).get_credentials_and_project()
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/utils/credentials_provider.py", line 277, in get_credentials_and_project
    credentials, project_id = self._get_credentials_using_adc()
  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/utils/credentials_provider.py", line 419, in _get_credentials_using_adc
    credentials, project_id = google.auth.default(scopes=scopes)
  File "/usr/python/lib/python3.10/site-packages/google/auth/_default.py", line 752, in default
    raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
google.auth.exceptions.DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.
2026-02-18T12:06:18.597533Z [info     ] Task instance state updated    [airflow.api_fastapi.execution_api.routes.task_instances] correlation_id=019c70a4-dccb-7e9d-aa0d-0614dda91efb new_state=failed rows_affected=1 ti_id=019c70a3-3470-7a55-a0e2-0747894fd338
2026-02-18T12:06:18.632724Z [info     ] Marking run <DagRun example_oracle_to_gcs @ 2026-02-18 12:04:29.899251+00:00: manual__2026-02-18T12:04:29.910741+00:00, state:running, queued_at: None. run_type: manual> failed [airflow.models.dagrun.DagRun]
2026-02-18T12:06:18.639376Z [info     ] DagRun Finished: dag_id=example_oracle_to_gcs, logical_date=2026-02-18 12:04:29.899251+00:00, run_id=manual__2026-02-18T12:04:29.910741+00:00, run_start_date=2026-02-18 12:04:29.899251+00:00, run_end_date=2026-02-18 12:06:18.632893+00:00, run_duration=108.733642, state=failed, run_type=manual, data_interval_start=2026-02-18 12:04:29.899251+00:00, data_interval_end=2026-02-18 12:04:29.899251+00:00, [airflow.models.dagrun.DagRun]
-------------------------------------- Captured stderr call ---------------------------------------
2026-02-18T12:04:29.971464Z [info     ] [DAG TEST] starting task_id=create_bucket map_index=-1 [airflow.sdk.definitions.dag]
2026-02-18T12:04:29.972057Z [info     ] [DAG TEST] running task <TaskInstance: example_oracle_to_gcs.create_bucket manual__2026-02-18T12:04:29.910741+00:00 [scheduled]> [airflow.sdk.definitions.dag]
2026-02-18T12:04:33.753969Z [info     ] Stats instance was created in PID 86 but accessed in PID 101. Re-initializing. [airflow.sdk._shared.observability.metrics.stats]
2026-02-18T12:04:43.764039Z [warning  ] OpenLineage process with pid `101` expired and will be terminated by listener. This has no impact on actual task execution status. [airflow.providers.openlineage.plugins.listener]
2026-02-18T12:04:46.785977Z [warning  ] Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986. [airflow.sdk.definitions.connection]
2026-02-18T12:04:57.801344Z [info     ] Stats instance was created in PID 86 but accessed in PID 106. Re-initializing. [airflow.sdk._shared.observability.metrics.stats]
2026-02-18T12:05:07.825578Z [warning  ] OpenLineage process with pid `106` expired and will be terminated by listener. This has no impact on actual task execution status. [airflow.providers.openlineage.plugins.listener]
2026-02-18T12:05:10.911731Z [info     ] [DAG TEST] end task task_id=create_bucket map_index=-1 [airflow.sdk.definitions.dag]
2026-02-18T12:05:10.932416Z [info     ] [DAG TEST] starting task_id=watcher map_index=-1 [airflow.sdk.definitions.dag]
2026-02-18T12:05:10.932609Z [info     ] [DAG TEST] running task <TaskInstance: example_oracle_to_gcs.watcher manual__2026-02-18T12:04:29.910741+00:00 [scheduled]> [airflow.sdk.definitions.dag]
2026-02-18T12:05:13.578805Z [info     ] Stats instance was created in PID 86 but accessed in PID 112. Re-initializing. [airflow.sdk._shared.observability.metrics.stats]
2026-02-18T12:05:23.589031Z [warning  ] OpenLineage process with pid `112` expired and will be terminated by listener. This has no impact on actual task execution status. [airflow.providers.openlineage.plugins.listener]
2026-02-18T12:05:26.613162Z [info     ] Stats instance was created in PID 86 but accessed in PID 113. Re-initializing. [airflow.sdk._shared.observability.metrics.stats]
2026-02-18T12:05:36.621143Z [warning  ] OpenLineage process with pid `113` expired and will be terminated by listener. This has no impact on actual task execution status. [airflow.providers.openlineage.plugins.listener]
2026-02-18T12:05:39.677809Z [info     ] [DAG TEST] end task task_id=watcher map_index=-1 [airflow.sdk.definitions.dag]
2026-02-18T12:05:39.692325Z [info     ] [DAG TEST] starting task_id=delete_bucket map_index=-1 [airflow.sdk.definitions.dag]
2026-02-18T12:05:39.692546Z [info     ] [DAG TEST] running task <TaskInstance: example_oracle_to_gcs.delete_bucket manual__2026-02-18T12:04:29.910741+00:00 [scheduled]> [airflow.sdk.definitions.dag]
2026-02-18T12:05:42.226634Z [info     ] Stats instance was created in PID 86 but accessed in PID 119. Re-initializing. [airflow.sdk._shared.observability.metrics.stats]
2026-02-18T12:05:52.232068Z [warning  ] OpenLineage process with pid `119` expired and will be terminated by listener. This has no impact on actual task execution status. [airflow.providers.openlineage.plugins.listener]
2026-02-18T12:05:53.644541Z [warning  ] Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986. [airflow.sdk.definitions.connection]
2026-02-18T12:06:05.550598Z [info     ] Stats instance was created in PID 86 but accessed in PID 120. Re-initializing. [airflow.sdk._shared.observability.metrics.stats]
2026-02-18T12:06:15.563995Z [warning  ] OpenLineage process with pid `120` expired and will be terminated by listener. This has no impact on actual task execution status. [airflow.providers.openlineage.plugins.listener]
2026-02-18T12:06:18.623173Z [info     ] [DAG TEST] end task task_id=delete_bucket map_index=-1 [airflow.sdk.definitions.dag]
2026-02-18T12:06:18.634014Z [warning  ] OpenLineage received exception in method on_dag_run_failed [airflow.providers.openlineage.plugins.listener]
Traceback (most recent call last):
  File "/opt/airflow/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py", line 962, in on_dag_run_failed
    **get_airflow_dag_run_facet(dag_run),
  File "/opt/airflow/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py", line 921, in get_airflow_dag_run_facet
    dagRun=DagRunInfo(dag_run),
  File "/opt/airflow/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py", line 601, in __init__
    self._cast_fields()
  File "/opt/airflow/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py", line 628, in _cast_fields
    setattr(self, field, func(self.obj))
  File "/opt/airflow/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py", line 753, in <lambda>
    "dag_bundle_name": lambda dagrun: DagRunInfo.dag_version_info(dagrun, "bundle_name"),
  File "/opt/airflow/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py", line 770, in dag_version_info
    if not getattr(dagrun, "dag_versions", []):
  File "/opt/airflow/airflow-core/src/airflow/models/dagrun.py", line 401, in dag_versions
    for dv in dict.fromkeys(list(self._tih_dag_versions) + list(self._ti_dag_versions))
  File "/usr/python/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py", line 451, in __get__
    return inst.get(instance)
  File "/usr/python/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py", line 903, in get
    self.collection_class, proxy = self._new(
  File "/usr/python/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py", line 955, in _new
    collection_class = util.duck_type_collection(lazy_collection())
  File "/usr/python/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py", line 1330, in __call__
    return getattr(self.parent, self.target)  # type: ignore[no-any-return]
  File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 569, in __get__
    return self.impl.get(state, dict_)  # type: ignore[no-any-return]
  File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 1096, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 1131, in _fire_loader_callables
    return self.callable_(state, passive)
  File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 922, in _load_for_state
    raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <DagRun at 0x78dbcff06e00> is not bound to a Session; lazy load operation of attribute 'task_instances_histories' cannot proceed (Background on this error at: https://sqlalche.me/e/20/bhk3)

@Prab-27 Prab-27 changed the title add pytest openlineage fixture and related files in google system tests Add pytest OpenLineage fixture and required files to google system tests Feb 18, 2026
@kacpermuda
Copy link
Contributor

I think the OL system tests are broken now when run with breeze, so the google ones relying on OL are broken as well. The dags themselves are working fine, I think it has something to do with executing OL on forked process, I think when I changed that it started to work. Will have to look into it more, but have no bandwidth right now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments