From ab9cd134304b1917ff0450d805155eff430d2756 Mon Sep 17 00:00:00 2001 From: Ivica Kukic Date: Fri, 28 Nov 2025 22:42:23 +0100 Subject: [PATCH] HAProxy protocol receiver trait and data types --- pingora-core/Cargo.toml | 1 + pingora-core/src/listeners/mod.rs | 46 ++++ pingora-core/src/protocols/digest.rs | 15 ++ pingora-core/src/protocols/mod.rs | 2 + pingora-core/src/protocols/proxy_protocol.rs | 223 +++++++++++++++++++ pingora-core/src/services/listening.rs | 15 ++ 6 files changed, 302 insertions(+) create mode 100644 pingora-core/src/protocols/proxy_protocol.rs diff --git a/pingora-core/Cargo.toml b/pingora-core/Cargo.toml index d7eaa7d7..1261aad1 100644 --- a/pingora-core/Cargo.toml +++ b/pingora-core/Cargo.toml @@ -106,3 +106,4 @@ openssl_derived = ["any_tls"] any_tls = [] sentry = ["dep:sentry"] connection_filter = [] +proxy_protocol = [] diff --git a/pingora-core/src/listeners/mod.rs b/pingora-core/src/listeners/mod.rs index 49137d4d..82fd631c 100644 --- a/pingora-core/src/listeners/mod.rs +++ b/pingora-core/src/listeners/mod.rs @@ -90,6 +90,9 @@ pub use crate::protocols::tls::ALPN; use crate::protocols::GetSocketDigest; pub use l4::{ServerAddress, TcpSocketOptions}; +#[cfg(feature = "proxy_protocol")] +use crate::protocols::proxy_protocol::ProxyProtocolReceiver; + /// The APIs to customize things like certificate during TLS server side handshake #[async_trait] pub trait TlsAccept { @@ -110,6 +113,8 @@ struct TransportStackBuilder { tls: Option, #[cfg(feature = "connection_filter")] connection_filter: Option>, + #[cfg(feature = "proxy_protocol")] + pp_receiver: Option>, } impl TransportStackBuilder { @@ -135,6 +140,8 @@ impl TransportStackBuilder { Ok(TransportStack { l4, tls: self.tls.take().map(|tls| Arc::new(tls.build())), + #[cfg(feature = "proxy_protocol")] + pp_receiver: self.pp_receiver.take(), }) } } @@ -143,6 +150,8 @@ impl TransportStackBuilder { pub(crate) struct TransportStack { l4: ListenerEndpoint, tls: Option>, + #[cfg(feature = "proxy_protocol")] + pp_receiver: Option>, } impl TransportStack { @@ -155,6 +164,8 @@ impl TransportStack { Ok(UninitializedStream { l4: stream, tls: self.tls.clone(), + #[cfg(feature = "proxy_protocol")] + pp_receiver: self.pp_receiver.clone(), }) } @@ -166,11 +177,26 @@ impl TransportStack { pub(crate) struct UninitializedStream { l4: L4Stream, tls: Option>, + #[cfg(feature = "proxy_protocol")] + pp_receiver: Option>, } impl UninitializedStream { pub async fn handshake(mut self) -> Result { self.l4.set_buffer(); + + #[cfg(feature = "proxy_protocol")] + if let Some(receiver) = self.pp_receiver { + let (header, unused) = receiver.accept(&mut self.l4).await?; + + self.l4.get_socket_digest().map(|sd| { + sd.proxy_protocol + .set(Some(header)) + .expect("Newly created OnceCell must be empty"); + }); + self.l4.rewind(&unused); + } + if let Some(tls) = self.tls { let tls_stream = tls.tls_handshake(self.l4).await?; Ok(Box::new(tls_stream)) @@ -288,6 +314,26 @@ impl Listeners { tls, #[cfg(feature = "connection_filter")] connection_filter: self.connection_filter.clone(), + #[cfg(feature = "proxy_protocol")] + pp_receiver: None, + }) + } + + #[cfg(feature = "proxy_protocol")] + /// Add TCP endpoint to self with optional [`TcpSocketOptions`] and [`TlsSettings`], and a [`ProxyProtocolReceiver`]. + pub fn add_proxy_protocol_endpoint( + &mut self, + addr: &str, + sock_opt: Option, + tls: Option, + pp_receiver: R, + ) { + self.stacks.push(TransportStackBuilder { + l4: ServerAddress::Tcp(addr.into(), sock_opt), + tls, + #[cfg(feature = "connection_filter")] + connection_filter: self.connection_filter.clone(), + pp_receiver: Some(Arc::new(pp_receiver)), }) } diff --git a/pingora-core/src/protocols/digest.rs b/pingora-core/src/protocols/digest.rs index f939bb1f..2c4e50b9 100644 --- a/pingora-core/src/protocols/digest.rs +++ b/pingora-core/src/protocols/digest.rs @@ -24,6 +24,9 @@ use super::l4::socket::SocketAddr; use super::raw_connect::ProxyDigest; use super::tls::digest::SslDigest; +#[cfg(feature = "proxy_protocol")] +use super::proxy_protocol::ProxyProtocolHeader; + /// The information can be extracted from a connection #[derive(Clone, Debug, Default)] pub struct Digest { @@ -72,6 +75,9 @@ pub struct SocketDigest { pub local_addr: OnceCell>, /// Original destination address pub original_dst: OnceCell>, + /// Proxy Protocol header + #[cfg(feature = "proxy_protocol")] + pub proxy_protocol: OnceCell>, } impl SocketDigest { @@ -82,6 +88,8 @@ impl SocketDigest { peer_addr: OnceCell::new(), local_addr: OnceCell::new(), original_dst: OnceCell::new(), + #[cfg(feature = "proxy_protocol")] + proxy_protocol: OnceCell::new(), } } @@ -92,6 +100,8 @@ impl SocketDigest { peer_addr: OnceCell::new(), local_addr: OnceCell::new(), original_dst: OnceCell::new(), + #[cfg(feature = "proxy_protocol")] + proxy_protocol: OnceCell::new(), } } @@ -204,6 +214,11 @@ impl SocketDigest { }) .as_ref() } + + #[cfg(feature = "proxy_protocol")] + pub fn proxy_protocol(&self) -> Option<&ProxyProtocolHeader> { + self.proxy_protocol.get_or_init(|| None).as_ref() + } } /// The interface to return timing information diff --git a/pingora-core/src/protocols/mod.rs b/pingora-core/src/protocols/mod.rs index d3bd99b8..72798dd3 100644 --- a/pingora-core/src/protocols/mod.rs +++ b/pingora-core/src/protocols/mod.rs @@ -17,6 +17,8 @@ mod digest; pub mod http; pub mod l4; +#[cfg(feature = "proxy_protocol")] +pub mod proxy_protocol; pub mod raw_connect; pub mod tls; #[cfg(windows)] diff --git a/pingora-core/src/protocols/proxy_protocol.rs b/pingora-core/src/protocols/proxy_protocol.rs new file mode 100644 index 00000000..66701ce0 --- /dev/null +++ b/pingora-core/src/protocols/proxy_protocol.rs @@ -0,0 +1,223 @@ +// Copyright 2025 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Proxy Protocol support for preserving client connection information +//! +//! This module provides the [`ProxyProtocolReceiver`] trait and related types for implementing +//! [HAProxy's Proxy Protocol](https://www.haproxy.org/download/2.8/doc/proxy-protocol.txt). +//! The protocol allows intermediaries (like load balancers) to pass original client connection +//! information to backend servers. +//! +//! # Feature Flag +//! +//! This functionality requires the `proxy_protocol` feature to be enabled: +//! ```toml +//! [dependencies] +//! pingora-core = { version = "0.6", features = ["proxy_protocol"] } +//! ``` +//! +//! # Protocol Versions +//! +//! Both Proxy Protocol v1 (human-readable text format) and v2 (binary format with TLV support) +//! are supported through the [`ProxyProtocolHeader`] enum. +//! +//! # Example +//! +//! ```rust,no_run +//! use async_trait::async_trait; +//! use pingora_core::protocols::proxy_protocol::{ +//! ProxyProtocolReceiver, ProxyProtocolHeader, HeaderV2, +//! Command, Transport, Addresses +//! }; +//! use pingora_core::protocols::l4::stream::Stream; +//! use pingora_error::Result; +//! +//! struct MyProxyProtocolParser; +//! +//! #[async_trait] +//! impl ProxyProtocolReceiver for MyProxyProtocolParser { +//! async fn accept(&self, stream: &mut Stream) -> Result<(ProxyProtocolHeader, Vec)> { +//! // Parse the Proxy Protocol header from the stream +//! // Return the parsed header and any remaining bytes +//! todo!("Implement parsing logic") +//! } +//! } +//! ``` + +use async_trait::async_trait; +use std::borrow::Cow; +use std::net::SocketAddr; + +use super::l4::stream::Stream; +use pingora_error::Result; + +/// A trait for parsing Proxy Protocol headers from incoming connections. +/// +/// Implementations of this trait handle reading and parsing Proxy Protocol headers +/// (v1 or v2) from a stream. The trait is designed to be flexible, allowing different +/// parsing strategies or third-party parser libraries to be used. +/// +/// # Example +/// +/// ```rust,no_run +/// use async_trait::async_trait; +/// use pingora_core::protocols::proxy_protocol::{ +/// ProxyProtocolReceiver, ProxyProtocolHeader, HeaderV1, +/// Transport, Addresses +/// }; +/// use pingora_core::protocols::l4::stream::Stream; +/// use pingora_error::Result; +/// use tokio::io::AsyncReadExt; +/// +/// struct SimpleV1Parser; +/// +/// #[async_trait] +/// impl ProxyProtocolReceiver for SimpleV1Parser { +/// async fn accept(&self, stream: &mut Stream) -> Result<(ProxyProtocolHeader, Vec)> { +/// let mut buffer = Vec::new(); +/// // Read and parse v1 header +/// stream.read_buf(&mut buffer).await?; +/// // Parse logic here... +/// todo!("Parse v1 header and return result") +/// } +/// } +/// ``` +/// +/// # Performance Considerations +/// +/// This method is called once per connection that uses Proxy Protocol. Implementations +/// should efficiently read only the necessary bytes from the stream to parse the header, +/// returning any excess bytes for subsequent processing. +#[async_trait] +pub trait ProxyProtocolReceiver: Send + Sync { + /// Parses the Proxy Protocol header from an accepted connection stream. + /// + /// This method is called after a TCP connection is accepted on a Proxy Protocol endpoint. + /// Implementors should read from the stream to parse the header according to either + /// v1 (text) or v2 (binary) format specifications. + /// + /// # Arguments + /// + /// * `stream` - A mutable reference to the accepted connection stream + /// + /// # Returns + /// + /// A tuple containing: + /// * The parsed [`ProxyProtocolHeader`] (v1 or v2) + /// * Any remaining bytes read from the stream after the header (to be processed by the application) + /// + /// # Errors + /// + /// Returns an error if: + /// * The stream cannot be read + /// * The header format is invalid + /// * The connection is closed unexpectedly + /// + /// # Example + /// + /// ```rust,no_run + /// async fn accept(&self, stream: &mut Stream) -> Result<(ProxyProtocolHeader, Vec)> { + /// // Read bytes from stream + /// let mut buffer = Vec::new(); + /// stream.read_buf(&mut buffer).await?; + /// + /// // Parse header and determine remaining bytes + /// let (header, remaining) = parse_proxy_header(&buffer)?; + /// Ok((header, remaining)) + /// } + /// ``` + async fn accept(&self, stream: &mut Stream) -> Result<(ProxyProtocolHeader, Vec)>; +} + +/// Parsed Proxy Protocol header containing connection information. +/// +/// This enum represents either a v1 (text) or v2 (binary) Proxy Protocol header. +/// The version is determined by the parser implementation. +#[derive(Debug)] +pub enum ProxyProtocolHeader { + /// Proxy Protocol version 1 (human-readable text format) + V1(HeaderV1), + /// Proxy Protocol version 2 (binary format with TLV extension support) + V2(HeaderV2), +} + +/// Proxy Protocol version 1 header information. +/// +/// Version 1 uses a human-readable text format. It contains basic transport +/// and address information but does not support the command field or TLV extensions. +#[derive(Debug)] +pub struct HeaderV1 { + /// The transport protocol used for the proxied connection + pub transport: Transport, + /// Source and destination addresses, if available. + /// `None` indicates an unknown or local connection. + pub addresses: Option, +} + +/// Proxy Protocol version 2 header information. +/// +/// Version 2 uses a binary format and supports additional features including +/// the command field (LOCAL vs PROXY) and optional TLV (Type-Length-Value) extensions +/// for passing custom metadata. +#[derive(Debug)] +pub struct HeaderV2 { + /// Indicates whether this is a proxied connection or a local health check + pub command: Command, + /// The transport protocol used for the proxied connection + pub transport: Transport, + /// Source and destination addresses, if available. + /// `None` for LOCAL command or unknown connections. + pub addresses: Option, + /// Optional TLV (Type-Length-Value) data for protocol extensions. + /// May contain additional metadata such as SSL information, unique IDs, etc. + pub tlvs: Option>, +} + +/// Transport protocol family for the proxied connection. +/// +/// Indicates the network protocol (IPv4 or IPv6 over TCP) or unknown/unspecified transport. +#[derive(Debug)] +pub enum Transport { + /// TCP over IPv4 + Tcp4, + /// TCP over IPv6 + Tcp6, + /// Unknown or unspecified transport protocol + Unknown, +} + +/// Source and destination socket addresses for a proxied connection. +/// +/// Contains the original client address and the destination address +/// as seen by the proxy/load balancer. +#[derive(Debug)] +pub struct Addresses { + /// The original source address (client) + pub source: SocketAddr, + /// The destination address as seen by the proxy + pub destination: SocketAddr, +} + +/// Proxy Protocol v2 command type. +/// +/// Distinguishes between actual proxied connections and local connections +/// (typically used for health checks). +#[derive(Debug)] +pub enum Command { + /// LOCAL command: indicates a health check or non-proxied connection. + /// Receivers should not use any address information from LOCAL connections. + Local, + /// PROXY command: indicates a proxied connection with valid address information. + Proxy, +} diff --git a/pingora-core/src/services/listening.rs b/pingora-core/src/services/listening.rs index b5c04dd1..8e694ed7 100644 --- a/pingora-core/src/services/listening.rs +++ b/pingora-core/src/services/listening.rs @@ -25,6 +25,8 @@ use crate::listeners::AcceptAllFilter; use crate::listeners::{ ConnectionFilter, Listeners, ServerAddress, TcpSocketOptions, TransportStack, }; +#[cfg(feature = "proxy_protocol")] +use crate::protocols::proxy_protocol::ProxyProtocolReceiver; use crate::protocols::Stream; #[cfg(unix)] use crate::server::ListenFds; @@ -153,6 +155,19 @@ impl Service { .add_tls_with_settings(addr, sock_opt, settings) } + /// Add TCP listening endpoint with optional [`TcpSocketOptions`] and [`TlsSettings`], and a [`ProxyProtocolReceiver`]. + #[cfg(feature = "proxy_protocol")] + pub fn add_proxy_protocol_endpoint( + &mut self, + addr: &str, + sock_opt: Option, + tls: Option, + pp_receiver: R, + ) { + self.listeners + .add_proxy_protocol_endpoint(addr, sock_opt, tls, pp_receiver); + } + /// Add an endpoint according to the given [`ServerAddress`] pub fn add_address(&mut self, addr: ServerAddress) { self.listeners.add_address(addr);