Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
use std::time::Duration;
use uuid::Uuid;

#[allow(clippy::too_many_arguments)]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could pull this into a ConsumeConfig struct as we do for howl?

pub fn consume(
topic: &BrokerAndTopic,
partition: Option<i32>,
Expand All @@ -15,6 +16,8 @@ pub fn consume(
offset: Option<i64>,
last: Option<i64>,
timestamp: Option<i64>,
key: bool,
terse: bool,
) {
debug!(
"Listening to topic: {} partition {:?} on broker {}:{}, filtering {}",
Expand Down Expand Up @@ -100,9 +103,22 @@ pub fn consume(
}
debug!("Message has no schema id, ignoring filter")
}

print!("[partition={}", message.partition());
if key {
print!(" key={:?}", message.key().unwrap_or(b""));
}
print!("] ");
match deserialize_message(p) {
Ok(d) => println!("{d:?}"),
Ok(d) => {
if terse {
let schema = get_schema_id(p)
.and_then(|s| str::from_utf8(s).ok())
.unwrap_or("invalid");
println!("{schema} ({} bytes)", p.len())
} else {
println!("{d:?}")
}
}
Err(e) => error!("Failed to deserialize message: {e:?}"),
}
}
Expand Down
42 changes: 39 additions & 3 deletions src/howl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use flatbuffers::FlatBufferBuilder;
use isis_streaming_data_types::flatbuffers_generated::events_ev44::{
Event44Message, Event44MessageArgs, finish_event_44_message_buffer,
};
use isis_streaming_data_types::flatbuffers_generated::pulse_metadata_pu00::{
Pu00Message, Pu00MessageArgs, finish_pu_00_message_buffer,
};
use isis_streaming_data_types::flatbuffers_generated::run_start_pl72::{
RunStart, RunStartArgs, SpectraDetectorMapping, SpectraDetectorMappingArgs,
finish_run_start_buffer,
Expand Down Expand Up @@ -126,6 +129,18 @@ fn produce_messages(
.try_into()
.expect("This will fail after April 11th, 2262");

match producer.send(
BaseRecord::to(conf.event_topic)
.key("")
.payload(generate_fake_metadata(fbb, now_nanos))
.timestamp(now_nanos / 1_000_000),
) {
Ok(_) => {}
Err(err) => {
error!("Failed to send messages: {}", err.0);
}
}

for _ in 0..conf.messages_per_frame {
match producer.send(
BaseRecord::to(conf.event_topic)
Expand Down Expand Up @@ -222,6 +237,21 @@ fn generate_fake_events<'a>(
fbb.finished_data()
}

fn generate_fake_metadata<'a>(fbb: &'a mut FlatBufferBuilder<'_>, timestamp_ns: i64) -> &'a [u8] {
fbb.reset();
let args = Pu00MessageArgs {
reference_time: timestamp_ns,
message_id: 0,
source_name: Some(fbb.create_string("saluki")),
period_number: Some(1),
vetos: Some(0),
proton_charge: Some(0.1),
};
let pu00 = Pu00Message::create(fbb, &args);
finish_pu_00_message_buffer(fbb, pu00);
fbb.finished_data()
}

pub struct HowlConfig<'a> {
pub broker: &'a str,
pub event_topic: &'a str,
Expand All @@ -243,14 +273,18 @@ pub fn howl(conf: &HowlConfig) {
.as_nanos()
.try_into()
.expect("This will fail after April 11th, 2262");

let ev44_size =
generate_fake_events(&mut fbb, &mut rng, 0, conf.event_message_config, now_nanos).len()
as u32;

debug!("ev44 size is {ev44_size} bytes");

// calculate rate
let rate_bytes_per_sec = ev44_size * conf.messages_per_frame * conf.frames_per_second;
let pu00_size = generate_fake_metadata(&mut fbb, now_nanos).len() as u32;
debug!("pu00 size is {pu00_size} bytes");

// calculate overall rate (with both ev44 and pu00)
let rate_bytes_per_sec = ev44_size * conf.messages_per_frame * conf.frames_per_second
+ pu00_size * conf.frames_per_second;
debug!("bytes per second: {rate_bytes_per_sec}");

let rate_mbit_per_sec = (rate_bytes_per_sec as f64 / (1024. * 1024.)) * 8.0;
Expand All @@ -259,6 +293,7 @@ pub fn howl(conf: &HowlConfig) {
println!(
"Attempting to simulate data rate: {rate_mbit_per_sec:.3} Mbit/s ({rate_mebibits_per_sec:.3} MiB/s)"
);
println!("Each pu00 is {pu00_size} bytes");
println!("Each ev44 is {ev44_size} bytes");

let producer: ThreadedProducer<DefaultProducerContext> = ClientConfig::new()
Expand Down Expand Up @@ -291,6 +326,7 @@ pub fn howl(conf: &HowlConfig) {
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get system time");
debug!("Target time: {target_time:?}");

loop {
target_time += target_frame_time;
debug!("New target: {target_time:?}");
Expand Down
10 changes: 9 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ enum Commands {
/// Print last x messages on topic
#[arg(short, long, conflicts_with_all = ["offset","timestamp","messages","filter"])]
last: Option<i64>,
/// Show message key
#[arg(long, action=clap::ArgAction::SetTrue)]
key: bool,
/// Print using terse format (just schema ID and length)
#[arg(long, action=clap::ArgAction::SetTrue)]
terse: bool,
},
/// Print broker metadata.
Sniff {
Expand Down Expand Up @@ -97,8 +103,10 @@ fn main() {
offset,
last,
timestamp,
key,
terse,
} => consume::consume(
&topic, partition, &filter, messages, offset, last, timestamp,
&topic, partition, &filter, messages, offset, last, timestamp, key, terse,
),
Commands::Sniff { broker } => sniff(&broker),
Commands::Howl {
Expand Down