From d756d497af42988abef9ddd5b6193b395ab6fb29 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Wed, 31 May 2023 19:31:58 +0800 Subject: [PATCH 01/11] first commit --- robusta_krr/core/integrations/metrics.py | 132 ++++++++++++++++++ .../core/integrations/prometheus/loader.py | 8 +- .../metrics_service/base_metric_service.py | 4 + .../prometheus_metrics_service.py | 2 +- robusta_krr/core/runner.py | 15 +- 5 files changed, 151 insertions(+), 10 deletions(-) create mode 100644 robusta_krr/core/integrations/metrics.py diff --git a/robusta_krr/core/integrations/metrics.py b/robusta_krr/core/integrations/metrics.py new file mode 100644 index 00000000..9cfcbee1 --- /dev/null +++ b/robusta_krr/core/integrations/metrics.py @@ -0,0 +1,132 @@ +import asyncio +import itertools +from typing import Optional, List, Dict +from collections import defaultdict + +from robusta_krr.core.models.config import Config +from robusta_krr.core.models.objects import K8sObjectData, PodData +from robusta_krr.core.models.result import ResourceAllocations, ResourceType, RecommendationValue +from robusta_krr.utils.configurable import Configurable +from .prometheus.loader import MetricsLoader + +class PrometheusLoader(Configurable): + def __init__(self, config: Config): + super().__init__(config) + self.metrics_loader = MetricsLoader(config) + + async def list_clusters(self) -> Optional[list[str]]: + self.debug("Working in Prometheus-based workload discovery mode. Only support a single cluster") + return None + + async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]: + """List all scannable objects from Prometheus + In this workload discovery mode, clusters are not supported. + + Returns: + A list of scannable objects. + """ + self.info(f"Listing scannable objects from Prometheus") + self.debug(f"Namespaces: {self.config.namespaces}") + try: + objects_tuple = await asyncio.gather( + self._list_deployments(), + ) + except Exception as e: + self.error(f"Error trying to list pods from Prometheus: {e}") + self.debug_exception() + return [] + + objects = itertools.chain(*objects_tuple) + if self.config.namespaces == "*": + # NOTE: We are not scanning kube-system namespace by default + result = [obj for obj in objects if obj.namespace != "kube-system"] + else: + result = [obj for obj in objects if obj.namespace in self.config.namespaces] + + namespaces = {obj.namespace for obj in result} + self.info(f"Found {len(result)} objects across {len(namespaces)} namespaces from Prometheus({self.config.prometheus_url})") + + return result + + async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations: + limits = await self.metrics_loader.loader.query("avg by(resource) (kube_pod_container_resource_limits{" + f'namespace="{namespace}", ' + f'pod=~"{pod_selector}", ' + f'container="{container_name}"' + "})") + requests = await self.metrics_loader.loader.query("avg by(resource) (kube_pod_container_resource_requests{" + f'namespace="{namespace}", ' + f'pod=~"{pod_selector}", ' + f'container="{container_name}"' + "})") + requests_values: Dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + limits_values: Dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + for limit in limits: + if limit['metric']['resource'] == ResourceType.CPU: + limits_values[ResourceType.CPU] = float(limit['value'][1]) + elif limit['metric']['resource'] == ResourceType.Memory: + limits_values[ResourceType.Memory] = float(limit['value'][1]) + + for request in requests: + if request['metric']['resource'] == ResourceType.CPU: + requests_values[ResourceType.CPU] = float(request['value'][1]) + elif request['metric']['resource'] == ResourceType.Memory: + requests_values[ResourceType.Memory] = float(request['value'][1]) + return ResourceAllocations(requests=requests_values, limits=limits_values) + + + async def __build_from_owner(self, namespace: str, app_name: str, containers: List[str], pod_names: List[str]) -> List[K8sObjectData]: + return [ + K8sObjectData( + cluster="default", + namespace=namespace, + name=app_name, + kind="Deployment", + container=container_name, + allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find + pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods + ) + for container_name in containers + ] + + async def _list_containers(self, namespace: str, pod_selector: str) -> List[str]: + containers = await self.metrics_loader.loader.query("count by (container) (kube_pod_container_info{" + f'namespace="{namespace}", ' + f'pod=~"{pod_selector}"' + "})") + return [container['metric']['container'] for container in containers] + + async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str) -> list[K8sObjectData]: + if pod_owner_kind == "ReplicaSet": + # owner_name is ReplicaSet names + pods = await self.metrics_loader.loader.query("count by (owner_name, replicaset, pod) (kube_pod_owner{" + f'namespace="{namespace}", ' + f'owner_name=~"{owner_name}", ' + 'owner_kind="ReplicaSet"})') + if pods is None or len(pods) == 0: + return [] # no container + # [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']}, + # {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}] + pod_names = [pod['metric']['pod'] for pod in pods] + container_names = await self._list_containers(namespace, "|".join(pod_names)) + return await self.__build_from_owner(namespace, app_name, container_names, pod_names) + return [] + + async def _list_deployments(self) -> list[K8sObjectData]: + self.debug(f"Listing deployments in namespace({self.config.namespaces}) from Prometheus({self.config.prometheus_url})") + ns = "|".join(self.config.namespaces) + replicasets = await self.metrics_loader.loader.query("count by (namespace, owner_name, replicaset) (kube_replicaset_owner{" + f'namespace=~"{ns}", ' + 'owner_kind="Deployment"})') + # groupBy: 'ns/owner_name' => [{metadata}...] + pod_owner_kind = "ReplicaSet" + replicaset_dict = defaultdict(list) + for replicaset in replicasets: + replicaset_dict[replicaset['metric']['namespace'] + "/" + replicaset['metric']['owner_name']].append(replicaset['metric']) + objects = await asyncio.gather( + *[ + self._list_containers_in_pods(deployment[0]['owner_name'], pod_owner_kind, deployment[0]['namespace'], "|".join(list(map(lambda metric: metric['replicaset'], deployment)))) + for deployment in replicaset_dict.values() + ] + ) + return list(itertools.chain(*objects)) diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 143fb638..f1b75ab1 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -26,6 +26,8 @@ class MetricsLoader(Configurable): + loader: MetricsService + def __init__( self, config: Config, @@ -49,10 +51,10 @@ def __init__( if cluster is not None else None ) - self.loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster) - if not self.loader: + loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster) + if not loader: raise PrometheusNotFound("No Prometheus or metrics service found") - + self.loader = loader self.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster") def get_metrics_service( diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index 4337e5e0..57d35a56 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -36,6 +36,10 @@ def __init__( @abc.abstractmethod def check_connection(self): ... + + @abc.abstractmethod + async def query(self, query: str) -> dict: + ... def name(self) -> str: classname = self.__class__.__name__ diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 896387f9..174d1644 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -116,7 +116,7 @@ def __init__( if self.auth_header: headers = {"Authorization": self.auth_header} - elif not self.config.inside_cluster: + elif not self.config.inside_cluster and self.api_client is not None: self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) self.prometheus = CustomPrometheusConnect(url=self.url, disable_ssl=not self.ssl_enabled, headers=headers) diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 890e19d4..b8ed1414 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -7,6 +7,7 @@ from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult from robusta_krr.core.integrations.kubernetes import KubernetesLoader from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, MetricsLoader, PrometheusNotFound +from robusta_krr.core.integrations.metrics import PrometheusLoader from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData from robusta_krr.core.models.result import ( @@ -28,7 +29,7 @@ class Runner(Configurable): def __init__(self, config: Config) -> None: super().__init__(config) - self._k8s_loader = KubernetesLoader(self.config) + self._obj_loader = PrometheusLoader(self.config) self._metrics_service_loaders: dict[Optional[str], Union[MetricsLoader, Exception]] = {} self._metrics_service_loaders_error_logged: set[Exception] = set() self._strategy = self.config.create_strategy() @@ -153,7 +154,7 @@ async def _gather_objects_recommendations( ] async def _collect_result(self) -> Result: - clusters = await self._k8s_loader.list_clusters() + clusters = await self._obj_loader.list_clusters() if len(clusters) > 1 and self.config.prometheus_url: # this can only happen for multi-cluster querying a single centeralized prometheus # In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect @@ -162,7 +163,7 @@ async def _collect_result(self) -> Result: ) self.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}') - objects = await self._k8s_loader.list_scannable_objects(clusters) + objects = await self._obj_loader.list_scannable_objects(clusters) if len(objects) == 0: self.warning("Current filters resulted in no objects available to scan.") @@ -195,9 +196,11 @@ async def run(self) -> None: try: self.config.load_kubeconfig() except Exception as e: - self.error(f"Could not load kubernetes configuration: {e}") - self.error("Try to explicitly set --context and/or --kubeconfig flags.") - return + if self.config.prometheus_url is None: + self.error(f"Could not load kubernetes configuration: {e}") + self.error("Try to explicitly set --context and/or --kubeconfig flags.") + return + self.warning("Could not load kubernetes configuration, use Prometheus-based worload instead.") try: result = await self._collect_result() From 67f264cf0dfdcc889047b66d354eb959174d65b0 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Thu, 1 Jun 2023 12:06:12 +0800 Subject: [PATCH 02/11] fix all bugs --- robusta_krr/core/integrations/metrics.py | 2 +- .../core/integrations/prometheus/metrics/cpu_metric.py | 5 ++++- .../integrations/prometheus/metrics/memory_metric.py | 5 ++++- robusta_krr/core/models/objects.py | 1 + robusta_krr/core/runner.py | 10 +++++++++- 5 files changed, 19 insertions(+), 4 deletions(-) diff --git a/robusta_krr/core/integrations/metrics.py b/robusta_krr/core/integrations/metrics.py index 9cfcbee1..e7fd417c 100644 --- a/robusta_krr/core/integrations/metrics.py +++ b/robusta_krr/core/integrations/metrics.py @@ -78,7 +78,7 @@ async def __parse_allocation(self, namespace: str, pod_selector: str, container_ async def __build_from_owner(self, namespace: str, app_name: str, containers: List[str], pod_names: List[str]) -> List[K8sObjectData]: return [ K8sObjectData( - cluster="default", + cluster=None, namespace=namespace, name=app_name, kind="Deployment", diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py index e2d364b4..6f9a17c1 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -8,7 +8,10 @@ @bind_metric(ResourceType.CPU) class CPUMetricLoader(BaseFilteredMetricLoader): def get_query(self, object: K8sObjectData) -> str: - pods_selector = "|".join(pod.name for pod in object.pods) + if len(object.pods) < 300: + pods_selector = "|".join(pod.name for pod in object.pods) + else: + pods_selector = "|".join(set([pod.name[:pod.name.rfind('-')] + '-[0-9a-z]{5}' for pod in object.pods])) cluster_label = self.get_prometheus_cluster_label() return ( "sum(irate(container_cpu_usage_seconds_total{" diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index 5942ec14..a5ff91c0 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -8,7 +8,10 @@ @bind_metric(ResourceType.Memory) class MemoryMetricLoader(BaseFilteredMetricLoader): def get_query(self, object: K8sObjectData) -> str: - pods_selector = "|".join(pod.name for pod in object.pods) + if len(object.pods) < 300: + pods_selector = "|".join(pod.name for pod in object.pods) + else: + pods_selector = "|".join(set([pod.name[:pod.name.rfind('-')] + '-[0-9a-z]{5}' for pod in object.pods])) cluster_label = self.get_prometheus_cluster_label() return ( "sum(container_memory_working_set_bytes{" diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index 52c4fbd4..90af54a6 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -1,6 +1,7 @@ from typing import Optional import pydantic as pd +from typing import Optional from robusta_krr.core.models.allocations import ResourceAllocations diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index b8ed1414..b43d0d21 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -23,6 +23,13 @@ from robusta_krr.utils.progress_bar import ProgressBar from robusta_krr.utils.version import get_version +async def gather_with_concurrency(n: int, *coros): + semaphore = asyncio.Semaphore(n) + + async def sem_coro(coro): + async with semaphore: + return await coro + return await asyncio.gather(*(sem_coro(c) for c in coros)) class Runner(Configurable): EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) @@ -137,7 +144,8 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> tupl async def _gather_objects_recommendations( self, objects: list[K8sObjectData] ) -> list[tuple[ResourceAllocations, MetricsData]]: - recommendations: list[tuple[RunResult, MetricsData]] = await asyncio.gather( + recommendations: list[tuple[RunResult, MetricsData]] = await gather_with_concurrency( + 2, *[self._calculate_object_recommendations(object) for object in objects] ) From 180b19a960fd004cad359d29e592c7a059c9c66f Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Thu, 15 Jun 2023 19:06:23 +0800 Subject: [PATCH 03/11] support labels in the table --- robusta_krr/core/integrations/metrics.py | 27 ++++++++++++++++++++---- robusta_krr/core/models/objects.py | 3 ++- robusta_krr/core/runner.py | 2 +- robusta_krr/formatters/table.py | 2 ++ 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/robusta_krr/core/integrations/metrics.py b/robusta_krr/core/integrations/metrics.py index e7fd417c..ae59a22c 100644 --- a/robusta_krr/core/integrations/metrics.py +++ b/robusta_krr/core/integrations/metrics.py @@ -75,11 +75,12 @@ async def __parse_allocation(self, namespace: str, pod_selector: str, container_ return ResourceAllocations(requests=requests_values, limits=limits_values) - async def __build_from_owner(self, namespace: str, app_name: str, containers: List[str], pod_names: List[str]) -> List[K8sObjectData]: + async def __build_from_owner(self, namespace: str, app_name: str, containers: List[str], pod_names: List[str], labels: Dict[str, str]) -> List[K8sObjectData]: return [ K8sObjectData( cluster=None, namespace=namespace, + labels=labels, name=app_name, kind="Deployment", container=container_name, @@ -96,7 +97,7 @@ async def _list_containers(self, namespace: str, pod_selector: str) -> List[str] "})") return [container['metric']['container'] for container in containers] - async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str) -> list[K8sObjectData]: + async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str, labels: Dict[str, str]) -> list[K8sObjectData]: if pod_owner_kind == "ReplicaSet": # owner_name is ReplicaSet names pods = await self.metrics_loader.loader.query("count by (owner_name, replicaset, pod) (kube_pod_owner{" @@ -109,9 +110,24 @@ async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, nam # {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}] pod_names = [pod['metric']['pod'] for pod in pods] container_names = await self._list_containers(namespace, "|".join(pod_names)) - return await self.__build_from_owner(namespace, app_name, container_names, pod_names) + return await self.__build_from_owner(namespace, app_name, container_names, pod_names, labels) return [] + async def _list_labels(self, owner_kind: str, namespace: str, owner_name: str) -> Dict[str, str]: + if owner_kind == "Deployment": + self.debug(f"{owner_kind} in {namespace}: {owner_name}") + labels_metric = await self.metrics_loader.loader.query("kube_deployment_labels{" + f'namespace="{namespace}", ' + f'deployment="{owner_name}"' + "}") + if len(labels_metric) == 0: + return {} + labels = {} + for key in labels_metric[0]['metric'].keys(): + if key.startswith('label_'): + labels[key[6:]] = labels_metric[0]['metric'][key] + return labels + async def _list_deployments(self) -> list[K8sObjectData]: self.debug(f"Listing deployments in namespace({self.config.namespaces}) from Prometheus({self.config.prometheus_url})") ns = "|".join(self.config.namespaces) @@ -125,7 +141,10 @@ async def _list_deployments(self) -> list[K8sObjectData]: replicaset_dict[replicaset['metric']['namespace'] + "/" + replicaset['metric']['owner_name']].append(replicaset['metric']) objects = await asyncio.gather( *[ - self._list_containers_in_pods(deployment[0]['owner_name'], pod_owner_kind, deployment[0]['namespace'], "|".join(list(map(lambda metric: metric['replicaset'], deployment)))) + self._list_containers_in_pods(deployment[0]['owner_name'], pod_owner_kind, deployment[0]['namespace'], + "|".join(list(map(lambda metric: metric['replicaset'], deployment))), + await self._list_labels("Deployment", deployment[0]['namespace'], deployment[0]['owner_name']) + ) for deployment in replicaset_dict.values() ] ) diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index 90af54a6..b72ed0cd 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -1,7 +1,7 @@ from typing import Optional import pydantic as pd -from typing import Optional +from typing import Optional, Dict from robusta_krr.core.models.allocations import ResourceAllocations @@ -32,6 +32,7 @@ class K8sObjectData(pd.BaseModel): hpa: Optional[HPAData] namespace: str kind: str + labels: Dict[str, str] allocations: ResourceAllocations def __str__(self) -> str: diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index b43d0d21..d1264f1b 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -145,7 +145,7 @@ async def _gather_objects_recommendations( self, objects: list[K8sObjectData] ) -> list[tuple[ResourceAllocations, MetricsData]]: recommendations: list[tuple[RunResult, MetricsData]] = await gather_with_concurrency( - 2, + 3, *[self._calculate_object_recommendations(object) for object in objects] ) diff --git a/robusta_krr/formatters/table.py b/robusta_krr/formatters/table.py index 846732a4..561b8d85 100644 --- a/robusta_krr/formatters/table.py +++ b/robusta_krr/formatters/table.py @@ -89,6 +89,7 @@ def table(result: Result) -> Table: if cluster_count > 1: table.add_column("Cluster", style="cyan") table.add_column("Namespace", style="cyan") + table.add_column("Group", style="cyan") table.add_column("Name", style="cyan") table.add_column("Pods", style="cyan") table.add_column("Old Pods", style="cyan") @@ -113,6 +114,7 @@ def table(result: Result) -> Table: cells.append(item.object.cluster if full_info_row else "") cells += [ item.object.namespace if full_info_row else "", + item.object.labels['group'] if "group" in item.object.labels else "" if full_info_row else "", item.object.name if full_info_row else "", f"{item.object.current_pods_count}" if full_info_row else "", f"{item.object.deleted_pods_count}" if full_info_row else "", From 9279ce8732ebee17d0c5a51fd193046332802c8d Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Mon, 19 Jun 2023 14:26:45 +0800 Subject: [PATCH 04/11] fix None cluster issue --- robusta_krr/core/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index d1264f1b..4c56afbc 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -163,7 +163,7 @@ async def _gather_objects_recommendations( async def _collect_result(self) -> Result: clusters = await self._obj_loader.list_clusters() - if len(clusters) > 1 and self.config.prometheus_url: + if clusters is not None and len(clusters) > 1 and self.config.prometheus_url: # this can only happen for multi-cluster querying a single centeralized prometheus # In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect raise ClusterNotSpecifiedException( From 28bf5a9c6864e56b22c5dc9d32dd902a898fdbfd Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Mon, 19 Jun 2023 15:58:37 +0800 Subject: [PATCH 05/11] fix name --- robusta_krr/core/integrations/metrics.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/robusta_krr/core/integrations/metrics.py b/robusta_krr/core/integrations/metrics.py index ae59a22c..0236e981 100644 --- a/robusta_krr/core/integrations/metrics.py +++ b/robusta_krr/core/integrations/metrics.py @@ -141,11 +141,11 @@ async def _list_deployments(self) -> list[K8sObjectData]: replicaset_dict[replicaset['metric']['namespace'] + "/" + replicaset['metric']['owner_name']].append(replicaset['metric']) objects = await asyncio.gather( *[ - self._list_containers_in_pods(deployment[0]['owner_name'], pod_owner_kind, deployment[0]['namespace'], - "|".join(list(map(lambda metric: metric['replicaset'], deployment))), - await self._list_labels("Deployment", deployment[0]['namespace'], deployment[0]['owner_name']) + self._list_containers_in_pods(replicas[0]['owner_name'], pod_owner_kind, replicas[0]['namespace'], + "|".join(list(map(lambda metric: metric['replicaset'], replicas))), + await self._list_labels("Deployment", replicas[0]['namespace'], replicas[0]['owner_name']) ) - for deployment in replicaset_dict.values() + for replicas in replicaset_dict.values() ] ) return list(itertools.chain(*objects)) From 8bb28573a24cafadfc6b3771989647c388dca916 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Mon, 3 Jul 2023 14:22:50 +0800 Subject: [PATCH 06/11] abstract workload discovery --- .../core/integrations/base_workload_loader.py | 15 +++++++++++++++ robusta_krr/core/integrations/kubernetes.py | 3 ++- robusta_krr/core/integrations/metrics.py | 4 ++-- robusta_krr/core/models/config.py | 1 + robusta_krr/core/runner.py | 11 +++++++---- robusta_krr/main.py | 7 +++++++ 6 files changed, 34 insertions(+), 7 deletions(-) create mode 100644 robusta_krr/core/integrations/base_workload_loader.py diff --git a/robusta_krr/core/integrations/base_workload_loader.py b/robusta_krr/core/integrations/base_workload_loader.py new file mode 100644 index 00000000..e6771a65 --- /dev/null +++ b/robusta_krr/core/integrations/base_workload_loader.py @@ -0,0 +1,15 @@ +import abc +from typing import Optional + +from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.utils.configurable import Configurable + +class WorkloadLoader(Configurable, abc.ABC): + + @abc.abstractmethod + async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]: + ... + + @abc.abstractmethod + async def list_clusters(self) -> Optional[list[str]]: + ... \ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index 49401213..e8a9b9aa 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -24,6 +24,7 @@ from robusta_krr.core.models.objects import K8sObjectData, PodData, HPAData from robusta_krr.core.models.result import ResourceAllocations from robusta_krr.utils.configurable import Configurable +from .base_workload_loader import WorkloadLoader AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] @@ -243,7 +244,7 @@ def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[f } -class KubernetesLoader(Configurable): +class KubernetesLoader(WorkloadLoader): async def list_clusters(self) -> Optional[list[str]]: """List all clusters. diff --git a/robusta_krr/core/integrations/metrics.py b/robusta_krr/core/integrations/metrics.py index 0236e981..437a32fc 100644 --- a/robusta_krr/core/integrations/metrics.py +++ b/robusta_krr/core/integrations/metrics.py @@ -6,10 +6,10 @@ from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData, PodData from robusta_krr.core.models.result import ResourceAllocations, ResourceType, RecommendationValue -from robusta_krr.utils.configurable import Configurable from .prometheus.loader import MetricsLoader +from .base_workload_loader import WorkloadLoader -class PrometheusLoader(Configurable): +class PrometheusLoader(WorkloadLoader): def __init__(self, config: Config): super().__init__(config) self.metrics_loader = MetricsLoader(config) diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 38e3f4fb..829ffeb4 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -16,6 +16,7 @@ class Config(pd.BaseSettings): clusters: Union[list[str], Literal["*"], None] = None kubeconfig: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") + discovery_method: Union[Literal["api-server"], Literal["prometheus"]] = pd.Field("api-server") # Value settings cpu_min_value: int = pd.Field(5, ge=0) # in millicores diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 4c56afbc..a28dcf46 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -36,7 +36,10 @@ class Runner(Configurable): def __init__(self, config: Config) -> None: super().__init__(config) - self._obj_loader = PrometheusLoader(self.config) + if config.discovery_method == "api-server": + self._workload_loader = KubernetesLoader(self.config) + else: + self._workload_loader = PrometheusLoader(self.config) self._metrics_service_loaders: dict[Optional[str], Union[MetricsLoader, Exception]] = {} self._metrics_service_loaders_error_logged: set[Exception] = set() self._strategy = self.config.create_strategy() @@ -145,7 +148,7 @@ async def _gather_objects_recommendations( self, objects: list[K8sObjectData] ) -> list[tuple[ResourceAllocations, MetricsData]]: recommendations: list[tuple[RunResult, MetricsData]] = await gather_with_concurrency( - 3, + self.config.max_workers, *[self._calculate_object_recommendations(object) for object in objects] ) @@ -162,7 +165,7 @@ async def _gather_objects_recommendations( ] async def _collect_result(self) -> Result: - clusters = await self._obj_loader.list_clusters() + clusters = await self._workload_loader.list_clusters() if clusters is not None and len(clusters) > 1 and self.config.prometheus_url: # this can only happen for multi-cluster querying a single centeralized prometheus # In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect @@ -171,7 +174,7 @@ async def _collect_result(self) -> Result: ) self.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}') - objects = await self._obj_loader.list_scannable_objects(clusters) + objects = await self._workload_loader.list_scannable_objects(clusters) if len(objects) == 0: self.warning("Current filters resulted in no objects available to scan.") diff --git a/robusta_krr/main.py b/robusta_krr/main.py index 03b4f21b..28f1ef0c 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -73,6 +73,12 @@ def {func_name}( help="List of namespaces to run on. By default, will run on all namespaces.", rich_help_panel="Kubernetes Settings" ), + discovery_method: Optional[str] = typer.Option( + "api-server", + "--discovery-method", + help="Method to discover workload in the cluster.", + rich_help_panel="Kubernetes Settings" + ), prometheus_url: Optional[str] = typer.Option( None, "--prometheus-url", @@ -124,6 +130,7 @@ def {func_name}( kubeconfig=kubeconfig, clusters="*" if all_clusters else clusters, namespaces="*" if "*" in namespaces else namespaces, + discovery_method=discovery_method, prometheus_url=prometheus_url, prometheus_auth_header=prometheus_auth_header, prometheus_ssl_enabled=prometheus_ssl_enabled, From 88b98f837d474a9a8fdbca5c7dba88f086c29b29 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Mon, 3 Jul 2023 18:35:54 +0800 Subject: [PATCH 07/11] revert BaseFilteredMetricLoader --- .../core/integrations/prometheus/metrics/cpu_metric.py | 5 +---- .../core/integrations/prometheus/metrics/memory_metric.py | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py index 6f9a17c1..e2d364b4 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py @@ -8,10 +8,7 @@ @bind_metric(ResourceType.CPU) class CPUMetricLoader(BaseFilteredMetricLoader): def get_query(self, object: K8sObjectData) -> str: - if len(object.pods) < 300: - pods_selector = "|".join(pod.name for pod in object.pods) - else: - pods_selector = "|".join(set([pod.name[:pod.name.rfind('-')] + '-[0-9a-z]{5}' for pod in object.pods])) + pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return ( "sum(irate(container_cpu_usage_seconds_total{" diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py index a5ff91c0..5942ec14 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory_metric.py @@ -8,10 +8,7 @@ @bind_metric(ResourceType.Memory) class MemoryMetricLoader(BaseFilteredMetricLoader): def get_query(self, object: K8sObjectData) -> str: - if len(object.pods) < 300: - pods_selector = "|".join(pod.name for pod in object.pods) - else: - pods_selector = "|".join(set([pod.name[:pod.name.rfind('-')] + '-[0-9a-z]{5}' for pod in object.pods])) + pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() return ( "sum(container_memory_working_set_bytes{" From 85c62d58a61c851045c77946f3fa5f541a12aa51 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Mon, 3 Jul 2023 18:37:54 +0800 Subject: [PATCH 08/11] revert table format --- robusta_krr/formatters/table.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/robusta_krr/formatters/table.py b/robusta_krr/formatters/table.py index 561b8d85..846732a4 100644 --- a/robusta_krr/formatters/table.py +++ b/robusta_krr/formatters/table.py @@ -89,7 +89,6 @@ def table(result: Result) -> Table: if cluster_count > 1: table.add_column("Cluster", style="cyan") table.add_column("Namespace", style="cyan") - table.add_column("Group", style="cyan") table.add_column("Name", style="cyan") table.add_column("Pods", style="cyan") table.add_column("Old Pods", style="cyan") @@ -114,7 +113,6 @@ def table(result: Result) -> Table: cells.append(item.object.cluster if full_info_row else "") cells += [ item.object.namespace if full_info_row else "", - item.object.labels['group'] if "group" in item.object.labels else "" if full_info_row else "", item.object.name if full_info_row else "", f"{item.object.current_pods_count}" if full_info_row else "", f"{item.object.deleted_pods_count}" if full_info_row else "", From 325274bb8fa30da7bdeb9b40c8bb4efeb81a3862 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Sat, 8 Jul 2023 10:07:34 +0800 Subject: [PATCH 09/11] fix Literal --- robusta_krr/core/models/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 829ffeb4..67ddf766 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -16,7 +16,7 @@ class Config(pd.BaseSettings): clusters: Union[list[str], Literal["*"], None] = None kubeconfig: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") - discovery_method: Union[Literal["api-server"], Literal["prometheus"]] = pd.Field("api-server") + discovery_method: Literal["api-server", "prometheus"] = pd.Field("api-server") # Value settings cpu_min_value: int = pd.Field(5, ge=0) # in millicores From e4b443b437911c824cbe4c75b259620e746c7432 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Thu, 13 Jul 2023 09:27:12 +0800 Subject: [PATCH 10/11] remove coroutine limit --- robusta_krr/core/runner.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 98baa33a..26233a9c 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -22,14 +22,6 @@ from robusta_krr.utils.progress_bar import ProgressBar from robusta_krr.utils.version import get_version -async def gather_with_concurrency(n: int, *coros): - semaphore = asyncio.Semaphore(n) - - async def sem_coro(coro): - async with semaphore: - return await coro - return await asyncio.gather(*(sem_coro(c) for c in coros)) - class Runner(Configurable): EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) @@ -146,8 +138,7 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> tupl async def _gather_objects_recommendations( self, objects: list[K8sObjectData] ) -> list[tuple[ResourceAllocations, MetricsData]]: - recommendations: list[tuple[RunResult, MetricsData]] = await gather_with_concurrency( - self.config.max_workers, + recommendations: list[tuple[RunResult, MetricsData]] = await asyncio.gather( *[self._calculate_object_recommendations(object) for object in objects] ) From f4267f5406aeed62ae58e5a04fc2ebc887d181cd Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Thu, 13 Jul 2023 09:31:07 +0800 Subject: [PATCH 11/11] remove labels --- robusta_krr/core/integrations/metrics.py | 26 ++++-------------------- robusta_krr/core/models/objects.py | 3 +-- 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/robusta_krr/core/integrations/metrics.py b/robusta_krr/core/integrations/metrics.py index 437a32fc..a56e3583 100644 --- a/robusta_krr/core/integrations/metrics.py +++ b/robusta_krr/core/integrations/metrics.py @@ -75,12 +75,11 @@ async def __parse_allocation(self, namespace: str, pod_selector: str, container_ return ResourceAllocations(requests=requests_values, limits=limits_values) - async def __build_from_owner(self, namespace: str, app_name: str, containers: List[str], pod_names: List[str], labels: Dict[str, str]) -> List[K8sObjectData]: + async def __build_from_owner(self, namespace: str, app_name: str, containers: List[str], pod_names: List[str]) -> List[K8sObjectData]: return [ K8sObjectData( cluster=None, namespace=namespace, - labels=labels, name=app_name, kind="Deployment", container=container_name, @@ -97,7 +96,7 @@ async def _list_containers(self, namespace: str, pod_selector: str) -> List[str] "})") return [container['metric']['container'] for container in containers] - async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str, labels: Dict[str, str]) -> list[K8sObjectData]: + async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str) -> list[K8sObjectData]: if pod_owner_kind == "ReplicaSet": # owner_name is ReplicaSet names pods = await self.metrics_loader.loader.query("count by (owner_name, replicaset, pod) (kube_pod_owner{" @@ -110,24 +109,9 @@ async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, nam # {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}] pod_names = [pod['metric']['pod'] for pod in pods] container_names = await self._list_containers(namespace, "|".join(pod_names)) - return await self.__build_from_owner(namespace, app_name, container_names, pod_names, labels) + return await self.__build_from_owner(namespace, app_name, container_names, pod_names) return [] - async def _list_labels(self, owner_kind: str, namespace: str, owner_name: str) -> Dict[str, str]: - if owner_kind == "Deployment": - self.debug(f"{owner_kind} in {namespace}: {owner_name}") - labels_metric = await self.metrics_loader.loader.query("kube_deployment_labels{" - f'namespace="{namespace}", ' - f'deployment="{owner_name}"' - "}") - if len(labels_metric) == 0: - return {} - labels = {} - for key in labels_metric[0]['metric'].keys(): - if key.startswith('label_'): - labels[key[6:]] = labels_metric[0]['metric'][key] - return labels - async def _list_deployments(self) -> list[K8sObjectData]: self.debug(f"Listing deployments in namespace({self.config.namespaces}) from Prometheus({self.config.prometheus_url})") ns = "|".join(self.config.namespaces) @@ -142,9 +126,7 @@ async def _list_deployments(self) -> list[K8sObjectData]: objects = await asyncio.gather( *[ self._list_containers_in_pods(replicas[0]['owner_name'], pod_owner_kind, replicas[0]['namespace'], - "|".join(list(map(lambda metric: metric['replicaset'], replicas))), - await self._list_labels("Deployment", replicas[0]['namespace'], replicas[0]['owner_name']) - ) + "|".join(list(map(lambda metric: metric['replicaset'], replicas)))) for replicas in replicaset_dict.values() ] ) diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index b72ed0cd..90af54a6 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -1,7 +1,7 @@ from typing import Optional import pydantic as pd -from typing import Optional, Dict +from typing import Optional from robusta_krr.core.models.allocations import ResourceAllocations @@ -32,7 +32,6 @@ class K8sObjectData(pd.BaseModel): hpa: Optional[HPAData] namespace: str kind: str - labels: Dict[str, str] allocations: ResourceAllocations def __str__(self) -> str: