From 3f46d6794e7337fe3c2afaf61f869894572610d5 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Tue, 30 Dec 2025 00:10:34 -0800 Subject: [PATCH 1/9] rust(feat): Add test server to CLI --- rust/crates/sift_cli/Cargo.toml | 13 +- rust/crates/sift_cli/build.rs | 21 ++ rust/crates/sift_cli/src/cli/mod.rs | 25 ++ rust/crates/sift_cli/src/cmd/mod.rs | 1 + .../test_server/metrics_streaming_client.rs | 127 ++++++++ .../sift_cli/src/cmd/test_server/mod.rs | 101 ++++++ .../sift_cli/src/cmd/test_server/server.rs | 306 ++++++++++++++++++ rust/crates/sift_cli/src/main.rs | 3 + 8 files changed, 596 insertions(+), 1 deletion(-) create mode 100644 rust/crates/sift_cli/build.rs create mode 100644 rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs create mode 100644 rust/crates/sift_cli/src/cmd/test_server/mod.rs create mode 100644 rust/crates/sift_cli/src/cmd/test_server/server.rs diff --git a/rust/crates/sift_cli/Cargo.toml b/rust/crates/sift_cli/Cargo.toml index 285563341..9e3483cdc 100644 --- a/rust/crates/sift_cli/Cargo.toml +++ b/rust/crates/sift_cli/Cargo.toml @@ -23,13 +23,24 @@ flate2 = "1.1.2" indicatif = "0.18.0" parquet = "56.2.0" pbjson-types = { workspace = true } +prost = "0.13.5" reqwest = "0.12.23" sift_pbfs = { workspace = true } sift_rs = { workspace = true } -tokio = { version = "1.47.1", features = ["full", "net", "time"] } +sift_stream = { workspace = true } +tokio = { version = "1.47.1", features = ["full", "net", "time", "macros", "rt-multi-thread", "signal"] } tokio-stream = "0.1.17" +tonic = { workspace = true } +tonic-reflection = "0.12" toml = "0.8.23" zip = "6.0.0" [dev-dependencies] indoc = "2.0.6" + +[dependencies.uuid] +version = "1.19.0" +features = ["v4"] + +[build-dependencies] +tonic-build = "0.12" diff --git a/rust/crates/sift_cli/build.rs b/rust/crates/sift_cli/build.rs new file mode 100644 index 000000000..5f94005c9 --- /dev/null +++ b/rust/crates/sift_cli/build.rs @@ -0,0 +1,21 @@ +/// Build descriptor's so that the Black Hole gRPC server can +/// stand up the reflection service. +fn main() -> Result<(), Box> { + tonic_build::configure() + .file_descriptor_set_path("descriptor.bin") + .build_client(false) + .build_server(false) + .compile_protos( + &[ + "/tmp/exported-protos/sift/assets/v1/assets.proto", + "/tmp/exported-protos/sift/ping/v1/ping.proto", + "/tmp/exported-protos/sift/ingest/v1/ingest.proto", + "/tmp/exported-protos/sift/ingestion_configs/v2/ingestion_configs.proto", + ], + // Run the following command to generate exported-protos: + // buf export ../../../protos --output /tmp/exported-protos + &["/tmp/exported-protos"], + )?; + + Ok(()) +} diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 0a879fb1d..2200e39ea 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -48,6 +48,10 @@ pub enum Cmd { /// Import time series files into Sift #[command(subcommand)] Import(ImportCmd), + + /// Start a test gRPC server for streaming. + #[command(subcommand)] + TestServer(TestServerCmd), } #[derive(Subcommand)] @@ -164,6 +168,27 @@ pub enum ConfigCmd { Update(ConfigUpdateArgs), } +#[derive(Subcommand)] +pub enum TestServerCmd { + /// Start a test ingestion server. + Run(TestServerArgs), +} + +#[derive(clap::Args)] +pub struct TestServerArgs { + /// The address to serve gRPC server. Default is 127.0.0.1:50051. + #[arg(short, long)] + pub local_address: Option, + + /// Whether to stream metrics to Sift. + #[arg(short, long)] + pub stream_metrics: Option, + + /// The asset name to use when streaming server ingestion metrics. + #[arg(short, long)] + pub metrics_asset_name: Option, +} + #[derive(clap::Args)] pub struct ConfigUpdateArgs { /// Edit or create a profile interactively (ignores other flags) diff --git a/rust/crates/sift_cli/src/cmd/mod.rs b/rust/crates/sift_cli/src/cmd/mod.rs index c508a4d6e..8b3a26e67 100644 --- a/rust/crates/sift_cli/src/cmd/mod.rs +++ b/rust/crates/sift_cli/src/cmd/mod.rs @@ -8,6 +8,7 @@ pub mod completions; pub mod config; pub mod export; pub mod import; +pub mod test_server; pub struct Context { pub grpc_uri: String, diff --git a/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs b/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs new file mode 100644 index 000000000..5f3fe07b7 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs @@ -0,0 +1,127 @@ +use super::Context; +use anyhow::{Ok, anyhow}; +use crossterm::style::Stylize; +use sift_stream::{ + ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowBuilder, FlowConfig, + IngestionConfigEncoder, IngestionConfigForm, RecoveryStrategy, RetryPolicy, RunForm, + SiftStream, SiftStreamBuilder, TimeValue, +}; + +/// Streams metrics to Sift. +pub struct MetricsStreamingClient { + ctx: Context, + asset_name: String, + sift_stream: Option>, +} + +impl MetricsStreamingClient { + pub fn build( + ctx: Context, + stream_metrics: &Option, + asset_name: &Option, + ) -> Result, anyhow::Error> { + if !stream_metrics.unwrap_or(false) { + return Ok(None); + } + + let Some(asset_name) = asset_name else { + return Err(anyhow!( + "must specify {} with streaming enabled", + "--metrics_asset_name".cyan() + )); + }; + + Ok(Some(MetricsStreamingClient { + ctx: ctx, + asset_name: asset_name.clone(), + sift_stream: None, + })) + } + + /// Initialize SiftStream and create ingestion config. + pub async fn initialize(&mut self) -> Result<(), anyhow::Error> { + let credentials = Credentials::Config { + apikey: self.ctx.api_key.clone(), + uri: self.ctx.grpc_uri.clone(), + }; + + let ingestion_config = IngestionConfigForm { + asset_name: self.asset_name.to_string(), + client_key: "stress-test-ingestion-config-test".into(), + flows: vec![FlowConfig { + name: "metrics".into(), + channels: vec![ + ChannelConfig { + name: "total_num_streams".into(), + description: "Total number of streams created".into(), + data_type: ChannelDataType::Uint32.into(), + ..Default::default() + }, + ChannelConfig { + name: "total_num_bytes_read".into(), + description: "Total number of bytes read".into(), + unit: "B".into(), + data_type: ChannelDataType::Uint64.into(), + ..Default::default() + }, + ChannelConfig { + name: "total_num_messages".into(), + description: "Total number of messages received".into(), + unit: "message".into(), + data_type: ChannelDataType::Uint64.into(), + ..Default::default() + }, + ChannelConfig { + name: "bytes_per_s".into(), + description: "Number of bytes received per second".into(), + data_type: ChannelDataType::Double.into(), + unit: "B/s".into(), + ..Default::default() + }, + ChannelConfig { + name: "messages_per_s".into(), + description: "Number of messages received per second".into(), + unit: "message/s".into(), + data_type: ChannelDataType::Double.into(), + ..Default::default() + }, + ], + }], + }; + + let sift_stream = SiftStreamBuilder::new(credentials) + .ingestion_config(ingestion_config) + .recovery_strategy(RecoveryStrategy::RetryOnly(RetryPolicy::default())) + .build() + .await?; + + self.sift_stream = Some(sift_stream); + + Ok(()) + } + + /// Send metrics to Sift. + pub async fn ingest(&mut self, metrics: Metrics) { + let flow = Flow::new( + "metrics", + TimeValue::now(), + &[ + ChannelValue::new("total_num_streams", metrics.total_num_streams), + ChannelValue::new("total_num_bytes_read", metrics.total_num_bytes_read), + ChannelValue::new("total_num_messages", metrics.total_num_messages), + ChannelValue::new("bytes_per_s", metrics.bytes_per_s), + ChannelValue::new("messages_per_s", metrics.messages_per_s), + ], + ); + + self.sift_stream.as_mut().unwrap().send(flow).await.unwrap(); + } +} + +pub struct Metrics { + pub total_num_streams: u32, + pub total_num_bytes_read: u64, + pub total_num_messages: u64, + pub bytes_per_s: f64, + pub messages_per_s: f64, +} diff --git a/rust/crates/sift_cli/src/cmd/test_server/mod.rs b/rust/crates/sift_cli/src/cmd/test_server/mod.rs new file mode 100644 index 000000000..b1c51ce32 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/test_server/mod.rs @@ -0,0 +1,101 @@ +use super::Context; +use crate::cmd::test_server::metrics_streaming_client::MetricsStreamingClient; +use crate::{cli::TestServerArgs, util::tty::Output}; +use anyhow::Result; +use server::TestServer; +use sift_rs::assets::v1::asset_service_server::AssetServiceServer; +use sift_rs::ingest::v1::ingest_service_server::IngestServiceServer; +use sift_rs::ingestion_configs::v2::ingestion_config_service_server::IngestionConfigServiceServer; +use sift_rs::ping::v1::ping_service_server::PingServiceServer; +use std::process::ExitCode; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::watch; +use tonic::transport::Server; +use tonic_reflection::server::Builder; + +pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../../descriptor.bin"); +pub mod metrics_streaming_client; +pub mod server; +use crate::cmd::test_server::metrics_streaming_client::Metrics; + +pub async fn run(ctx: Context, args: TestServerArgs) -> Result { + let local_address = args.local_address.unwrap_or("127.0.0.1:50051".into()); + let addr = local_address.parse()?; + + // Initialize streaming client. + let mut streaming_client = + MetricsStreamingClient::build(ctx, &args.stream_metrics, &args.metrics_asset_name)?; + if streaming_client.is_some() { + streaming_client.as_mut().unwrap().initialize().await?; + } + + // Channel to signal program exit. + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); + let mut shutdown_rx2 = shutdown_rx.clone(); + + // Channel to send metrics. + let (metrics_tx, mut metrics_rx) = mpsc::channel::(1024); + + // Initialize gRPC server. + let server = Arc::new(TestServer::default()); + + // Start task to calculate ingestion metrics. + let server_arc = Arc::clone(&server); + let calc_stats_task = tokio::spawn(async move { + server_arc + .calculate_metrics( + &mut shutdown_rx, + metrics_tx, + args.stream_metrics.unwrap_or(false), + ) + .await; + }); + + // Start task to ingest metrics to Sift. + let ingest_metrics_task = tokio::spawn(async move { + if streaming_client.is_none() { + return; + } + + let mut client = streaming_client.unwrap(); + loop { + tokio::select! { + _ = shutdown_rx2.changed() => { + Output::new().line("Ingest task shutting down").print(); + break; + } + Some(metrics) = metrics_rx.recv() => { + client.ingest(metrics).await; + } + }; + } + }); + + let reflection_service = Builder::configure() + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build_v1()?; + + Output::new() + .line(format!("Server listening on {addr}")) + .print(); + + Server::builder() + .add_service(reflection_service) + .add_service(PingServiceServer::from_arc(server.clone())) + .add_service(IngestServiceServer::from_arc(server.clone())) + .add_service(IngestionConfigServiceServer::from_arc(server.clone())) + .add_service(AssetServiceServer::from_arc(server.clone())) + .serve_with_shutdown(addr, async move { + tokio::signal::ctrl_c().await.unwrap(); + let _ = shutdown_tx.send(true); + }) + .await?; + + calc_stats_task.await?; + ingest_metrics_task.await?; + + Output::new().line("Exiting.").print(); + + Ok(ExitCode::SUCCESS) +} diff --git a/rust/crates/sift_cli/src/cmd/test_server/server.rs b/rust/crates/sift_cli/src/cmd/test_server/server.rs new file mode 100644 index 000000000..84f9a59f0 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/test_server/server.rs @@ -0,0 +1,306 @@ +use crate::cmd::test_server::metrics_streaming_client::Metrics; +use crate::util::tty::Output; +use anyhow::Result; +use crossterm::{ExecutableCommand, cursor, terminal}; +use prost::Message; +use sift_rs::assets::v1::{ + Asset, GetAssetRequest, GetAssetResponse, asset_service_server::AssetService, +}; +use sift_rs::ingest::v1::{ + IngestArbitraryProtobufDataStreamRequest, IngestArbitraryProtobufDataStreamResponse, + IngestWithConfigDataStreamRequest, IngestWithConfigDataStreamResponse, + ingest_service_server::IngestService, +}; +use sift_rs::ingestion_configs::v2::{ingestion_config_service_server::IngestionConfigService, *}; +use sift_rs::ping::v1::{PingRequest, PingResponse, ping_service_server::PingService}; +use std::io::stdout; +use std::time::Duration; +use std::{ + collections::HashMap, + sync::{ + Mutex, + atomic::{AtomicU32, AtomicU64, Ordering::Relaxed}, + }, +}; +use tokio::sync::mpsc::Sender; +use tokio::sync::watch; +use tokio_stream::StreamExt; +use tonic::{Request, Response, Status, Streaming}; +use uuid::Uuid; + +#[derive(Default)] +pub struct TestServer { + /// Total number of streams created. + total_num_streams: AtomicU32, + + /// Total number of messages received. + total_num_messages: AtomicU64, + + /// Total number of bytes received. + total_num_bytes_read: AtomicU64, + + // Total number of ingestion configs created. + total_num_ingestion_configs: AtomicU32, + + /// Ingestion configs by Ingestion Config ID. + ingestion_configs_by_id: Mutex>, + + /// Assets by Asset ID. + asset_ids_by_name: Mutex>, +} + +/// Ingested data and drops it. Calculates ingestion stats and optionally streams them to Sift. +impl TestServer { + /// Calculate ingestion metrics and optionally stream them to Sift. + pub async fn calculate_metrics( + &self, + shutdown: &mut watch::Receiver, + metrics_tx: Sender, + streaming_enabled: bool, + ) { + let mut last_total_num_bytes_read: u64 = 0; + let mut last_total_num_messages: u64 = 0; + + loop { + tokio::select! { + _ = shutdown.changed() => { + Output::new().line("Metrics task shutting down").print(); + break; + } + + _ = tokio::time::sleep(Duration::from_millis(100)) => { + let current_total_num_bytes_read = self.total_num_bytes_read.load(Relaxed); + let current_total_num_messages = self.total_num_messages.load(Relaxed); + let current_total_num_streams = self.total_num_streams.load(Relaxed); + let bytes_per_s = current_total_num_bytes_read - last_total_num_bytes_read; + let messages_per_s = current_total_num_messages - last_total_num_messages; + + last_total_num_bytes_read = current_total_num_bytes_read; + last_total_num_messages = current_total_num_messages; + + // Clear terminal and print metrics. + stdout() + .execute(terminal::Clear(terminal::ClearType::All)) + .expect(""); + stdout().execute(cursor::MoveTo(0, 0)).expect("msg"); + stdout().execute(cursor::MoveUp(5)).expect("terminal error"); + stdout().execute(terminal::Clear(terminal::ClearType::FromCursorDown)).expect("msg"); + + Output::new().line(format!("Total num streams: {current_total_num_streams}")).print(); + Output::new().line(format!("Total num bytes: {current_total_num_bytes_read}")).print(); + Output::new().line(format!("Total num messages: {current_total_num_messages}")).print(); + Output::new().line(format!("bytes/s: {bytes_per_s}")).print(); + Output::new().line(format!("messages/s: {messages_per_s}")).print(); + + // Stream to Sift. + if streaming_enabled { + let e = metrics_tx.try_send(Metrics{ + total_num_streams: current_total_num_streams, + total_num_bytes_read: current_total_num_bytes_read, + total_num_messages: current_total_num_messages, + bytes_per_s: (10 * bytes_per_s )as f64, + messages_per_s: (10 * messages_per_s) as f64, + }); + + if e.is_err() { + Output::new().line(format!("{e:?}")); + } + } + } + } + } + } +} + +#[tonic::async_trait] +impl PingService for TestServer { + async fn ping(&self, _request: Request) -> Result, Status> { + let resp = PingResponse { + response: "".into(), + }; + + Ok(Response::new(resp)) + } +} + +#[tonic::async_trait] +impl AssetService for TestServer { + /// Returns an asset ID. + async fn get_asset( + &self, + request: Request, + ) -> Result, Status> { + let asset_ids_by_name = self.asset_ids_by_name.lock().unwrap(); + let inner = request.into_inner(); + + for (asset_id, asset_name) in asset_ids_by_name.iter() { + if inner.asset_id == *asset_id { + return Ok(Response::new(GetAssetResponse { + asset: Some(Asset { + asset_id: asset_id.into(), + name: asset_name.into(), + ..Default::default() + }), + })); + } + } + + Err(Status::not_found("asset not found")) + } + + /// No-op. + async fn delete_asset( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new( + sift_rs::assets::v1::DeleteAssetResponse::default(), + )) + } + + /// No-op. + async fn list_assets( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new( + sift_rs::assets::v1::ListAssetsResponse::default(), + )) + } + + /// No-op. + async fn update_asset( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new( + sift_rs::assets::v1::UpdateAssetResponse::default(), + )) + } + + /// No-op. + async fn archive_asset( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new( + sift_rs::assets::v1::ArchiveAssetResponse::default(), + )) + } +} + +#[tonic::async_trait] +impl IngestionConfigService for TestServer { + /// Returns an arbitrary Ingestion Config with a new UUID. + async fn get_ingestion_config( + &self, + request: Request, + ) -> Result, Status> { + let inner = request.into_inner(); + let ingestion_configs = self.ingestion_configs_by_id.lock().unwrap(); + let ingestion_config = ingestion_configs + .get(&inner.ingestion_config_id) + .ok_or(Status::not_found("ingestion config not found"))?; + + Ok(Response::new(GetIngestionConfigResponse { + ingestion_config: Some(IngestionConfig { + ingestion_config_id: ingestion_config.ingestion_config_id.clone(), + asset_id: ingestion_config.asset_id.clone(), + client_key: ingestion_config.client_key.clone(), + }), + })) + } + + /// Returns an empty list of ingestion configs. + async fn list_ingestion_configs( + &self, + _request: Request, + ) -> Result, Status> { + let ingestion_configs = self.ingestion_configs_by_id.lock().unwrap(); + + let mut all_ingestion_configs: Vec = + Vec::with_capacity(ingestion_configs.len()); + + for ingestion_config in ingestion_configs.values() { + all_ingestion_configs.push(ingestion_config.clone()); + } + + Ok(Response::new(ListIngestionConfigsResponse { + ingestion_configs: all_ingestion_configs, + next_page_token: "".into(), + })) + } + + /// Returns an arbitrary Ingestion Config with a new UUID. + async fn create_ingestion_config( + &self, + request: Request, + ) -> Result, Status> { + self.total_num_ingestion_configs.fetch_add(1, Relaxed); + let inner = request.into_inner(); + + let mut assets = self.asset_ids_by_name.lock().unwrap(); + let default_asset_id = Uuid::new_v4().to_string(); + let asset_id = assets + .get(&inner.asset_name) + .unwrap_or(&default_asset_id) + .to_string(); + + let new_ingestion_config = CreateIngestionConfigResponse { + ingestion_config: Some(IngestionConfig { + ingestion_config_id: Uuid::new_v4().to_string(), + asset_id: asset_id.clone(), + client_key: inner.client_key, + }), + }; + + assets.insert(asset_id.clone(), inner.asset_name); + + Ok(Response::new(new_ingestion_config)) + } + + /// No-op. + async fn create_ingestion_config_flows( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(CreateIngestionConfigFlowsResponse::default())) + } + + /// No-op. + async fn list_ingestion_config_flows( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(ListIngestionConfigFlowsResponse::default())) + } +} + +#[tonic::async_trait] +impl IngestService for TestServer { + /// Store ingestion stats. + async fn ingest_with_config_data_stream( + &self, + request: Request>, + ) -> Result, Status> { + self.total_num_streams.fetch_add(1, Relaxed); + + let mut stream = request.into_inner(); + while let Some(msg) = stream.next().await { + self.total_num_messages.fetch_add(1, Relaxed); + let inner = msg?; + self.total_num_bytes_read + .fetch_add(inner.encoded_len() as u64, Relaxed); + } + + Ok(Response::new(IngestWithConfigDataStreamResponse::default())) + } + + /// No-op. + async fn ingest_arbitrary_protobuf_data_stream( + &self, + _: Request>, + ) -> Result, Status> { + unimplemented!() + } +} diff --git a/rust/crates/sift_cli/src/main.rs b/rust/crates/sift_cli/src/main.rs index 2b73f1cb7..fbef1f935 100644 --- a/rust/crates/sift_cli/src/main.rs +++ b/rust/crates/sift_cli/src/main.rs @@ -84,6 +84,9 @@ fn run(clargs: cli::Args) -> Result { cli::ExportCmd::Run(args) => run_future(cmd::export::run(ctx, args)), cli::ExportCmd::Asset(args) => run_future(cmd::export::asset(ctx, args)), }, + Cmd::TestServer(cmd) => match cmd { + cli::TestServerCmd::Run(args) => run_future(cmd::test_server::run(ctx, args)), + }, _ => Ok(ExitCode::SUCCESS), } } From b7125e7db26539aa8cbcef0eac55b5e08adf8757 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Wed, 31 Dec 2025 12:17:22 -0800 Subject: [PATCH 2/9] Implement PR feedback --- .../test_server/metrics_streaming_client.rs | 8 +-- .../sift_cli/src/cmd/test_server/mod.rs | 59 +++++++++++-------- .../sift_cli/src/cmd/test_server/server.rs | 16 +++-- 3 files changed, 46 insertions(+), 37 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs b/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs index 5f3fe07b7..b67ca1c34 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs @@ -2,9 +2,9 @@ use super::Context; use anyhow::{Ok, anyhow}; use crossterm::style::Stylize; use sift_stream::{ - ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowBuilder, FlowConfig, - IngestionConfigEncoder, IngestionConfigForm, RecoveryStrategy, RetryPolicy, RunForm, - SiftStream, SiftStreamBuilder, TimeValue, + ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig, + IngestionConfigEncoder, IngestionConfigForm, RecoveryStrategy, RetryPolicy, SiftStream, + SiftStreamBuilder, TimeValue, }; /// Streams metrics to Sift. @@ -32,7 +32,7 @@ impl MetricsStreamingClient { }; Ok(Some(MetricsStreamingClient { - ctx: ctx, + ctx, asset_name: asset_name.clone(), sift_stream: None, })) diff --git a/rust/crates/sift_cli/src/cmd/test_server/mod.rs b/rust/crates/sift_cli/src/cmd/test_server/mod.rs index b1c51ce32..46d6e2249 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/mod.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/mod.rs @@ -1,7 +1,7 @@ use super::Context; use crate::cmd::test_server::metrics_streaming_client::MetricsStreamingClient; use crate::{cli::TestServerArgs, util::tty::Output}; -use anyhow::Result; +use anyhow::{Context as AnyhowContext, Result}; use server::TestServer; use sift_rs::assets::v1::asset_service_server::AssetServiceServer; use sift_rs::ingest::v1::ingest_service_server::IngestServiceServer; @@ -20,14 +20,23 @@ pub mod server; use crate::cmd::test_server::metrics_streaming_client::Metrics; pub async fn run(ctx: Context, args: TestServerArgs) -> Result { - let local_address = args.local_address.unwrap_or("127.0.0.1:50051".into()); - let addr = local_address.parse()?; + let local_address = args + .local_address + .unwrap_or_else(|| "0.0.0.0:50051".to_string()); + let addr = local_address + .parse() + .context(format!("failed to parse local_address: {}", local_address))?; // Initialize streaming client. let mut streaming_client = - MetricsStreamingClient::build(ctx, &args.stream_metrics, &args.metrics_asset_name)?; - if streaming_client.is_some() { - streaming_client.as_mut().unwrap().initialize().await?; + MetricsStreamingClient::build(ctx, &args.stream_metrics, &args.metrics_asset_name) + .context("failed to create metrics streaming client")?; + + if let Some(client) = streaming_client.as_mut() { + client + .initialize() + .await + .context("failed to initialize streaming client")?; } // Channel to signal program exit. @@ -54,27 +63,25 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result { // Start task to ingest metrics to Sift. let ingest_metrics_task = tokio::spawn(async move { - if streaming_client.is_none() { - return; - } - - let mut client = streaming_client.unwrap(); - loop { - tokio::select! { - _ = shutdown_rx2.changed() => { - Output::new().line("Ingest task shutting down").print(); - break; - } - Some(metrics) = metrics_rx.recv() => { - client.ingest(metrics).await; - } - }; + if let Some(client) = streaming_client.as_mut() { + loop { + tokio::select! { + _ = shutdown_rx2.changed() => { + Output::new().line("Ingest task shutting down").print(); + break; + } + Some(metrics) = metrics_rx.recv() => { + client.ingest(metrics).await; + } + }; + } } }); let reflection_service = Builder::configure() .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) - .build_v1()?; + .build_v1() + .context("failed to create gRPC reflection service")?; Output::new() .line(format!("Server listening on {addr}")) @@ -92,8 +99,12 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result { }) .await?; - calc_stats_task.await?; - ingest_metrics_task.await?; + calc_stats_task + .await + .context("failed to await calculation task")?; + ingest_metrics_task + .await + .context("failed to await ingestion task")?; Output::new().line("Exiting.").print(); diff --git a/rust/crates/sift_cli/src/cmd/test_server/server.rs b/rust/crates/sift_cli/src/cmd/test_server/server.rs index 84f9a59f0..0469fc0f4 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/server.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/server.rs @@ -58,6 +58,8 @@ impl TestServer { metrics_tx: Sender, streaming_enabled: bool, ) { + let mut stdout = stdout(); + let mut last_total_num_bytes_read: u64 = 0; let mut last_total_num_messages: u64 = 0; @@ -79,12 +81,12 @@ impl TestServer { last_total_num_messages = current_total_num_messages; // Clear terminal and print metrics. - stdout() + stdout .execute(terminal::Clear(terminal::ClearType::All)) .expect(""); - stdout().execute(cursor::MoveTo(0, 0)).expect("msg"); - stdout().execute(cursor::MoveUp(5)).expect("terminal error"); - stdout().execute(terminal::Clear(terminal::ClearType::FromCursorDown)).expect("msg"); + stdout.execute(cursor::MoveTo(0, 0)).expect("msg"); + stdout.execute(cursor::MoveUp(5)).expect("terminal error"); + stdout.execute(terminal::Clear(terminal::ClearType::FromCursorDown)).expect("msg"); Output::new().line(format!("Total num streams: {current_total_num_streams}")).print(); Output::new().line(format!("Total num bytes: {current_total_num_bytes_read}")).print(); @@ -115,11 +117,7 @@ impl TestServer { #[tonic::async_trait] impl PingService for TestServer { async fn ping(&self, _request: Request) -> Result, Status> { - let resp = PingResponse { - response: "".into(), - }; - - Ok(Response::new(resp)) + Ok(Response::new(PingResponse::default())) } } From 497f73a03425a16a5ef04e4a09678b85ffb75d5b Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Wed, 31 Dec 2025 14:02:27 -0800 Subject: [PATCH 3/9] Use existing descriptor set --- rust/crates/sift_cli/build.rs | 21 ------------------- .../sift_cli/src/cmd/test_server/mod.rs | 2 +- 2 files changed, 1 insertion(+), 22 deletions(-) delete mode 100644 rust/crates/sift_cli/build.rs diff --git a/rust/crates/sift_cli/build.rs b/rust/crates/sift_cli/build.rs deleted file mode 100644 index 5f94005c9..000000000 --- a/rust/crates/sift_cli/build.rs +++ /dev/null @@ -1,21 +0,0 @@ -/// Build descriptor's so that the Black Hole gRPC server can -/// stand up the reflection service. -fn main() -> Result<(), Box> { - tonic_build::configure() - .file_descriptor_set_path("descriptor.bin") - .build_client(false) - .build_server(false) - .compile_protos( - &[ - "/tmp/exported-protos/sift/assets/v1/assets.proto", - "/tmp/exported-protos/sift/ping/v1/ping.proto", - "/tmp/exported-protos/sift/ingest/v1/ingest.proto", - "/tmp/exported-protos/sift/ingestion_configs/v2/ingestion_configs.proto", - ], - // Run the following command to generate exported-protos: - // buf export ../../../protos --output /tmp/exported-protos - &["/tmp/exported-protos"], - )?; - - Ok(()) -} diff --git a/rust/crates/sift_cli/src/cmd/test_server/mod.rs b/rust/crates/sift_cli/src/cmd/test_server/mod.rs index 46d6e2249..d6f081a6c 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/mod.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/mod.rs @@ -14,7 +14,7 @@ use tokio::sync::watch; use tonic::transport::Server; use tonic_reflection::server::Builder; -pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../../descriptor.bin"); +pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../../../sift_rs/descriptor_set.bin"); pub mod metrics_streaming_client; pub mod server; use crate::cmd::test_server::metrics_streaming_client::Metrics; From 8e7220305dcc21557e256ba55358a5802f459cb9 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Wed, 31 Dec 2025 16:57:49 -0800 Subject: [PATCH 4/9] Add flag to end test server while streaming --- rust/crates/sift_cli/src/cmd/test_server/server.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rust/crates/sift_cli/src/cmd/test_server/server.rs b/rust/crates/sift_cli/src/cmd/test_server/server.rs index 0469fc0f4..f99093f40 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/server.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/server.rs @@ -14,6 +14,7 @@ use sift_rs::ingest::v1::{ use sift_rs::ingestion_configs::v2::{ingestion_config_service_server::IngestionConfigService, *}; use sift_rs::ping::v1::{PingRequest, PingResponse, ping_service_server::PingService}; use std::io::stdout; +use std::sync::atomic::AtomicBool; use std::time::Duration; use std::{ collections::HashMap, @@ -30,6 +31,9 @@ use uuid::Uuid; #[derive(Default)] pub struct TestServer { + /// Whether the server is done processing streams. + done: AtomicBool, + /// Total number of streams created. total_num_streams: AtomicU32, @@ -66,6 +70,7 @@ impl TestServer { loop { tokio::select! { _ = shutdown.changed() => { + self.done.fetch_or(true, Relaxed); Output::new().line("Metrics task shutting down").print(); break; } @@ -289,6 +294,10 @@ impl IngestService for TestServer { let inner = msg?; self.total_num_bytes_read .fetch_add(inner.encoded_len() as u64, Relaxed); + + if self.done.load(Relaxed) { + break; + } } Ok(Response::new(IngestWithConfigDataStreamResponse::default())) From 95326cf0291e4c97d8563f402566e1c834d14840 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Wed, 31 Dec 2025 17:28:10 -0800 Subject: [PATCH 5/9] Improve error handling --- .../sift_cli/src/cmd/test_server/mod.rs | 4 +++- .../sift_cli/src/cmd/test_server/server.rs | 23 +++++++++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/test_server/mod.rs b/rust/crates/sift_cli/src/cmd/test_server/mod.rs index d6f081a6c..97345a452 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/mod.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/mod.rs @@ -58,7 +58,9 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result { metrics_tx, args.stream_metrics.unwrap_or(false), ) - .await; + .await + .context("calculate metrics task failed") + .unwrap(); }); // Start task to ingest metrics to Sift. diff --git a/rust/crates/sift_cli/src/cmd/test_server/server.rs b/rust/crates/sift_cli/src/cmd/test_server/server.rs index f99093f40..622880af8 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/server.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/server.rs @@ -1,6 +1,6 @@ use crate::cmd::test_server::metrics_streaming_client::Metrics; use crate::util::tty::Output; -use anyhow::Result; +use anyhow::{Context, Ok as AnyhowOk, Result as AnyhowResult}; use crossterm::{ExecutableCommand, cursor, terminal}; use prost::Message; use sift_rs::assets::v1::{ @@ -61,7 +61,7 @@ impl TestServer { shutdown: &mut watch::Receiver, metrics_tx: Sender, streaming_enabled: bool, - ) { + ) -> AnyhowResult<()> { let mut stdout = stdout(); let mut last_total_num_bytes_read: u64 = 0; @@ -72,10 +72,10 @@ impl TestServer { _ = shutdown.changed() => { self.done.fetch_or(true, Relaxed); Output::new().line("Metrics task shutting down").print(); - break; + return AnyhowOk(()); } - _ = tokio::time::sleep(Duration::from_millis(100)) => { + _ = tokio::time::sleep(Duration::from_secs(1)) => { let current_total_num_bytes_read = self.total_num_bytes_read.load(Relaxed); let current_total_num_messages = self.total_num_messages.load(Relaxed); let current_total_num_streams = self.total_num_streams.load(Relaxed); @@ -88,10 +88,13 @@ impl TestServer { // Clear terminal and print metrics. stdout .execute(terminal::Clear(terminal::ClearType::All)) - .expect(""); - stdout.execute(cursor::MoveTo(0, 0)).expect("msg"); - stdout.execute(cursor::MoveUp(5)).expect("terminal error"); - stdout.execute(terminal::Clear(terminal::ClearType::FromCursorDown)).expect("msg"); + .context("failed to clear terminal")?; + stdout.execute(cursor::MoveTo(0, 0)) + .context("failed to move terminal cursor")?; + stdout.execute(cursor::MoveUp(5)) + .context("failed to move terminal cursor")?; + stdout.execute(terminal::Clear(terminal::ClearType::FromCursorDown)) + .context("failed to move terminal cursor")?; Output::new().line(format!("Total num streams: {current_total_num_streams}")).print(); Output::new().line(format!("Total num bytes: {current_total_num_bytes_read}")).print(); @@ -105,8 +108,8 @@ impl TestServer { total_num_streams: current_total_num_streams, total_num_bytes_read: current_total_num_bytes_read, total_num_messages: current_total_num_messages, - bytes_per_s: (10 * bytes_per_s )as f64, - messages_per_s: (10 * messages_per_s) as f64, + bytes_per_s: bytes_per_s as f64, + messages_per_s: messages_per_s as f64, }); if e.is_err() { From 075b85330610afdfe63bde13dfcf74dd01d7c6d5 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Thu, 8 Jan 2026 13:12:33 -0800 Subject: [PATCH 6/9] Implement PR feedback --- rust/crates/sift_cli/src/cli/mod.rs | 6 ++- .../test_server/metrics_streaming_client.rs | 4 +- .../sift_cli/src/cmd/test_server/mod.rs | 9 ++-- .../sift_cli/src/cmd/test_server/server.rs | 49 +++++++++---------- 4 files changed, 36 insertions(+), 32 deletions(-) diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 2200e39ea..2e77f240e 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -182,11 +182,15 @@ pub struct TestServerArgs { /// Whether to stream metrics to Sift. #[arg(short, long)] - pub stream_metrics: Option, + pub stream_metrics: bool, /// The asset name to use when streaming server ingestion metrics. #[arg(short, long)] pub metrics_asset_name: Option, + + /// Include to use plain output. + #[arg(short, long)] + pub plain_output: bool, } #[derive(clap::Args)] diff --git a/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs b/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs index b67ca1c34..d3beef7c2 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/metrics_streaming_client.rs @@ -17,10 +17,10 @@ pub struct MetricsStreamingClient { impl MetricsStreamingClient { pub fn build( ctx: Context, - stream_metrics: &Option, + stream_metrics: &bool, asset_name: &Option, ) -> Result, anyhow::Error> { - if !stream_metrics.unwrap_or(false) { + if !stream_metrics { return Ok(None); } diff --git a/rust/crates/sift_cli/src/cmd/test_server/mod.rs b/rust/crates/sift_cli/src/cmd/test_server/mod.rs index 97345a452..998cad987 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/mod.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/mod.rs @@ -14,7 +14,6 @@ use tokio::sync::watch; use tonic::transport::Server; use tonic_reflection::server::Builder; -pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../../../sift_rs/descriptor_set.bin"); pub mod metrics_streaming_client; pub mod server; use crate::cmd::test_server::metrics_streaming_client::Metrics; @@ -56,7 +55,8 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result { .calculate_metrics( &mut shutdown_rx, metrics_tx, - args.stream_metrics.unwrap_or(false), + args.stream_metrics, + args.plain_output, ) .await .context("calculate metrics task failed") @@ -81,7 +81,10 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result { }); let reflection_service = Builder::configure() - .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(sift_rs::assets::v1::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(sift_rs::ingest::v1::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(sift_rs::ingestion_configs::v2::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(sift_rs::ping::v1::FILE_DESCRIPTOR_SET) .build_v1() .context("failed to create gRPC reflection service")?; diff --git a/rust/crates/sift_cli/src/cmd/test_server/server.rs b/rust/crates/sift_cli/src/cmd/test_server/server.rs index 622880af8..0b6b889e1 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/server.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/server.rs @@ -15,7 +15,6 @@ use sift_rs::ingestion_configs::v2::{ingestion_config_service_server::IngestionC use sift_rs::ping::v1::{PingRequest, PingResponse, ping_service_server::PingService}; use std::io::stdout; use std::sync::atomic::AtomicBool; -use std::time::Duration; use std::{ collections::HashMap, sync::{ @@ -25,6 +24,7 @@ use std::{ }; use tokio::sync::mpsc::Sender; use tokio::sync::watch; +use tokio::time::Duration; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; use uuid::Uuid; @@ -61,7 +61,9 @@ impl TestServer { shutdown: &mut watch::Receiver, metrics_tx: Sender, streaming_enabled: bool, + plain_output: bool, ) -> AnyhowResult<()> { + let mut interval = tokio::time::interval(Duration::from_secs(1)); let mut stdout = stdout(); let mut last_total_num_bytes_read: u64 = 0; @@ -75,7 +77,7 @@ impl TestServer { return AnyhowOk(()); } - _ = tokio::time::sleep(Duration::from_secs(1)) => { + _ = interval.tick() => { let current_total_num_bytes_read = self.total_num_bytes_read.load(Relaxed); let current_total_num_messages = self.total_num_messages.load(Relaxed); let current_total_num_streams = self.total_num_streams.load(Relaxed); @@ -85,16 +87,21 @@ impl TestServer { last_total_num_bytes_read = current_total_num_bytes_read; last_total_num_messages = current_total_num_messages; - // Clear terminal and print metrics. - stdout - .execute(terminal::Clear(terminal::ClearType::All)) - .context("failed to clear terminal")?; - stdout.execute(cursor::MoveTo(0, 0)) - .context("failed to move terminal cursor")?; - stdout.execute(cursor::MoveUp(5)) - .context("failed to move terminal cursor")?; - stdout.execute(terminal::Clear(terminal::ClearType::FromCursorDown)) - .context("failed to move terminal cursor")?; + if !plain_output { + // Clear terminal and print metrics. + stdout + .execute(terminal::Clear(terminal::ClearType::All)) + .context("failed to clear terminal")?; + stdout.execute(cursor::MoveTo(0, 0)) + .context("failed to move terminal cursor")?; + stdout.execute(cursor::MoveUp(5)) + .context("failed to move terminal cursor")?; + stdout.execute(terminal::Clear(terminal::ClearType::FromCursorDown)) + .context("failed to move terminal cursor")?; + } else { + Output::new().line(format!("-----")).print(); + Output::new().line(format!("{}", chrono::Local::now().to_rfc3339())).print(); + } Output::new().line(format!("Total num streams: {current_total_num_streams}")).print(); Output::new().line(format!("Total num bytes: {current_total_num_bytes_read}")).print(); @@ -159,9 +166,7 @@ impl AssetService for TestServer { &self, _request: Request, ) -> Result, Status> { - Ok(Response::new( - sift_rs::assets::v1::DeleteAssetResponse::default(), - )) + unimplemented!() } /// No-op. @@ -179,9 +184,7 @@ impl AssetService for TestServer { &self, _request: Request, ) -> Result, Status> { - Ok(Response::new( - sift_rs::assets::v1::UpdateAssetResponse::default(), - )) + unimplemented!() } /// No-op. @@ -189,9 +192,7 @@ impl AssetService for TestServer { &self, _request: Request, ) -> Result, Status> { - Ok(Response::new( - sift_rs::assets::v1::ArchiveAssetResponse::default(), - )) + unimplemented!() } } @@ -209,11 +210,7 @@ impl IngestionConfigService for TestServer { .ok_or(Status::not_found("ingestion config not found"))?; Ok(Response::new(GetIngestionConfigResponse { - ingestion_config: Some(IngestionConfig { - ingestion_config_id: ingestion_config.ingestion_config_id.clone(), - asset_id: ingestion_config.asset_id.clone(), - client_key: ingestion_config.client_key.clone(), - }), + ingestion_config: Some(ingestion_config.clone()), })) } From dfa65d25a47667702987a4fe1502e3e2c303da1d Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Thu, 8 Jan 2026 15:13:45 -0800 Subject: [PATCH 7/9] Fix cast issue with newer rust version --- rust/crates/sift_cli/src/cmd/import/csv.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/csv.rs b/rust/crates/sift_cli/src/cmd/import/csv.rs index 627556efd..9e6161806 100644 --- a/rust/crates/sift_cli/src/cmd/import/csv.rs +++ b/rust/crates/sift_cli/src/cmd/import/csv.rs @@ -300,14 +300,14 @@ fn create_data_import_request( let mut enum_configs = Vec::new(); let mut bit_field_configs = Vec::new(); - if data_type == ChannelDataType::Enum.into() { + if data_type == ChannelDataType::Enum as i32 { let Some(configs) = enum_configs_iter.next() else { return Err(anyhow!( "'{name}' was declared as type enum but --enum-config was not specified" )); }; enum_configs = configs; - } else if data_type == ChannelDataType::BitField.into() { + } else if data_type == ChannelDataType::BitField as i32 { let Some(configs) = bit_field_configs_iter.next() else { return Err(anyhow!( "'{name}' was declared as type bit-field but --bit-field-config was not specified" From 89ac8d0359e767903df7263581afcf2fef300fbc Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Wed, 14 Jan 2026 12:14:52 -0800 Subject: [PATCH 8/9] PR feedback --- rust/crates/sift_cli/src/cli/mod.rs | 8 ++++---- rust/crates/sift_cli/src/cmd/test_server/mod.rs | 10 ++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 2e77f240e..1c6d5436c 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -176,9 +176,9 @@ pub enum TestServerCmd { #[derive(clap::Args)] pub struct TestServerArgs { - /// The address to serve gRPC server. Default is 127.0.0.1:50051. - #[arg(short, long)] - pub local_address: Option, + /// The address to serve gRPC server. + #[arg(short, long, default_value_t = String::from("0.0.0.0:50051"))] + pub local_address: String, /// Whether to stream metrics to Sift. #[arg(short, long)] @@ -188,7 +188,7 @@ pub struct TestServerArgs { #[arg(short, long)] pub metrics_asset_name: Option, - /// Include to use plain output. + /// Include to use plain output. Use this option in scripts or when saving logs. #[arg(short, long)] pub plain_output: bool, } diff --git a/rust/crates/sift_cli/src/cmd/test_server/mod.rs b/rust/crates/sift_cli/src/cmd/test_server/mod.rs index 998cad987..84f67360f 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/mod.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/mod.rs @@ -19,12 +19,10 @@ pub mod server; use crate::cmd::test_server::metrics_streaming_client::Metrics; pub async fn run(ctx: Context, args: TestServerArgs) -> Result { - let local_address = args - .local_address - .unwrap_or_else(|| "0.0.0.0:50051".to_string()); - let addr = local_address - .parse() - .context(format!("failed to parse local_address: {}", local_address))?; + let addr = args.local_address.parse().context(format!( + "failed to parse local_address: {}", + args.local_address + ))?; // Initialize streaming client. let mut streaming_client = From c68c149ff12d94cb57ac11e44e47c9a016b801d2 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Wed, 14 Jan 2026 14:31:40 -0800 Subject: [PATCH 9/9] Use correct version of reflection service --- rust/crates/sift_cli/src/cmd/test_server/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/crates/sift_cli/src/cmd/test_server/mod.rs b/rust/crates/sift_cli/src/cmd/test_server/mod.rs index 84f67360f..810b5e024 100644 --- a/rust/crates/sift_cli/src/cmd/test_server/mod.rs +++ b/rust/crates/sift_cli/src/cmd/test_server/mod.rs @@ -83,7 +83,7 @@ pub async fn run(ctx: Context, args: TestServerArgs) -> Result { .register_encoded_file_descriptor_set(sift_rs::ingest::v1::FILE_DESCRIPTOR_SET) .register_encoded_file_descriptor_set(sift_rs::ingestion_configs::v2::FILE_DESCRIPTOR_SET) .register_encoded_file_descriptor_set(sift_rs::ping::v1::FILE_DESCRIPTOR_SET) - .build_v1() + .build_v1alpha() .context("failed to create gRPC reflection service")?; Output::new()