Skip to content

Wire .transform() and .transform_join() into stage profiling #109

Description

@lxsaah

Issue #58 added automatic stage profiling for .source(), .tap(), and .link() (RFC docs/design/014-M6-stage-profiling.md). Transforms were intentionally scoped out and left as a stub. This issue finishes the job: make .transform() and .transform_join() first-class stage-profiling participants.

Why this matters (user-visible symptom)

The get_stage_profiling MCP tool reports a bottleneck = the stage with the highest average wall-clock time, iterating whatever the snapshot returns (tools/aimdb-mcp/src/tools/profiling.rs:63). Because transforms never appear in the snapshot, a transform that is the real bottleneck is invisible, and the tool silently flags the slowest of the remaining source/tap/link stages instead. This is a correctness gap in a shipped tool's output, not just a missing metric.

Current state — verified 2026-06-29

Already wired (transform-aware, do not redo):

  • RecordProfilingMetrics::set_stage_name() already handles StageKind::Transformtransforms vec (aimdb-core/src/profiling/record_profiling.rs:113-123). Naming will work as soon as a real index is stored (see below).
  • reset_all() already chains transforms (record_profiling.rs:126-136).
  • The MCP tool already iterates the snapshot generically and computes the bottleneck by max-avg (tools/aimdb-mcp/src/tools/profiling.rs:63-87), so once the snapshot emits transform entries it picks them up for free — no MCP change needed.

Still missing (the actual work):

  • transforms: Vec<StageEntry> is a dead stub: #[allow(dead_code)] at record_profiling.rs:36.
  • No push_transform() and no transforms() accessor (only push_source/tap/link and sources()/taps()/links() exist).
  • snapshot() iterates only source/tap/link (aimdb-core/src/profiling/info.rs:53-65) — transforms are never reported.
  • .transform() / .transform_join() store last_stage = Some((StageKind::Transform, 0)) with a hardcoded 0 (aimdb-core/src/typed_api.rs:562 and :579), so .with_name(...) silently no-ops past the first transform.
  • run_single_transform calls transform_fn(&input_value, &mut state) untimed (aimdb-core/src/transform/single.rs:186) — no Clock/StageMetrics.
  • The join task output producer is not instrumented.

Goal

  • get_stage_profiling returns a stage_type: "transform" entry per registered transform, with the same call_count / avg / min / max shape as the other stages.
  • .with_name("...") after a .transform* registration is honored and surfaces in the MCP output.
  • Existing source / tap / link behavior is unchanged.
  • Off by default, zero-cost when the profiling feature is disabled, still builds on no_std + alloc + Embassy.

Where to implement — mirror existing code

This is a "follow the .tap() / .source() pattern" task. The four moving parts and their existing analogues:

Part Mirror this Add for transform
Registration push_tap() (record_profiling.rs:60) push_transform(); remove #[allow(dead_code)]; add a transforms() accessor mirroring taps() (record_profiling.rs:103)
Real index in last_stage .tap() stores (StageKind::Tap, idx) (typed_api.rs:478) In transform / transform_raw / transform_join / transform_join_raw, call push_transform() and store the returned idx instead of 0
Spawn-time metric attach collect_producer_future does producer.set_profiling(entry.metrics.clone(), clock) (typed_record.rs:1056); collect_consumer_futures does the consumer equivalent (:1020) In collect_transform_futures (typed_record.rs:763), the output Producer<T> is created at :782 and passed to build_fn at :784 — attach set_profiling there, gated on #[cfg(feature = "profiling")]
Reporting snapshot() tuple list (info.rs:55-59) Add ("transform", self.transforms()) to the iteration (order: sources → taps → links → transforms)

Timing semantics (decided — no open question for the contributor)

Single-input transforms (.transform() / .transform_raw()): time the closure itself — "time per input value processed."

  • Thread an Option<Arc<StageMetrics>> through create_single_transform_descriptorbuild_fnrun_single_transform (transform/single.rs:112 / :146).
  • Wrap the transform_fn(&input_value, &mut state) call at single.rs:186 with Clock::now() before/after; record on both Some(output) and None (a None still consumed an input).

Multi-input joins (.transform_join() / .transform_join_raw()): the Design 027 task-model handler owns its own event loop, so we can't time per-trigger work without intruding into user code. Instead, time the interval between successive Producer::produce() calls on the output producer — the same trick ProducerProfilingState already uses for .source(). Interpretation: "average time between successive join outputs" (a throughput proxy). This falls out of the collect_transform_futures attach point above for free, since the join also produces through that Producer<T>.

Rejected alternative: using producer-cadence for single transforms too (simpler/unified). It conflates closure time with input-wait time, so single transforms get explicit per-call timing instead.

Suggested implementation order

  1. push_transform() + transforms() accessor + drop #[allow(dead_code)] (record_profiling.rs). Unit-test it like the existing push_assigns_sequential_indices test.
  2. Real idx in last_stage for all four .transform* entry points (typed_api.rs). Unit-test that .with_name() lands on the right transform index (naming already works via set_stage_name).
  3. Add transforms to snapshot() (info.rs).
  4. Instrument run_single_transform (per-call Clock timing) + thread metrics through the descriptor.
  5. Attach ProducerProfilingState in collect_transform_futures for the join output cadence.
  6. Tests + example + RFC update.

Tasks

  • RecordProfilingMetrics::push_transform() + transforms() accessor + remove #[allow(dead_code)]
  • snapshot() includes transforms in sources → taps → links → transforms order
  • All four .transform* entry points register a stage and store the real index in last_stage
  • Thread Option<Arc<StageMetrics>> through run_single_transform; wrap the transform_fn call with Clock measurement (record on Some and None)
  • Attach ProducerProfilingState to the producer in collect_transform_futures so join output cadence is timed
  • Unit tests: metrics record correctly; with_name lands on the right index; adjacent records don't cross-talk
  • Integration test in aimdb-tokio-adapter/tests/: source → transform → tap pipeline; assert min ≤ avg ≤ max on the transform stage and that the slowest-by-design stage is reported as bottleneck
  • examples/remote-access-demo: add one transform-driven record so get_stage_profiling returns a stage_type: "transform" entry; update the README stage table
  • RFC update: amend docs/design/014-M6-stage-profiling.md "Implementation notes" to describe transform timing semantics (per-call for single, output-interval for join)

How to verify

# Unit + integration (feature is off by default — turn it on explicitly)
cargo test -p aimdb-core --features profiling
cargo test -p aimdb-tokio-adapter --features profiling

# Zero-cost-when-off + full workspace gate
make check

# Embassy cross-compile (thumbv7em-none-eabihf); already gates embassy-runtime,profiling
make test-embedded

# Eyeball the MCP output end-to-end: run examples/remote-access-demo with the
# profiling feature and call the get_stage_profiling tool; confirm a
# stage_type:"transform" entry appears and is selected as bottleneck when slowest.

Acceptance criteria

  1. With --features profiling, get_stage_profiling for a record with a transform returns a stage_type: "transform" entry containing call_count, avg_time_ns, min_time_ns, max_time_ns, plus the name set via .with_name(...).
  2. When the transform is the slowest stage, bottleneck in the MCP response points at it with the human-readable recommendation string.
  3. Without the profiling feature: no overhead, no API change (verify make check and a default-features build).
  4. Embassy thumbv7em-none-eabihf build still passes with the feature on.
  5. All existing tests continue to pass.

Out of scope

  • CPU-time tracking (still wall-clock only).
  • Histogram / percentile metrics.
  • Per-input-port timing for joins (single aggregate "output cadence" only — per-port timing would mean instrumenting the fan-in forwarders, a separate piece of work if it turns out to be needed).

Prerequisites / difficulty

Mid-sized. Comfortable Rust + async, plus care with #[cfg(feature = "profiling")] gating and no_std + alloc constraints (no std-only types on the core path). Every piece has a working source/tap/link analogue to copy, and there are no unresolved design decisions — timing semantics and the stage ordering are settled above.

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions