From 028e39520d47f63ab8a1772d58d439762322c29e Mon Sep 17 00:00:00 2001 From: bugraoz93 Date: Sat, 11 Apr 2026 22:15:21 +0200 Subject: [PATCH] sync airflowctl from main to v3-2-test --- .../installation/installing-from-pypi.rst | 34 ++++++- airflow-ctl/pyproject.toml | 2 + airflow-ctl/src/airflowctl/api/client.py | 31 +++++-- airflow-ctl/src/airflowctl/api/operations.py | 24 +++-- .../airflowctl/ctl/commands/auth_command.py | 6 +- .../tests/airflow_ctl/api/test_client.py | 18 +++- .../tests/airflow_ctl/api/test_operations.py | 91 ++++++++++++------- .../ctl/commands/test_auth_command.py | 18 ++++ 8 files changed, 171 insertions(+), 53 deletions(-) diff --git a/airflow-ctl/docs/installation/installing-from-pypi.rst b/airflow-ctl/docs/installation/installing-from-pypi.rst index 9ec3a5e73ab84..826cbee7e957c 100644 --- a/airflow-ctl/docs/installation/installing-from-pypi.rst +++ b/airflow-ctl/docs/installation/installing-from-pypi.rst @@ -21,8 +21,38 @@ Installation from PyPI This page describes installations using the ``apache-airflow-ctl`` package `published in PyPI `__. -Installation tools -'''''''''''''''''' +Installation via ``pipx`` or ``uv`` as tool +''''''''''''''''''''''''''''''''''''''''''' + +You can locally deploy or run airflowctl without installing it in your environment using tools like `pipx `_ or `uv `_. + +Via ``pipx`` it is possible to install airflowctl directly from PyPI using the command below: + +.. code-block:: bash + + pipx install "apache-airflow-ctl==|version|" + +As well as directly run w/o installing it first: + +.. code-block:: bash + + pipx run "apache-airflow-ctl --help" + +Same via Astral ``uv`` to install airflowctl from PyPI using the command below: + +.. code-block:: bash + + uv tool install "apache-airflow-ctl==|version|" + +Additionally to jump-start using it you can also use the shortcut via ``uvx`` command and directly run it without installing it first: + +.. code-block:: bash + + uvx apache-airflow-ctl --help + + +Installation in your environment +'''''''''''''''''''''''''''''''' Only ``pip`` and ``uv`` installation is currently officially supported. diff --git a/airflow-ctl/pyproject.toml b/airflow-ctl/pyproject.toml index a14d42ecb15fd..06278d01cd602 100644 --- a/airflow-ctl/pyproject.toml +++ b/airflow-ctl/pyproject.toml @@ -67,6 +67,8 @@ YouTube = "https://www.youtube.com/channel/UCSXwxpWZQ7XZ1WL3wqevChA/" [project.scripts] airflowctl = "airflowctl.__main__:main" +# Redundant definition allowing direct execution via `uvx apache-airflow-ctl ...` +apache-airflow-ctl = "airflowctl.__main__:main" [build-system] requires = [ diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index 06e78a6a32a90..51786bfcfcbff 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -169,6 +169,11 @@ def input_cli_config_file(self) -> str: """Generate path for the CLI config file.""" return f"{self.api_environment}.json" + @staticmethod + def token_key_for_environment(api_environment: str) -> str: + """Build the keyring/debug token key for a given environment name.""" + return f"api_token_{api_environment}" + def save(self, skip_keyring: bool = False): """ Save the credentials to keyring and URL to disk as a file. @@ -186,7 +191,7 @@ def save(self, skip_keyring: bool = False): with open( os.path.join(default_config_dir, f"debug_creds_{self.input_cli_config_file}"), "w" ) as f: - json.dump({f"api_token_{self.api_environment}": self.api_token}, f) + json.dump({self.token_key_for_environment(self.api_environment): self.api_token}, f) else: if skip_keyring: return @@ -199,7 +204,11 @@ def save(self, skip_keyring: bool = False): for candidate in candidates: if hasattr(candidate, "_get_new_password"): candidate._get_new_password = _bounded_get_new_password - keyring.set_password("airflowctl", f"api_token_{self.api_environment}", self.api_token) # type: ignore[arg-type] + keyring.set_password( + "airflowctl", + self.token_key_for_environment(self.api_environment), + self.api_token, # type: ignore[arg-type] + ) except (NoKeyringError, NotImplementedError) as e: log.error(e) raise AirflowCtlKeyringException( @@ -228,13 +237,23 @@ def load(self) -> Credentials: debug_creds_path = os.path.join( default_config_dir, f"debug_creds_{self.input_cli_config_file}" ) - with open(debug_creds_path) as df: - debug_credentials = json.load(df) - self.api_token = debug_credentials.get(f"api_token_{self.api_environment}") + try: + with open(debug_creds_path) as df: + debug_credentials = json.load(df) + self.api_token = debug_credentials.get( + self.token_key_for_environment(self.api_environment) + ) + except FileNotFoundError as e: + if self.client_kind == ClientKind.CLI: + raise AirflowCtlCredentialNotFoundException( + f"Debug credentials file not found: {debug_creds_path}. " + "Set AIRFLOW_CLI_DEBUG_MODE=false or log in with debug mode enabled first." + ) from e + self.api_token = None else: try: self.api_token = keyring.get_password( - "airflowctl", f"api_token_{self.api_environment}" + "airflowctl", self.token_key_for_environment(self.api_environment) ) except ValueError as e: # Incorrect keyring password diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index 2890304743b4c..3ce196c10cb32 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -195,6 +195,9 @@ def __init_subclass__(cls, **kwargs): setattr(cls, attr, _check_flag_and_exit_if_server_response_error(value)) def execute_list(self, *, path, data_model, offset=0, limit=50, params=None): + if limit <= 0: + raise ValueError(f"limit must be a positive integer, got {limit}") + shared_params = {"limit": limit, **(params or {})} def safe_validate(content: bytes) -> BaseModel: @@ -610,32 +613,33 @@ def list( dag_id: str | None = None, ) -> DAGRunCollectionResponse | ServerResponseError: """ - List all dag runs. + List dag runs (at most `limit` results). Args: state: Filter dag runs by state start_date: Filter dag runs by start date (optional) end_date: Filter dag runs by end date (optional) - state: Filter dag runs by state - limit: Limit the number of results + limit: Limit the number of results returned dag_id: The DAG ID to filter by. If None, retrieves dag runs for all DAGs (using "~"). """ # Use "~" for all DAGs if dag_id is not specified if not dag_id: dag_id = "~" - params: dict[str, object] = { - "state": state, + params: dict[str, Any] = { + "state": str(state), "limit": limit, } if start_date is not None: - params["start_date"] = start_date + params["start_date"] = start_date.isoformat() if end_date is not None: - params["end_date"] = end_date + params["end_date"] = end_date.isoformat() - return super().execute_list( - path=f"/dags/{dag_id}/dagRuns", data_model=DAGRunCollectionResponse, params=params - ) + try: + self.response = self.client.get(f"/dags/{dag_id}/dagRuns", params=params) + return DAGRunCollectionResponse.model_validate_json(self.response.content) + except ServerResponseError as e: + raise e class JobsOperations(BaseOperations): diff --git a/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py b/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py index 236b8d5c6b8de..cf521cbe7eea0 100644 --- a/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py +++ b/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py @@ -144,7 +144,7 @@ def list_envs(args) -> None: if filename.startswith("debug_creds_") or filename.endswith("_generated.json"): continue - env_name = filename.replace(".json", "") + env_name, _ = os.path.splitext(filename) # Try to read config file api_url = None @@ -168,11 +168,11 @@ def list_envs(args) -> None: if os.path.exists(debug_path): with open(debug_path) as f: debug_creds = json.load(f) - if f"api_token_{env_name}" in debug_creds: + if Credentials.token_key_for_environment(env_name) in debug_creds: token_status = "authenticated" else: # Check keyring - token = keyring.get_password("airflowctl", f"api_token_{env_name}") + token = keyring.get_password("airflowctl", Credentials.token_key_for_environment(env_name)) if token: token_status = "authenticated" except NoKeyringError: diff --git a/airflow-ctl/tests/airflow_ctl/api/test_client.py b/airflow-ctl/tests/airflow_ctl/api/test_client.py index 0617d62276a1c..f495b357d8d4b 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_client.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_client.py @@ -30,7 +30,10 @@ from airflowctl.api.client import Client, ClientKind, Credentials, _bounded_get_new_password from airflowctl.api.operations import ServerResponseError -from airflowctl.exceptions import AirflowCtlCredentialNotFoundException, AirflowCtlKeyringException +from airflowctl.exceptions import ( + AirflowCtlCredentialNotFoundException, + AirflowCtlKeyringException, +) def make_client_w_responses(responses: list[httpx.Response]) -> Client: @@ -376,3 +379,16 @@ def test_retry_handling_ok(self): response = client.get("http://error") assert response.status_code == 200 assert len(responses) == 1 + + def test_debug_mode_missing_debug_creds_reports_correct_error(self, monkeypatch, tmp_path): + monkeypatch.setenv("AIRFLOW_HOME", str(tmp_path)) + monkeypatch.setenv("AIRFLOW_CLI_DEBUG_MODE", "true") + monkeypatch.setenv("AIRFLOW_CLI_ENVIRONMENT", "TEST_DEBUG") + + config_path = tmp_path / "TEST_DEBUG.json" + config_path.write_text(json.dumps({"api_url": "http://localhost:8080"}), encoding="utf-8") + # Intentionally do not create debug_creds_TEST_DEBUG.json to simulate a missing file + + creds = Credentials(client_kind=ClientKind.CLI, api_environment="TEST_DEBUG") + with pytest.raises(AirflowCtlCredentialNotFoundException, match="Debug credentials file not found"): + creds.load() diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 2f2e0b0f54738..aa559f174214b 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -230,6 +230,16 @@ def test_execute_list_sends_limit_on_subsequent_pages(self): for call in mock_client.get.call_args_list: assert call.kwargs["params"]["limit"] == 2 + @pytest.mark.parametrize("limit", [0, -1]) + def test_execute_list_rejects_non_positive_limit(self, limit): + mock_client = Mock() + base_operation = BaseOperations(client=mock_client) + + with pytest.raises(ValueError, match="limit must be a positive integer"): + base_operation.execute_list(path="hello", data_model=HelloCollectionResponse, limit=limit) + + mock_client.get.assert_not_called() + class TestAssetsOperations: asset_id: int = 1 @@ -1068,44 +1078,63 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) assert response == self.dag_run_collection_response - def test_list_all_dags(self): - """Test listing dag runs for all DAGs using default dag_id='~'.""" - - def handle_request(request: httpx.Request) -> httpx.Response: - # When dag_id is "~", it should query all DAGs - assert request.url.path == "/api/v2/dags/~/dagRuns" + @pytest.mark.parametrize( + ( + "dag_id_input", + "state", + "limit", + "start_date", + "end_date", + "expected_path_suffix", + "expected_params_subset", + ), + [ + # Test --limit with various values and configurations (covers CLI --limit flag) + ("dag1", "queued", 5, None, None, "dag1", {"state": "queued", "limit": "5"}), + (None, "running", 1, None, None, "~", {"state": "running", "limit": "1"}), + ( + "example_dag", + "success", + 10, + None, + None, + "example_dag", + {"state": "success", "limit": "10"}, + ), + ("dag2", "failed", 0, None, None, "dag2", {"state": "failed", "limit": "0"}), + ], + ids=["limit-5", "all-dags-limit-1", "string-state-limit-10", "limit-zero"], + ) + def test_list_with_various_limits( + self, + dag_id_input: str | None, + state: str | DagRunState, + limit: int, + start_date: datetime.datetime | None, + end_date: datetime.datetime | None, + expected_path_suffix: str, + expected_params_subset: dict, + ) -> None: + """Test listing dag runs with various limit values (especially --limit flag).""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path.endswith(f"/dags/{expected_path_suffix}/dagRuns") + params = dict(request.url.params) + for key, value in expected_params_subset.items(): + assert key in params + assert str(params[key]) == str(value) return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) - # Call without specifying dag_id - should use default "~" response = client.dag_runs.list( - start_date=datetime.datetime(2025, 1, 1, 0, 0, 0), - end_date=datetime.datetime(2025, 1, 1, 0, 0, 0), - state="running", - limit=1, + state=state, + limit=limit, + start_date=start_date, + end_date=end_date, + dag_id=dag_id_input, ) assert response == self.dag_run_collection_response - def test_list_with_optional_parameters(self): - """Test listing dag runs with only some optional parameters.""" - - def handle_request(request: httpx.Request) -> httpx.Response: - assert request.url.path == "/api/v2/dags/dag1/dagRuns" - # Verify that only state and limit are in query params - params = dict(request.url.params) - assert "state" in params - assert params["state"] == "queued" - assert "limit" in params - assert params["limit"] == "5" - # start_date and end_date should not be present - assert "start_date" not in params - assert "end_date" not in params - return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json())) - - client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.dag_runs.list(state="queued", limit=5, dag_id="dag1") - assert response == self.dag_run_collection_response - class TestJobsOperations: job_response = JobResponse( diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py index e76fafc28adf9..2bda56b0fdc18 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py @@ -477,3 +477,21 @@ def test_list_envs_filters_special_files(self, monkeypatch): # Only production environment should be checked, not the special files mock_get_password.assert_called_once_with("airflowctl", "api_token_production") + + def test_list_envs_environment_name_with_json_substring(self, monkeypatch): + """Test list-envs keeps '.json' substrings in environment name for key lookup.""" + with ( + tempfile.TemporaryDirectory() as temp_airflow_home, + patch("keyring.get_password") as mock_get_password, + ): + monkeypatch.setenv("AIRFLOW_HOME", temp_airflow_home) + + with open(os.path.join(temp_airflow_home, "prod.json.region.json"), "w") as f: + json.dump({"api_url": "http://localhost:8080"}, f) + + mock_get_password.return_value = "test_token" + + args = self.parser.parse_args(["auth", "list-envs"]) + auth_command.list_envs(args) + + mock_get_password.assert_called_once_with("airflowctl", "api_token_prod.json.region")