Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions airflow-ctl/docs/installation/installing-from-pypi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,38 @@ Installation from PyPI
This page describes installations using the ``apache-airflow-ctl`` package `published in
PyPI <https://pypi.org/project/apache-airflow-ctl/>`__.

Installation tools
''''''''''''''''''
Installation via ``pipx`` or ``uv`` as tool
'''''''''''''''''''''''''''''''''''''''''''

You can locally deploy or run airflowctl without installing it in your environment using tools like `pipx <https://pypi.org/project/pipx/>`_ or `uv <https://astral.sh/uv/>`_.

Via ``pipx`` it is possible to install airflowctl directly from PyPI using the command below:

.. code-block:: bash

pipx install "apache-airflow-ctl==|version|"

As well as directly run w/o installing it first:

.. code-block:: bash

pipx run "apache-airflow-ctl --help"

Same via Astral ``uv`` to install airflowctl from PyPI using the command below:

.. code-block:: bash

uv tool install "apache-airflow-ctl==|version|"

Additionally to jump-start using it you can also use the shortcut via ``uvx`` command and directly run it without installing it first:

.. code-block:: bash

uvx apache-airflow-ctl --help


Installation in your environment
''''''''''''''''''''''''''''''''

Only ``pip`` and ``uv`` installation is currently officially supported.

Expand Down
2 changes: 2 additions & 0 deletions airflow-ctl/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ YouTube = "https://www.youtube.com/channel/UCSXwxpWZQ7XZ1WL3wqevChA/"

[project.scripts]
airflowctl = "airflowctl.__main__:main"
# Redundant definition allowing direct execution via `uvx apache-airflow-ctl ...`
apache-airflow-ctl = "airflowctl.__main__:main"

[build-system]
requires = [
Expand Down
31 changes: 25 additions & 6 deletions airflow-ctl/src/airflowctl/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ def input_cli_config_file(self) -> str:
"""Generate path for the CLI config file."""
return f"{self.api_environment}.json"

@staticmethod
def token_key_for_environment(api_environment: str) -> str:
"""Build the keyring/debug token key for a given environment name."""
return f"api_token_{api_environment}"

def save(self, skip_keyring: bool = False):
"""
Save the credentials to keyring and URL to disk as a file.
Expand All @@ -186,7 +191,7 @@ def save(self, skip_keyring: bool = False):
with open(
os.path.join(default_config_dir, f"debug_creds_{self.input_cli_config_file}"), "w"
) as f:
json.dump({f"api_token_{self.api_environment}": self.api_token}, f)
json.dump({self.token_key_for_environment(self.api_environment): self.api_token}, f)
else:
if skip_keyring:
return
Expand All @@ -199,7 +204,11 @@ def save(self, skip_keyring: bool = False):
for candidate in candidates:
if hasattr(candidate, "_get_new_password"):
candidate._get_new_password = _bounded_get_new_password
keyring.set_password("airflowctl", f"api_token_{self.api_environment}", self.api_token) # type: ignore[arg-type]
keyring.set_password(
"airflowctl",
self.token_key_for_environment(self.api_environment),
self.api_token, # type: ignore[arg-type]
)
except (NoKeyringError, NotImplementedError) as e:
log.error(e)
raise AirflowCtlKeyringException(
Expand Down Expand Up @@ -228,13 +237,23 @@ def load(self) -> Credentials:
debug_creds_path = os.path.join(
default_config_dir, f"debug_creds_{self.input_cli_config_file}"
)
with open(debug_creds_path) as df:
debug_credentials = json.load(df)
self.api_token = debug_credentials.get(f"api_token_{self.api_environment}")
try:
with open(debug_creds_path) as df:
debug_credentials = json.load(df)
self.api_token = debug_credentials.get(
self.token_key_for_environment(self.api_environment)
)
except FileNotFoundError as e:
if self.client_kind == ClientKind.CLI:
raise AirflowCtlCredentialNotFoundException(
f"Debug credentials file not found: {debug_creds_path}. "
"Set AIRFLOW_CLI_DEBUG_MODE=false or log in with debug mode enabled first."
) from e
self.api_token = None
else:
try:
self.api_token = keyring.get_password(
"airflowctl", f"api_token_{self.api_environment}"
"airflowctl", self.token_key_for_environment(self.api_environment)
)
except ValueError as e:
# Incorrect keyring password
Expand Down
24 changes: 14 additions & 10 deletions airflow-ctl/src/airflowctl/api/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ def __init_subclass__(cls, **kwargs):
setattr(cls, attr, _check_flag_and_exit_if_server_response_error(value))

def execute_list(self, *, path, data_model, offset=0, limit=50, params=None):
if limit <= 0:
raise ValueError(f"limit must be a positive integer, got {limit}")

shared_params = {"limit": limit, **(params or {})}

def safe_validate(content: bytes) -> BaseModel:
Expand Down Expand Up @@ -610,32 +613,33 @@ def list(
dag_id: str | None = None,
) -> DAGRunCollectionResponse | ServerResponseError:
"""
List all dag runs.
List dag runs (at most `limit` results).

Args:
state: Filter dag runs by state
start_date: Filter dag runs by start date (optional)
end_date: Filter dag runs by end date (optional)
state: Filter dag runs by state
limit: Limit the number of results
limit: Limit the number of results returned
dag_id: The DAG ID to filter by. If None, retrieves dag runs for all DAGs (using "~").
"""
# Use "~" for all DAGs if dag_id is not specified
if not dag_id:
dag_id = "~"

params: dict[str, object] = {
"state": state,
params: dict[str, Any] = {
"state": str(state),
"limit": limit,
}
if start_date is not None:
params["start_date"] = start_date
params["start_date"] = start_date.isoformat()
if end_date is not None:
params["end_date"] = end_date
params["end_date"] = end_date.isoformat()

return super().execute_list(
path=f"/dags/{dag_id}/dagRuns", data_model=DAGRunCollectionResponse, params=params
)
try:
self.response = self.client.get(f"/dags/{dag_id}/dagRuns", params=params)
return DAGRunCollectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e


class JobsOperations(BaseOperations):
Expand Down
6 changes: 3 additions & 3 deletions airflow-ctl/src/airflowctl/ctl/commands/auth_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def list_envs(args) -> None:
if filename.startswith("debug_creds_") or filename.endswith("_generated.json"):
continue

env_name = filename.replace(".json", "")
env_name, _ = os.path.splitext(filename)

# Try to read config file
api_url = None
Expand All @@ -168,11 +168,11 @@ def list_envs(args) -> None:
if os.path.exists(debug_path):
with open(debug_path) as f:
debug_creds = json.load(f)
if f"api_token_{env_name}" in debug_creds:
if Credentials.token_key_for_environment(env_name) in debug_creds:
token_status = "authenticated"
else:
# Check keyring
token = keyring.get_password("airflowctl", f"api_token_{env_name}")
token = keyring.get_password("airflowctl", Credentials.token_key_for_environment(env_name))
if token:
token_status = "authenticated"
except NoKeyringError:
Expand Down
18 changes: 17 additions & 1 deletion airflow-ctl/tests/airflow_ctl/api/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@

from airflowctl.api.client import Client, ClientKind, Credentials, _bounded_get_new_password
from airflowctl.api.operations import ServerResponseError
from airflowctl.exceptions import AirflowCtlCredentialNotFoundException, AirflowCtlKeyringException
from airflowctl.exceptions import (
AirflowCtlCredentialNotFoundException,
AirflowCtlKeyringException,
)


def make_client_w_responses(responses: list[httpx.Response]) -> Client:
Expand Down Expand Up @@ -376,3 +379,16 @@ def test_retry_handling_ok(self):
response = client.get("http://error")
assert response.status_code == 200
assert len(responses) == 1

def test_debug_mode_missing_debug_creds_reports_correct_error(self, monkeypatch, tmp_path):
monkeypatch.setenv("AIRFLOW_HOME", str(tmp_path))
monkeypatch.setenv("AIRFLOW_CLI_DEBUG_MODE", "true")
monkeypatch.setenv("AIRFLOW_CLI_ENVIRONMENT", "TEST_DEBUG")

config_path = tmp_path / "TEST_DEBUG.json"
config_path.write_text(json.dumps({"api_url": "http://localhost:8080"}), encoding="utf-8")
# Intentionally do not create debug_creds_TEST_DEBUG.json to simulate a missing file

creds = Credentials(client_kind=ClientKind.CLI, api_environment="TEST_DEBUG")
with pytest.raises(AirflowCtlCredentialNotFoundException, match="Debug credentials file not found"):
creds.load()
91 changes: 60 additions & 31 deletions airflow-ctl/tests/airflow_ctl/api/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ def test_execute_list_sends_limit_on_subsequent_pages(self):
for call in mock_client.get.call_args_list:
assert call.kwargs["params"]["limit"] == 2

@pytest.mark.parametrize("limit", [0, -1])
def test_execute_list_rejects_non_positive_limit(self, limit):
mock_client = Mock()
base_operation = BaseOperations(client=mock_client)

with pytest.raises(ValueError, match="limit must be a positive integer"):
base_operation.execute_list(path="hello", data_model=HelloCollectionResponse, limit=limit)

mock_client.get.assert_not_called()


class TestAssetsOperations:
asset_id: int = 1
Expand Down Expand Up @@ -1068,44 +1078,63 @@ def handle_request(request: httpx.Request) -> httpx.Response:
)
assert response == self.dag_run_collection_response

def test_list_all_dags(self):
"""Test listing dag runs for all DAGs using default dag_id='~'."""

def handle_request(request: httpx.Request) -> httpx.Response:
# When dag_id is "~", it should query all DAGs
assert request.url.path == "/api/v2/dags/~/dagRuns"
@pytest.mark.parametrize(
(
"dag_id_input",
"state",
"limit",
"start_date",
"end_date",
"expected_path_suffix",
"expected_params_subset",
),
[
# Test --limit with various values and configurations (covers CLI --limit flag)
("dag1", "queued", 5, None, None, "dag1", {"state": "queued", "limit": "5"}),
(None, "running", 1, None, None, "~", {"state": "running", "limit": "1"}),
(
"example_dag",
"success",
10,
None,
None,
"example_dag",
{"state": "success", "limit": "10"},
),
("dag2", "failed", 0, None, None, "dag2", {"state": "failed", "limit": "0"}),
],
ids=["limit-5", "all-dags-limit-1", "string-state-limit-10", "limit-zero"],
)
def test_list_with_various_limits(
self,
dag_id_input: str | None,
state: str | DagRunState,
limit: int,
start_date: datetime.datetime | None,
end_date: datetime.datetime | None,
expected_path_suffix: str,
expected_params_subset: dict,
) -> None:
"""Test listing dag runs with various limit values (especially --limit flag)."""

def handle_request(request: httpx.Request) -> httpx.Response:
assert request.url.path.endswith(f"/dags/{expected_path_suffix}/dagRuns")
params = dict(request.url.params)
for key, value in expected_params_subset.items():
assert key in params
assert str(params[key]) == str(value)
return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json()))

client = make_api_client(transport=httpx.MockTransport(handle_request))
# Call without specifying dag_id - should use default "~"
response = client.dag_runs.list(
start_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
end_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
state="running",
limit=1,
state=state,
limit=limit,
start_date=start_date,
end_date=end_date,
dag_id=dag_id_input,
)
assert response == self.dag_run_collection_response

def test_list_with_optional_parameters(self):
"""Test listing dag runs with only some optional parameters."""

def handle_request(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v2/dags/dag1/dagRuns"
# Verify that only state and limit are in query params
params = dict(request.url.params)
assert "state" in params
assert params["state"] == "queued"
assert "limit" in params
assert params["limit"] == "5"
# start_date and end_date should not be present
assert "start_date" not in params
assert "end_date" not in params
return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json()))

client = make_api_client(transport=httpx.MockTransport(handle_request))
response = client.dag_runs.list(state="queued", limit=5, dag_id="dag1")
assert response == self.dag_run_collection_response


class TestJobsOperations:
job_response = JobResponse(
Expand Down
18 changes: 18 additions & 0 deletions airflow-ctl/tests/airflow_ctl/ctl/commands/test_auth_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,3 +477,21 @@ def test_list_envs_filters_special_files(self, monkeypatch):

# Only production environment should be checked, not the special files
mock_get_password.assert_called_once_with("airflowctl", "api_token_production")

def test_list_envs_environment_name_with_json_substring(self, monkeypatch):
"""Test list-envs keeps '.json' substrings in environment name for key lookup."""
with (
tempfile.TemporaryDirectory() as temp_airflow_home,
patch("keyring.get_password") as mock_get_password,
):
monkeypatch.setenv("AIRFLOW_HOME", temp_airflow_home)

with open(os.path.join(temp_airflow_home, "prod.json.region.json"), "w") as f:
json.dump({"api_url": "http://localhost:8080"}, f)

mock_get_password.return_value = "test_token"

args = self.parser.parse_args(["auth", "list-envs"])
auth_command.list_envs(args)

mock_get_password.assert_called_once_with("airflowctl", "api_token_prod.json.region")
Loading