Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ default = ["atof-streaming"]
atof-streaming = ["nemo-relay/atof-streaming"]

[dependencies]
nemo-relay = { workspace = true, features = ["guardrails-remote", "object-store", "openinference"] }
nemo-relay = { workspace = true, features = ["guardrails-remote", "object-store", "openinference", "worker-grpc"] }
nemo-relay-adaptive = { workspace = true, features = ["redis-backend"] }
nemo-relay-pii-redaction.workspace = true
async-stream = "0.3"
Expand Down
32 changes: 31 additions & 1 deletion crates/cli/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use axum::http::HeaderMap;
use axum::routing::{get, post};
use axum::{Json, Router};
use nemo_relay::plugin::dynamic::{
DynamicPluginKind, NativePluginActivation, NativePluginLoadSpec, load_native_plugins,
DynamicPluginKind, NativePluginActivation, NativePluginLoadSpec, WorkerPluginActivation,
WorkerPluginLoadSpec, load_native_plugins, load_worker_plugins,
};
use nemo_relay::plugin::{
PluginComponentSpec, PluginConfig, clear_plugin_configuration, initialize_plugins_exact,
Expand Down Expand Up @@ -216,6 +217,7 @@ async fn idle_shutdown_future(
struct PluginActivation {
active: bool,
native: Option<NativePluginActivation>,
worker: Option<WorkerPluginActivation>,
}

impl PluginActivation {
Expand All @@ -227,6 +229,7 @@ impl PluginActivation {
return Ok(Self {
active: false,
native: None,
worker: None,
});
};
register_adaptive_component().map_err(|error| {
Expand All @@ -251,6 +254,23 @@ impl PluginActivation {
})
})
.collect::<Result<Vec<_>, CliError>>()?;
let worker_specs = dynamic_plugins
.iter()
.filter(|plugin| plugin.kind == DynamicPluginKind::Worker)
.map(|plugin| {
let manifest_ref = plugin.manifest_ref.clone().ok_or_else(|| {
CliError::Config(format!(
"worker dynamic plugin '{}' has no manifest_ref in lifecycle state",
plugin.plugin_id
))
})?;
Ok(WorkerPluginLoadSpec {
plugin_id: plugin.plugin_id.clone(),
manifest_ref,
config: plugin.config.clone(),
})
})
.collect::<Result<Vec<_>, CliError>>()?;
let native =
if native_specs.is_empty() {
None
Expand All @@ -259,6 +279,14 @@ impl PluginActivation {
CliError::Config(format!("native plugin load failed: {error}"))
})?)
};
let worker =
if worker_specs.is_empty() {
None
} else {
Some(load_worker_plugins(worker_specs).map_err(|error| {
CliError::Config(format!("worker plugin load failed: {error}"))
})?)
};
// Gateway already resolved its config; activate exactly (no re-discovery).
let mut plugin_config: PluginConfig = match config {
Some(config) => serde_json::from_value(config)
Expand All @@ -282,6 +310,7 @@ impl PluginActivation {
Ok(Self {
active: true,
native,
worker,
})
}

Expand All @@ -295,6 +324,7 @@ impl PluginActivation {
Ok(())
};
self.native.take();
self.worker.take();
result
}
}
Expand Down
22 changes: 22 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,24 @@ openinference = [
"dep:wasm-bindgen",
"dep:wasm-bindgen-futures",
]
worker-grpc = [
"dep:nemo-relay-worker-proto",
"dep:hyper-util",
"dep:tower",
"dep:tonic",
"tokio-stream/net",
"tonic/codegen",
"tonic/router",
"tonic/transport",
"tokio/io-util",
"tokio/net",
"tokio/process",
"tokio/rt-multi-thread",
]

[dependencies]
nemo-relay-types.workspace = true
nemo-relay-worker-proto = { workspace = true, optional = true }
uuid = { workspace = true, features = ["v7", "serde"] }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
Expand Down Expand Up @@ -120,6 +135,11 @@ path = "tests/integration/context_isolation_tests.rs"
name = "native_plugin_integration"
path = "tests/integration/native_plugin_tests.rs"

[[test]]
name = "worker_plugin_integration"
path = "tests/integration/worker_plugin_tests.rs"
required-features = ["worker-grpc"]

[[test]]
name = "middleware_integration"
path = "tests/integration/middleware_tests.rs"
Expand Down Expand Up @@ -163,3 +183,5 @@ rustls = { version = "0.23", default-features = false, features = ["ring", "std"
tonic = { version = "0.14.1", default-features = false, optional = true }
object_store = { version = "0.13", default-features = false, features = ["aws"], optional = true }
tokio-tungstenite = { version = "0.27", default-features = false, features = ["connect", "rustls-tls-native-roots"], optional = true }
tower = { version = "0.5", features = ["util"], optional = true }
hyper-util = { version = "0.1", features = ["tokio"], optional = true }
4 changes: 4 additions & 0 deletions crates/core/src/plugin/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ mod manifest;
#[cfg(not(target_arch = "wasm32"))]
mod native;
mod registry;
#[cfg(all(feature = "worker-grpc", not(target_arch = "wasm32")))]
mod worker;

pub use manifest::*;
#[cfg(not(target_arch = "wasm32"))]
pub use native::*;
pub use registry::*;
#[cfg(all(feature = "worker-grpc", not(target_arch = "wasm32")))]
pub use worker::*;

/// Plugin execution lane.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash, Display)]
Expand Down
Loading
Loading