Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aimdb-core/src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ mod writer;

// Public API exports
pub use cfg::BufferCfg;
pub use traits::{Buffer, BufferReader, DynBuffer};
pub use traits::{Buffer, BufferReader, DynBuffer, TryProduceError};

// Crate-private — used by Producer<T> to push without per-call lookup
pub(crate) use traits::WriteHandle;
Expand Down
42 changes: 42 additions & 0 deletions aimdb-core/src/buffer/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ pub trait DynBuffer<T: Clone + Send>: Send + Sync {
fn reset_metrics(&self) {}
}

/// Non-blocking push error — carries the rejected value back to the caller.
///
/// Returned by [`Producer::try_produce`](crate::typed_api::Producer::try_produce).
/// Both variants own the value so the caller can retry, escalate, or drop it.
#[derive(Debug)]
pub enum TryProduceError<T> {
/// Buffer is at capacity and configured not to overwrite. Transient.
Full(T),
/// Buffer / record has been torn down (e.g. shutdown). Terminal.
Closed(T),
}

/// Write-side handle for a single record (design 029, M14).
///
/// `Producer<T>` holds an `Arc<dyn WriteHandle<T>>` so it can be parameterised
Expand All @@ -120,6 +132,14 @@ pub(crate) trait WriteHandle<T: Clone + Send + 'static>: Send + Sync {
/// Infallible — all three operations are synchronous and lock-free or
/// spin-locked.
fn push(&self, value: T);

/// Default: delegate to `push` and return `Ok(())`. Overwriting buffers
/// cannot fail, so the value is always accepted. Bounded / non-overwriting
/// buffers override this to return `Full(value)` or `Closed(value)`.
fn try_push(&self, value: T) -> Result<(), TryProduceError<T>> {
self.push(value);
Ok(())
}
}

/// Reader trait for consuming values from a buffer
Expand Down Expand Up @@ -322,4 +342,26 @@ mod tests {
// Should be able to use as DynBuffer
let _: &dyn DynBuffer<i32> = &buffer;
}

// WriteHandle impl that always rejects with the buffer Full. used to verify that when
// try_push fails, it returns the value that it failed to push.
struct FullWriteHandle;

impl WriteHandle<i32> for FullWriteHandle {
fn push(&self, _value: i32) {}
fn try_push(&self, value: i32) -> Result<(), TryProduceError<i32>> {
Err(TryProduceError::Full(value))
}
}

#[test]
fn try_push_full_round_trips_value_through_producer() {
use alloc::sync::Arc;
let producer = crate::typed_api::Producer::new(Arc::new(FullWriteHandle));
let result = producer.try_produce(42_i32);
assert!(
matches!(result, Err(TryProduceError::Full(42))),
"expected Full(42), got {result:?}",
);
}
}
1 change: 1 addition & 0 deletions aimdb-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub use extensions::Extensions;
pub use aimdb_executor::{ExecutorError, ExecutorResult};

// Producer-Consumer Pattern exports
pub use buffer::TryProduceError;
pub use builder::OutboundRoute;
pub use builder::{AimDb, AimDbBuilder};
pub use connector::ConnectorBuilder;
Expand Down
48 changes: 43 additions & 5 deletions aimdb-core/src/typed_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use alloc::{
vec::Vec,
};

use crate::buffer::{DynBuffer, WriteHandle};
use crate::buffer::{DynBuffer, TryProduceError, WriteHandle};
use crate::typed_record::TypedRecord;
use crate::AimDb;

Expand Down Expand Up @@ -133,19 +133,57 @@ where
)));
}

/// Produce a value of type T
/// Push a value. Infallible — overwrite-on-overflow buffers cannot reject.
/// Use this for fire-and-forget telemetry.
///
/// Push to the record's buffer; consumer tasks and outbound link connectors
/// observe it from there. Synchronous and infallible — the underlying
/// `WriteHandle::push` cannot fail.
/// Forwards the value to the record's buffer, the latest-snapshot slot,
/// and the metadata tracker in a single synchronous call.
///
/// # Example
///
/// ```no_run
/// # use aimdb_core::{Producer, RuntimeContext};
/// # #[derive(Clone, Debug)] struct Telemetry { celsius: f32 }
/// # async fn read_sensor() -> Telemetry { Telemetry { celsius: 21.0 } }
/// async fn sensor_loop(ctx: RuntimeContext, producer: Producer<Telemetry>) {
/// loop {
/// producer.produce(read_sensor().await);
/// ctx.time().sleep_secs(1).await;
/// }
/// }
/// ```
pub fn produce(&self, value: T) {
#[cfg(feature = "profiling")]
if let Some(state) = &self.profiling {
state.record_produce();
}
self.write.push(value);
}

/// Non-blocking push. Returns the value back via [`TryProduceError::Full`]
/// if a bounded buffer is at capacity, or [`TryProduceError::Closed`] if
/// the record is shutting down. Use when the caller has a meaningful
/// response to backpressure.
///
/// Overwriting buffers (`SpmcRing`, `SingleLatest`, `Mailbox`) always
/// return `Ok(())`. Use [`produce`](Self::produce) for those.
///
/// # Example
///
/// ```no_run
/// # use aimdb_core::{Producer, TryProduceError};
/// # #[derive(Clone, Debug)] struct Command { id: u32 }
/// fn send_command(producer: &Producer<Command>, cmd: Command) {
/// match producer.try_produce(cmd) {
/// Ok(()) => {}
/// Err(TryProduceError::Full(_)) => { /* backpressure: retry later */ }
/// Err(TryProduceError::Closed(_)) => { /* record shut down */ }
/// }
/// }
/// ```
pub fn try_produce(&self, value: T) -> Result<(), TryProduceError<T>> {
self.write.try_push(value)
}
}

impl<T> Clone for Producer<T> {
Expand Down