From b25571905e76b0b352bd7a355864f3f050e7b6d4 Mon Sep 17 00:00:00 2001 From: Steve Jenson Date: Mon, 10 Apr 2017 13:49:42 -0700 Subject: [PATCH] Adds failfast mode for unroutable requests. * Fixes Issue #26 * Adds a failfast mode for currently unroutable requests. * Adds an integration test failfast. * Adds a mock namerd. * Adds a mock static webserver. * Improves some comments. --- Cargo.lock | 1 + Cargo.toml | 7 ++ circle.yml | 2 +- src/app/mod.rs | 80 ++++++++++++--------- src/lb/balancer.rs | 18 ++++- src/lb/mod.rs | 29 +++++--- src/lb/proxy_stream.rs | 4 +- src/namerd.rs | 7 +- tests/lib.rs | 13 ++++ tests/mocks/mock_namerd.rs | 38 ++++++++++ tests/mocks/mock_webserver.rs | 38 ++++++++++ tests/mocks/mod.rs | 5 ++ tests/test_bad_requests.rs | 131 ++++++++++++++++++++++++++++++++++ 13 files changed, 321 insertions(+), 52 deletions(-) create mode 100644 tests/lib.rs create mode 100644 tests/mocks/mock_namerd.rs create mode 100644 tests/mocks/mock_webserver.rs create mode 100644 tests/mocks/mod.rs create mode 100644 tests/test_bad_requests.rs diff --git a/Cargo.lock b/Cargo.lock index 62f1cce..fbd8a83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,6 +4,7 @@ version = "0.0.2" dependencies = [ "bytes 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.22.1 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.0-a.0 (git+https://github.com/hyperium/hyper?rev=5a3743c1)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index bab71da..36c8ec6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,3 +34,10 @@ tokio-io = "0.1" tokio-service = "0.1" tokio-timer = "0.1" url = "1.4" + +[dev-dependencies] +env_logger = { version = "0.3", default-features = false } +futures = "0.1" +# We use not-yet-released tokio integration on master: +hyper = { git = "https://github.com/hyperium/hyper", rev = "5a3743c1" } +tokio-core = "0.1" diff --git a/circle.yml b/circle.yml index e89ea1b..f9e9024 100644 --- a/circle.yml +++ b/circle.yml @@ -17,4 +17,4 @@ dependencies: test: override: #- ~/.cargo/bin/cargo clippy - - ~/.cargo/bin/cargo test + - RUST_BACKTRACE=full ~/.cargo/bin/cargo test diff --git a/src/app/mod.rs b/src/app/mod.rs index f3e00d1..283bf16 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -9,7 +9,7 @@ use std::cell::RefCell; use std::collections::{VecDeque, HashMap}; use std::fs::File; use std::io::{self, BufReader}; -use std::net; +use std::net::{self, SocketAddr}; use std::rc::Rc; use std::time::Duration; use tacho::{self, Tacho}; @@ -21,12 +21,12 @@ mod admin_http; mod sni; pub mod config; -use self::config::*; -use self::sni::Sni; use WeightedAddr; use lb::{Balancer, Acceptor, Connector, PlainAcceptor, PlainConnector, SecureAcceptor, SecureConnector}; use namerd; +use self::config::*; +use self::sni::Sni; const DEFAULT_BUFFER_SIZE: usize = 8 * 1024; const DEFAULT_MAX_WAITERS: usize = 8; @@ -95,7 +95,7 @@ pub fn configure(app: AppConfig) -> (Admin, Proxies) { pub trait Loader: Sized { type Run: Future; - fn load(self, handle: Handle) -> io::Result; + fn load(self, handle: Handle) -> io::Result<(SocketAddr, Self::Run)>; } pub trait Runner: Sized { fn run(self) -> io::Result<()>; @@ -104,7 +104,7 @@ pub trait Runner: Sized { impl Runner for L { fn run(self) -> io::Result<()> { let mut core = Core::new()?; - let fut = self.load(core.handle())?; + let (_, fut) = self.load(core.handle())?; core.run(fut) } } @@ -118,12 +118,12 @@ pub struct Admin { } impl Loader for Admin { type Run = Running; - fn load(self, handle: Handle) -> io::Result { + fn load(self, handle: Handle) -> io::Result<(SocketAddr, Running)> { let mut running = Running::new(); { let mut namerds = self.namerds; for _ in 0..namerds.len() { - let f = namerds.pop_front().unwrap().load(handle.clone())?; + let (_, f) = namerds.pop_front().unwrap().load(handle.clone())?; running.register(f.map_err(|_| io::ErrorKind::Other.into())); } } @@ -163,19 +163,19 @@ impl Loader for Admin { }); running.register(srv); } - Ok(running) + Ok((self.addr, running)) } } -struct Namerd { - config: NamerdConfig, - sender: mpsc::Sender>, - metrics: tacho::Metrics, +pub struct Namerd { + pub config: NamerdConfig, + pub sender: mpsc::Sender>, + pub metrics: tacho::Metrics, } impl Loader for Namerd { type Run = Box>; - fn load(self, handle: Handle) -> io::Result { + fn load(self, handle: Handle) -> io::Result<(SocketAddr, Self::Run)> { let path = self.config.path; let url = self.config.url; let interval_secs = self.config.interval_secs.unwrap_or(DEFAULT_NAMERD_SECONDS); @@ -194,7 +194,8 @@ impl Loader for Namerd { let sink = self.sender.sink_map_err(|_| error!("sink error")); addrs.forward(sink).map_err(|_| io::ErrorKind::Other.into()).map(|_| {}) }; - Ok(Box::new(driver)) + // FIXME: 127.0.0.1:0 is a hideous hack but I lost the ability to get a SocketAddr. + Ok(("127.0.0.1:0".parse().unwrap(), Box::new(driver))) } } @@ -203,29 +204,32 @@ pub struct Proxies { } impl Loader for Proxies { type Run = Running; - fn load(self, handle: Handle) -> io::Result { + fn load(self, handle: Handle) -> io::Result<(SocketAddr, Running)> { let mut running = Running::new(); let mut proxies = self.proxies; + let mut addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); for _ in 0..proxies.len() { let p = proxies.pop_front().unwrap(); - let f = p.load(handle.clone())?; + let (_addr, f) = p.load(handle.clone())?; + addr = _addr; running.register(f); } - Ok(running) + Ok((addr, running)) } } -struct Proxy { - client: Option, - server: ProxyServer, +pub struct Proxy { + pub client: Option, + pub server: ProxyServer, } impl Loader for Proxy { type Run = Running; - fn load(self, handle: Handle) -> io::Result { + fn load(self, handle: Handle) -> io::Result<(SocketAddr, Running)> { match self.client.and_then(|c| c.tls) { None => { let conn = PlainConnector::new(handle.clone()); - self.server.load(&handle, conn) + let f = self.server.load(&handle, conn).expect("b"); + Ok(f) } Some(ref c) => { let mut tls = rustls::ClientConfig::new(); @@ -238,22 +242,23 @@ impl Loader for Proxy { } }; let conn = SecureConnector::new(c.dns_name.clone(), tls, handle.clone()); - self.server.load(&handle, conn) + let f = self.server.load(&handle, conn).expect("a"); + Ok(f) } } } } -struct ProxyServer { - label: String, - servers: Vec, - addrs: Box, Error = ()>>, - buf: Rc>>, - max_waiters: usize, - metrics: tacho::Metrics, +pub struct ProxyServer { + pub label: String, + pub servers: Vec, + pub addrs: Box, Error = ()>>, + pub buf: Rc>>, + pub max_waiters: usize, + pub metrics: tacho::Metrics, } impl ProxyServer { - fn load(self, handle: &Handle, conn: C) -> io::Result + fn load(self, handle: &Handle, conn: C) -> io::Result<(SocketAddr, Running)> where C: Connector + 'static { let addrs = self.addrs.map_err(|_| io::ErrorKind::Other.into()); @@ -261,6 +266,9 @@ impl ProxyServer { let bal = Balancer::new(addrs, conn, self.buf.clone(), metrics.clone()) .into_shared(self.max_waiters, handle.clone()); + // Placeholder for our local listening SocketAddr. + let mut local_addr: SocketAddr = "127.0.0.1:0".parse().expect("unable to parse addr"); + // TODO scope/tag stats for servers. let mut running = Running::new(); @@ -271,7 +279,9 @@ impl ProxyServer { ServerConfig::Tcp { ref addr } => { let metrics = metrics.clone().labeled("srv".into(), format!("{}", addr)); let acceptor = PlainAcceptor::new(handle, metrics); - let f = acceptor.accept(addr).forward(bal).map(|_| {}); + let (bound_addr, forwarder) = acceptor.accept(addr); + local_addr = bound_addr; + let f = forwarder.forward(bal).map(|_| {}); running.register(f); } ServerConfig::Tls { ref addr, @@ -287,12 +297,14 @@ impl ProxyServer { let metrics = metrics.clone().labeled("srv".into(), format!("{}", addr)); let acceptor = SecureAcceptor::new(handle, tls, metrics); - let f = acceptor.accept(addr).forward(bal).map(|_| {}); + let (bound_addr, forwarder) = acceptor.accept(addr); + local_addr = bound_addr; + let f = forwarder.forward(bal).map(|_| {}); running.register(f); } } } - Ok(running) + Ok((local_addr, running)) } } diff --git a/src/lb/balancer.rs b/src/lb/balancer.rs index e5abc4a..d771cc0 100644 --- a/src/lb/balancer.rs +++ b/src/lb/balancer.rs @@ -44,6 +44,8 @@ pub struct Balancer { retired: VecDeque, stats: Stats, + + fail_fast_mode: bool, } impl Balancer @@ -64,6 +66,7 @@ impl Balancer ready: VecDeque::new(), retired: VecDeque::new(), stats: Stats::new(metrics), + fail_fast_mode: false, } } @@ -140,6 +143,9 @@ impl Balancer if let Async::Ready(addrs) = self.addrs.poll()? { trace!("addr update"); let addrs = addrs.expect("addr stream must be infinite"); + // If there are no addrs to route to, drop requests quickly. + // TODO: validate that fail_fast_mode is being disabled once addrs exist. + self.fail_fast_mode = addrs.is_empty(); let new = addr_weight_map(&addrs); self.update_endpoints(&new); } @@ -357,12 +363,18 @@ impl Sink for Balancer self.evict_retirees(&mut rec)?; self.promote_unready(&mut rec)?; self.discover_and_retire()?; - trace!("retrying {} unready={} ready={} retired={}", + trace!("retrying {} unready={} ready={} retired={} failfast={}", src_addr, self.unready.len(), self.ready.len(), - self.retired.len()); - self.dispatch(src, &mut rec) + self.retired.len(), + self.fail_fast_mode); + if self.fail_fast_mode { + trace!("in fail fast mode, dropping traffic"); + Err(io::ErrorKind::Other.into()) + } else { + self.dispatch(src, &mut rec) + } } }; diff --git a/src/lb/mod.rs b/src/lb/mod.rs index 2bc14e4..95a1421 100644 --- a/src/lb/mod.rs +++ b/src/lb/mod.rs @@ -45,9 +45,11 @@ impl WithAddr for Src { } } -/// Binds on `addr` and produces `U`-typed src connections. +/// Binds on `addr` and produces the bound `SocketAddr` and a` Stream` of `Src` connections. pub trait Acceptor { - fn accept(&self, addr: &SocketAddr) -> Box>; + fn accept(&self, + addr: &SocketAddr) + -> (SocketAddr, Box>); } /// Establishes a `D`-typed connection to `addr`. @@ -71,17 +73,20 @@ impl PlainAcceptor { } } impl Acceptor for PlainAcceptor { - fn accept(&self, addr: &SocketAddr) -> Box> { + fn accept(&self, + addr: &SocketAddr) + -> (SocketAddr, Box>) { let metrics = self.metrics.clone(); let connects_key = self.connects_key.clone(); - TcpListener::bind(addr, &self.handle) - .unwrap() - .incoming() + let listener = TcpListener::bind(addr, &self.handle).expect("could not bind to address"); + let local_addr = listener.local_addr().expect("could not get local_addr from listener"); + let worker = listener.incoming() .map(move |(s, a)| { metrics.recorder().incr(&connects_key, 1); Src(Socket::plain(a, s)) }) - .boxed() + .boxed(); + (local_addr, worker) } } @@ -119,9 +124,13 @@ impl SecureAcceptor { } } impl Acceptor for SecureAcceptor { - fn accept(&self, addr: &SocketAddr) -> Box> { + fn accept(&self, + addr: &SocketAddr) + -> (SocketAddr, Box>) { let tls = self.config.clone(); - let l = TcpListener::bind(addr, &self.handle).unwrap(); + let l = TcpListener::bind(addr, &self.handle) + .expect("could not bind listener for SecureAcceptor"); + let local_addr = l.local_addr().expect("could not get local_addr from listener"); let metrics = self.metrics.clone(); let connects_key = self.connects_key.clone(); @@ -143,7 +152,7 @@ impl Acceptor for SecureAcceptor { } } }); - Box::new(srcs) + (local_addr, Box::new(srcs)) } } diff --git a/src/lb/proxy_stream.rs b/src/lb/proxy_stream.rs index 46997df..b29741b 100644 --- a/src/lb/proxy_stream.rs +++ b/src/lb/proxy_stream.rs @@ -13,8 +13,8 @@ use tokio_io::AsyncWrite; /// A future representing reading all data from one side of a proxy connection and writing /// it to another. /// -/// In the typical case, nothing allocations are required. If the write side exhibits -/// backpressure, however, a buffer is allocated to +/// In the typical case, no allocations are required. If the write side exhibits +/// backpressure, however, a buffer is allocated. pub struct ProxyStream { reader: Rc>, writer: Rc>, diff --git a/src/namerd.rs b/src/namerd.rs index 67f0969..d6f0949 100644 --- a/src/namerd.rs +++ b/src/namerd.rs @@ -81,7 +81,7 @@ fn request(client: Rc>, url: Url, stats: Stats) -> AddrsFu } } Err(e) => { - error!("failed to read response: {}", e); + error!("failed to read response from remote namerd: {}", e); future::ok(None).boxed() } }) @@ -134,9 +134,10 @@ fn parse_chunks(chunks: &[Chunk]) -> Option> { let result: json::Result = json::from_reader(r); match result { Ok(ref nrsp) if nrsp.kind == "bound" => Some(to_weighted_addrs(&nrsp.addrs)), + Ok(ref nrsp) if nrsp.kind == "neg" => Some(vec![]), Ok(_) => Some(vec![]), Err(e) => { - info!("error parsing response: {}", e); + error!("error parsing response: {}", e); None } } @@ -157,7 +158,9 @@ fn to_weighted_addrs(namerd_addrs: &[NamerdAddr]) -> Vec<::WeightedAddr> { struct NamerdResponse { #[serde(rename = "type")] kind: String, + #[serde(default)] addrs: Vec, + #[serde(default)] meta: HashMap, } diff --git a/tests/lib.rs b/tests/lib.rs new file mode 100644 index 0000000..e515942 --- /dev/null +++ b/tests/lib.rs @@ -0,0 +1,13 @@ +#[cfg(tests)] +mod tests; +extern crate log; + +extern crate env_logger; +extern crate futures; +extern crate hyper; +extern crate tokio_core; +extern crate tokio_io; +extern crate linkerd_tcp; + +mod mocks; +pub use mocks::MockNamerd; diff --git a/tests/mocks/mock_namerd.rs b/tests/mocks/mock_namerd.rs new file mode 100644 index 0000000..306c245 --- /dev/null +++ b/tests/mocks/mock_namerd.rs @@ -0,0 +1,38 @@ +extern crate futures; +extern crate hyper; + +use futures::{Future, future}; +use hyper::{Get, StatusCode}; +use hyper::header::{ContentLength, ContentType}; +use hyper::server::{Service, Request, Response}; +use std::boxed::Box; + +pub struct MockNamerd {} + +impl Default for MockNamerd { + fn default() -> MockNamerd { + MockNamerd {} + } +} + +impl Service for MockNamerd { + type Request = Request; + type Response = Response; + type Error = hyper::Error; + type Future = Box>; + fn call(&self, req: Request) -> Self::Future { + match (req.method(), req.path()) { + (&Get, "/api/1/resolve/default") => { + // TODO: Because there are no paths in this mock namerd, we always return type: neg + let body = "{\"type\":\"neg\"}".to_owned(); + let rsp = Response::new() + .with_status(StatusCode::Ok) + .with_header(ContentType::json()) + .with_header(ContentLength(body.len() as u64)) + .with_body(body); + future::ok(rsp).boxed() + } + _ => future::ok(Response::new().with_status(StatusCode::NotFound)).boxed(), + } + } +} diff --git a/tests/mocks/mock_webserver.rs b/tests/mocks/mock_webserver.rs new file mode 100644 index 0000000..3aa8a18 --- /dev/null +++ b/tests/mocks/mock_webserver.rs @@ -0,0 +1,38 @@ +extern crate futures; +extern crate hyper; + +use futures::{Future, future}; +use hyper::{Get, StatusCode}; +use hyper::header::{ContentLength, ContentType}; +use hyper::server::{Service, Request, Response}; +use std::boxed::Box; + +pub struct MockWebServer {} + +impl Default for MockWebServer { + fn default() -> MockWebServer { + MockWebServer {} + } +} + +impl Service for MockWebServer { + type Request = Request; + type Response = Response; + type Error = hyper::Error; + type Future = Box>; + fn call(&self, req: Request) -> Self::Future { + match (req.method(), req.path()) { + (&Get, "/") => { + println!("mock webserver received request"); + let body = "{\"hello\":\"world\"}".to_owned(); + let rsp = Response::new() + .with_status(StatusCode::Ok) + .with_header(ContentType::json()) + .with_header(ContentLength(body.len() as u64)) + .with_body(body); + future::ok(rsp).boxed() + } + _ => future::ok(Response::new().with_status(StatusCode::NotFound)).boxed(), + } + } +} diff --git a/tests/mocks/mod.rs b/tests/mocks/mod.rs new file mode 100644 index 0000000..e2069ce --- /dev/null +++ b/tests/mocks/mod.rs @@ -0,0 +1,5 @@ +mod mock_namerd; +mod mock_webserver; + +pub use self::mock_namerd::*; +pub use self::mock_webserver::*; diff --git a/tests/test_bad_requests.rs b/tests/test_bad_requests.rs new file mode 100644 index 0000000..6b16119 --- /dev/null +++ b/tests/test_bad_requests.rs @@ -0,0 +1,131 @@ +extern crate env_logger; +extern crate futures; +extern crate hyper; +extern crate tokio_core; +extern crate tokio_io; +extern crate linkerd_tcp; +extern crate tacho; + +mod mocks; + +use futures::Future; +use futures::stream::Stream; +use futures::sync::{mpsc, oneshot}; +use hyper::server::Http; +use linkerd_tcp::app::{Loader, Namerd, Proxy, ProxyServer}; +use linkerd_tcp::app::config::NamerdConfig; +use linkerd_tcp::app::config::ServerConfig::Tcp; +use mocks::MockNamerd; +use std::cell::RefCell; +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::rc::Rc; +use std::thread; +use tacho::Tacho; +use tokio_core::net::TcpListener; +use tokio_core::reactor::Core; + +#[test] +/// If namerd has no resolvable path then requests should fail fast. +fn empty_path() { + drop(env_logger::init()); + + let Tacho { metrics, aggregator, report } = Tacho::default(); + drop(aggregator); + drop(report); + + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let mut namerd_listener_addr: SocketAddr = "127.0.0.1:0" + .parse() + .expect("could not parse namerd_listener_addr"); + + let namerd_listener = TcpListener::bind(&namerd_listener_addr, &handle.clone()) + .expect(format!("unable to bind mock namerd on address {:?}", + namerd_listener_addr) + .as_str()); + namerd_listener_addr = namerd_listener.local_addr() + .expect("could not get bound addr for namerd_listener"); + + let mock_namerd_httpd = Http::new(); + let handle2 = handle.clone(); + let namerd_srv = namerd_listener.incoming() + .map(move |(socket, addr)| { + let server: MockNamerd = Default::default(); + mock_namerd_httpd.bind_connection(&handle2, socket, addr, server); + namerd_listener_addr = addr; + Ok(()) as Result<(), ()> + }) + .collect(); + + core.handle().spawn(namerd_srv.map(|_| {}) + .map_err(|_| {})); + + let namerd_config = NamerdConfig { + url: format!("http://{}", namerd_listener_addr), + path: "/svc/default".to_owned(), + namespace: None, + interval_secs: Some(5 as u64), + }; + + // Channel for connecting Namerd updates to the Proxy + let (addrs_tx, addrs_rx) = mpsc::channel(1); + + let namerd = Namerd { + config: namerd_config, + sender: addrs_tx, + metrics: metrics.clone(), + }; + + let handle2 = handle.clone(); + let (_, namerd) = namerd.load(handle2).expect("couldn't load() namerd."); + + let proxy_listener_addr: SocketAddr = "127.0.0.1:0" + .parse() + .unwrap(); + + let buffer = Rc::new(RefCell::new(vec![0;1024])); + + let proxy = Proxy { + client: None, + server: ProxyServer { + label: "proxy_server".to_owned(), + servers: vec![Tcp { addr: proxy_listener_addr }], + addrs: Box::new(addrs_rx.fuse()), + buf: buffer, + max_waiters: 1 as usize, + metrics: metrics, + }, + }; + + let handle2 = handle.clone(); + let (proxy_local_addr, proxy) = proxy.load(handle2).expect("couldn't load() proxy."); + + core.handle().spawn(proxy.map(|_| {}) + .map_err(|_| {})); + + core.handle().spawn(namerd.map(|_| {}) + .map_err(|_| {})); + + let (c, p) = oneshot::channel::<()>(); + + let _ = thread::spawn(move || { + let mut client = TcpStream::connect(&proxy_local_addr) + .expect(format!("unable to connect to proxy on {:?}", proxy_listener_addr).as_str()); + client.set_nodelay(true).expect("unable to set NODELAY"); + let request_with_no_path = "GET / HTTP/1.0\r\n\r\n".to_owned(); + let request_written = client.write(request_with_no_path.as_bytes()); + assert_eq!(request_written.expect("bytes not written"), + request_with_no_path.as_str().len()); + let mut reader_buf = vec![0 as u8; 16 * 1024]; + let response_read = client.read_to_end(&mut reader_buf); + match response_read { + Ok(bytes) => assert_eq!(bytes, 0), // Either we read 0 bytes + Err(_) => assert!(true), // Or we got an error. + }; + c.send(()).expect("could not send"); + }); + + core.run(p).expect("our oneshot cannot be completed."); +} \ No newline at end of file