Unify executor workload queues with tier-based scheduling#63482
Unify executor workload queues with tier-based scheduling#63482anishgirianish wants to merge 1 commit intoapache:mainfrom
Conversation
| self.team_name: str | None = team_name | ||
| self.queued_tasks: dict[TaskInstanceKey, workloads.ExecuteTask] = {} | ||
| self.queued_callbacks: dict[str, workloads.ExecuteCallback] = {} | ||
| self.executor_queues: dict[str, dict] = defaultdict(dict) |
There was a problem hiding this comment.
Sorry, this is not possible. This breaks compatibility between providers and core.
There was a problem hiding this comment.
@jscheffl Thank you very much for the quick heads up, closing this pr as I messed up while rebasing, will create a new pr with that in mind. thank you
There was a problem hiding this comment.
@jscheffl - I really like the direction this PR was going and it's a huge step toward simplifying the scheduler. Once he fixes the rebase, are you willing to chat about what he needs to do to make this back-compat?
I think we can work something out where the old parameters are retained and flagged as deprecated, and assemble the new executor_queues from those if they are present
There was a problem hiding this comment.
I am not sure and have not taken a deep look into the potential simplification but you are renaming fields in the BaseExecutor class which all Executors inherit on.
Yes, the fields are adjusted in the PR from executors. But you can mix the versions of providers, upgrade Airflow core and leave providers in old versions (actually this is a best practice in upgrading not to have complexity in triage) and you can also upgrade providers when you keep the same core version. In any of these two the renamed fields break the executors. So renaming the fields (and semantic how the fields are used) is breaking the (public) interface for executors.
There was a problem hiding this comment.
He actually had a better solution already, I'll see you over on the new PR (#63491) when you get time. He's using an @property to cover that backcompat. I think that should work.
db054a0 to
9026c79
Compare
Was generative AI tooling used to co-author this PR?
Summary
Refactors executor workload queue management for extensibility. No behavioral change , scheduling order, slot accounting, and all provider executors work identically to before.
Follows the direction proposed by @ferruzzi #62343 (comment).
Problem
Adding a new workload type (like ExecuteCallback or TestConnection) required touching ~6 places in BaseExecutor: a new queue dict, a new
supports_*flag,slots calculation, an isinstance branch in queue_workload, a dedicated scheduling method, and isinstance branches in dequeue/trigger logic. Each provider executor that overrodequeue_workloadalso needed updating. This made extending the executor interface unnecessarily painful.What this does
Replaces the per-type queue dicts and boolean capability flags with three simple primitives:
executor_queues: a single dict keyed by workload type string (e.g."ExecuteTask","ExecuteCallback") instead of separatequeued_tasks,queued_callbacks,queued_connection_testsdictssupported_workload_types: a frozenset of type strings instead of iindividualsupports_callbacks,supports_connection_testbooleansWorkloadQueueDef(scheduling_tier, sort_key), a small NamedTuple on each workload class that controls scheduling priority. Callbacks get tier 0, tasks get tier 1, same order as before, just explicit now.The base class
queue_workloadis now generic: validate the type, store by key. Four provider executors (K8s, ECS, Batch, Lambda) no longer need their ownqueue_workloadoverrides.trigger_tasksbecomestrigger_workloadssince it handles all workload types now.Adding a new workload type after this refactor
keyandqueue_defon the workload dataclasssupported_workload_typeson supporting executors_process_workloadsdoneNo changes needed in BaseExecutor itself.
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.