Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rs/hang-cli/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub async fn run_client(client: moq_native::Client, url: Url, name: String, publ
tracing::info!(%url, %name, "connecting");

// Establish the connection, not providing a subscriber.
let session = client.connect_with_fallback(url, origin.consumer, None).await?;
let session = client.with_publish(origin.consumer).connect(url).await?;

#[cfg(unix)]
// Notify systemd that we're ready.
Expand Down
10 changes: 4 additions & 6 deletions rs/hang-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ async fn main() -> anyhow::Result<()> {
Command::Serve { config, dir, name, .. } => {
let web_bind = config.bind.unwrap_or("[::]:443".parse().unwrap());

#[allow(unused_mut)]
let mut server = config.init()?;
let server = config.init()?;
#[cfg(feature = "iroh")]
server.with_iroh(iroh);
let server = server.with_iroh(iroh);

let web_tls = server.tls_info();

Expand All @@ -112,11 +111,10 @@ async fn main() -> anyhow::Result<()> {
}
}
Command::Publish { config, url, name, .. } => {
#[allow(unused_mut)]
let mut client = config.init()?;
let client = config.init()?;

#[cfg(feature = "iroh")]
client.with_iroh(iroh);
let client = client.with_iroh(iroh);

run_client(client, url, name, publish).await
}
Expand Down
2 changes: 1 addition & 1 deletion rs/hang-cli/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn run_session(
origin.producer.publish_broadcast(&name, consumer);

// Blindly accept the session (WebTransport or QUIC), regardless of the URL.
let session = session.accept(origin.consumer, None).await?;
let session = session.with_publish(origin.consumer).accept().await?;

tracing::info!(id, "accepted session");

Expand Down
8 changes: 4 additions & 4 deletions rs/hang/examples/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ async fn run_session(origin: moq_lite::OriginConsumer) -> anyhow::Result<()> {
// The "anon" path is usually configured to bypass authentication; be careful!
let url = url::Url::parse("https://cdn.moq.dev/anon/video-example").unwrap();

// Establish a WebTransport/QUIC connection and MoQ handshake.
// None means we're not consuming anything from the session, otherwise we would provide an OriginProducer.
// Optional: Use connect_with_fallback if you also want to support WebSocket.
let session = client.connect(url, origin, None).await?;
// Establish a WebTransport/QUIC connection and MoQ handshake for publishing.
// with_publish() registers an OriginConsumer for outgoing data.
// Use with_consume() if you also want to subscribe/consume from the session.
let session = client.with_publish(origin).connect(url).await?;

// Wait until the session is closed.
session.closed().await.map_err(Into::into)
Expand Down
6 changes: 5 additions & 1 deletion rs/libmoq/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ impl Session {
let client = moq_native::ClientConfig::default()
.init()
.map_err(|err| Error::Connect(Arc::new(err)))?;

let session = client
.connect(url, publish, consume)
.with_publish(publish)
.with_consume(consume)
.connect(url)
.await
.map_err(|err| Error::Connect(Arc::new(err)))?;

callback.call(());

session.closed().await?;
Expand Down
6 changes: 2 additions & 4 deletions rs/moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,15 @@ async fn main() -> anyhow::Result<()> {

origin.producer.publish_broadcast(&config.broadcast, broadcast.consumer);

let session = client
.connect_with_fallback(config.url, Some(origin.consumer), None)
.await?;
let session = client.with_publish(origin.consumer).connect(config.url).await?;

tokio::select! {
res = session.closed() => res.context("session closed"),
_ = clock.run() => Ok(()),
}
}
Command::Subscribe => {
let session = client.connect_with_fallback(config.url, None, origin.producer).await?;
let session = client.with_consume(origin.producer).connect(config.url).await?;

// NOTE: We could just call `session.consume_broadcast(&config.broadcast)` instead,
// However that won't work with IETF MoQ and the current OriginConsumer API the moment.
Expand Down
104 changes: 104 additions & 0 deletions rs/moq-lite/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// TODO: Uncomment when observability feature is merged
// use std::sync::Arc;

use crate::{
Error, OriginConsumer, OriginProducer, Session, VERSIONS,
coding::{Decode, Encode, Stream},
ietf, lite, setup,
};

/// A MoQ client session builder.
#[derive(Default, Clone)]
pub struct Client {
publish: Option<OriginConsumer>,
consume: Option<OriginProducer>,
// TODO: Uncomment when observability feature is merged
// stats: Option<Arc<dyn crate::Stats>>,
}

impl Client {
pub fn new() -> Self {
Default::default()
}

pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
self.publish = publish.into();
self
}

pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
self.consume = consume.into();
self
}

// TODO: Uncomment when observability feature is merged
// pub fn with_stats(mut self, stats: impl Into<Option<Arc<dyn crate::Stats>>>) -> Self {
// self.stats = stats.into();
// self
// }

/// Perform the MoQ handshake as a client negotiating the version.
pub async fn connect<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
if self.publish.is_none() && self.consume.is_none() {
tracing::warn!("not publishing or consuming anything");
}

let mut stream = Stream::open(&session, setup::ServerKind::Ietf14).await?;

let mut parameters = ietf::Parameters::default();
parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
let parameters = parameters.encode_bytes(());

let client = setup::Client {
// Unfortunately, we have to pick a single draft range to support.
// moq-lite can support this handshake.
kind: setup::ClientKind::Ietf14,
versions: VERSIONS.into(),
parameters,
};

// TODO pretty print the parameters.
tracing::trace!(?client, "sending client setup");
stream.writer.encode(&client).await?;

let mut server: setup::Server = stream.reader.decode().await?;
tracing::trace!(?server, "received server setup");

if let Ok(version) = lite::Version::try_from(server.version) {
let stream = stream.with_version(version);
lite::start(
session.clone(),
stream,
self.publish.clone(),
self.consume.clone(),
version,
)
.await?;
} else if let Ok(version) = ietf::Version::try_from(server.version) {
// Decode the parameters to get the initial request ID.
let parameters = ietf::Parameters::decode(&mut server.parameters, version)?;
let request_id_max =
ietf::RequestId(parameters.get_varint(ietf::ParameterVarInt::MaxRequestId).unwrap_or(0));

let stream = stream.with_version(version);
ietf::start(
session.clone(),
stream,
request_id_max,
true,
self.publish.clone(),
self.consume.clone(),
version,
)
.await?;
} else {
// unreachable, but just in case
return Err(Error::Version(client.versions, [server.version].into()));
}

tracing::debug!(version = ?server.version, "connected");

Ok(Session::new(session))
}
}
4 changes: 4 additions & 0 deletions rs/moq-lite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,21 @@
//! - Use [FrameProducer] and [FrameConsumer] for chunked frame writes/reads without allocating entire frames (useful for relaying).
//! - Use [TrackProducer::create_group] instead of [TrackProducer::append_group] to produce groups out-of-order.

mod client;
mod error;
mod model;
mod path;
mod server;
mod session;
mod setup;

pub mod coding;
pub mod ietf;
pub mod lite;

pub use client::*;
pub use error::*;
pub use model::*;
pub use path::*;
pub use server::*;
pub use session::*;
111 changes: 111 additions & 0 deletions rs/moq-lite/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// TODO: Uncomment when observability feature is merged
// use std::sync::Arc;

use crate::{
Error, OriginConsumer, OriginProducer, Session, VERSIONS,
coding::{Decode, Encode, Stream},
ietf, lite, setup,
};

/// A MoQ server session builder.
#[derive(Default, Clone)]
pub struct Server {
publish: Option<OriginConsumer>,
consume: Option<OriginProducer>,
// TODO: Uncomment when observability feature is merged
// stats: Option<Arc<dyn crate::Stats>>,
}

impl Server {
pub fn new() -> Self {
Default::default()
}

pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
self.publish = publish.into();
self
}

pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
self.consume = consume.into();
self
}

// TODO: Uncomment when observability feature is merged
// pub fn with_stats(mut self, stats: impl Into<Option<Arc<dyn crate::Stats>>>) -> Self {
// self.stats = stats.into();
// self
// }

/// Perform the MoQ handshake as a server for the given session.
pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
if self.publish.is_none() && self.consume.is_none() {
tracing::warn!("not publishing or consuming anything");
}

// Accept with an initial version; we'll switch to the negotiated version later
let mut stream = Stream::accept(&session, ()).await?;
let mut client: setup::Client = stream.reader.decode().await?;
tracing::trace!(?client, "received client setup");

// Choose the version to use
let version = client
.versions
.iter()
.find(|v| VERSIONS.contains(v))
.copied()
.ok_or_else(|| Error::Version(client.versions.clone(), VERSIONS.into()))?;

// Only encode parameters if we're using the IETF draft because it has max_request_id
let parameters = if ietf::Version::try_from(version).is_ok() && client.kind == setup::ClientKind::Ietf14 {
let mut parameters = ietf::Parameters::default();
parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
parameters.encode_bytes(())
} else {
lite::Parameters::default().encode_bytes(())
};

let server = setup::Server { version, parameters };
tracing::trace!(?server, "sending server setup");

let mut stream = stream.with_version(client.kind.reply());
stream.writer.encode(&server).await?;

if let Ok(version) = lite::Version::try_from(version) {
let stream = stream.with_version(version);
lite::start(
session.clone(),
stream,
self.publish.clone(),
self.consume.clone(),
version,
)
.await?;
} else if let Ok(version) = ietf::Version::try_from(version) {
// Decode the client's parameters to get their max request ID.
let parameters = ietf::Parameters::decode(&mut client.parameters, version)?;
let request_id_max =
ietf::RequestId(parameters.get_varint(ietf::ParameterVarInt::MaxRequestId).unwrap_or(0));

let stream = stream.with_version(version);
ietf::start(
session.clone(),
stream,
request_id_max,
false,
self.publish.clone(),
self.consume.clone(),
version,
)
.await?;
} else {
// unreachable, but just in case
return Err(Error::Version(client.versions, VERSIONS.into()));
}

tracing::debug!(?version, "connected");

Ok(Session::new(session))
}
}
Loading
Loading