From e6a79ed7fcbc68cabdd73197f1c08985ccb7aa86 Mon Sep 17 00:00:00 2001 From: Jose Manuel Perez Date: Mon, 29 Jun 2026 14:59:46 +0200 Subject: [PATCH] add try_produce and rustdoc guidance for Producer Implements Producer::try_produce, which returns the value back via TryProduceError::Full or TryProduceError::Closed instead of silently overwriting, for callers that need a meaningful response to backpressure. Extends WriteHandle with a default try_push method so existing buffer implementations require no changes. Updates rustdoc on both produce and try_produce with the suggested phrasing and no_run examples so callers can pick the right method without reading the implementation. --- aimdb-core/src/buffer/mod.rs | 2 +- aimdb-core/src/buffer/traits.rs | 42 +++++++++++++++++++++++++++++ aimdb-core/src/lib.rs | 1 + aimdb-core/src/typed_api.rs | 48 +++++++++++++++++++++++++++++---- 4 files changed, 87 insertions(+), 6 deletions(-) diff --git a/aimdb-core/src/buffer/mod.rs b/aimdb-core/src/buffer/mod.rs index 74591b9..4231789 100644 --- a/aimdb-core/src/buffer/mod.rs +++ b/aimdb-core/src/buffer/mod.rs @@ -64,7 +64,7 @@ mod writer; // Public API exports pub use cfg::BufferCfg; -pub use traits::{Buffer, BufferReader, DynBuffer}; +pub use traits::{Buffer, BufferReader, DynBuffer, TryProduceError}; // Crate-private — used by Producer to push without per-call lookup pub(crate) use traits::WriteHandle; diff --git a/aimdb-core/src/buffer/traits.rs b/aimdb-core/src/buffer/traits.rs index e49b5e1..173b643 100644 --- a/aimdb-core/src/buffer/traits.rs +++ b/aimdb-core/src/buffer/traits.rs @@ -102,6 +102,18 @@ pub trait DynBuffer: Send + Sync { fn reset_metrics(&self) {} } +/// Non-blocking push error — carries the rejected value back to the caller. +/// +/// Returned by [`Producer::try_produce`](crate::typed_api::Producer::try_produce). +/// Both variants own the value so the caller can retry, escalate, or drop it. +#[derive(Debug)] +pub enum TryProduceError { + /// Buffer is at capacity and configured not to overwrite. Transient. + Full(T), + /// Buffer / record has been torn down (e.g. shutdown). Terminal. + Closed(T), +} + /// Write-side handle for a single record (design 029, M14). /// /// `Producer` holds an `Arc>` so it can be parameterised @@ -120,6 +132,14 @@ pub(crate) trait WriteHandle: Send + Sync { /// Infallible — all three operations are synchronous and lock-free or /// spin-locked. fn push(&self, value: T); + + /// Default: delegate to `push` and return `Ok(())`. Overwriting buffers + /// cannot fail, so the value is always accepted. Bounded / non-overwriting + /// buffers override this to return `Full(value)` or `Closed(value)`. + fn try_push(&self, value: T) -> Result<(), TryProduceError> { + self.push(value); + Ok(()) + } } /// Reader trait for consuming values from a buffer @@ -322,4 +342,26 @@ mod tests { // Should be able to use as DynBuffer let _: &dyn DynBuffer = &buffer; } + + // WriteHandle impl that always rejects with the buffer Full. used to verify that when + // try_push fails, it returns the value that it failed to push. + struct FullWriteHandle; + + impl WriteHandle for FullWriteHandle { + fn push(&self, _value: i32) {} + fn try_push(&self, value: i32) -> Result<(), TryProduceError> { + Err(TryProduceError::Full(value)) + } + } + + #[test] + fn try_push_full_round_trips_value_through_producer() { + use alloc::sync::Arc; + let producer = crate::typed_api::Producer::new(Arc::new(FullWriteHandle)); + let result = producer.try_produce(42_i32); + assert!( + matches!(result, Err(TryProduceError::Full(42))), + "expected Full(42), got {result:?}", + ); + } } diff --git a/aimdb-core/src/lib.rs b/aimdb-core/src/lib.rs index cc48d1e..2746900 100644 --- a/aimdb-core/src/lib.rs +++ b/aimdb-core/src/lib.rs @@ -58,6 +58,7 @@ pub use extensions::Extensions; pub use aimdb_executor::{ExecutorError, ExecutorResult}; // Producer-Consumer Pattern exports +pub use buffer::TryProduceError; pub use builder::OutboundRoute; pub use builder::{AimDb, AimDbBuilder}; pub use connector::ConnectorBuilder; diff --git a/aimdb-core/src/typed_api.rs b/aimdb-core/src/typed_api.rs index dda6e3f..988a9bb 100644 --- a/aimdb-core/src/typed_api.rs +++ b/aimdb-core/src/typed_api.rs @@ -67,7 +67,7 @@ use alloc::{ vec::Vec, }; -use crate::buffer::{DynBuffer, WriteHandle}; +use crate::buffer::{DynBuffer, TryProduceError, WriteHandle}; use crate::typed_record::TypedRecord; use crate::AimDb; @@ -133,12 +133,25 @@ where ))); } - /// Produce a value of type T + /// Push a value. Infallible — overwrite-on-overflow buffers cannot reject. + /// Use this for fire-and-forget telemetry. /// - /// Push to the record's buffer; consumer tasks and outbound link connectors - /// observe it from there. Synchronous and infallible — the underlying - /// `WriteHandle::push` cannot fail. + /// Forwards the value to the record's buffer, the latest-snapshot slot, + /// and the metadata tracker in a single synchronous call. /// + /// # Example + /// + /// ```no_run + /// # use aimdb_core::{Producer, RuntimeContext}; + /// # #[derive(Clone, Debug)] struct Telemetry { celsius: f32 } + /// # async fn read_sensor() -> Telemetry { Telemetry { celsius: 21.0 } } + /// async fn sensor_loop(ctx: RuntimeContext, producer: Producer) { + /// loop { + /// producer.produce(read_sensor().await); + /// ctx.time().sleep_secs(1).await; + /// } + /// } + /// ``` pub fn produce(&self, value: T) { #[cfg(feature = "profiling")] if let Some(state) = &self.profiling { @@ -146,6 +159,31 @@ where } self.write.push(value); } + + /// Non-blocking push. Returns the value back via [`TryProduceError::Full`] + /// if a bounded buffer is at capacity, or [`TryProduceError::Closed`] if + /// the record is shutting down. Use when the caller has a meaningful + /// response to backpressure. + /// + /// Overwriting buffers (`SpmcRing`, `SingleLatest`, `Mailbox`) always + /// return `Ok(())`. Use [`produce`](Self::produce) for those. + /// + /// # Example + /// + /// ```no_run + /// # use aimdb_core::{Producer, TryProduceError}; + /// # #[derive(Clone, Debug)] struct Command { id: u32 } + /// fn send_command(producer: &Producer, cmd: Command) { + /// match producer.try_produce(cmd) { + /// Ok(()) => {} + /// Err(TryProduceError::Full(_)) => { /* backpressure: retry later */ } + /// Err(TryProduceError::Closed(_)) => { /* record shut down */ } + /// } + /// } + /// ``` + pub fn try_produce(&self, value: T) -> Result<(), TryProduceError> { + self.write.try_push(value) + } } impl Clone for Producer {