Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Commit ce21663

Browse files
committed
Ensure that remote streams are shutdown appropriately (#29)
Previously, when one end of the proxy connection stopped writing (so that read() returns 0), the shutdown was not propagated to the other connection. This is because the AsyncWrite::shutdown() api does nothing to actually shutdown the connection. We introduce Socket::tcp_shutdown to provide a consistent api for triggering a TcpStream::shutdown. ProxyStream uses both the AsyncWrite::shutdown and Socket::tcp_shutdown APIs to signal when a write is complete. This has been confirmed to fix the issue with manual testing.
1 parent 4699c4a commit ce21663

File tree

3 files changed

+107
-91
lines changed

3 files changed

+107
-91
lines changed

src/lb/duplex.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl Duplex {
3434
let dst = Rc::new(RefCell::new(dst));
3535
let tx_bytes_stat = tx_metrics.scope().stat("bytes".into());
3636
let rx_byte_stat = rx_metrics.scope().stat("bytes".into());
37-
let metrics = rx_metrics.clone(); // doesn't matter which one,
37+
let metrics = rx_metrics.clone(); // doesn't matter which one.
3838
let tx = ProxyStream::new(src.clone(), dst.clone(), buf.clone(), tx_metrics);
3939
let rx = ProxyStream::new(dst, src, buf, rx_metrics);
4040
Duplex {
@@ -70,6 +70,7 @@ impl Future for Duplex {
7070
self.tx_bytes += sz;
7171
}
7272
Async::NotReady => {
73+
trace!("dstward not ready");
7374
self.tx = Some(tx);
7475
}
7576
}
@@ -87,17 +88,20 @@ impl Future for Duplex {
8788
self.rx_bytes += sz;
8889
}
8990
Async::NotReady => {
91+
trace!("srcward not ready");
9092
self.rx = Some(rx);
9193
}
9294
}
9395
}
9496

9597
if self.tx.is_none() && self.rx.is_none() {
98+
trace!("complete");
9699
let mut rec = self.metrics.recorder();
97100
rec.add(&self.tx_bytes_stat, self.tx_bytes);
98101
rec.add(&self.rx_bytes_stat, self.rx_bytes);
99102
Ok(Async::Ready(()))
100103
} else {
104+
trace!("not ready");
101105
Ok(Async::NotReady)
102106
}
103107
}

src/lb/proxy_stream.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use futures::{Async, Future, Poll};
55
use lb::Socket;
66
use std::cell::RefCell;
77
use std::io::{self, Read, Write};
8+
use std::net::Shutdown;
89
use std::rc::Rc;
910
use tacho;
1011
use tokio_io::AsyncWrite;
@@ -69,14 +70,17 @@ impl Future for ProxyStream {
6970
/// all pending data before reading any more.
7071
fn poll(&mut self) -> Poll<u64, io::Error> {
7172
trace!("poll");
72-
if self.completed {
73-
return Ok(self.bytes_total.into());
74-
}
75-
7673
let mut rec = self.metrics.recorder();
7774
let mut writer = self.writer.borrow_mut();
7875
let mut reader = self.reader.borrow_mut();
7976
loop {
77+
if self.completed {
78+
try_nb!(writer.shutdown());
79+
writer.tcp_shutdown(Shutdown::Write)?;
80+
trace!("completed");
81+
return Ok(self.bytes_total.into());
82+
}
83+
8084
// Try to flush pending bytes to the writer.
8185
if let Some(mut pending) = self.pending.take() {
8286
let psz = pending.len();
@@ -106,16 +110,12 @@ impl Future for ProxyStream {
106110
let rsz = try_nb!(reader.read(&mut buf));
107111
if rsz == 0 {
108112
// Nothing left to read, return the total number of bytes transferred.
109-
trace!("completed: {}B", self.bytes_total);
113+
trace!("completing: {}B", self.bytes_total);
110114
self.completed = true;
111-
match writer.shutdown()? {
112-
Async::NotReady => {
113-
return Ok(Async::NotReady);
114-
}
115-
Async::Ready(_) => {
116-
return Ok(self.bytes_total.into());
117-
}
118-
}
115+
try_nb!(writer.shutdown());
116+
writer.tcp_shutdown(Shutdown::Write)?;
117+
trace!("completed: {}B", self.bytes_total);
118+
return Ok(self.bytes_total.into());
119119
}
120120
trace!("read {} bytes", rsz);
121121

0 commit comments

Comments
 (0)