Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions datadog_sync/utils/resources_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, config: Configuration) -> None:
self.sorter: Optional[TopologicalSorter] = None
self.cleanup_sorter: Optional[TopologicalSorter] = None
self.worker: Optional[Workers] = None
self._dependency_graph = Optional[Dict[Tuple[str, str], List[Tuple[str, str]]]]
self._dependency_graph: Optional[Dict[Tuple[str, str], Set[Tuple[str, str]]]] = None

@staticmethod
def _sanitize_reason(err: Exception) -> str:
Expand Down Expand Up @@ -109,7 +109,7 @@ async def reset(self) -> None:

async def apply_resources(self) -> Tuple[int, int]:
# Build dependency graph and missing resources
self._dependency_graph, missing = self.get_dependency_graph()
self._dependency_graph, missing, filtered_count = self.get_dependency_graph()

# Import resources that are missing but needed for resource connections
if self.config.force_missing_dependencies and missing:
Expand Down Expand Up @@ -206,6 +206,7 @@ async def apply_resources(self) -> Tuple[int, int]:
)
else:
await self.worker.schedule_workers(additional_coros=[self.run_sorter()])
self.worker.counter.filtered = filtered_count
self.config.logger.info(f"finished syncing resource items: {self.worker.counter}.")

self.config.state.dump_state()
Expand All @@ -216,14 +217,6 @@ async def _apply_resource_cb(self, q_item: List) -> None:

try:
r_class = self.config.resources[resource_type]

# Filter BEFORE deepcopy to avoid unnecessary memory allocation.
# Safe because filter() only reads the resource dict, never mutates it.
if not r_class.filter(self.config.state.source[resource_type][_id]):
self.worker.counter.increment_filtered()
self._emit(resource_type, _id, "sync", "filtered")
return

resource = deepcopy(self.config.state.source[resource_type][_id])

if not r_class.resource_config.concurrent:
Expand Down Expand Up @@ -283,7 +276,7 @@ async def _apply_resource_cb(self, q_item: List) -> None:
r_class.resource_config.async_lock.release()

async def diffs(self) -> None:
self._dependency_graph, _ = self.get_dependency_graph()
self._dependency_graph, _, _ = self.get_dependency_graph()

# Run pre-apply hooks
resource_types = set(i[0] for i in self._dependency_graph.keys())
Expand Down Expand Up @@ -318,10 +311,6 @@ async def _diffs_worker_cb(self, q_item: List) -> None:
else:
resource = self.config.state.source[resource_type][_id]

if not r_class.filter(resource):
self._emit(resource_type, _id, "sync", "filtered")
return

try:
await r_class._pre_resource_action_hook(_id, resource)
except SkipResource as e:
Expand Down Expand Up @@ -542,22 +531,41 @@ async def run_cleanup_sorter(self):

await asyncio.sleep(0)

def get_dependency_graph(self) -> Tuple[Dict[Tuple[str, str], List[Tuple[str, str]]], Set[Tuple[str, str]]]:
def get_dependency_graph(
self,
) -> Tuple[Dict[Tuple[str, str], Set[Tuple[str, str]]], Set[Tuple[str, str]], int]:
"""Build the dependency graph for all resources.

Returns:
Tuple[Dict[Tuple[str, str], List[Tuple[str, str]]], Set[Tuple[str, str]]]: Returns
a tuple of the dependency graph and missing resources.
Tuple of (dependency_graph, missing_resources, filtered_count).
"""
dependency_graph = {}
missing_resources = set()
filtered_out = set()

for (resource_type, _id), resource in self.config.state.get_all_resources(self.config.resources_arg).items():
r_class = self.config.resources[resource_type]
if not r_class.filter(resource):
filtered_out.add((resource_type, _id))
continue

for resource_type, _id in self.config.state.get_all_resources(self.config.resources_arg).keys():
deps, missing = self._resource_connections(resource_type, _id)
dependency_graph[(resource_type, _id)] = deps
missing_resources.update(missing)

return dependency_graph, missing_resources
# Emit filtered outcomes so --json consumers see which resources were excluded.
for resource_type, _id in filtered_out:
self._emit(resource_type, _id, "sync", "filtered")

# Remove dependency references to filtered-out resources only.
# Cross-type deps on resource types outside resources_arg must be
# preserved as phantom nodes — TopologicalSorter yields them first,
# ensuring dependencies are synced before dependents.
if filtered_out:
for key in dependency_graph:
dependency_graph[key] = dependency_graph[key] - filtered_out

return dependency_graph, missing_resources, len(filtered_out)

def _resource_connections(self, resource_type: str, _id: str) -> Tuple[Set[Tuple[str, str]], Set[Tuple[str, str]]]:
"""Returns the failed connections and missing resources for a given resource.
Expand Down
2 changes: 1 addition & 1 deletion datadog_sync/utils/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __str__(self):
)

def reset_counter(self) -> None:
self.successes = self.failure = self.skipped = 0
self.successes = self.failure = self.skipped = self.filtered = 0

def increment_success(self) -> None:
self.successes += 1
Expand Down
Loading