Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

No changes yet.
### Added

- **Context-Aware Deserializers (Design 026)**: Inbound connector deserializers can now receive a `RuntimeContext<R>` for platform-independent timestamps and logging
- New `.with_deserializer(|ctx, bytes| ...)` API on `InboundConnectorBuilder` provides `RuntimeContext<R>` to deserialization closures
- New `.with_deserializer_raw(|bytes| ...)` for plain bytes-only deserialization when context is unnecessary
- `DeserializerKind` enum enforces mutual exclusivity between raw and context-aware deserializers
- `Router::route()` propagates optional runtime context to context-aware routes
- **Context-Aware Serializers**: Outbound connector serializers can now receive a `RuntimeContext<R>`, symmetric with deserializers
- New `.with_serializer(|ctx, value| ...)` API on `OutboundConnectorBuilder` provides `RuntimeContext<R>` to serialization closures
- New `.with_serializer_raw(|value| ...)` for plain value-only serialization when context is unnecessary
- `SerializerKind` enum (`Raw` / `Context`) enforces mutual exclusivity
- All outbound connector publishers updated to propagate runtime context via `db.runtime_any()`
- Design document: 026 (Context-Aware Deserializers)

### Changed

- **aimdb-core**: Breaking API changes to `InboundConnectorLink`, `Router`, and `RouterBuilder` to support `DeserializerKind` (see [aimdb-core/CHANGELOG.md](aimdb-core/CHANGELOG.md))
- **aimdb-core**: Breaking API change — `ConnectorLink.serializer` now stores `SerializerKind` instead of `SerializerFn`
- **aimdb-core**: `.with_serializer()` renamed to `.with_serializer_raw()` for the old single-argument pattern
- **aimdb-mqtt-connector**: Updated router dispatch for new `route()` signature; outbound publishers dispatch via `SerializerKind`
- **aimdb-knx-connector**: Updated router dispatch for new `route()` signature; outbound publishers dispatch via `SerializerKind`
- **aimdb-websocket-connector**: Updated router dispatch for new `route()` signature; outbound publishers dispatch via `SerializerKind`
- All connector examples updated to use new `.with_deserializer(|_ctx, bytes| ...)` and `.with_serializer_raw(|value| ...)` signatures

## [1.0.0] - 2026-03-16

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion _external/embassy
Submodule embassy updated 112 files
4 changes: 2 additions & 2 deletions aimdb-codegen/src/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ fn emit_connector_chain(
chain = quote! {
#chain
.link_to(#addr_var)
.with_serializer(|v: &#value_type| {
.with_serializer_raw(|v: &#value_type| {
v.to_bytes()
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
})
Expand Down Expand Up @@ -1515,7 +1515,7 @@ fn emit_transform_configure_block(rec: &RecordDef, task: &TaskDef) -> TokenStrea
let outbound_chain = if has_outbound {
quote! {
.link_to(addr)
.with_serializer(|v: &#value_type| {
.with_serializer_raw(|v: &#value_type| {
v.to_bytes()
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
})
Expand Down
27 changes: 26 additions & 1 deletion aimdb-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

No changes yet.
### Added

- **Context-Aware Deserializers (Design 026)**: Inbound connector deserializers can now receive a `RuntimeContext<R>` for platform-independent timestamps and logging during deserialization
- New `ContextDeserializerFn` type alias for context-aware type-erased deserializer callbacks
- New `DeserializerKind` enum (`Raw` / `Context`) to enforce mutual exclusivity between plain and context-aware deserializers
- `.with_deserializer(|ctx, bytes| ...)` now accepts a context-aware closure receiving `RuntimeContext<R>`
- `.with_deserializer_raw(|bytes| ...)` added for plain bytes-only deserialization (no context needed)
- `Router::route()` now accepts an optional type-erased runtime context (`Option<&Arc<dyn Any + Send + Sync>>`)
- Context deserializer routes are gracefully skipped when no context is provided
- **Context-Aware Serializers**: Outbound connector serializers can now receive a `RuntimeContext<R>`, symmetric with deserializers
- New `ContextSerializerFn` type alias for context-aware type-erased serializer callbacks
- New `SerializerKind` enum (`Raw` / `Context`) to enforce mutual exclusivity between plain and context-aware serializers
- `.with_serializer(|ctx, value| ...)` now accepts a context-aware closure receiving `RuntimeContext<R>`
- `.with_serializer_raw(|value| ...)` added for plain value-only serialization (no context needed)

### Changed

- **Breaking**: `InboundConnectorLink::deserializer` field type changed from `DeserializerFn` to `DeserializerKind`
- **Breaking**: `InboundConnectorLink::new()` now takes `DeserializerKind` instead of `DeserializerFn`
- **Breaking**: `Router::route()` signature changed to accept an additional `ctx` parameter
- **Breaking**: `RouterBuilder::from_routes()` and `RouterBuilder::add_route()` now take `DeserializerKind` instead of `DeserializerFn`
- **Breaking**: `ConnectorLink::serializer` field type changed from `Option<SerializerFn>` to `Option<SerializerKind>`
- **Breaking**: `.with_serializer()` renamed to `.with_serializer_raw()` — old single-argument pattern
- **Breaking**: `OutboundRoute` type alias updated to use `SerializerKind`
- **Breaking**: `.with_deserializer()` on `InboundConnectorBuilder` now expects `Fn(RuntimeContext<R>, &[u8]) -> Result<T, String>` instead of `Fn(&[u8]) -> Result<T, String>` — use `.with_deserializer_raw()` for the previous bytes-only signature
- `AimDb::collect_inbound_routes()` return type updated to use `DeserializerKind`

## [1.0.0] - 2026-03-11

Expand Down
14 changes: 11 additions & 3 deletions aimdb-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ use crate::{DbError, DbResult};
/// Each tuple contains:
/// - `String` - Default topic/destination from the URL path
/// - `Box<dyn ConsumerTrait>` - Consumer for subscribing to record values
/// - `SerializerFn` - User-provided serializer for the record type
/// - `SerializerKind` - User-provided serializer for the record type (raw or context-aware)
/// - `Vec<(String, String)>` - Configuration options from the URL query
/// - `Option<TopicProviderFn>` - Optional dynamic topic provider
#[cfg(feature = "alloc")]
pub type OutboundRoute = (
String,
Box<dyn crate::connector::ConsumerTrait>,
crate::connector::SerializerFn,
crate::connector::SerializerKind,
Vec<(String, String)>,
Option<crate::connector::TopicProviderFn>,
);
Expand Down Expand Up @@ -1213,6 +1213,14 @@ impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
&self.runtime
}

/// Returns the runtime as a type-erased `Arc<dyn Any + Send + Sync>`
///
/// Used by connectors to provide `RuntimeContext` to context-aware
/// deserializers during inbound message routing.
pub fn runtime_any(&self) -> Arc<dyn core::any::Any + Send + Sync> {
self.runtime.clone()
}

/// Lists all registered records (std only)
///
/// Returns metadata for all registered records, useful for remote access introspection.
Expand Down Expand Up @@ -1450,7 +1458,7 @@ impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
) -> Vec<(
String,
Box<dyn crate::connector::ProducerTrait>,
crate::connector::DeserializerFn,
crate::connector::DeserializerKind,
)> {
let mut routes = Vec::new();

Expand Down
90 changes: 75 additions & 15 deletions aimdb-core/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,34 @@ impl std::error::Error for SerializeError {}
pub type SerializerFn =
Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;

/// Type alias for context-aware type-erased serializer callbacks
///
/// Like `SerializerFn`, but receives a type-erased runtime context
/// for platform-independent timestamps and logging during serialization.
///
/// The first argument is the type-erased runtime (as `Arc<dyn Any + Send + Sync>`),
/// which is downcast to the concrete runtime type via `RuntimeContext::extract_from_any`.
pub type ContextSerializerFn = Arc<
dyn Fn(
Arc<dyn core::any::Any + Send + Sync>,
&dyn core::any::Any,
) -> Result<Vec<u8>, SerializeError>
+ Send
+ Sync,
>;

/// Which serializer variant is registered for an outbound link
///
/// Enforces mutual exclusivity between raw value-only serializers
/// and context-aware serializers.
#[derive(Clone)]
pub enum SerializerKind {
/// Plain value-only serializer (from `.with_serializer_raw()`)
Raw(SerializerFn),
/// Context-aware serializer (from `.with_serializer()`)
Context(ContextSerializerFn),
}

// ============================================================================
// TopicProvider - Dynamic topic/destination routing
// ============================================================================
Expand Down Expand Up @@ -434,13 +462,14 @@ pub struct ConnectorLink {

/// Serialization callback that converts record values to bytes for publishing
///
/// This is a type-erased function that takes `&dyn Any` and returns `Result<Vec<u8>, String>`.
/// The connector implementation will downcast to the concrete type and call the serializer.
/// Either a plain value-only serializer (`Raw`) or a context-aware
/// serializer (`Context`) that receives `RuntimeContext` for timestamps
/// and logging.
///
/// If `None`, the connector must provide a default serialization mechanism or fail.
///
/// Available in both `std` and `no_std` (with `alloc` feature) environments.
pub serializer: Option<SerializerFn>,
pub serializer: Option<SerializerKind>,

/// Consumer factory callback (alloc feature)
///
Expand Down Expand Up @@ -471,7 +500,10 @@ impl Debug for ConnectorLink {
.field("config", &self.config)
.field(
"serializer",
&self.serializer.as_ref().map(|_| "<function>"),
&self.serializer.as_ref().map(|s| match s {
SerializerKind::Raw(_) => "<raw>",
SerializerKind::Context(_) => "<context>",
}),
)
.field(
"consumer_factory",
Expand Down Expand Up @@ -531,6 +563,34 @@ impl ConnectorLink {
pub type DeserializerFn =
Arc<dyn Fn(&[u8]) -> Result<Box<dyn core::any::Any + Send>, String> + Send + Sync>;

/// Type alias for context-aware type-erased deserializer callbacks
///
/// Like `DeserializerFn`, but receives a type-erased runtime context
/// for platform-independent timestamps and logging during deserialization.
///
/// The first argument is the type-erased runtime (as `Arc<dyn Any + Send + Sync>`),
/// which is downcast to the concrete runtime type via `RuntimeContext::extract_from_any`.
pub type ContextDeserializerFn = Arc<
dyn Fn(
Arc<dyn core::any::Any + Send + Sync>,
&[u8],
) -> Result<Box<dyn core::any::Any + Send>, String>
+ Send
+ Sync,
>;

/// Which deserializer variant is registered for an inbound link
///
/// Enforces mutual exclusivity between raw bytes-only deserializers
/// and context-aware deserializers.
#[derive(Clone)]
pub enum DeserializerKind {
/// Plain bytes-only deserializer (from `.with_deserializer_raw()`)
Raw(DeserializerFn),
/// Context-aware deserializer (from `.with_deserializer()`)
Context(ContextDeserializerFn),
}

/// Type alias for producer factory callback (alloc feature)
///
/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ProducerTrait.
Expand Down Expand Up @@ -646,12 +706,12 @@ pub struct InboundConnectorLink {

/// Deserialization callback that converts bytes to typed values
///
/// This is a type-erased function that takes `&[u8]` and returns
/// `Result<Box<dyn Any + Send>, String>`. The spawned task will
/// downcast to the concrete type before producing.
/// Either a plain bytes-only deserializer (`Raw`) or a context-aware
/// deserializer (`Context`) that receives `RuntimeContext` for timestamps
/// and logging.
///
/// Available in both `std` and `no_std` (with `alloc` feature) environments.
pub deserializer: DeserializerFn,
pub deserializer: DeserializerKind,

/// Producer creation callback (alloc feature)
///
Expand Down Expand Up @@ -700,7 +760,7 @@ impl Debug for InboundConnectorLink {

impl InboundConnectorLink {
/// Creates a new inbound connector link from a URL and deserializer
pub fn new(url: ConnectorUrl, deserializer: DeserializerFn) -> Self {
pub fn new(url: ConnectorUrl, deserializer: DeserializerKind) -> Self {
Self {
url,
config: Vec::new(),
Expand Down Expand Up @@ -1153,25 +1213,25 @@ mod tests {

#[test]
fn test_inbound_connector_link_resolve_topic_default() {
use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};

let url = ConnectorUrl::parse("mqtt://sensors/temperature").unwrap();
let deserializer: DeserializerFn =
Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
let link = InboundConnectorLink::new(url, deserializer);
let link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));

// No resolver configured, should return static topic from URL
assert_eq!(link.resolve_topic(), "sensors/temperature");
}

#[test]
fn test_inbound_connector_link_resolve_topic_dynamic() {
use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};

let url = ConnectorUrl::parse("mqtt://sensors/default").unwrap();
let deserializer: DeserializerFn =
Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
let mut link = InboundConnectorLink::new(url, deserializer);
let mut link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));

// Configure dynamic resolver
link.topic_resolver = Some(Arc::new(|| Some("sensors/dynamic/kitchen".into())));
Expand All @@ -1182,12 +1242,12 @@ mod tests {

#[test]
fn test_inbound_connector_link_resolve_topic_fallback() {
use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};

let url = ConnectorUrl::parse("mqtt://sensors/fallback").unwrap();
let deserializer: DeserializerFn =
Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
let mut link = InboundConnectorLink::new(url, deserializer);
let mut link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));

// Configure resolver that returns None
link.topic_resolver = Some(Arc::new(|| None));
Expand Down
27 changes: 13 additions & 14 deletions aimdb-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ where
#[cfg(feature = "std")]
runtime: std::sync::Arc<R>,
#[cfg(not(feature = "std"))]
runtime: &'static R,
runtime: alloc::sync::Arc<R>,
}

#[cfg(feature = "std")]
Expand Down Expand Up @@ -64,8 +64,15 @@ impl<R> RuntimeContext<R>
where
R: Runtime,
{
/// Create a new RuntimeContext with static reference (no_std version)
pub fn new(runtime: &'static R) -> Self {
/// Create a new RuntimeContext (no_std version uses Arc internally)
pub fn new(runtime: R) -> Self {
Self {
runtime: alloc::sync::Arc::new(runtime),
}
}

/// Create from an existing Arc to avoid double-wrapping
pub fn from_arc(runtime: alloc::sync::Arc<R>) -> Self {
Self { runtime }
}

Expand All @@ -74,21 +81,13 @@ where
/// This is a helper for runtime adapters to convert the raw `Arc<dyn Any>`
/// context passed to `.source_raw()` and `.tap_raw()` into a typed `RuntimeContext`.
///
/// For no_std, this leaks the Arc to obtain a `&'static` reference, which is safe
/// because the runtime lives for the entire program lifetime in embedded contexts.
///
/// # Panics
/// Panics if the runtime type doesn't match `R`.
pub fn extract_from_any(ctx_any: alloc::sync::Arc<dyn core::any::Any + Send + Sync>) -> Self {
let runtime = ctx_any
.downcast::<R>()
.expect("Runtime type mismatch - expected matching runtime adapter");

// Convert Arc<R> to &'static R by leaking it
// This is safe because in embedded contexts, the runtime lives for the entire program
let runtime_ref: &'static R = &*alloc::boxed::Box::leak(runtime.into());

Self::new(runtime_ref)
Self::from_arc(runtime)
}
}

Expand Down Expand Up @@ -117,8 +116,8 @@ where
}

#[cfg(not(feature = "std"))]
pub fn runtime(&self) -> &'static R {
self.runtime
pub fn runtime(&self) -> &R {
&self.runtime
}
}

Expand Down
16 changes: 3 additions & 13 deletions aimdb-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use core::fmt::Debug;
extern crate alloc;

#[cfg(not(feature = "std"))]
use alloc::boxed::Box;
use alloc::{boxed::Box, sync::Arc};

#[cfg(feature = "std")]
use std::boxed::Box;
use std::{boxed::Box, sync::Arc};

/// AimDB Database implementation
///
Expand Down Expand Up @@ -136,17 +136,7 @@ impl<A: RuntimeAdapter + aimdb_executor::Spawn + 'static> Database<A> {
where
A: aimdb_executor::Runtime + Clone,
{
#[cfg(feature = "std")]
{
RuntimeContext::from_arc(std::sync::Arc::new(self.adapter.clone()))
}
#[cfg(not(feature = "std"))]
{
// For no_std, we need a static reference - this would typically be handled
// by the caller storing the adapter in a static cell first
// For now, we'll document this limitation
panic!("context() not supported in no_std without a static reference. To use context(), store your adapter in a static cell (e.g., StaticCell from portable-atomic or embassy-sync), or use adapter() directly.")
}
RuntimeContext::from_arc(Arc::new(self.adapter.clone()))
}
}

Expand Down
Loading
Loading