diff --git a/airflow-ctl/docs/installation/installing-from-pypi.rst b/airflow-ctl/docs/installation/installing-from-pypi.rst
index 9ec3a5e73ab84..826cbee7e957c 100644
--- a/airflow-ctl/docs/installation/installing-from-pypi.rst
+++ b/airflow-ctl/docs/installation/installing-from-pypi.rst
@@ -21,8 +21,38 @@ Installation from PyPI
This page describes installations using the ``apache-airflow-ctl`` package `published in
PyPI `__.
-Installation tools
-''''''''''''''''''
+Installation via ``pipx`` or ``uv`` as tool
+'''''''''''''''''''''''''''''''''''''''''''
+
+You can locally deploy or run airflowctl without installing it in your environment using tools like `pipx `_ or `uv `_.
+
+Via ``pipx`` it is possible to install airflowctl directly from PyPI using the command below:
+
+.. code-block:: bash
+
+ pipx install "apache-airflow-ctl==|version|"
+
+As well as directly run w/o installing it first:
+
+.. code-block:: bash
+
+ pipx run "apache-airflow-ctl --help"
+
+Same via Astral ``uv`` to install airflowctl from PyPI using the command below:
+
+.. code-block:: bash
+
+ uv tool install "apache-airflow-ctl==|version|"
+
+Additionally to jump-start using it you can also use the shortcut via ``uvx`` command and directly run it without installing it first:
+
+.. code-block:: bash
+
+ uvx apache-airflow-ctl --help
+
+
+Installation in your environment
+''''''''''''''''''''''''''''''''
Only ``pip`` and ``uv`` installation is currently officially supported.
diff --git a/airflow-ctl/pyproject.toml b/airflow-ctl/pyproject.toml
index a14d42ecb15fd..06278d01cd602 100644
--- a/airflow-ctl/pyproject.toml
+++ b/airflow-ctl/pyproject.toml
@@ -67,6 +67,8 @@ YouTube = "https://www.youtube.com/channel/UCSXwxpWZQ7XZ1WL3wqevChA/"
[project.scripts]
airflowctl = "airflowctl.__main__:main"
+# Redundant definition allowing direct execution via `uvx apache-airflow-ctl ...`
+apache-airflow-ctl = "airflowctl.__main__:main"
[build-system]
requires = [
diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py
index 06e78a6a32a90..51786bfcfcbff 100644
--- a/airflow-ctl/src/airflowctl/api/client.py
+++ b/airflow-ctl/src/airflowctl/api/client.py
@@ -169,6 +169,11 @@ def input_cli_config_file(self) -> str:
"""Generate path for the CLI config file."""
return f"{self.api_environment}.json"
+ @staticmethod
+ def token_key_for_environment(api_environment: str) -> str:
+ """Build the keyring/debug token key for a given environment name."""
+ return f"api_token_{api_environment}"
+
def save(self, skip_keyring: bool = False):
"""
Save the credentials to keyring and URL to disk as a file.
@@ -186,7 +191,7 @@ def save(self, skip_keyring: bool = False):
with open(
os.path.join(default_config_dir, f"debug_creds_{self.input_cli_config_file}"), "w"
) as f:
- json.dump({f"api_token_{self.api_environment}": self.api_token}, f)
+ json.dump({self.token_key_for_environment(self.api_environment): self.api_token}, f)
else:
if skip_keyring:
return
@@ -199,7 +204,11 @@ def save(self, skip_keyring: bool = False):
for candidate in candidates:
if hasattr(candidate, "_get_new_password"):
candidate._get_new_password = _bounded_get_new_password
- keyring.set_password("airflowctl", f"api_token_{self.api_environment}", self.api_token) # type: ignore[arg-type]
+ keyring.set_password(
+ "airflowctl",
+ self.token_key_for_environment(self.api_environment),
+ self.api_token, # type: ignore[arg-type]
+ )
except (NoKeyringError, NotImplementedError) as e:
log.error(e)
raise AirflowCtlKeyringException(
@@ -228,13 +237,23 @@ def load(self) -> Credentials:
debug_creds_path = os.path.join(
default_config_dir, f"debug_creds_{self.input_cli_config_file}"
)
- with open(debug_creds_path) as df:
- debug_credentials = json.load(df)
- self.api_token = debug_credentials.get(f"api_token_{self.api_environment}")
+ try:
+ with open(debug_creds_path) as df:
+ debug_credentials = json.load(df)
+ self.api_token = debug_credentials.get(
+ self.token_key_for_environment(self.api_environment)
+ )
+ except FileNotFoundError as e:
+ if self.client_kind == ClientKind.CLI:
+ raise AirflowCtlCredentialNotFoundException(
+ f"Debug credentials file not found: {debug_creds_path}. "
+ "Set AIRFLOW_CLI_DEBUG_MODE=false or log in with debug mode enabled first."
+ ) from e
+ self.api_token = None
else:
try:
self.api_token = keyring.get_password(
- "airflowctl", f"api_token_{self.api_environment}"
+ "airflowctl", self.token_key_for_environment(self.api_environment)
)
except ValueError as e:
# Incorrect keyring password
diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py
index 2890304743b4c..3ce196c10cb32 100644
--- a/airflow-ctl/src/airflowctl/api/operations.py
+++ b/airflow-ctl/src/airflowctl/api/operations.py
@@ -195,6 +195,9 @@ def __init_subclass__(cls, **kwargs):
setattr(cls, attr, _check_flag_and_exit_if_server_response_error(value))
def execute_list(self, *, path, data_model, offset=0, limit=50, params=None):
+ if limit <= 0:
+ raise ValueError(f"limit must be a positive integer, got {limit}")
+
shared_params = {"limit": limit, **(params or {})}
def safe_validate(content: bytes) -> BaseModel:
@@ -610,32 +613,33 @@ def list(
dag_id: str | None = None,
) -> DAGRunCollectionResponse | ServerResponseError:
"""
- List all dag runs.
+ List dag runs (at most `limit` results).
Args:
state: Filter dag runs by state
start_date: Filter dag runs by start date (optional)
end_date: Filter dag runs by end date (optional)
- state: Filter dag runs by state
- limit: Limit the number of results
+ limit: Limit the number of results returned
dag_id: The DAG ID to filter by. If None, retrieves dag runs for all DAGs (using "~").
"""
# Use "~" for all DAGs if dag_id is not specified
if not dag_id:
dag_id = "~"
- params: dict[str, object] = {
- "state": state,
+ params: dict[str, Any] = {
+ "state": str(state),
"limit": limit,
}
if start_date is not None:
- params["start_date"] = start_date
+ params["start_date"] = start_date.isoformat()
if end_date is not None:
- params["end_date"] = end_date
+ params["end_date"] = end_date.isoformat()
- return super().execute_list(
- path=f"/dags/{dag_id}/dagRuns", data_model=DAGRunCollectionResponse, params=params
- )
+ try:
+ self.response = self.client.get(f"/dags/{dag_id}/dagRuns", params=params)
+ return DAGRunCollectionResponse.model_validate_json(self.response.content)
+ except ServerResponseError as e:
+ raise e
class JobsOperations(BaseOperations):
diff --git a/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py b/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py
index 236b8d5c6b8de..cf521cbe7eea0 100644
--- a/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py
+++ b/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py
@@ -144,7 +144,7 @@ def list_envs(args) -> None:
if filename.startswith("debug_creds_") or filename.endswith("_generated.json"):
continue
- env_name = filename.replace(".json", "")
+ env_name, _ = os.path.splitext(filename)
# Try to read config file
api_url = None
@@ -168,11 +168,11 @@ def list_envs(args) -> None:
if os.path.exists(debug_path):
with open(debug_path) as f:
debug_creds = json.load(f)
- if f"api_token_{env_name}" in debug_creds:
+ if Credentials.token_key_for_environment(env_name) in debug_creds:
token_status = "authenticated"
else:
# Check keyring
- token = keyring.get_password("airflowctl", f"api_token_{env_name}")
+ token = keyring.get_password("airflowctl", Credentials.token_key_for_environment(env_name))
if token:
token_status = "authenticated"
except NoKeyringError:
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_client.py b/airflow-ctl/tests/airflow_ctl/api/test_client.py
index 0617d62276a1c..f495b357d8d4b 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_client.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_client.py
@@ -30,7 +30,10 @@
from airflowctl.api.client import Client, ClientKind, Credentials, _bounded_get_new_password
from airflowctl.api.operations import ServerResponseError
-from airflowctl.exceptions import AirflowCtlCredentialNotFoundException, AirflowCtlKeyringException
+from airflowctl.exceptions import (
+ AirflowCtlCredentialNotFoundException,
+ AirflowCtlKeyringException,
+)
def make_client_w_responses(responses: list[httpx.Response]) -> Client:
@@ -376,3 +379,16 @@ def test_retry_handling_ok(self):
response = client.get("http://error")
assert response.status_code == 200
assert len(responses) == 1
+
+ def test_debug_mode_missing_debug_creds_reports_correct_error(self, monkeypatch, tmp_path):
+ monkeypatch.setenv("AIRFLOW_HOME", str(tmp_path))
+ monkeypatch.setenv("AIRFLOW_CLI_DEBUG_MODE", "true")
+ monkeypatch.setenv("AIRFLOW_CLI_ENVIRONMENT", "TEST_DEBUG")
+
+ config_path = tmp_path / "TEST_DEBUG.json"
+ config_path.write_text(json.dumps({"api_url": "http://localhost:8080"}), encoding="utf-8")
+ # Intentionally do not create debug_creds_TEST_DEBUG.json to simulate a missing file
+
+ creds = Credentials(client_kind=ClientKind.CLI, api_environment="TEST_DEBUG")
+ with pytest.raises(AirflowCtlCredentialNotFoundException, match="Debug credentials file not found"):
+ creds.load()
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
index 2f2e0b0f54738..aa559f174214b 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
@@ -230,6 +230,16 @@ def test_execute_list_sends_limit_on_subsequent_pages(self):
for call in mock_client.get.call_args_list:
assert call.kwargs["params"]["limit"] == 2
+ @pytest.mark.parametrize("limit", [0, -1])
+ def test_execute_list_rejects_non_positive_limit(self, limit):
+ mock_client = Mock()
+ base_operation = BaseOperations(client=mock_client)
+
+ with pytest.raises(ValueError, match="limit must be a positive integer"):
+ base_operation.execute_list(path="hello", data_model=HelloCollectionResponse, limit=limit)
+
+ mock_client.get.assert_not_called()
+
class TestAssetsOperations:
asset_id: int = 1
@@ -1068,44 +1078,63 @@ def handle_request(request: httpx.Request) -> httpx.Response:
)
assert response == self.dag_run_collection_response
- def test_list_all_dags(self):
- """Test listing dag runs for all DAGs using default dag_id='~'."""
-
- def handle_request(request: httpx.Request) -> httpx.Response:
- # When dag_id is "~", it should query all DAGs
- assert request.url.path == "/api/v2/dags/~/dagRuns"
+ @pytest.mark.parametrize(
+ (
+ "dag_id_input",
+ "state",
+ "limit",
+ "start_date",
+ "end_date",
+ "expected_path_suffix",
+ "expected_params_subset",
+ ),
+ [
+ # Test --limit with various values and configurations (covers CLI --limit flag)
+ ("dag1", "queued", 5, None, None, "dag1", {"state": "queued", "limit": "5"}),
+ (None, "running", 1, None, None, "~", {"state": "running", "limit": "1"}),
+ (
+ "example_dag",
+ "success",
+ 10,
+ None,
+ None,
+ "example_dag",
+ {"state": "success", "limit": "10"},
+ ),
+ ("dag2", "failed", 0, None, None, "dag2", {"state": "failed", "limit": "0"}),
+ ],
+ ids=["limit-5", "all-dags-limit-1", "string-state-limit-10", "limit-zero"],
+ )
+ def test_list_with_various_limits(
+ self,
+ dag_id_input: str | None,
+ state: str | DagRunState,
+ limit: int,
+ start_date: datetime.datetime | None,
+ end_date: datetime.datetime | None,
+ expected_path_suffix: str,
+ expected_params_subset: dict,
+ ) -> None:
+ """Test listing dag runs with various limit values (especially --limit flag)."""
+
+ def handle_request(request: httpx.Request) -> httpx.Response:
+ assert request.url.path.endswith(f"/dags/{expected_path_suffix}/dagRuns")
+ params = dict(request.url.params)
+ for key, value in expected_params_subset.items():
+ assert key in params
+ assert str(params[key]) == str(value)
return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json()))
client = make_api_client(transport=httpx.MockTransport(handle_request))
- # Call without specifying dag_id - should use default "~"
response = client.dag_runs.list(
- start_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
- end_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
- state="running",
- limit=1,
+ state=state,
+ limit=limit,
+ start_date=start_date,
+ end_date=end_date,
+ dag_id=dag_id_input,
)
assert response == self.dag_run_collection_response
- def test_list_with_optional_parameters(self):
- """Test listing dag runs with only some optional parameters."""
-
- def handle_request(request: httpx.Request) -> httpx.Response:
- assert request.url.path == "/api/v2/dags/dag1/dagRuns"
- # Verify that only state and limit are in query params
- params = dict(request.url.params)
- assert "state" in params
- assert params["state"] == "queued"
- assert "limit" in params
- assert params["limit"] == "5"
- # start_date and end_date should not be present
- assert "start_date" not in params
- assert "end_date" not in params
- return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json()))
-
- client = make_api_client(transport=httpx.MockTransport(handle_request))
- response = client.dag_runs.list(state="queued", limit=5, dag_id="dag1")
- assert response == self.dag_run_collection_response
-
class TestJobsOperations:
job_response = JobResponse(
diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py
index e76fafc28adf9..2bda56b0fdc18 100644
--- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py
+++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py
@@ -477,3 +477,21 @@ def test_list_envs_filters_special_files(self, monkeypatch):
# Only production environment should be checked, not the special files
mock_get_password.assert_called_once_with("airflowctl", "api_token_production")
+
+ def test_list_envs_environment_name_with_json_substring(self, monkeypatch):
+ """Test list-envs keeps '.json' substrings in environment name for key lookup."""
+ with (
+ tempfile.TemporaryDirectory() as temp_airflow_home,
+ patch("keyring.get_password") as mock_get_password,
+ ):
+ monkeypatch.setenv("AIRFLOW_HOME", temp_airflow_home)
+
+ with open(os.path.join(temp_airflow_home, "prod.json.region.json"), "w") as f:
+ json.dump({"api_url": "http://localhost:8080"}, f)
+
+ mock_get_password.return_value = "test_token"
+
+ args = self.parser.parse_args(["auth", "list-envs"])
+ auth_command.list_envs(args)
+
+ mock_get_password.assert_called_once_with("airflowctl", "api_token_prod.json.region")