Skip to content

Fix in-process Execution API secrets routing in client contexts#65587

Open
henry3260 wants to merge 3 commits into
apache:mainfrom
henry3260:fix-infinite-waiting-get
Open

Fix in-process Execution API secrets routing in client contexts#65587
henry3260 wants to merge 3 commits into
apache:mainfrom
henry3260:fix-infinite-waiting-get

Conversation

@henry3260

@henry3260 henry3260 commented Apr 21, 2026

Copy link
Copy Markdown
Contributor

Why

Execution API requests can run inside in-process paths where SUPERVISOR_COMMS is present, which may incorrectly classify server-side code as client-side Task SDK execution.
When thathappens, Variable or Connectionreads can route back into Task SDK paths instead of staying server-side, creating a recursive chain and potential hangs.
The previous context signal was primarily environment-based, which is not always safe for request-scoped behavior in threaded in-process execution.

Because of that, Variable and Connection operations triggered while serving in-process Execution API
requests could still be routed back through Task SDK APIs instead of using the server-side secrets/backend
path, leading to recursive self-calls and hanging lookups.
closes: #65482

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:task-sdk labels Apr 21, 2026
@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch from 6758f3a to c8ece7f Compare April 21, 2026 07:22
@henry3260 henry3260 changed the title fix infinite loop for Variable.get Fix in-process Execution API secrets routing in client contexts Apr 21, 2026
@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch from c8ece7f to 6dc8178 Compare April 21, 2026 14:14
@henry3260 henry3260 marked this pull request as ready for review April 21, 2026 16:59
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Apr 23, 2026
@henry3260

Copy link
Copy Markdown
Contributor Author

Could someone please take a look at this PR when they have a chance? Thanks!

@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch from d68c027 to 73528bb Compare April 27, 2026 16:05
@jason810496 jason810496 self-requested a review April 29, 2026 15:28
@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch 4 times, most recently from ca97e7f to 5fc80b5 Compare May 7, 2026 02:57

@jason810496 jason810496 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the fix.

Additionally, could we add test for the scenario mentioned in the issue (#65482) to make sure the expected behavior?

Thanks.

Comment thread airflow-core/src/airflow/process_context.py
Comment thread airflow-core/src/airflow/models/variable.py Outdated
@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch 4 times, most recently from b21962b to 0a6ed36 Compare May 7, 2026 18:53
@henry3260 henry3260 requested a review from jason810496 May 8, 2026 03:22

@jason810496 jason810496 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please correct me if my understanding is wrong.

Another direction: How about adding lifespan to ensure SUPERVISOR_COMMS will be initialized for in-process Execution API?

Additionally, it would be nice to add airflow-e2e test for airflow dag test command with common scenarios as follow-up.

Comment thread airflow-core/src/airflow/process_context.py
Comment thread airflow-core/src/airflow/process_context.py
Comment thread airflow-core/src/airflow/models/variable.py
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/app.py
@henry3260

Copy link
Copy Markdown
Contributor Author

How about adding lifespan to ensure SUPERVISOR_COMMS will be initialized for in-process Execution API?

I think lifespan is the wrong scope for this. It initializes app-scoped state once at startup, but the issue here is request-scoped
we need to isolate an in-process Execution API request from the outer client context. Initializing SUPERVISOR_COMMS in lifespan would make that client marker app-wide and could leak it into server-side request handling wdyt?

@jason810496 jason810496 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding lifespan to ensure SUPERVISOR_COMMS will be initialized for in-process Execution API?

I think lifespan is the wrong scope for this. It initializes app-scoped state once at startup, but the issue here is request-scoped we need to isolate an in-process Execution API request from the outer client context. Initializing SUPERVISOR_COMMS in lifespan would make that client marker app-wide and could leak it into server-side request handling wdyt?

For dag.test() command, it will use InProcessTestSupervisor under the hook, which means all the request will be processed in process. I'm not sure will there be race condition for override_process_context (when the _PROCESS_CONTEXT_OVERRIDE.reset teardown).

IMO, it's no harm to set the lifespan for InProcessTestSupervisor only app (override the app instance in _api_client factory method)

@henry3260

Copy link
Copy Markdown
Contributor Author

How about adding lifespan to ensure SUPERVISOR_COMMS will be initialized for in-process Execution API?

I think lifespan is the wrong scope for this. It initializes app-scoped state once at startup, but the issue here is request-scoped we need to isolate an in-process Execution API request from the outer client context. Initializing SUPERVISOR_COMMS in lifespan would make that client marker app-wide and could leak it into server-side request handling wdyt?

For dag.test() command, it will use InProcessTestSupervisor under the hook, which means all the request will be processed in process. I'm not sure will there be race condition for override_process_context (when the _PROCESS_CONTEXT_OVERRIDE.reset teardown).

IMO, it's no harm to set the lifespan for InProcessTestSupervisor only app (override the app instance in _api_client factory method)

I understand your concern! Since ContextVar is independent per thread task, there shouldn't be any race conditions during the reset. That being said, I completely agree with your idea to minimize the influence area. I will update the PR to apply the override specifically within InProcessTestSupervisor.

@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch from 0a6ed36 to 7bf1b8e Compare May 17, 2026 17:47
@henry3260 henry3260 requested a review from jason810496 May 18, 2026 04:21
@potiuk

potiuk commented May 18, 2026

Copy link
Copy Markdown
Member

@henry3260 — There are 3 unresolved review threads on this PR from @jason810496. Could you either push a fix or reply in each thread explaining why the feedback doesn't apply? Once you believe the feedback is addressed, mark the thread as resolved so the reviewer isn't re-pinged needlessly. Thanks!


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch 2 times, most recently from 8a56c4b to 321df41 Compare May 19, 2026 09:52
@potiuk potiuk removed the ready for maintainer review Set after triaging when all criteria pass. label May 24, 2026
@Vamsi-klu

Vamsi-klu commented May 30, 2026

Copy link
Copy Markdown
Contributor

This early return fixes the hang by avoiding SUPERVISOR_COMMS, but it also drops the mask before it reaches the process that actually redacts virtualenv logs. The virtualenv bootstrap sets PYTHON_OPERATORS_VIRTUAL_ENV_MODE and then calls reinit_supervisor_comms(); that configures logging with sending_to_supervisor=True, which disables the child-side mask_logs processor. The MaskSecret comms model notes that this message is needed because redaction happens in the parent process.

With this return, Variable.get() in the virtualenv child still calls mask_secret() for values from env/secrets backends, but the value is only registered in the child-local masker. If the callable or a library logs that value, the JSON log is forwarded unredacted and the supervisor does not know to redact it.

Can we avoid the deadlock without suppressing mask propagation for the reinitialized virtualenv channel? For example, skip only when supervisor comms are unavailable/stale, or make the send bounded, but continue sending MaskSecret once reinit_supervisor_comms() has installed the child comms.

@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch 2 times, most recently from b81948c to cad3cc5 Compare May 31, 2026 14:34
@eladkal

eladkal commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

There are conflicts to resolve

@eladkal eladkal added this to the Airflow 3.3.0 milestone Jun 4, 2026
@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch 2 times, most recently from 544a6ec to 2adee89 Compare June 4, 2026 17:54
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/app.py Outdated

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a hang/recursion risk when the Execution API is run in-process and Task SDK “client context” signals (e.g. SUPERVISOR_COMMS) are present, by introducing a request-scoped server-context override so Variable/Connection secret lookups stay on the server-side path.

Changes:

  • Add airflow.process_context helpers (including a request-scoped override via ContextVar) and use them to decide whether core Variable/Connection APIs should route through Task SDK.
  • Add an in-process Execution API wrapper that forces server context per request (request_scoped_server_context=True) and wire task-sdk’s in-process client to use it.
  • Add regression tests covering server-context routing and the virtualenv/supervisor-comms hang scenario.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
task-sdk/tests/task_sdk/execution_time/test_supervisor.py Adds coverage ensuring the in-process client requests server-scoped context.
task-sdk/tests/task_sdk/definitions/test_variables.py Adds regression tests for virtualenv Variable.get hang avoidance + masking behavior.
task-sdk/src/airflow/sdk/log.py Avoids sending MaskSecret via stale virtualenv comms.
task-sdk/src/airflow/sdk/execution_time/supervisor.py Passes request-scoped server context when creating the in-process Execution API server/client.
airflow-core/tests/unit/models/test_connection.py Adds isolation fixture + test ensuring server context uses core Connection.from_json.
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_variables.py Ensures Execution API variable endpoints do not route through Task SDK in server context.
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_connections.py Ensures Execution API connection endpoint does not route through Task SDK in server context.
airflow-core/src/airflow/process_context.py Introduces request-scoped process-context overrides and routing decision helper.
airflow-core/src/airflow/models/variable.py Switches Task SDK routing condition to should_use_task_sdk_api_path().
airflow-core/src/airflow/models/connection.py Switches Task SDK routing condition to should_use_task_sdk_api_path().
airflow-core/src/airflow/api_fastapi/execution_api/app.py Adds request-scoped server-context ASGI wrapper and exposes it via asgi_app.

Comment thread airflow-core/src/airflow/models/variable.py
Comment thread airflow-core/src/airflow/models/connection.py
@potiuk

potiuk commented Jun 9, 2026

Copy link
Copy Markdown
Member

@henry3260 — There are 3 unresolved review thread(s) on this PR from @jason810496, @jscheffl. Could you either push a fix or reply in each thread explaining why the feedback doesn't apply? When you believe the feedback is addressed, please mark the threads as resolved and ping the reviewer (@jason810496, @jscheffl) for a final look. Thanks!

Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch from 2adee89 to b4c9600 Compare June 11, 2026 08:21
@henry3260 henry3260 force-pushed the fix-infinite-waiting-get branch from b4c9600 to 348bc84 Compare June 11, 2026 09:05
@apache apache deleted a comment from Vamsi-klu Jun 11, 2026
@ashb

ashb commented Jun 11, 2026

Copy link
Copy Markdown
Member

PTAL @kaxil

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PythonVirtualenvOperator hangs indefinitely on Variable.get() (Task SDK) under airflow dags test and dag.test() — no error, no timeout

8 participants