Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gitops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from pathlib import Path

from gitops.utils.apps import App, get_app_details, get_apps
from gitops.utils.apps import App, get_app_details, get_apps # type: ignore[attr-defined]

from .utils.cli import success, warning

Expand Down
42 changes: 28 additions & 14 deletions gitops/common/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def __init__(
self,
name: str,
path: str | None = None,
deployments: dict | None = None,
secrets: dict | None = None,
deployments: dict[str, Any] | None = None,
secrets: dict[str, str] | None = None,
load_secrets: bool = True,
encode_secrets: bool = True,
# deprecated
Expand Down Expand Up @@ -75,7 +75,7 @@ def set_value(self, path: str, value: Any) -> None:
current_dict = current_dict.setdefault(key, {})
current_dict[keys[-1]] = value

def _make_values(self, deployments: dict, secrets: dict[str, str]) -> dict:
def _make_values(self, deployments: dict[str, Any], secrets: dict[str, str]) -> dict[str, Any]:
def encode(value: str) -> str:
return b64encode(str(value).encode()).decode() if self.encode_secrets else value

Expand All @@ -93,14 +93,17 @@ def encode(value: str) -> str:
values.pop("images", None)
return values

def _make_image(self, deployment_config: dict) -> str:
def _make_image(self, deployment_config: dict[str, Any]) -> str:
if "image-tag" in deployment_config:
return deployment_config["images"]["template"].format(
account_id=self.account_id,
tag=deployment_config["image-tag"],
return str(
deployment_config["images"]["template"].format(
account_id=self.account_id,
tag=deployment_config["image-tag"],
)
)
else:
return deployment_config.get("image", "")
image = deployment_config.get("image", "")
return str(image) if image else ""

@property
def image(self) -> str:
Expand All @@ -109,7 +112,7 @@ def image(self) -> str:
if isinstance(image, dict):
return f"{image['repository']}:{image.get('tag', 'latest')}"
else:
return image
return str(image) if image else ""

@property
def image_repository_name(self) -> str:
Expand Down Expand Up @@ -139,20 +142,31 @@ def image_prefix(self) -> str:

@property
def cluster(self) -> str:
return self.values.get("cluster", "")
cluster = self.values.get("cluster", "")
return str(cluster)

@property
def tags(self) -> list[str]:
return self.values.get("tags", [])
tags = self.values.get("tags", [])
return list(tags)

@property
def service_account_name(self) -> str:
return self.values.get("serviceAccount", {}).get("name") or self.values.get("serviceAccountName") or "default"
service_account = self.values.get("serviceAccount", {})
if isinstance(service_account, dict):
name = service_account.get("name")
if name:
return str(name)
service_account_name = self.values.get("serviceAccountName")
if service_account_name:
return str(service_account_name)
return "default"

@property
def secrets(self) -> dict[str, str]:
# TODO: This should be a first class property
return self.values.get("secrets", {})
secrets = self.values.get("secrets", {})
return dict(secrets)


class Chart:
Expand All @@ -177,7 +191,7 @@ class Chart:
chart: https://github.com/uptick/workforce
"""

def __init__(self, definition: dict | str):
def __init__(self, definition: dict[str, Any] | str):
if isinstance(definition, str):
# for backwards compat, any chart definition which is a string, is a git repo
self.type = "git"
Expand Down
7 changes: 4 additions & 3 deletions gitops/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import os
from typing import Any

import yaml


def load_yaml(path: str) -> dict:
def load_yaml(path: str) -> dict[str, Any]:
with open(path) as file:
return resolve_values(yaml.safe_load(file), path)


def resolve_values(values: dict, path: str) -> dict:
def resolve_values(values: dict[str, Any], path: str) -> dict[str, Any]:
if "extends" not in values:
return values
parent_values = load_yaml(os.path.join(os.path.dirname(path), values["extends"]))
return deep_merge(parent_values, values)


def deep_merge(parent: dict, child: dict) -> dict:
def deep_merge(parent: dict[str, Any], child: dict[str, Any]) -> dict[str, Any]:
"""Deeply merge two dictionaries.

Dictionary entries will be followed and merged, anything else will be
Expand Down
81 changes: 45 additions & 36 deletions gitops/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import os
import uuid
from typing import Any

from colorama import Fore
from invoke import run, task
Expand All @@ -16,7 +18,7 @@


@task
def summary(ctx, filter="", exclude=""):
def summary(ctx: Any, filter: str = "", exclude: str = "") -> None:
"""Produce a summary of apps, their tags, and their expected images & replicas.
May not necessarily reflect actual app statuses if recent changes haven't yet been pushed to
the remote, or the deployment has failed.
Expand All @@ -32,18 +34,18 @@ def summary(ctx, filter="", exclude=""):

@task
def bump( # noqa: C901
ctx,
filter,
exclude="",
image_tag=None,
prefix=None,
autoexclude_inactive=True,
interactive=True,
push=False,
redeploy=False,
skip_migrations=False,
skip_deploy=False,
):
ctx: Any,
filter: str,
exclude: str = "",
image_tag: str | None = None,
prefix: str | None = None,
autoexclude_inactive: bool = True,
interactive: bool = True,
push: bool = False,
redeploy: bool = False,
skip_migrations: bool = False,
skip_deploy: bool = False,
) -> None:
"""Bump image tag on selected app(s).
Provide `image_tag` to set to a specific image tag, or provide `prefix` to use latest image
with the given prefix.
Expand Down Expand Up @@ -134,16 +136,16 @@ def bump( # noqa: C901

@task
def command(
ctx,
filter,
command,
exclude="",
cleanup=True,
sequential=True,
interactive=True,
cpu=0,
memory=0,
):
ctx: Any,
filter: str,
command: str,
exclude: str = "",
cleanup: bool = True,
sequential: bool = True,
interactive: bool = True,
cpu: int = 0,
memory: int = 0,
) -> None:
"""Run command on selected app(s).

eg. inv command customer,sandbox -e aesg "python manage.py migrate"
Expand Down Expand Up @@ -183,7 +185,7 @@ def command(


@task
def tag(ctx, filter, tag, exclude=""):
def tag(ctx: Any, filter: str, tag: str, exclude: str = "") -> None:
"""Set a tag on selected app(s)."""
try:
apps = get_apps(
Expand All @@ -209,7 +211,7 @@ def tag(ctx, filter, tag, exclude=""):


@task
def untag(ctx, filter, tag, exclude=""):
def untag(ctx: Any, filter: str, tag: str, exclude: str = "") -> None:
"""Unset a tag from selected app(s)."""
try:
apps = get_apps(
Expand All @@ -235,32 +237,34 @@ def untag(ctx, filter, tag, exclude=""):


@task # TODO: want `keys` to be optional-positional: https://github.com/pyinvoke/invoke/issues/159
def getenv(ctx, filter, keys="", exclude=""):
def getenv(ctx: Any, filter: str, keys: str = "", exclude: str = "") -> None:
"""Get one or more env vars on selected app(s)."""
_getenv("environment", filter, exclude, keys)


@task # TODO: want `keys` to be optional-positional: https://github.com/pyinvoke/invoke/issues/159
def getsecrets(ctx, filter, keys="", exclude=""):
def getsecrets(ctx: Any, filter: str, keys: str = "", exclude: str = "") -> None:
"""Get one or more secrets on selected app(s)."""
_getenv("secrets", filter, exclude, keys)


def _getenv(env_or_secrets, filter, exclude, filter_values):
filter_values = filter_values.split(",") if filter_values else ""
def _getenv(env_or_secrets: str, filter: str, exclude: str, filter_values: str) -> None:
filter_values_list = filter_values.split(",") if filter_values else []
apps = get_apps(filter=filter, exclude=exclude, mode="SILENT")
for app in apps:
print("-" * 20, progress(app.name), sep="\n")
values = app.values.get(env_or_secrets)
if isinstance(values, dict):
filtered_values = {k: v for k, v in values.items() if k in filter_values} if filter_values else values
filtered_values = (
{k: v for k, v in values.items() if k in filter_values_list} if filter_values_list else values
)
for k, v in filtered_values.items():
print(f"{k}={v}")
else:
print(warning(f"No {env_or_secrets} set."))


def _sort_envs(envs):
def _sort_envs(envs: dict[str, Any]) -> dict[str, Any]:
sorted_envs = {}
for e in config.getlist("env_order", fallback=""):
if e in envs:
Expand All @@ -271,7 +275,7 @@ def _sort_envs(envs):


@task
def setenv(ctx, filter, values, exclude=""):
def setenv(ctx: Any, filter: str, values: str, exclude: str = "") -> None:
"""Set one or more env vars on selected app(s).

eg. inv setenv customer,sandbox BG_RUNNER=DRAMATIQ,BUMP=2
Expand All @@ -292,11 +296,16 @@ def setenv(ctx, filter, values, exclude=""):
print(success_negative("Aborted."))
return
for app in apps:
# Split on first = only, in case values contain =
new_envs: dict[str, Any] = {}
for e in splitenvs:
key, value = e.split("=", 1)
new_envs[key] = value
update_app(
app.name,
environment=_sort_envs(
{
**dict(tuple(e.split("=")) for e in splitenvs),
**new_envs,
**app.values.get("environment", {}),
}
),
Expand All @@ -309,7 +318,7 @@ def setenv(ctx, filter, values, exclude=""):


@task
def unsetenv(ctx, filter, values, exclude=""):
def unsetenv(ctx: Any, filter: str, values: str, exclude: str = "") -> None:
"""Unset one or more env vars on selected app(s).

eg. inv unsetenv customer,sandbox BG_RUNNER,BUMP
Expand Down Expand Up @@ -343,7 +352,7 @@ def unsetenv(ctx, filter, values, exclude=""):


@task
def setcluster(ctx, filter, cluster, exclude=""):
def setcluster(ctx: Any, filter: str, cluster: str, exclude: str = "") -> None:
"""Move selected app(s) to given cluster.

eg. inv setcluster customer,sandbox eks-prod
Expand All @@ -370,7 +379,7 @@ def setcluster(ctx, filter, cluster, exclude=""):
print(success("Done!"))


def git_push(cluster_path: str, retry: int = 3):
def git_push(cluster_path: str | os.PathLike[str], retry: int = 3) -> None:
"""Git pushes in a directory and retries if commits already exist"""
print(progress(f"Pushing changes to {cluster_path}"))
attempts = 0
Expand Down
Loading
Loading