Skip to content

refactor: decompose source-file monoliths into focused submodules#191

Open
unclesp1d3r wants to merge 9 commits into
mainfrom
refactor/decompose-monolith-modules
Open

refactor: decompose source-file monoliths into focused submodules#191
unclesp1d3r wants to merge 9 commits into
mainfrom
refactor/decompose-monolith-modules

Conversation

@unclesp1d3r

Copy link
Copy Markdown
Member

Summary

Decomposes the largest source-file monoliths into focused submodule directories, addressing the file-size finding from an architecture review (AGENTS.md rule 09: keep files under 500–600 lines). Pure mechanical module extraction — zero logic, signature, or public-API changes. Every split preserves the exact public API via pub use re-exports, so all external use ... paths resolve unchanged.

This is intentionally separate from #190 (the T1 integrity-hashing PR): file decomposition is unrelated to that feature and would have ballooned an otherwise-focused, green PR. Branched from main.

What was split (9 files → submodule directories)

Crate File (orig lines)
daemoneye-eventbus rpc.rs (2789) rpc/{messages,providers,client,service,tests,mod}
daemoneye-eventbus process_manager.rs (1703) process_manager/{types,lifecycle,termination,control,mod}
collector-core event_bus.rs (2630) event_bus/{types,local_bus,mod}
collector-core trigger.rs (2793) trigger/{types,queue,sql_evaluator,manager,mod}
collector-core rpc_services.rs (883) rpc_services/{config,providers,manager,mod}
daemoneye-agent broker_manager.rs (2194) broker_manager/{health,state,lifecycle,rpc,state_machine,tests,mod}
procmond rpc_service.rs (2515) rpc_service/{error,config,stats,tests,mod}
procmond registration.rs (2058) registration/{state,error,heartbeat,config,tests,mod}
procmond event_bus_connector.rs (2751) event_bus_connector/{error,signal,buffered_event,tests,mod}

Methodology & scope boundary

Following "split by responsibility, not by line count": each file's separable concerns (error enums, config types, data types, provider trait impls) were extracted into small focused files. Two categories were deliberately left whole rather than artificially sharded:

  • Single-type impls (e.g. event_bus/local_bus.rs, trigger/manager.rs, each */mod.rs holding one type's large impl) — sharding one impl across files creates arbitrary boundaries and re-export indirection without improving cohesion.
  • Inline test modules (tests.rs) — splitting them by category is a judgment call, not mechanical extraction, and they often access private members of their sibling.

As a result the raw ">600 lines" file count does not drop dramatically (a few single-impl/test files remain large), but production-concern navigability is substantially improved — the small, distinct pieces are now their own files instead of buried in a 2000–2800 line monolith. Further reducing the single-impl and test-module sizes is a separate, judgment-driven follow-up, not mechanical movement.

Test plan

  • cargo build --workspace
  • cargo clippy --workspace --all-targets -- -D warnings — zero warnings
  • cargo test --workspace — full suite passing; identical test counts before/after every split (no test deleted, weakened, or duplicated)
  • Public API preserved — verified by the cross-crate integration tests and external consumers (main.rs, collector.rs) compiling unchanged
  • Private items kept private via pub(super) (matching original module-private semantics; avoids the workspace redundant_pub_crate lint)

AI usage disclosure

Per AI_POLICY.md: performed with Claude Code (Claude Opus 4.8 (1M Context)). The per-crate extractions were executed by parallel worktree-isolated Rust sub-agents, each constrained to pure mechanical movement and required to self-verify (build + test + clippy -D warnings + fmt) before committing. All splits were re-verified together at the workspace level. No behavior changes; human maintainer review required before merge.

rpc.rs was 2789 lines — ~4.5x the project's 500-600 line target (AGENTS.md
rule 09). It contained four distinct responsibilities with clean seams, now
extracted into a module directory:

- rpc/messages.rs  (687) — request/response & data types + their builders
- rpc/providers.rs (413) — Health/Config/Registration provider traits + defaults
- rpc/client.rs    (231) — CollectorRpcClient
- rpc/service.rs  (1336) — CollectorRpcService (one cohesive impl, left intact)
- rpc/tests.rs     (118) — the inline unit tests, verbatim
- rpc/mod.rs        (59) — module doc + re-exports preserving the public API

Pure mechanical extraction: zero logic/signature/API changes. Private items
kept private via pub(super) (matching the original module-private semantics)
so nothing leaks into the public surface. Verified: build, 88 lib tests, 10
doctests, and clippy -D warnings all green.

Also fixes a pre-existing broken doctest in lib.rs (missing include_control
field on EventSubscription) surfaced while verifying.

Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
Pure mechanical module extraction with no logic or public-API changes:
- rpc_service/error.rs: RpcServiceError + RpcServiceResult
- rpc_service/config.rs: RpcServiceConfig + Default + timeout const
- rpc_service/stats.rs: RpcServiceStats
- rpc_service/tests.rs: full unit test module
- rpc_service/mod.rs: RpcServiceHandler struct + impl, re-exports preserving API

All 51 rpc_service unit tests pass; build/clippy/fmt clean.

Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
Pure mechanical module extraction with no logic or public-API changes:
- registration/state.rs: RegistrationState enum + Display
- registration/error.rs: RegistrationError + RegistrationResult
- registration/heartbeat.rs: ConnectionStatus, HeartbeatMetrics, HeartbeatMessage
- registration/config.rs: RegistrationConfig + Default
- registration/tests.rs: full unit test module
- registration/mod.rs: RegistrationManager struct + impl, consts, re-exports

All 52 registration unit tests pass; build/clippy/fmt clean.

Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
…ctor/ submodules

Pure mechanical module extraction with no logic or public-API changes:
- event_bus_connector/error.rs: EventBusConnectorError + EventBusConnectorResult
- event_bus_connector/signal.rs: BackpressureSignal + ProcessEventType (methods pub(super))
- event_bus_connector/buffered_event.rs: BufferedEvent struct + impl (pub(super))
- event_bus_connector/tests.rs: full unit test module
- event_bus_connector/mod.rs: EventBusConnector struct + impl, consts, re-exports

All 58 event_bus_connector unit tests pass; build/clippy/fmt clean.

Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
Pure mechanical module extraction with no logic, behavior, or public-API
changes. The monolithic event_bus.rs (2630 lines) is split into:

- event_bus/mod.rs (65 lines): module doc, EventBus trait, glob re-exports
  preserving the full public API surface
- event_bus/types.rs (462 lines): config, subscription, filter, correlation,
  bus event, and statistics types plus CorrelationMetadata impl
- event_bus/local_bus.rs (2137 lines): LocalEventBus (single cohesive impl
  of one type) plus co-located tests that exercise its private associated
  functions (topic_matches_pattern, validate_single_pattern)

local_bus.rs remains large because it is one type's single impl (~975 lines
non-test) plus tests that must stay co-located to reach private items via
super::*; sharding one impl or moving the tests would change visibility.

Test count unchanged (147 lib tests, 12 in event_bus).

Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
Pure mechanical module extraction with no logic, behavior, or public-API
changes. The monolithic trigger.rs (2793 lines) is split into:

- trigger/mod.rs (22 lines): module doc + re-exports preserving the full
  public API surface at crate::trigger::*
- trigger/types.rs (331 lines): config, condition, deduplication-key,
  metadata, process-data, capabilities, aggregate-statistics, and the
  TriggerError enum
- trigger/queue.rs (161 lines): PriorityTriggerQueue + QueueStatistics
- trigger/sql_evaluator.rs (395 lines): SqlTriggerEvaluator and its
  compiled-condition / evaluation-stats types
- trigger/manager.rs (1966 lines): the lock_or_err! macro, RateLimitState,
  the single TriggerManager impl, TriggerTimeout, plus co-located tests

manager.rs remains large because it is one type's single impl (~1095 lines
non-test) plus tests that access TriggerManager's private timeout_tracker
field and the private TriggerTimeout struct; those tests must stay
co-located, and sharding one impl would create arbitrary boundaries.

Test count unchanged (147 lib tests, 23 in trigger; doctests pass).

Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
…bmodules

Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
…bmodules

rpc_services.rs (883 lines) split into a module directory:
- rpc_services/config.rs    (24)  — RpcServiceConfig + Default
- rpc_services/providers.rs (277) — the three Collector*Provider impls
- rpc_services/manager.rs   (590) — CollectorRpcServiceManager + impl + test
- rpc_services/mod.rs        (24) — module doc + re-exports (public API preserved)

Pure mechanical extraction; zero logic/API changes. 146 lib tests + 21 doctests
pass, clippy -D warnings clean.

Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
Copilot AI review requested due to automatic review settings June 11, 2026 02:46
@dosubot dosubot Bot added size:XXL This PR changes 1000+ lines, ignoring generated files. architecture System architecture and design decisions labels Jun 11, 2026

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot wasn't able to review this pull request because it exceeds the maximum number of lines (20,000). Try reducing the number of changed lines and requesting a review from Copilot again.

@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review Change Stack

Summary by CodeRabbit

  • Refactor
    • Reorganized event bus module structure, separating type definitions and implementations into dedicated modules for improved maintainability.
    • Restructured RPC service configuration and provider implementations into separate modules.
    • Modularized trigger system components for cleaner architecture.
    • Refactored broker manager lifecycle and state machine handling into dedicated modules.
    • Reorganized event bus connector into separate modules for better code separation.

Walkthrough

This PR refactors collector-core and daemoneye-agent from monolithic modules into modular subsystems: centralizing event-bus traits and types in dedicated modules, extracting RPC service providers, implementing a priority-queue trigger system with SQL evaluation, and decomposing the broker manager into lifecycle, health, state-machine, and RPC layers with comprehensive test coverage.

Changes

Core Modularization and Type Consolidation

Layer / File(s) Summary
Event Bus trait and type model
collector-core/src/event_bus/mod.rs, collector-core/src/event_bus/types.rs
EventBus trait defines async lifecycle (publish, subscribe, unsubscribe, shutdown) and statistics; EventBusConfig, EventSubscription, EventFilter, CorrelationMetadata with builder methods, and EventBusStatistics centralize the shared type contract, supporting distributed tracing and per-subscriber delivery tracking.
Local event bus implementation refactor
collector-core/src/event_bus/local_bus.rs
LocalEventBus and supporting structs now import EventBus trait and all type definitions from parent module; imports reorganized to reference crate::event_bus::types.
RPC service configuration and providers
collector-core/src/rpc_services/config.rs, collector-core/src/rpc_services/mod.rs, collector-core/src/rpc_services/providers.rs, collector-core/src/rpc_services/manager.rs
RpcServiceConfig extracted to dedicated module; CollectorHealthProvider, CollectorConfigProvider, and CollectorRegistrationProvider implement RPC provider traits for health checks, config management (with optional hot-reload or restart), and collector registration/heartbeat.

Trigger System with Priority Queueing and SQL Evaluation

Layer / File(s) Summary
Trigger types, configuration, and queue
collector-core/src/trigger/types.rs, collector-core/src/trigger/mod.rs, collector-core/src/trigger/queue.rs, collector-core/src/trigger/manager.rs
TriggerConfig, TriggerCondition, ProcessTriggerData, TriggerCapabilities, and TriggerError define trigger system contract; PriorityTriggerQueue enforces backpressure (rejects low-priority triggers when active) and dual-lane capacity limits (high vs. normal priority), incrementing drop counters on rejection.
SQL trigger evaluation
collector-core/src/trigger/sql_evaluator.rs
SqlTriggerEvaluator registers and evaluates trigger conditions per collector; compiles custom SQL predicates into AST (currently disabled in evaluation); generates TriggerRequest objects with condition_id, source_pid, and evaluation metadata for matching conditions; tracks per-condition and global evaluation statistics.

Broker Manager Refactoring into Modular Layers

Layer / File(s) Summary
Broker health, lifecycle, and agent state
daemoneye-agent/src/broker_manager/mod.rs, daemoneye-agent/src/broker_manager/health.rs, daemoneye-agent/src/broker_manager/lifecycle.rs, daemoneye-agent/src/broker_manager/state.rs
Original monolithic broker_manager.rs (2194 lines) split into focused modules: BrokerHealth enum and HealthState trait implementation; BrokerManager::start() and shutdown() orchestrate broker startup/graceful shutdown with ordered collector shutdown and RPC draining; AgentState enum models agent progression (Loading → Ready → SteadyState) with startup-failure and shutdown states; CollectorReadinessTracker tracks collector registration readiness via HashSet.
Broker manager RPC and provider implementations
daemoneye-agent/src/broker_manager/rpc.rs
BrokerManager implements HealthProvider, ConfigProvider, and RegistrationProvider traits; manages per-collector CollectorRpcClient caching; orchestrates collector start/stop/restart/health-check RPCs with timeout/status validation; config updates either restart collectors or hot-reload via broker event-bus publish (with rollback on failure).
Broker manager state-machine transitions
daemoneye-agent/src/broker_manager/state_machine.rs
State-machine methods drive agent transitions (Loading → Ready when all collectors ready; Ready → SteadyState when broadcast succeeds); wait_for_collectors_ready() polls with configurable timeout, recording StartupFailed state if timeout; broadcasts "BeginMonitoring" control message via event bus (graceful fallback if broker absent); stub drop_privileges() and get_startup_timeout() support lifecycle setup.
Broker manager tests
daemoneye-agent/src/broker_manager/tests/lifecycle.rs, daemoneye-agent/src/broker_manager/tests/state_machine.rs, daemoneye-agent/src/broker_manager/tests/mod.rs
Comprehensive test coverage: lifecycle creation/disabled mode/socket path/statistics; health checking and timeout; registration delegation and RPC client lifecycle; state-machine transitions (Loading → Ready → SteadyState, startup failure, shutdown) with collector readiness tracking; wait_for_collectors_ready() success/timeout paths; startup timeout calculation.

Event Bus Connector Refactoring

Layer / File(s) Summary
Event bus connector error and buffered event
procmond/src/event_bus_connector/error.rs, procmond/src/event_bus_connector/buffered_event.rs
Original monolithic event_bus_connector.rs (2751 lines) removed; EventBusConnectorError enum covers WAL, serialization, buffer overflow, and connection failures; BufferedEvent wraps ProcessEvent with sequence/topic/size_bytes accounting for in-memory buffering during broker disconnects; estimate_size() computes rough serialized-size via fixed overhead plus field/argument/topic lengths using saturating arithmetic.

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~60 minutes


Possibly Related PRs

  • EvilBit-Labs/DaemonEye#168: Aligned event-bus type changes; both PRs refactor EventBus::subscribe payload and BusEvent/Arc<BusEvent> typing.

Suggested Labels

rust, core-feature, process-monitoring, data-models, ipc, async, testing, daemoneye-agent, type:feature


Poem

🚀 From monoliths to modules bright,
Event buses now typed tight.
Triggers queue with priority's grace,
Brokers state-machine find their place.
Type safety wins, ten thousand streams,
RPC providers live the dreams! ⚡

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed Title follows Conventional Commits format with 'refactor' type and accurate scope 'decompose source-file monoliths into focused submodules' describing the mechanical module extraction performed across the codebase.
Description check ✅ Passed Description is comprehensive and directly related to the changeset: explains the mechanical decomposition of nine monolithic files into submodule directories, methodology, test verification, and AI usage disclosure.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch refactor/decompose-monolith-modules

Warning

Review ran into problems

🔥 Problems

These MCP integrations need to be re-authenticated in the Integrations settings: Linear, Notion


Comment @coderabbitai help to get the list of available commands and usage tips.

@mergify

mergify Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Merge Protections

Your pull request matches the following merge protections and will not be merged until they are valid.

🔴 Full CI must pass

Waiting for

  • check-success = coverage
  • check-success = test-cross-platform (macos-15, macOS)
  • check-success = test-cross-platform (ubuntu-22.04, Linux)
  • check-success = test-cross-platform (windows-2022, Windows)
This rule is failing.

All CI checks must pass. Activates for non-bot authors, or dependabot when files exist outside .github/workflows/.

  • check-success = coverage
  • check-success = test-cross-platform (macos-15, macOS)
  • check-success = test-cross-platform (ubuntu-22.04, Linux)
  • check-success = test-cross-platform (windows-2022, Windows)
  • check-success = DCO
  • check-success = quality
  • check-success = test

🟢 Enforce conventional commit

Wonderful, this rule succeeded.

Require conventional commit format per https://www.conventionalcommits.org/en/v1.0.0/. Skipped for dependabot and dosubot.

  • title ~= ^(fix|feat|docs|style|refactor|perf|test|build|ci|chore|revert)(?:\(.+\))?!?:

🟢 Do not merge outdated PRs

Wonderful, this rule succeeded.

Make sure PRs are within 3 commits of the base branch before merging

  • #commits-behind <= 3

@coderabbitai coderabbitai Bot added rust Pull requests that update rust code data-models Data structure and model related ipc Inter-Process Communication process-monitoring Process monitoring and enumeration features core-feature Core system functionality async Related to asynchronous programming and async/await patterns testing Related to test development and test infrastructure type:feature daemoneye-agent labels Jun 11, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 19

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
collector-core/src/rpc_services/manager.rs (2)

537-537: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

Simplify drop(ready_rx.await) to ready_rx.await.ok().

The explicit drop() is unnecessary since the result is unused and would be dropped implicitly.

-        drop(ready_rx.await);
+        ready_rx.await.ok();
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@collector-core/src/rpc_services/manager.rs` at line 537, Replace the explicit
drop call by invoking ok() on the awaited receiver result: change the statement
using ready_rx.await (currently wrapped in drop(...)) to simply call
ready_rx.await.ok() so the result is consumed and dropped implicitly; locate the
occurrence of drop(ready_rx.await) in manager.rs and make this substitution.

214-224: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove debug eprintln!() statements or convert to tracing macros.

The message processing loop contains numerous eprintln!() calls (lines 214-216, 220-224, 233-234, 238-249, 434-442, 448-449, 508-520, 523-524) that bypass structured logging. Your coding guidelines require "Use structured logging with tracing ecosystem," but eprintln!() writes unstructured text to stderr, invisible to your observability stack.

These appear to be debug traces left in production code. For a 10k+ process monitoring system, unstructured stderr output is not actionable and pollutes logs.

♻️ Convert to structured tracing
-            eprintln!(
-                "RPC_SERVICE_LOOP: Received message on topic: {} (payload size: {} bytes)",
-                message.topic,
-                message.payload.len()
-            );
+            tracing::debug!(
+                topic = %message.topic,
+                payload_size = message.payload.len(),
+                "RPC service received message"
+            );

Repeat for all eprintln!() calls, or remove entirely if they're only needed for local development.

Also applies to: 233-234, 238-249, 434-442, 448-449, 508-520, 523-524

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@collector-core/src/rpc_services/manager.rs` around lines 214 - 224, Replace
all debug eprintln!() calls in the RPC service message loop and related code
with structured tracing macros (e.g., tracing::debug!, tracing::info!,
tracing::warn!, or tracing::error!) so logs are emitted into the tracing
observability stack; specifically update the ready_tx send/log block and every
eprintln! surrounding the async message_receiver.recv() loop and handlers
(references: ready_tx, message_receiver, RPC_SERVICE_LOOP) to use context-rich
tracing calls that include the same variables (config_collector_id,
message.topic, message.payload.len(), etc.), or remove them if they were only
temporary debug prints. Ensure the tracing level chosen reflects the message
importance (use trace/debug for verbose debugging) and that the crate has
tracing imported and initialized where this module is used.

Source: Coding guidelines

collector-core/src/trigger/manager.rs (2)

78-79: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

pending_triggers is permanently stale.

Lines 511-513 say the queue is now the source of truth, but get_statistics() still reports pending_count, and that field is never mutated after new(). Once anything is enqueued, TriggerStatistics.pending_triggers stays at 0 while queue_stats shows pending work.

Suggested fix
-        let pending_count = self.pending_count.lock().map_or(0, |count| *count);
-
         // Batch lock acquisitions to minimize lock contention
         let (
             dedup_cache_size,
             rate_limit_states,
             registered_capabilities,
@@
         Ok(TriggerStatistics {
             registered_conditions: conditions_len,
-            pending_triggers: pending_count,
+            pending_triggers: queue_stats.high_priority_depth + queue_stats.normal_priority_depth,
             deduplication_cache_size: dedup_cache_size,
             rate_limit_states,
             registered_capabilities,
             queue_stats,
             sql_evaluation_stats,

Also applies to: 511-513, 1027-1086

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@collector-core/src/trigger/manager.rs` around lines 78 - 79, The
pending_count field is never updated and so TriggerStatistics.pending_triggers
is stale; update get_statistics() (and any other places that reference
pending_count such as where queue_stats is mentioned) to derive pending_triggers
from the queue's actual state (e.g., use queue_stats.pending or call the
queue.len()/queue.size() accessor) instead of reading pending_count, or remove
pending_count entirely and replace its usages with the queue-derived value;
update Manager::new(), get_statistics(), and any enqueue/dequeue logic (symbols:
pending_count, get_statistics(), TriggerStatistics.pending_triggers,
queue_stats) accordingly so reported pending_triggers matches the queue.

751-809: ⚠️ Potential issue | 🟠 Major

Fix lock-held-while-await in emit_trigger_request_internal (collector-core/src/trigger/manager.rs)

emit_trigger_request_internal() acquires event_bus_arc.write().await and then awaits event_bus.publish(...).await while that write guard is still held, unnecessarily serializing emissions and violating “Never await while holding locks.”

Refactor sketch
-    event_bus: Arc<RwLock<Option<Box<dyn EventBus + Send + Sync>>>>>,
+    event_bus: Arc<RwLock<Option<Arc<dyn EventBus + Send + Sync>>>>>,
 
-    pub async fn set_event_bus(&self, event_bus: Box<dyn EventBus + Send + Sync>) {
+    pub async fn set_event_bus(&self, event_bus: Arc<dyn EventBus + Send + Sync>) {
         let mut bus_guard = self.event_bus.write().await;
         *bus_guard = Some(event_bus);
     }
 
-        let publish_result = {
-            let mut bus_guard = event_bus_arc.write().await;
-            let event_bus = bus_guard.as_mut().ok_or_else(|| {
-                TriggerError::EventBusError("Event bus not configured".to_owned())
-            })?;
-            event_bus.publish(collection_event, correlation).await
-        };
+        let event_bus = {
+            let bus_guard = event_bus_arc.read().await;
+            bus_guard.as_ref().cloned().ok_or_else(|| {
+                TriggerError::EventBusError("Event bus not configured".to_owned())
+            })?
+        };
+        let publish_result = event_bus.publish(collection_event, correlation).await;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@collector-core/src/trigger/manager.rs` around lines 751 - 809, The code
currently awaits event_bus.publish(...) while holding the write lock on
event_bus_arc in emit_trigger_request_internal, so change the logic to move the
boxed EventBus out of the RwLock before awaiting: inside the write lock
(event_bus_arc.write().await) call Option::take() on the inner Option<Box<dyn
EventBus + Send + Sync>> to obtain the Box, drop the lock, call publish(...) on
the taken Box (handling correlation metadata as before), then re-acquire the
write lock and put the Box back with Some(boxed). Ensure you handle the None
case and reinsert the Box even on publish errors so the event bus state is
preserved; reference emit_trigger_request_internal, event_bus_arc, publish, and
Option::take() to locate the changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@collector-core/src/event_bus/local_bus.rs`:
- Around line 1016-1019: The tests import types directly from the private module
(crate::event_bus::types) instead of the public re-export surface; update the
import statement in local_bus.rs to pull CollectorCoordination,
CrossCollectorCorrelation, EventBusCorrelation, EventBusFilters,
EventBusMetadata, SequenceCorrelation, TemporalCorrelation, TopicChain, and
TopicCorrelation from the public API (crate::event_bus::{...}) so the tests
exercise the pub use re-exports (replace the use crate::event_bus::types::{...}
with use crate::event_bus::{CollectorCoordination, CrossCollectorCorrelation,
EventBusCorrelation, EventBusFilters, EventBusMetadata, SequenceCorrelation,
TemporalCorrelation, TopicChain, TopicCorrelation};).

In `@collector-core/src/event_bus/mod.rs`:
- Around line 40-44: Change the public API and implementations to use bounded
channels sized by EventBusConfig.buffer_size instead of exposing
UnboundedReceiver: update the trait method signature subscribe in EventBus
(collector-core/src/event_bus/mod.rs) to return a
tokio::sync::mpsc::Receiver<Arc<BusEvent>> (or an abstraction that hides channel
type) and update LocalEventBus and DaemoneyeEventBus to create bounded channels
using EventBusConfig.buffer_size (or move capacity into EventSubscription)
instead of unbounded_channel/UnboundedSender; additionally implement explicit
backpressure/drop behavior when sending (e.g., use try_send and drop
oldest/newest or await send with timeout and return/send a backpressure error)
so slow subscribers cannot grow unbounded queues.

In `@collector-core/src/event_bus/types.rs`:
- Around line 410-448: The track_delivery method currently updates
delivery_tracking but only sets EventBusMetadata.delivery_timestamp when
creating new metadata; update track_delivery (function track_delivery) so that
whenever you insert/update delivery_tracking on self.eventbus_metadata you also
refresh eventbus_metadata.delivery_timestamp to the current unix seconds
timestamp; ensure both the Some(eventbus_metadata) branch and the else branch
set delivery_timestamp (EventBusMetadata.delivery_timestamp) and preserve other
fields (delivery_tracking, topic_chains, etc.) so the temporal-correlation gate
sees the latest delivery time.

In `@collector-core/src/rpc_services/config.rs`:
- Around line 7-14: Make RpcServiceConfig enforce consistency between
collector_id and rpc_topic: either make the struct fields private and add a
constructor/builder (e.g., RpcServiceConfig::new or RpcServiceConfigBuilder)
that takes collector_id and derives rpc_topic =
format!("control.collector.{collector_id}") (while preserving default_timeout),
or add a public validate method (e.g., RpcServiceConfig::validate) that checks
rpc_topic == format!("control.collector.{collector_id}") and returns a
Result/Err or triggers a debug_assert; update call sites to construct via the
new API or invoke validate() at startup. Ensure references to RpcServiceConfig,
collector_id, rpc_topic, and default_timeout are updated accordingly.

In `@collector-core/src/rpc_services/mod.rs`:
- Around line 7-14: The module-wide clippy allows (especially
#[allow(clippy::as_conversions)] and #[allow(clippy::expect_used)]) are too
broad; narrow them and document safety: move #[allow(clippy::as_conversions)]
from mod.rs into providers.rs at each conversion site (or at top of providers.rs
with a short safety comment) and add a one-line rationale that u64/usize -> f64
casts (e.g. telemetry_metrics.operation_count and other metric counters) are
telemetry counters guaranteed by design to stay below f64's 2^53 safe integer
threshold; remove the blanket #[allow(clippy::expect_used)] from mod.rs and
either replace .expect() with safe alternatives (e.g. .unwrap_or_default()) in
manager.rs/providers.rs or keep #[allow(clippy::expect_used)] only on the
specific lines that truly require it with a safety comment explaining why
failure is fatal and cannot be recovered.

In `@collector-core/src/rpc_services/providers.rs`:
- Line 33: Replace the eprintln! debug statements in the health provider with
structured tracing macros (or remove them if unnecessary); specifically update
the eprintln! in get_collector_health and the other eprintln! occurrences in
providers.rs to use tracing::debug! or tracing::info! and include structured
fields (e.g., collector_id = %collector_id, reason = %... ) instead of
interpolated text so logs integrate with the observability stack; search for the
eprintln! calls in providers.rs (including inside get_collector_health and
related health functions) and convert each to an appropriate tracing macro with
clear contextual fields.
- Line 77: Several u64→f64 casts (e.g., telemetry_metrics.operation_count and
other metric counters) perform unchecked as f64 conversions and lack the
required safety comment or a checked/saturating conversion; update each cast
site to either (1) add #[allow(clippy::as_conversions)] immediately above the
expression and a one-line safety comment stating that values above 2^53 will be
rounded and why that's acceptable, or (2) replace the cast with a saturating
conversion like let safe = (value.min((1u64<<53) - 1)) as f64 to clamp to f64’s
exact integer range; apply this change to telemetry_metrics.operation_count and
the other metric fields mentioned so each conversion has an explicit safety
comment or uses the saturating clamp.
- Line 140: The hardcoded uptime_seconds: 0 is a stub; update CollectorRuntime
to record a start_time (e.g., a SystemTime or Instant field set when
CollectorRuntime is created) and replace uptime_seconds: 0 with a computed value
by taking now.duration_since(start_time).as_secs() (or .as_secs_f64() if
fractional seconds preferred), handling any potential duration errors by
clamping to 0; if you intentionally want this as a known limitation instead, add
a comment and open/link an issue instead of leaving 0. Ensure references:
CollectorRuntime (add start_time), code that builds the health response (where
uptime_seconds: 0 is set) use the computed value.

In `@collector-core/src/trigger/queue.rs`:
- Around line 67-73: The constructor currently pre-splits capacity
(high_priority and normal_priority with max_queue_size/2) which drops urgent
work; change the initializer in Queue::new so lanes are not hard-reserved (e.g.,
use VecDeque::with_capacity(max_queue_size) or no per-lane halving) and move
enforcement of the total limit into the enqueue logic: in the enqueue method(s)
check combined_len = high_priority.len() + normal_priority.len() and return
QueueFull only when combined_len >= max_queue_size so one lane can borrow unused
slots; update any checks in methods referenced around lines 97-108 and 138-141
to use the combined_len check rather than per-lane capacity.

In `@collector-core/src/trigger/sql_evaluator.rs`:
- Around line 103-118: register_collector_conditions currently accepts
conditions whose custom SQL parse failed because compile_condition downgrades
parse errors to a warning and returns Ok with compiled_predicate: None; instead,
make compile_condition return Err(TriggerError::SqlParsingError) on parse
failure (include sql_parsing_errors), and update register_collector_conditions
to propagate that error so malformed Custom(...) predicates are rejected rather
than stored with compiled_predicate: None; reference functions compile_condition
and register_collector_conditions and the TriggerError::SqlParsingError variant
when making the change.

In `@daemoneye-agent/src/broker_manager/lifecycle.rs`:
- Around line 244-305: The health_check() match currently groups
BrokerHealth::Unhealthy(_) with passthrough states so once you write an
Unhealthy it is returned forever; change the match so Unhealthy is not lumped
into the early-return arm — treat Unhealthy similarly to BrokerHealth::Healthy
(i.e., allow re-running the statistics() and collector health checks) by
removing BrokerHealth::Unhealthy(_) from the grouped arm and ensuring the logic
that sets self.health_status.write().await still updates the cached state when
probes pass or fail; refer to the health_check() function, self.health_status,
self.statistics(), and self.process_manager.check_collector_health() when making
this change.
- Around line 119-130: You are holding async lock guards across .await calls;
instead, clone/take the underlying values while holding the guard, drop the
guard, then await. Concretely: for rpc_clients in lifecycle.rs, inside the block
around self.rpc_clients.write().await, extract the entries (e.g., by draining
into a Vec or swapping with an empty map) and drop the write guard before
calling client.shutdown().await on each client; for event_bus use the mutex
guard from self.event_bus.lock().await to take or clone the inner Arc/Option and
drop the guard before calling event_bus.shutdown().await; for
broker.read().await calls (broker.shutdown(), broker.statistics(), and in rpc.rs
broker.publish(...).await) clone or take the Arc/owned broker reference from the
read guard, drop the read guard, then call the async method
(broker.shutdown().await, broker.statistics().await, broker.publish(...).await)
on the cloned/taken value. Ensure all occurrences reference the symbols
rpc_clients, event_bus, broker and the async methods client.shutdown,
event_bus.shutdown, broker.shutdown, broker.statistics, and broker.publish.

In `@daemoneye-agent/src/broker_manager/rpc.rs`:
- Around line 35-57: The create_rpc_client race can produce duplicate
CollectorRpcClient instances; after constructing the client but before inserting
into self.rpc_clients, acquire the write lock and re-check the map for
collector_id (same fix for get_rpc_client region around lines 217-225) — if an
entry now exists, return/clone that existing Arc and drop the freshly created
client, otherwise insert the new Arc into rpc_clients; refer to
create_rpc_client, get_rpc_client, self.rpc_clients, and CollectorRpcClient::new
to locate the code to modify.

In `@daemoneye-agent/src/broker_manager/state_machine.rs`:
- Around line 360-365: broadcast_begin_monitoring currently logs a warning and
returns Ok(()) when the broker is None, which prevents
transition_to_steady_state from detecting the failure and rolling back; change
the behavior so that when broker_guard.as_ref() is None (the Some(broker)
pattern fails) you still log the warning and then return an Err with a
descriptive error (e.g., using anyhow::anyhow or the crate's common error
constructor) so callers like transition_to_steady_state receive a failure and
can trigger rollback; keep the existing broker guard and message but replace the
Ok(()) return with an Err that includes context about "broker not available" and
the broadcast failing.
- Around line 154-162: The rollback currently unconditionally sets
*self.agent_state.write().await = AgentState::Ready after
broadcast_begin_monitoring() fails, which can race with other tasks; fix by
re-checking the current state under the write lock and only set to
AgentState::Ready if it is still AgentState::SteadyState (or whatever concrete
SteadyState variant) — acquire the write lock (self.agent_state.write().await),
match on the inner AgentState, and perform the rollback assignment only when it
equals AgentState::SteadyState; leave the state unchanged for other variants
like AgentState::ShuttingDown or AgentState::StartupFailed and still return the
original error from broadcast_begin_monitoring().
- Around line 372-375: Replace the blind `.unwrap_or_default()` on
`SystemTime::now().duration_since(UNIX_EPOCH)` so that if it returns an `Err`
you log a warning before falling back to `0`; specifically, call
`SystemTime::now()` and `duration_since(UNIX_EPOCH)` and handle the `Result`
(from `duration_since`) with a match or `map_or_else` that logs a warning
(including the error) when the system time is before `UNIX_EPOCH`, then use
`.as_millis()` on the valid duration or `0` as the fallback; look for the
`timestamp` construction using `SystemTime`, `UNIX_EPOCH`, `duration_since`, and
`as_millis` in `state_machine.rs` to modify.
- Around line 300-332: BrokerManager::drop_privileges is currently a no-op stub;
replace it with real platform-specific privilege-dropping and verification (and
make failures non-optional). Implement Unix behavior in
BrokerManager::drop_privileges using setgroups/setgid/setuid (e.g., via the nix
crate) to switch to an unprivileged user, verify with geteuid/getegid that
privileges were lowered, and return an Err if any step fails; implement Windows
equivalent (AdjustTokenPrivileges/SetTokenInformation) behind cfg(windows) and
also verify the effective token privileges changed; remove the test-locking stub
test test_drop_privileges_stub and update startup code that calls
drop_privileges (caller in main.rs) to propagate and hard-fail on Err so startup
cannot continue with elevated privileges until drop succeeds. Ensure the
function name BrokerManager::drop_privileges and the test
test_drop_privileges_stub are the points of change and add a post-drop
verification step that returns Err on mismatch.

In `@daemoneye-agent/src/broker_manager/state.rs`:
- Around line 97-99: mark_ready currently inserts any collector_id into
ready_collectors, which lets unexpected IDs pollute ready_count; change
mark_ready to only insert when the id exists in expected_collectors (i.e., check
self.expected_collectors.contains(collector_id) before inserting). Update the
method mark_ready in state.rs to guard readiness tracking accordingly; note that
full protection against impersonation must be handled in
CollectorRegistry::register (validate identity/ownership) rather than here.

In `@daemoneye-agent/src/broker_manager/tests/mod.rs`:
- Around line 3-13: Remove the module-wide #![allow(...)] in tests/mod.rs that
suppresses clippy::unwrap_used and clippy::panic; instead audit the specific
call sites in lifecycle.rs and state_machine.rs that call .unwrap()/.expect()
and either handle the Result/Error (propagate Result from the test, use
assert_matches/assert_eq on Err/Ok, or replace with expect including a useful
message), or if absolutely necessary apply a narrow
#[allow(clippy::unwrap_used)] or #[allow(clippy::panic)] directly on the
individual test function or smallest scope that legitimately needs it; ensure
tests compile as Result-returning tests where feasible to avoid panics.

---

Outside diff comments:
In `@collector-core/src/rpc_services/manager.rs`:
- Line 537: Replace the explicit drop call by invoking ok() on the awaited
receiver result: change the statement using ready_rx.await (currently wrapped in
drop(...)) to simply call ready_rx.await.ok() so the result is consumed and
dropped implicitly; locate the occurrence of drop(ready_rx.await) in manager.rs
and make this substitution.
- Around line 214-224: Replace all debug eprintln!() calls in the RPC service
message loop and related code with structured tracing macros (e.g.,
tracing::debug!, tracing::info!, tracing::warn!, or tracing::error!) so logs are
emitted into the tracing observability stack; specifically update the ready_tx
send/log block and every eprintln! surrounding the async message_receiver.recv()
loop and handlers (references: ready_tx, message_receiver, RPC_SERVICE_LOOP) to
use context-rich tracing calls that include the same variables
(config_collector_id, message.topic, message.payload.len(), etc.), or remove
them if they were only temporary debug prints. Ensure the tracing level chosen
reflects the message importance (use trace/debug for verbose debugging) and that
the crate has tracing imported and initialized where this module is used.

In `@collector-core/src/trigger/manager.rs`:
- Around line 78-79: The pending_count field is never updated and so
TriggerStatistics.pending_triggers is stale; update get_statistics() (and any
other places that reference pending_count such as where queue_stats is
mentioned) to derive pending_triggers from the queue's actual state (e.g., use
queue_stats.pending or call the queue.len()/queue.size() accessor) instead of
reading pending_count, or remove pending_count entirely and replace its usages
with the queue-derived value; update Manager::new(), get_statistics(), and any
enqueue/dequeue logic (symbols: pending_count, get_statistics(),
TriggerStatistics.pending_triggers, queue_stats) accordingly so reported
pending_triggers matches the queue.
- Around line 751-809: The code currently awaits event_bus.publish(...) while
holding the write lock on event_bus_arc in emit_trigger_request_internal, so
change the logic to move the boxed EventBus out of the RwLock before awaiting:
inside the write lock (event_bus_arc.write().await) call Option::take() on the
inner Option<Box<dyn EventBus + Send + Sync>> to obtain the Box, drop the lock,
call publish(...) on the taken Box (handling correlation metadata as before),
then re-acquire the write lock and put the Box back with Some(boxed). Ensure you
handle the None case and reinsert the Box even on publish errors so the event
bus state is preserved; reference emit_trigger_request_internal, event_bus_arc,
publish, and Option::take() to locate the changes.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository YAML (base), Repository UI (inherited), Organization UI (inherited)

Review profile: ASSERTIVE

Plan: Pro

Run ID: 0440caa3-64c7-47b2-b6ec-8ae8363bc13b

📥 Commits

Reviewing files that changed from the base of the PR and between 40e70a6 and 8725ced.

⛔ Files ignored due to path filters (14)
  • daemoneye-eventbus/src/lib.rs is excluded by none and included by none
  • daemoneye-eventbus/src/process_manager.rs is excluded by none and included by none
  • daemoneye-eventbus/src/process_manager/control.rs is excluded by none and included by none
  • daemoneye-eventbus/src/process_manager/lifecycle.rs is excluded by none and included by none
  • daemoneye-eventbus/src/process_manager/mod.rs is excluded by none and included by none
  • daemoneye-eventbus/src/process_manager/termination.rs is excluded by none and included by none
  • daemoneye-eventbus/src/process_manager/types.rs is excluded by none and included by none
  • daemoneye-eventbus/src/rpc.rs is excluded by none and included by none
  • daemoneye-eventbus/src/rpc/client.rs is excluded by none and included by none
  • daemoneye-eventbus/src/rpc/messages.rs is excluded by none and included by none
  • daemoneye-eventbus/src/rpc/mod.rs is excluded by none and included by none
  • daemoneye-eventbus/src/rpc/providers.rs is excluded by none and included by none
  • daemoneye-eventbus/src/rpc/service.rs is excluded by none and included by none
  • daemoneye-eventbus/src/rpc/tests.rs is excluded by none and included by none
📒 Files selected for processing (41)
  • collector-core/src/event_bus/local_bus.rs
  • collector-core/src/event_bus/mod.rs
  • collector-core/src/event_bus/types.rs
  • collector-core/src/rpc_services/config.rs
  • collector-core/src/rpc_services/manager.rs
  • collector-core/src/rpc_services/mod.rs
  • collector-core/src/rpc_services/providers.rs
  • collector-core/src/trigger/manager.rs
  • collector-core/src/trigger/mod.rs
  • collector-core/src/trigger/queue.rs
  • collector-core/src/trigger/sql_evaluator.rs
  • collector-core/src/trigger/types.rs
  • daemoneye-agent/src/broker_manager.rs
  • daemoneye-agent/src/broker_manager/health.rs
  • daemoneye-agent/src/broker_manager/lifecycle.rs
  • daemoneye-agent/src/broker_manager/mod.rs
  • daemoneye-agent/src/broker_manager/rpc.rs
  • daemoneye-agent/src/broker_manager/state.rs
  • daemoneye-agent/src/broker_manager/state_machine.rs
  • daemoneye-agent/src/broker_manager/tests/lifecycle.rs
  • daemoneye-agent/src/broker_manager/tests/mod.rs
  • daemoneye-agent/src/broker_manager/tests/state_machine.rs
  • procmond/src/event_bus_connector.rs
  • procmond/src/event_bus_connector/buffered_event.rs
  • procmond/src/event_bus_connector/error.rs
  • procmond/src/event_bus_connector/mod.rs
  • procmond/src/event_bus_connector/signal.rs
  • procmond/src/event_bus_connector/tests.rs
  • procmond/src/registration.rs
  • procmond/src/registration/config.rs
  • procmond/src/registration/error.rs
  • procmond/src/registration/heartbeat.rs
  • procmond/src/registration/mod.rs
  • procmond/src/registration/state.rs
  • procmond/src/registration/tests.rs
  • procmond/src/rpc_service.rs
  • procmond/src/rpc_service/config.rs
  • procmond/src/rpc_service/error.rs
  • procmond/src/rpc_service/mod.rs
  • procmond/src/rpc_service/stats.rs
  • procmond/src/rpc_service/tests.rs
💤 Files with no reviewable changes (2)
  • procmond/src/event_bus_connector.rs
  • daemoneye-agent/src/broker_manager.rs

Comment on lines +1016 to +1019
use crate::event_bus::types::{
CollectorCoordination, CrossCollectorCorrelation, EventBusCorrelation, EventBusFilters,
EventBusMetadata, SequenceCorrelation, TemporalCorrelation, TopicChain, TopicCorrelation,
};

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Exercise the re-exported API surface in these tests.

Lines 1016-1019 import from crate::event_bus::types, which is the private module layout, not the stabilized crate::event_bus::{...} surface. That means these tests keep passing even if a future split drops one of the pub use types::* re-exports in collector-core/src/event_bus/mod.rs.

Based on PR objectives, this refactor is supposed to preserve the public API via pub use re-exports.

Proposed fix
-    use crate::event_bus::types::{
+    use crate::event_bus::{
         CollectorCoordination, CrossCollectorCorrelation, EventBusCorrelation, EventBusFilters,
         EventBusMetadata, SequenceCorrelation, TemporalCorrelation, TopicChain, TopicCorrelation,
     };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use crate::event_bus::types::{
CollectorCoordination, CrossCollectorCorrelation, EventBusCorrelation, EventBusFilters,
EventBusMetadata, SequenceCorrelation, TemporalCorrelation, TopicChain, TopicCorrelation,
};
use crate::event_bus::{
CollectorCoordination, CrossCollectorCorrelation, EventBusCorrelation, EventBusFilters,
EventBusMetadata, SequenceCorrelation, TemporalCorrelation, TopicChain, TopicCorrelation,
};
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@collector-core/src/event_bus/local_bus.rs` around lines 1016 - 1019, The
tests import types directly from the private module (crate::event_bus::types)
instead of the public re-export surface; update the import statement in
local_bus.rs to pull CollectorCoordination, CrossCollectorCorrelation,
EventBusCorrelation, EventBusFilters, EventBusMetadata, SequenceCorrelation,
TemporalCorrelation, TopicChain, and TopicCorrelation from the public API
(crate::event_bus::{...}) so the tests exercise the pub use re-exports (replace
the use crate::event_bus::types::{...} with use
crate::event_bus::{CollectorCoordination, CrossCollectorCorrelation,
EventBusCorrelation, EventBusFilters, EventBusMetadata, SequenceCorrelation,
TemporalCorrelation, TopicChain, TopicCorrelation};).

Comment on lines +40 to +44
/// Subscribe to events matching a pattern
async fn subscribe(
&mut self,
subscription: EventSubscription,
) -> Result<tokio::sync::mpsc::UnboundedReceiver<Arc<BusEvent>>>;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail
rg -n --type rust 'UnboundedReceiver<Arc<BusEvent>>|unbounded_channel\(' collector-core/src/event_bus
rg -n --type rust '\bbuffer_size\b' collector-core/src/event_bus

Repository: EvilBit-Labs/DaemonEye

Length of output: 543


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== buffer_size usages =="
rg -n --type rust '\bbuffer_size\b' collector-core/src/event_bus

echo "== bounded/unbounded channel usage in event_bus =="
rg -n --type rust 'bounded_channel|unbounded_channel' collector-core/src/event_bus

echo "== mod.rs: subscribe signature context =="
sed -n '1,120p' collector-core/src/event_bus/mod.rs | nl -ba | sed -n '20,80p'

echo "== types.rs: EventBusConfig =="
sed -n '1,120p' collector-core/src/event_bus/types.rs | nl -ba | sed -n '1,120p'

echo "== local_bus.rs: subscribe implementation =="
sed -n '300,480p' collector-core/src/event_bus/local_bus.rs | nl -ba | sed -n '300,480p'

Repository: EvilBit-Labs/DaemonEye

Length of output: 497


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== bounded_channel occurrences in event_bus (and nearby) =="
rg -n --type rust 'bounded_channel' collector-core/src/event_bus || true

echo "== unbounded_channel occurrences in event_bus =="
rg -n --type rust 'unbounded_channel' collector-core/src/event_bus || true

echo "== local_bus.rs: subscribe impl (around the unbounded_channel) =="
sed -n '330,460p' collector-core/src/event_bus/local_bus.rs

echo "== types.rs: EventBusConfig buffer_size =="
sed -n '1,120p' collector-core/src/event_bus/types.rs

echo "== mod.rs: subscribe signature =="
sed -n '1,120p' collector-core/src/event_bus/mod.rs

Repository: EvilBit-Labs/DaemonEye

Length of output: 12052


🏁 Script executed:

#!/bin/bash
set -euo pipefail
rg -n --type rust 'subscribe\(' collector-core/src | head -n 50
rg -n --type rust 'UnboundedReceiver<Arc<BusEvent>>' -S . || true
rg -n --type rust 'EventBus' collector-core/src/event_bus | head -n 50

Repository: EvilBit-Labs/DaemonEye

Length of output: 10260


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== high_performance_event_bus subscribe signature/impl =="
sed -n '60,140p' collector-core/src/high_performance_event_bus.rs
sed -n '450,560p' collector-core/src/high_performance_event_bus.rs

echo "== daemoneye_event_bus subscribe signature/impl =="
sed -n '900,1020p' collector-core/src/daemoneye_event_bus.rs
sed -n '1040,1100p' collector-core/src/daemoneye_event_bus.rs

echo "== broker subscribe signature/impl =="
sed -n '980,1060p' daemoneye-eventbus/src/broker.rs

echo "== more evidence: UnboundedReceiver usage across collector-core =="
rg -n --type rust 'UnboundedReceiver<Arc<BusEvent>>' collector-core/src | head -n 50

echo "== is buffer_size referenced anywhere else? =="
rg -n --type rust '\bbuffer_size\b' .

Repository: EvilBit-Labs/DaemonEye

Length of output: 21827


🏁 Script executed:

#!/bin/bash
set -euo pipefail
rg -n --type rust 'config\.buffer_size|self\.config\.buffer_size|buffer_size' collector-core/src/event_bus/local_bus.rs

Repository: EvilBit-Labs/DaemonEye

Length of output: 48


🏁 Script executed:

#!/bin/bash
set -euo pipefail
sed -n '1,120p' collector-core/src/event_bus/local_bus.rs
sed -n '200,360p' collector-core/src/event_bus/local_bus.rs
sed -n '360,460p' collector-core/src/event_bus/local_bus.rs

Repository: EvilBit-Labs/DaemonEye

Length of output: 14978


Fix event-bus backpressure: remove unbounded subscriber queues from the public API

EventBus::subscribe exposes tokio::sync::mpsc::UnboundedReceiver (collector-core/src/event_bus/mod.rs), and both LocalEventBus and DaemoneyeEventBus create unbounded Tokio channels (unbounded_channel() / UnboundedSender), so EventBusConfig.buffer_size (collector-core/src/event_bus/types.rs) is effectively unenforceable in the LocalEventBus/collector-core path. This permits a slow/stalled subscriber to grow an unbounded queue (memory-DoS risk) with no backpressure/drop policy.

Switch the trait + implementations to bounded channels sized by EventBusConfig.buffer_size (or move capacity/backpressure strategy into EventSubscription) and handle send backpressure/drop explicitly.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@collector-core/src/event_bus/mod.rs` around lines 40 - 44, Change the public
API and implementations to use bounded channels sized by
EventBusConfig.buffer_size instead of exposing UnboundedReceiver: update the
trait method signature subscribe in EventBus
(collector-core/src/event_bus/mod.rs) to return a
tokio::sync::mpsc::Receiver<Arc<BusEvent>> (or an abstraction that hides channel
type) and update LocalEventBus and DaemoneyeEventBus to create bounded channels
using EventBusConfig.buffer_size (or move capacity into EventSubscription)
instead of unbounded_channel/UnboundedSender; additionally implement explicit
backpressure/drop behavior when sending (e.g., use try_send and drop
oldest/newest or await send with timeout and return/send a backpressure error)
so slow subscribers cannot grow unbounded queues.

Source: Coding guidelines

Comment on lines +410 to +448
/// Track message delivery to a subscriber
pub fn track_delivery(
&mut self,
subscriber_id: String,
success: bool,
error_message: Option<String>,
) {
let delivery_status = DeliveryStatus {
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
success,
error_message,
retry_count: 0,
};

if let Some(ref mut eventbus_metadata) = self.eventbus_metadata {
eventbus_metadata
.delivery_tracking
.insert(subscriber_id, delivery_status);
} else {
let mut delivery_tracking = HashMap::new();
delivery_tracking.insert(subscriber_id, delivery_status);

let metadata = EventBusMetadata {
broker_id: None,
routing_path: Vec::new(),
delivery_timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
delivery_tracking,
topic_chains: Vec::new(),
collector_coordination: None,
};
self.eventbus_metadata = Some(metadata);
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Refresh delivery_timestamp on every tracked delivery.

Lines 417-446 only update delivery_tracking when metadata already exists. collector-core/src/event_bus/local_bus.rs uses eventbus_metadata.delivery_timestamp as the temporal-correlation gate, so pre-existing metadata can carry an old timestamp into later delivery-window checks.

Proposed fix
     pub fn track_delivery(
         &mut self,
         subscriber_id: String,
         success: bool,
         error_message: Option<String>,
     ) {
+        let now = std::time::SystemTime::now()
+            .duration_since(std::time::UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_secs();
+
         let delivery_status = DeliveryStatus {
-            timestamp: std::time::SystemTime::now()
-                .duration_since(std::time::UNIX_EPOCH)
-                .unwrap_or_default()
-                .as_secs(),
+            timestamp: now,
             success,
             error_message,
             retry_count: 0,
         };
 
         if let Some(ref mut eventbus_metadata) = self.eventbus_metadata {
+            eventbus_metadata.delivery_timestamp = now;
             eventbus_metadata
                 .delivery_tracking
                 .insert(subscriber_id, delivery_status);
         } else {
             let mut delivery_tracking = HashMap::new();
             delivery_tracking.insert(subscriber_id, delivery_status);
 
             let metadata = EventBusMetadata {
                 broker_id: None,
                 routing_path: Vec::new(),
-                delivery_timestamp: std::time::SystemTime::now()
-                    .duration_since(std::time::UNIX_EPOCH)
-                    .unwrap_or_default()
-                    .as_secs(),
+                delivery_timestamp: now,
                 delivery_tracking,
                 topic_chains: Vec::new(),
                 collector_coordination: None,
             };
             self.eventbus_metadata = Some(metadata);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Track message delivery to a subscriber
pub fn track_delivery(
&mut self,
subscriber_id: String,
success: bool,
error_message: Option<String>,
) {
let delivery_status = DeliveryStatus {
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
success,
error_message,
retry_count: 0,
};
if let Some(ref mut eventbus_metadata) = self.eventbus_metadata {
eventbus_metadata
.delivery_tracking
.insert(subscriber_id, delivery_status);
} else {
let mut delivery_tracking = HashMap::new();
delivery_tracking.insert(subscriber_id, delivery_status);
let metadata = EventBusMetadata {
broker_id: None,
routing_path: Vec::new(),
delivery_timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
delivery_tracking,
topic_chains: Vec::new(),
collector_coordination: None,
};
self.eventbus_metadata = Some(metadata);
}
}
/// Track message delivery to a subscriber
pub fn track_delivery(
&mut self,
subscriber_id: String,
success: bool,
error_message: Option<String>,
) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let delivery_status = DeliveryStatus {
timestamp: now,
success,
error_message,
retry_count: 0,
};
if let Some(ref mut eventbus_metadata) = self.eventbus_metadata {
eventbus_metadata.delivery_timestamp = now;
eventbus_metadata
.delivery_tracking
.insert(subscriber_id, delivery_status);
} else {
let mut delivery_tracking = HashMap::new();
delivery_tracking.insert(subscriber_id, delivery_status);
let metadata = EventBusMetadata {
broker_id: None,
routing_path: Vec::new(),
delivery_timestamp: now,
delivery_tracking,
topic_chains: Vec::new(),
collector_coordination: None,
};
self.eventbus_metadata = Some(metadata);
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@collector-core/src/event_bus/types.rs` around lines 410 - 448, The
track_delivery method currently updates delivery_tracking but only sets
EventBusMetadata.delivery_timestamp when creating new metadata; update
track_delivery (function track_delivery) so that whenever you insert/update
delivery_tracking on self.eventbus_metadata you also refresh
eventbus_metadata.delivery_timestamp to the current unix seconds timestamp;
ensure both the Some(eventbus_metadata) branch and the else branch set
delivery_timestamp (EventBusMetadata.delivery_timestamp) and preserve other
fields (delivery_tracking, topic_chains, etc.) so the temporal-correlation gate
sees the latest delivery time.

Comment on lines +7 to +14
pub struct RpcServiceConfig {
/// Collector identifier
pub collector_id: String,
/// RPC topic to subscribe to
pub rpc_topic: String,
/// Default timeout for RPC operations
pub default_timeout: Duration,
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Consider adding validation to enforce collector_id/rpc_topic consistency.

The public fields allow callers to set collector_id and rpc_topic independently, which can create mismatches. The agent-side RPC client (from context snippet) constructs the target topic as format!("control.collector.{collector_id}"), expecting the collector's rpc_topic to match. If a caller sets collector_id = "foo" but forgets to update rpc_topic, the collector will subscribe to the wrong topic and miss RPC requests.

Consider one of:

  • Make fields private and expose a builder that derives rpc_topic from collector_id
  • Add a validation method that checks consistency
  • Document the invariant and add a debug assertion in runtime code
🔧 Option 1: Derive topic from ID in builder
 #[derive(Debug, Clone)]
 pub struct RpcServiceConfig {
     /// Collector identifier
-    pub collector_id: String,
+    collector_id: String,
     /// RPC topic to subscribe to
-    pub rpc_topic: String,
+    rpc_topic: String,
     /// Default timeout for RPC operations
     pub default_timeout: Duration,
 }
+
+impl RpcServiceConfig {
+    pub fn new(collector_id: impl Into<String>) -> Self {
+        let id = collector_id.into();
+        let topic = format!("control.collector.{id}");
+        Self {
+            collector_id: id,
+            rpc_topic: topic,
+            default_timeout: Duration::from_secs(30),
+        }
+    }
+
+    pub fn with_timeout(mut self, timeout: Duration) -> Self {
+        self.default_timeout = timeout;
+        self
+    }
+
+    pub fn collector_id(&self) -> &str {
+        &self.collector_id
+    }
+
+    pub fn rpc_topic(&self) -> &str {
+        &self.rpc_topic
+    }
+}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@collector-core/src/rpc_services/config.rs` around lines 7 - 14, Make
RpcServiceConfig enforce consistency between collector_id and rpc_topic: either
make the struct fields private and add a constructor/builder (e.g.,
RpcServiceConfig::new or RpcServiceConfigBuilder) that takes collector_id and
derives rpc_topic = format!("control.collector.{collector_id}") (while
preserving default_timeout), or add a public validate method (e.g.,
RpcServiceConfig::validate) that checks rpc_topic ==
format!("control.collector.{collector_id}") and returns a Result/Err or triggers
a debug_assert; update call sites to construct via the new API or invoke
validate() at startup. Ensure references to RpcServiceConfig, collector_id,
rpc_topic, and default_timeout are updated accordingly.

Comment on lines +7 to +14
#![allow(clippy::significant_drop_tightening)]
#![allow(clippy::as_conversions)]
#![allow(clippy::pattern_type_mismatch)]
#![allow(clippy::indexing_slicing)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::expect_used)]
#![allow(clippy::use_debug)]
#![allow(clippy::unreachable)]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Document safety rationale for each clippy allow, especially as_conversions.

Your coding guidelines require: "Add #[allow(clippy::as_conversions)] with safety comment for intentional type casts". The module-level allows apply broadly to manager.rs and providers.rs without explaining WHY each pattern is safe. For example, providers.rs uses multiple as f64 casts (lines 77, 102, 117-122) on metric counters, but there's no comment explaining the range assumptions or overflow behavior.

Additionally, the retrieved learning states .expect() is denied at workspace level, yet clippy::expect_used is allowed here. While manager.rs uses .unwrap_or_default() in most places (good), the blanket allow could mask new .expect() calls added later.

📋 Recommended approach

Move allows to specific sites with safety comments:

// In providers.rs, at each conversion site:
#[allow(clippy::as_conversions)]
// Safe: operation_count is a monotonic counter, always fits in f64 (2^53 max safe integer)
metrics.insert("operation_count".to_owned(), telemetry_metrics.operation_count as f64);

Or if the conversions are truly pervasive and all follow the same pattern, document once at the top of providers.rs:

// Safety note for as_conversions in this module:
// All u64/usize -> f64 conversions are for telemetry metrics, which are small
// counters expected to stay well below f64's 2^53 safe integer limit during
// typical collector runtime (10k processes × 100 sources = ~1M events/sec = 
// ~3.15e13 events/year, safely representable in f64).
#![allow(clippy::as_conversions)]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@collector-core/src/rpc_services/mod.rs` around lines 7 - 14, The module-wide
clippy allows (especially #[allow(clippy::as_conversions)] and
#[allow(clippy::expect_used)]) are too broad; narrow them and document safety:
move #[allow(clippy::as_conversions)] from mod.rs into providers.rs at each
conversion site (or at top of providers.rs with a short safety comment) and add
a one-line rationale that u64/usize -> f64 casts (e.g.
telemetry_metrics.operation_count and other metric counters) are telemetry
counters guaranteed by design to stay below f64's 2^53 safe integer threshold;
remove the blanket #[allow(clippy::expect_used)] from mod.rs and either replace
.expect() with safe alternatives (e.g. .unwrap_or_default()) in
manager.rs/providers.rs or keep #[allow(clippy::expect_used)] only on the
specific lines that truly require it with a safety comment explaining why
failure is fatal and cannot be recovered.

Sources: Coding guidelines, Learnings

Comment on lines +300 to +332
/// Drop privileges after all collectors have registered and reported ready.
///
/// This method is a stub that will be implemented with platform-specific
/// privilege dropping logic in the future. Currently it just logs that
/// privilege dropping would occur.
///
/// # Platform-specific behavior (future)
///
/// - **Unix**: Drop to a non-root user (e.g., `daemoneye`)
/// - **Windows**: Reduce process token privileges
/// - **macOS**: Drop supplementary groups and effective UID
///
/// # Safety
///
/// This should only be called after all collectors have been spawned and
/// have completed their privileged initialization (e.g., binding to
/// privileged ports, accessing protected resources).
#[allow(dead_code)]
#[allow(clippy::unused_async)] // Future implementation will use async for platform-specific privilege dropping
pub async fn drop_privileges(&self) -> Result<()> {
info!("Privilege dropping requested (stub - not yet implemented)");

// Future implementation will:
// 1. Check if running as root/elevated
// 2. Drop supplementary groups
// 3. Set effective UID/GID to unprivileged user
// 4. Verify privileges were dropped successfully

// For now, just log and succeed
debug!("Privilege dropping is not implemented - running with current privileges");

Ok(())
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for privilege dropping implementation and related TODOs

rg -nP --type=rust -C5 'drop_privileges|privilege.*drop|setuid|setgid' daemoneye-agent/ procmond/

Repository: EvilBit-Labs/DaemonEye

Length of output: 16240


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Confirm drop_privileges implementations/usages
rg -n "fn drop_privileges|drop_privileges\\(" daemoneye-agent/ procmond/

# 2) Look for any real privilege-dropping primitives (capabilities, syscalls, prctl, etc.)
rg -n --type=rust '(setuid|setgid|seteuid|setegid|setresuid|setresgid|setgroups|capset|cap_drop|prctl|seteuid|setuid\\(|setgid\\()' daemoneye-agent/ procmond/

# 3) Check for the stub-specific log strings
rg -n "Privilege dropping requested|Privilege dropping is not implemented|stub - not yet implemented" daemoneye-agent/ procmond/

# 4) Inspect the state_machine test that asserts the stub behavior
rg -n "test_drop_privileges_stub" daemoneye-agent/src/broker_manager/tests/state_machine.rs

Repository: EvilBit-Labs/DaemonEye

Length of output: 764


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Confirm drop_privileges definition/body
sed -n '280,360p' daemoneye-agent/src/broker_manager/state_machine.rs

# Confirm how it's called in main + integration test
sed -n '160,220p' daemoneye-agent/src/main.rs
sed -n '250,320p' daemoneye-agent/tests/loading_state_integration.rs

# Confirm the test that asserts stub behavior
sed -n '500,560p' daemoneye-agent/src/broker_manager/tests/state_machine.rs

# Search for any actual privilege-dropping primitives elsewhere using simple literal matches
rg -n --type=rust 'setuid|setgid|seteuid|setegid|setresuid|setresgid|setgroups|capset|cap_drop|prctl|capset\\(' daemoneye-agent/ procmond/ || true

# Search for capability-related crate usage (more likely than syscalls, if stub isn't implemented yet)
rg -n --type=rust 'capabilities?|capng' daemoneye-agent/ procmond/ || true

Repository: EvilBit-Labs/DaemonEye

Length of output: 35692


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Ensure there is only this stub implementation of drop_privileges
rg -n --type=rust "drop_privileges\\s*\\(" daemoneye-agent/ procmond/ || true

# 2) Look for any real privilege-dropping primitives/syscalls without regex parentheses (avoid prior parse issues)
rg -n --type=rust 'setuid|setgid|seteuid|setegid|setresuid|setresgid|setgroups|prctl|capset' daemoneye-agent/ procmond/ || true

# 3) Check the stub behavior callsite in main (what happens if drop_privileges fails)
rg -n --type=rust 'drop_privileges\\(\\)\\.await|Continuing with elevated privileges|Failed to drop privileges' daemoneye-agent/src/main.rs daemoneye-agent/src/broker_manager/state_machine.rs daemoneye-agent/tests/loading_state_integration.rs || true

# 4) Inspect tests asserting stub semantics
sed -n '500,560p' daemoneye-agent/src/broker_manager/tests/state_machine.rs

Repository: EvilBit-Labs/DaemonEye

Length of output: 1922


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for any privilege-dropping implementations/primitives across agent + procmond.
rg -n --type=rust 'libc::(setuid|seteuid|setgid|setegid|setresuid|setresgid|setgroups)|\b(setuid|seteuid|setgid|setegid|setresuid|setresgid|setgroups)\s*\(|\bprctl\s*\(|\bcapset\b|capng|cap_drop' daemoneye-agent/ procmond/ || true

# Search for common "effective uid/gid" strings/logs
rg -n --type=rust 'effective uid|effective gid|supplementary groups|drop supplementary|set effective|euid|egid|UID|GID' daemoneye-agent/ procmond/ || true

Repository: EvilBit-Labs/DaemonEye

Length of output: 496


Critical security TODO: BrokerManager::drop_privileges() is a stub (always returns Ok(())).

  • The method only logs “stub - not yet implemented” and returns success without actually dropping privileges (no UID/GID/capability boundary is enforced).
  • Startup calls it after collectors are ready, but main.rs explicitly continues with elevated privileges on failure—so privilege separation is effectively disabled.
  • There’s a test (test_drop_privileges_stub) that locks in the stub behavior.

Implement real platform-specific privilege dropping with a post-drop verification step, and make failure non-optional (fail startup / hard-gate monitoring) until the dropping is correctly enforced.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@daemoneye-agent/src/broker_manager/state_machine.rs` around lines 300 - 332,
BrokerManager::drop_privileges is currently a no-op stub; replace it with real
platform-specific privilege-dropping and verification (and make failures
non-optional). Implement Unix behavior in BrokerManager::drop_privileges using
setgroups/setgid/setuid (e.g., via the nix crate) to switch to an unprivileged
user, verify with geteuid/getegid that privileges were lowered, and return an
Err if any step fails; implement Windows equivalent
(AdjustTokenPrivileges/SetTokenInformation) behind cfg(windows) and also verify
the effective token privileges changed; remove the test-locking stub test
test_drop_privileges_stub and update startup code that calls drop_privileges
(caller in main.rs) to propagate and hard-fail on Err so startup cannot continue
with elevated privileges until drop succeeds. Ensure the function name
BrokerManager::drop_privileges and the test test_drop_privileges_stub are the
points of change and add a post-drop verification step that returns Err on
mismatch.

Comment on lines +360 to +365
pub async fn broadcast_begin_monitoring(&self) -> Result<()> {
let broker_guard = self.broker.read().await;
let Some(broker) = broker_guard.as_ref() else {
warn!("Cannot broadcast 'begin monitoring': broker not available");
return Ok(());
};

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Critical: broadcast_begin_monitoring() silently succeeds when broker unavailable.

Lines 362–365: When the broker is None, the method logs a warning and returns Ok(()). This breaks the contract expected by transition_to_steady_state() (lines 154–162), which relies on an Err result to trigger state rollback.

If the broker is not available, collectors never receive the "begin monitoring" signal, but the agent proceeds to SteadyState anyway.

Return an error instead:

         let broker_guard = self.broker.read().await;
         let Some(broker) = broker_guard.as_ref() else {
-            warn!("Cannot broadcast 'begin monitoring': broker not available");
-            return Ok(());
+            return Err(anyhow::anyhow!(
+                "Cannot broadcast 'begin monitoring': broker not available"
+            ));
         };

This ensures transition_to_steady_state() correctly rolls back to Ready when the broadcast fails.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@daemoneye-agent/src/broker_manager/state_machine.rs` around lines 360 - 365,
broadcast_begin_monitoring currently logs a warning and returns Ok(()) when the
broker is None, which prevents transition_to_steady_state from detecting the
failure and rolling back; change the behavior so that when broker_guard.as_ref()
is None (the Some(broker) pattern fails) you still log the warning and then
return an Err with a descriptive error (e.g., using anyhow::anyhow or the
crate's common error constructor) so callers like transition_to_steady_state
receive a failure and can trigger rollback; keep the existing broker guard and
message but replace the Ok(()) return with an Err that includes context about
"broker not available" and the broadcast failing.

Comment on lines +372 to +375
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_millis(),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

Consider logging a warning when system time is before UNIX_EPOCH.

Lines 372–375: Using .unwrap_or_default() on duration_since(UNIX_EPOCH) returns 0 if the system clock is set before 1970-01-01. This won't panic, but could confuse log correlation if the timestamp is 0.

For operator reliability in production, add a warning:

             "timestamp": std::time::SystemTime::now()
                 .duration_since(std::time::SystemTime::UNIX_EPOCH)
-                .unwrap_or_default()
+                .unwrap_or_else(|e| {
+                    warn!(error = %e, "System time is before UNIX_EPOCH, using timestamp 0");
+                    Duration::default()
+                })
                 .as_millis(),

This makes clock issues visible in logs without breaking functionality.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@daemoneye-agent/src/broker_manager/state_machine.rs` around lines 372 - 375,
Replace the blind `.unwrap_or_default()` on
`SystemTime::now().duration_since(UNIX_EPOCH)` so that if it returns an `Err`
you log a warning before falling back to `0`; specifically, call
`SystemTime::now()` and `duration_since(UNIX_EPOCH)` and handle the `Result`
(from `duration_since`) with a match or `map_or_else` that logs a warning
(including the error) when the system time is before `UNIX_EPOCH`, then use
`.as_millis()` on the valid duration or `0` as the fallback; look for the
`timestamp` construction using `SystemTime`, `UNIX_EPOCH`, `duration_since`, and
`as_millis` in `state_machine.rs` to modify.

Comment on lines +97 to +99
pub(super) fn mark_ready(&mut self, collector_id: &str) {
self.ready_collectors.insert(collector_id.to_owned());
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for mark_ready usage to verify whether unexpected IDs are allowed

rg -nP --type=rust -C3 '\bmark_ready\s*\(' daemoneye-agent/

Repository: EvilBit-Labs/DaemonEye

Length of output: 1224


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Show the full implementation region for CollectorReadinessTracker
sed -n '1,220p' daemoneye-agent/src/broker_manager/state.rs | nl -ba | sed -n '1,220p'

# 2) Inspect mark_collector_ready call site and any logic around expected collectors
sed -n '1,140p' daemoneye-agent/src/broker_manager/state_machine.rs | nl -ba | sed -n '1,140p'

# 3) Find where expected collectors are set / reset, and how collector IDs are derived
rg -n --type=rust "set_expected|reset\(|expected_collectors|ready_collectors|all_ready\(|pending_collectors\(|mark_collector_ready|mark_ready" daemoneye-agent/src/broker_manager/

# 4) Find any other calls to mark_ready (direct or via wrappers)
rg -n --type=rust "\.mark_ready\s*\(|mark_collector_ready\s*\(" daemoneye-agent/src/broker_manager/

Repository: EvilBit-Labs/DaemonEye

Length of output: 109


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Show the implementation of CollectorReadinessTracker
sed -n '1,220p' daemoneye-agent/src/broker_manager/state.rs | cat -n | sed -n '1,220p'

# 2) Inspect mark_collector_ready and surrounding logic
sed -n '1,200p' daemoneye-agent/src/broker_manager/state_machine.rs | cat -n | sed -n '1,200p'

# 3) Find where expected collectors are set/reset and how readiness is used
rg -n --type=rust "set_expected|reset\(|expected_collectors|ready_collectors|all_ready\(|pending_collectors\(|mark_collector_ready|mark_ready" daemoneye-agent/src/broker_manager/

# 4) Find any other calls to mark_ready/mark_collector_ready
rg -n --type=rust "\.mark_ready\s*\(|mark_collector_ready\s*\(" daemoneye-agent/src/broker_manager/

Repository: EvilBit-Labs/DaemonEye

Length of output: 20756


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show relevant parts of state_machine.rs tests around mark_ready/all_ready behavior
sed -n '1,220p' daemoneye-agent/src/broker_manager/tests/state_machine.rs | cat -n
sed -n '220,420p' daemoneye-agent/src/broker_manager/tests/state_machine.rs | cat -n
sed -n '420,520p' daemoneye-agent/src/broker_manager/tests/state_machine.rs | cat -n

# Inspect RPC handler that marks collector ready
sed -n '430,560p' daemoneye-agent/src/broker_manager/rpc.rs | cat -n

Repository: EvilBit-Labs/DaemonEye

Length of output: 26097


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the exact RPC handler code around the mark_collector_ready call
sed -n '440,540p' daemoneye-agent/src/broker_manager/rpc.rs | cat -n

# Find and inspect collector registration acceptance logic
rg -n --type=rust "fn register_collector|register\(|accepted|RegistrationResponse|RegistrationError|collector_id" daemoneye-agent/src/collector_registry/ daemoneye-agent/src/broker_manager/

# If collector registry file is found, show it around register method
# (Use fd-like search by listing rust files containing "struct CollectorRegistry" and "register(" )
rg -n --type=rust "struct CollectorRegistry|impl .*CollectorRegistry|async fn register" daemoneye-agent/src/collector_registry/ daemoneye-agent/src/collector_registry -S || true

Repository: EvilBit-Labs/DaemonEye

Length of output: 13957


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate CollectorRegistry implementation
rg -n --type=rust "struct CollectorRegistry|impl .*CollectorRegistry|async fn register\(|fn register\(" daemoneye-agent/src -S

# Narrow to the file(s) that define CollectorRegistry
fd -t f -e rs "CollectorRegistry" daemoneye-agent/src || true
fd -t f -e rs "collector_registry" daemoneye-agent/src || true

# If we find candidates, print the relevant sections around register()
# (Use rg output to infer file paths; this is a best-effort two-pass)
echo "---- candidate register() implementations ----"
rg -n --type=rust "CollectorRegistry" daemoneye-agent/src -S | head -n 50

Repository: EvilBit-Labs/DaemonEye

Length of output: 2237


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show collector_registry.rs around register()
sed -n '1,260p' daemoneye-agent/src/collector_registry.rs | cat -n

# Also show any tests for register() that might cover unexpected IDs
sed -n '260,480p' daemoneye-agent/src/collector_registry.rs | cat -n

Repository: EvilBit-Labs/DaemonEye

Length of output: 16436


mark_ready should maintain the readiness invariant (unknown IDs shouldn’t be counted as ready)

  • mark_ready currently inserts any collector_id into ready_collectors; however all_ready() / pending_collectors() only iterate over expected_collectors, so unexpected IDs cannot make startup transition succeed early.
  • Still, ready_collectors (and ready_count in logs) gets polluted because CollectorRegistry::register() accepts any non-empty, non-duplicate collector_id (no check against the configured expected set). Add a guard so only expected IDs are tracked:
    pub(super) fn mark_ready(&mut self, collector_id: &str) {
        if self.expected_collectors.contains(collector_id) {
            self.ready_collectors.insert(collector_id.to_owned());
        }
    }
  • Security note: even with the guard, a collector that claims an expected collector_id will be treated as ready—fixing impersonation requires identity/ownership validation at registration time, not just readiness bookkeeping.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@daemoneye-agent/src/broker_manager/state.rs` around lines 97 - 99, mark_ready
currently inserts any collector_id into ready_collectors, which lets unexpected
IDs pollute ready_count; change mark_ready to only insert when the id exists in
expected_collectors (i.e., check self.expected_collectors.contains(collector_id)
before inserting). Update the method mark_ready in state.rs to guard readiness
tracking accordingly; note that full protection against impersonation must be
handled in CollectorRegistry::register (validate identity/ownership) rather than
here.

Comment on lines +3 to +13
#![allow(
clippy::expect_used,
clippy::unwrap_used,
clippy::str_to_string,
clippy::semicolon_outside_block,
clippy::semicolon_inside_block,
clippy::semicolon_if_nothing_returned,
clippy::shadow_unrelated,
clippy::wildcard_enum_match_arm,
clippy::panic
)]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Inspect the reviewed file (lines 1-60)
echo "=== daemoneye-agent/src/broker_manager/tests/mod.rs (head) ==="
sed -n '1,60p' daemoneye-agent/src/broker_manager/tests/mod.rs

# 2) List test files under that module
echo "=== Broker manager tests files ==="
fd -t f -e rs "daemoneye-agent/src/broker_manager/tests" -a 2>/dev/null || true
# (fallback listing if fd -a doesn't exist in sandbox)
ls -la daemoneye-agent/src/broker_manager/tests || true
find daemoneye-agent/src/broker_manager/tests -maxdepth 2 -type f -name '*.rs' -print

# 3) Check for expect()/unwrap() usages inside broker_manager/tests
echo "=== rg expect()/unwrap() under daemoneye-agent/src/broker_manager/tests ==="
rg -n --hidden --no-ignore-vcs "(\.expect\(|\.unwrap\()" daemoneye-agent/src/broker_manager/tests || true

# 4) Confirm whether clippy::expect_used is denied/allowed at workspace level
echo "=== rg clippy::expect_used / unwrap_used in workspace configs ==="
rg -n --hidden --no-ignore-vcs "clippy::(expect_used|unwrap_used)" -S . || true

# 5) Also search for workspace lints configuration blocks in Cargo.toml(s)
echo "=== Search for workspace lints config ==="
rg -n --hidden --no-ignore-vcs "\\[workspace\\.lints\\.|\\[lints\\.|clippy::expect_used|expect_used" -S Cargo.toml daemoneye-agent/Cargo.toml **/Cargo.toml 2>/dev/null || true

# 6) Print top-level Cargo.toml if present (first ~220 lines)
echo "=== Top-level Cargo.toml (head) ==="
ls -la Cargo.toml && sed -n '1,220p' Cargo.toml || true

# 7) If there is a clippy.toml or rust-toolchain.toml, show relevant bits
echo "=== clippy.toml / rust-toolchain.toml / .cargo/config* if present ==="
ls -la .cargo 2>/dev/null || true
find . -maxdepth 3 -type f \( -name "clippy.toml" -o -name "rust-toolchain*.toml" -o -path "*/.cargo/config*" \) -print -exec sed -n '1,200p' {} \; || true

Repository: EvilBit-Labs/DaemonEye

Length of output: 31106


Don’t module-wide disable denied Clippy lints in broker_manager tests

daemoneye-agent/src/broker_manager/tests/mod.rs uses #![allow(...)] to suppress clippy::unwrap_used (workspace deny) and clippy::panic (workspace deny) across the whole test module, masking lint enforcement in lifecycle.rs/state_machine.rs where .unwrap()/.expect() are used. Remove the module-wide allow and instead fix the specific call sites (or use narrow #[allow(...)] only where it’s justified).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@daemoneye-agent/src/broker_manager/tests/mod.rs` around lines 3 - 13, Remove
the module-wide #![allow(...)] in tests/mod.rs that suppresses
clippy::unwrap_used and clippy::panic; instead audit the specific call sites in
lifecycle.rs and state_machine.rs that call .unwrap()/.expect() and either
handle the Result/Error (propagate Result from the test, use
assert_matches/assert_eq on Err/Ok, or replace with expect including a useful
message), or if absolutely necessary apply a narrow
#[allow(clippy::unwrap_used)] or #[allow(clippy::panic)] directly on the
individual test function or smallest scope that legitimately needs it; ensure
tests compile as Result-returning tests where feasible to avoid panics.

Sources: Coding guidelines, Learnings

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

architecture System architecture and design decisions async Related to asynchronous programming and async/await patterns core-feature Core system functionality daemoneye-agent data-models Data structure and model related ipc Inter-Process Communication process-monitoring Process monitoring and enumeration features rust Pull requests that update rust code size:XXL This PR changes 1000+ lines, ignoring generated files. testing Related to test development and test infrastructure type:feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants