Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Commit c23d894

Browse files
author
Steve Jenson
committed
Adds failfast mode for unroutable requests.
* Fixes Issue #26 * Adds a failfast mode for currently unroutable requests. * Adds an integration test failfast. * Adds a mock namerd. * Adds a mock static webserver. * Improves some comments.
1 parent bf0e2f6 commit c23d894

13 files changed

+318
-52
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+7
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,10 @@ tokio-io = "0.1"
3434
tokio-service = "0.1"
3535
tokio-timer = "0.1"
3636
url = "1.4"
37+
38+
[dev-dependencies]
39+
env_logger = { version = "0.3", default-features = false }
40+
futures = "0.1"
41+
# We use not-yet-released tokio integration on master:
42+
hyper = { git = "https://github.com/hyperium/hyper", rev = "5a3743c1" }
43+
tokio-core = "0.1"

circle.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ dependencies:
1717
test:
1818
override:
1919
#- ~/.cargo/bin/cargo clippy
20-
- ~/.cargo/bin/cargo test
20+
- RUST_BACKTRACE=full ~/.cargo/bin/cargo test

src/app/mod.rs

+45-34
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::cell::RefCell;
99
use std::collections::{VecDeque, HashMap};
1010
use std::fs::File;
1111
use std::io::{self, BufReader};
12-
use std::net;
12+
use std::net::{self, SocketAddr};
1313
use std::rc::Rc;
1414
use std::time::Duration;
1515
use tacho::{self, Tacho};
@@ -21,12 +21,12 @@ mod admin_http;
2121
mod sni;
2222
pub mod config;
2323

24-
use self::config::*;
25-
use self::sni::Sni;
2624
use WeightedAddr;
2725
use lb::{Balancer, Acceptor, Connector, PlainAcceptor, PlainConnector, SecureAcceptor,
2826
SecureConnector};
2927
use namerd;
28+
use self::config::*;
29+
use self::sni::Sni;
3030

3131
const DEFAULT_BUFFER_SIZE: usize = 8 * 1024;
3232
const DEFAULT_MAX_WAITERS: usize = 8;
@@ -95,7 +95,7 @@ pub fn configure(app: AppConfig) -> (Admin, Proxies) {
9595

9696
pub trait Loader: Sized {
9797
type Run: Future<Item = (), Error = io::Error>;
98-
fn load(self, handle: Handle) -> io::Result<Self::Run>;
98+
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Self::Run)>;
9999
}
100100
pub trait Runner: Sized {
101101
fn run(self) -> io::Result<()>;
@@ -104,7 +104,7 @@ pub trait Runner: Sized {
104104
impl<L: Loader> Runner for L {
105105
fn run(self) -> io::Result<()> {
106106
let mut core = Core::new()?;
107-
let fut = self.load(core.handle())?;
107+
let (_, fut) = self.load(core.handle())?;
108108
core.run(fut)
109109
}
110110
}
@@ -118,12 +118,12 @@ pub struct Admin {
118118
}
119119
impl Loader for Admin {
120120
type Run = Running;
121-
fn load(self, handle: Handle) -> io::Result<Running> {
121+
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Running)> {
122122
let mut running = Running::new();
123123
{
124124
let mut namerds = self.namerds;
125125
for _ in 0..namerds.len() {
126-
let f = namerds.pop_front().unwrap().load(handle.clone())?;
126+
let (_, f) = namerds.pop_front().unwrap().load(handle.clone())?;
127127
running.register(f.map_err(|_| io::ErrorKind::Other.into()));
128128
}
129129
}
@@ -163,19 +163,19 @@ impl Loader for Admin {
163163
});
164164
running.register(srv);
165165
}
166-
Ok(running)
166+
Ok((self.addr, running))
167167
}
168168
}
169169

170170

171-
struct Namerd {
172-
config: NamerdConfig,
173-
sender: mpsc::Sender<Vec<WeightedAddr>>,
174-
metrics: tacho::Metrics,
171+
pub struct Namerd {
172+
pub config: NamerdConfig,
173+
pub sender: mpsc::Sender<Vec<WeightedAddr>>,
174+
pub metrics: tacho::Metrics,
175175
}
176176
impl Loader for Namerd {
177177
type Run = Box<Future<Item = (), Error = io::Error>>;
178-
fn load(self, handle: Handle) -> io::Result<Self::Run> {
178+
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Self::Run)> {
179179
let path = self.config.path;
180180
let addr = self.config.addr;
181181
let interval_secs = self.config.interval_secs.unwrap_or(DEFAULT_NAMERD_SECONDS);
@@ -194,7 +194,7 @@ impl Loader for Namerd {
194194
let sink = self.sender.sink_map_err(|_| error!("sink error"));
195195
addrs.forward(sink).map_err(|_| io::ErrorKind::Other.into()).map(|_| {})
196196
};
197-
Ok(Box::new(driver))
197+
Ok((addr, Box::new(driver)))
198198
}
199199
}
200200

@@ -203,29 +203,32 @@ pub struct Proxies {
203203
}
204204
impl Loader for Proxies {
205205
type Run = Running;
206-
fn load(self, handle: Handle) -> io::Result<Running> {
206+
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Running)> {
207207
let mut running = Running::new();
208208
let mut proxies = self.proxies;
209+
let mut addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
209210
for _ in 0..proxies.len() {
210211
let p = proxies.pop_front().unwrap();
211-
let f = p.load(handle.clone())?;
212+
let (_addr, f) = p.load(handle.clone())?;
213+
addr = _addr;
212214
running.register(f);
213215
}
214-
Ok(running)
216+
Ok((addr, running))
215217
}
216218
}
217219

218-
struct Proxy {
219-
client: Option<ClientConfig>,
220-
server: ProxyServer,
220+
pub struct Proxy {
221+
pub client: Option<ClientConfig>,
222+
pub server: ProxyServer,
221223
}
222224
impl Loader for Proxy {
223225
type Run = Running;
224-
fn load(self, handle: Handle) -> io::Result<Running> {
226+
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Running)> {
225227
match self.client.and_then(|c| c.tls) {
226228
None => {
227229
let conn = PlainConnector::new(handle.clone());
228-
self.server.load(&handle, conn)
230+
let f = self.server.load(&handle, conn).expect("b");
231+
Ok(f)
229232
}
230233
Some(ref c) => {
231234
let mut tls = rustls::ClientConfig::new();
@@ -238,29 +241,33 @@ impl Loader for Proxy {
238241
}
239242
};
240243
let conn = SecureConnector::new(c.dns_name.clone(), tls, handle.clone());
241-
self.server.load(&handle, conn)
244+
let f = self.server.load(&handle, conn).expect("a");
245+
Ok(f)
242246
}
243247
}
244248
}
245249
}
246250

247-
struct ProxyServer {
248-
label: String,
249-
servers: Vec<ServerConfig>,
250-
addrs: Box<Stream<Item = Vec<WeightedAddr>, Error = ()>>,
251-
buf: Rc<RefCell<Vec<u8>>>,
252-
max_waiters: usize,
253-
metrics: tacho::Metrics,
251+
pub struct ProxyServer {
252+
pub label: String,
253+
pub servers: Vec<ServerConfig>,
254+
pub addrs: Box<Stream<Item = Vec<WeightedAddr>, Error = ()>>,
255+
pub buf: Rc<RefCell<Vec<u8>>>,
256+
pub max_waiters: usize,
257+
pub metrics: tacho::Metrics,
254258
}
255259
impl ProxyServer {
256-
fn load<C>(self, handle: &Handle, conn: C) -> io::Result<Running>
260+
fn load<C>(self, handle: &Handle, conn: C) -> io::Result<(SocketAddr, Running)>
257261
where C: Connector + 'static
258262
{
259263
let addrs = self.addrs.map_err(|_| io::ErrorKind::Other.into());
260264
let metrics = self.metrics.clone().labeled("proxy".into(), self.label.into());
261265
let bal = Balancer::new(addrs, conn, self.buf.clone(), metrics.clone())
262266
.into_shared(self.max_waiters, handle.clone());
263267

268+
// Placeholder for our local listening SocketAddr.
269+
let mut local_addr: SocketAddr = "127.0.0.1:0".parse().expect("unable to parse addr");
270+
264271
// TODO scope/tag stats for servers.
265272

266273
let mut running = Running::new();
@@ -271,7 +278,9 @@ impl ProxyServer {
271278
ServerConfig::Tcp { ref addr } => {
272279
let metrics = metrics.clone().labeled("srv".into(), format!("{}", addr));
273280
let acceptor = PlainAcceptor::new(handle, metrics);
274-
let f = acceptor.accept(addr).forward(bal).map(|_| {});
281+
let (bound_addr, forwarder) = acceptor.accept(addr);
282+
local_addr = bound_addr;
283+
let f = forwarder.forward(bal).map(|_| {});
275284
running.register(f);
276285
}
277286
ServerConfig::Tls { ref addr,
@@ -287,12 +296,14 @@ impl ProxyServer {
287296

288297
let metrics = metrics.clone().labeled("srv".into(), format!("{}", addr));
289298
let acceptor = SecureAcceptor::new(handle, tls, metrics);
290-
let f = acceptor.accept(addr).forward(bal).map(|_| {});
299+
let (bound_addr, forwarder) = acceptor.accept(addr);
300+
local_addr = bound_addr;
301+
let f = forwarder.forward(bal).map(|_| {});
291302
running.register(f);
292303
}
293304
}
294305
}
295-
Ok(running)
306+
Ok((local_addr, running))
296307
}
297308
}
298309

src/lb/balancer.rs

+15-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub struct Balancer<A, C> {
4444
retired: VecDeque<Endpoint>,
4545

4646
stats: Stats,
47+
48+
fail_fast_mode: bool,
4749
}
4850

4951
impl<A, C> Balancer<A, C>
@@ -64,6 +66,7 @@ impl<A, C> Balancer<A, C>
6466
ready: VecDeque::new(),
6567
retired: VecDeque::new(),
6668
stats: Stats::new(metrics),
69+
fail_fast_mode: false,
6770
}
6871
}
6972

@@ -140,6 +143,9 @@ impl<A, C> Balancer<A, C>
140143
if let Async::Ready(addrs) = self.addrs.poll()? {
141144
trace!("addr update");
142145
let addrs = addrs.expect("addr stream must be infinite");
146+
// If there are no addrs to route to, drop requests quickly.
147+
// TODO: validate that fail_fast_mode is being disabled once addrs exist.
148+
self.fail_fast_mode = addrs.is_empty();
143149
let new = addr_weight_map(&addrs);
144150
self.update_endpoints(&new);
145151
}
@@ -357,12 +363,18 @@ impl<A, C> Sink for Balancer<A, C>
357363
self.evict_retirees(&mut rec)?;
358364
self.promote_unready(&mut rec)?;
359365
self.discover_and_retire()?;
360-
trace!("retrying {} unready={} ready={} retired={}",
366+
trace!("retrying {} unready={} ready={} retired={} failfast={}",
361367
src_addr,
362368
self.unready.len(),
363369
self.ready.len(),
364-
self.retired.len());
365-
self.dispatch(src, &mut rec)
370+
self.retired.len(),
371+
self.fail_fast_mode);
372+
if self.fail_fast_mode {
373+
trace!("in fail fast mode, dropping traffic");
374+
Err(io::ErrorKind::Other.into())
375+
} else {
376+
self.dispatch(src, &mut rec)
377+
}
366378
}
367379
};
368380

src/lb/mod.rs

+19-10
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@ impl WithAddr for Src {
4545
}
4646
}
4747

48-
/// Binds on `addr` and produces `U`-typed src connections.
48+
/// Binds on `addr` and produces the bound `SocketAddr` and a` Stream` of `Src` connections.
4949
pub trait Acceptor {
50-
fn accept(&self, addr: &SocketAddr) -> Box<Stream<Item = Src, Error = io::Error>>;
50+
fn accept(&self,
51+
addr: &SocketAddr)
52+
-> (SocketAddr, Box<Stream<Item = Src, Error = io::Error>>);
5153
}
5254

5355
/// Establishes a `D`-typed connection to `addr`.
@@ -71,17 +73,20 @@ impl PlainAcceptor {
7173
}
7274
}
7375
impl Acceptor for PlainAcceptor {
74-
fn accept(&self, addr: &SocketAddr) -> Box<Stream<Item = Src, Error = io::Error>> {
76+
fn accept(&self,
77+
addr: &SocketAddr)
78+
-> (SocketAddr, Box<Stream<Item = Src, Error = io::Error>>) {
7579
let metrics = self.metrics.clone();
7680
let connects_key = self.connects_key.clone();
77-
TcpListener::bind(addr, &self.handle)
78-
.unwrap()
79-
.incoming()
81+
let listener = TcpListener::bind(addr, &self.handle).expect("could not bind to address");
82+
let local_addr = listener.local_addr().expect("could not get local_addr from listener");
83+
let worker = listener.incoming()
8084
.map(move |(s, a)| {
8185
metrics.recorder().incr(&connects_key, 1);
8286
Src(Socket::plain(a, s))
8387
})
84-
.boxed()
88+
.boxed();
89+
(local_addr, worker)
8590
}
8691
}
8792

@@ -119,9 +124,13 @@ impl SecureAcceptor {
119124
}
120125
}
121126
impl Acceptor for SecureAcceptor {
122-
fn accept(&self, addr: &SocketAddr) -> Box<Stream<Item = Src, Error = io::Error>> {
127+
fn accept(&self,
128+
addr: &SocketAddr)
129+
-> (SocketAddr, Box<Stream<Item = Src, Error = io::Error>>) {
123130
let tls = self.config.clone();
124-
let l = TcpListener::bind(addr, &self.handle).unwrap();
131+
let l = TcpListener::bind(addr, &self.handle)
132+
.expect("could not bind listener for SecureAcceptor");
133+
let local_addr = l.local_addr().expect("could not get local_addr from listener");
125134

126135
let metrics = self.metrics.clone();
127136
let connects_key = self.connects_key.clone();
@@ -143,7 +152,7 @@ impl Acceptor for SecureAcceptor {
143152
}
144153
}
145154
});
146-
Box::new(srcs)
155+
(local_addr, Box::new(srcs))
147156
}
148157
}
149158

src/lb/proxy_stream.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ use tokio_io::AsyncWrite;
1212
/// A future representing reading all data from one side of a proxy connection and writing
1313
/// it to another.
1414
///
15-
/// In the typical case, nothing allocations are required. If the write side exhibits
16-
/// backpressure, however, a buffer is allocated to
15+
/// In the typical case, no allocations are required. If the write side exhibits
16+
/// backpressure, however, a buffer is allocated.
1717
pub struct ProxyStream {
1818
reader: Rc<RefCell<Socket>>,
1919
writer: Rc<RefCell<Socket>>,

src/namerd.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ fn request<C: Connect>(client: Rc<Client<C>>, url: Url, stats: Stats) -> AddrsFu
8383
}
8484
}
8585
Err(e) => {
86-
error!("failed to read response: {}", e);
86+
error!("failed to read response from remote namerd: {}", e);
8787
future::ok(None).boxed()
8888
}
8989
})
@@ -136,9 +136,10 @@ fn parse_chunks(chunks: &[Chunk]) -> Option<Vec<::WeightedAddr>> {
136136
let result: json::Result<NamerdResponse> = json::from_reader(r);
137137
match result {
138138
Ok(ref nrsp) if nrsp.kind == "bound" => Some(to_weighted_addrs(&nrsp.addrs)),
139+
Ok(ref nrsp) if nrsp.kind == "neg" => Some(vec![]),
139140
Ok(_) => Some(vec![]),
140141
Err(e) => {
141-
info!("error parsing response: {}", e);
142+
error!("error parsing response: {}", e);
142143
None
143144
}
144145
}
@@ -159,7 +160,9 @@ fn to_weighted_addrs(namerd_addrs: &[NamerdAddr]) -> Vec<::WeightedAddr> {
159160
struct NamerdResponse {
160161
#[serde(rename = "type")]
161162
kind: String,
163+
#[serde(default)]
162164
addrs: Vec<NamerdAddr>,
165+
#[serde(default)]
163166
meta: HashMap<String, String>,
164167
}
165168

tests/lib.rs

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#[cfg(tests)]
2+
mod tests;
3+
extern crate log;
4+
5+
extern crate env_logger;
6+
extern crate futures;
7+
extern crate hyper;
8+
extern crate tokio_core;
9+
extern crate tokio_io;
10+
extern crate linkerd_tcp;
11+
12+
mod mocks;
13+
pub use mocks::MockNamerd;

0 commit comments

Comments
 (0)