Skip to content

Commit 2d67477

Browse files
authored
Limit max idle connections. (fortanix#81)
Adds limit of 100 connections to ConnectionPool. This is implemented with a VecDeque acting as an LRU. When the pool becomes full, the oldest stream is popped off the back from the VecDeque and also removed from the map of PoolKeys to Streams. Fixes fortanix#77
1 parent a6e99c8 commit 2d67477

File tree

1 file changed

+110
-4
lines changed

1 file changed

+110
-4
lines changed

src/pool.rs

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::HashMap;
1+
use std::collections::{HashMap, VecDeque};
22
use std::io::{Read, Result as IoResult};
33

44
use crate::stream::Stream;
@@ -7,14 +7,24 @@ use crate::unit::Unit;
77
use url::Url;
88

99
pub const DEFAULT_HOST: &str = "localhost";
10+
const MAX_IDLE_CONNECTIONS: usize = 100;
1011

1112
/// Holder of recycled connections.
1213
///
14+
/// Invariant: The length of recycle and lru are the same.
15+
/// Invariant: Each PoolKey exists as a key in recycle, and vice versa.
16+
/// Invariant: Each PoolKey exists in recycle at most once and lru at most once.
17+
///
1318
/// *Internal API*
1419
#[derive(Default, Debug)]
1520
pub(crate) struct ConnectionPool {
1621
// the actual pooled connection. however only one per hostname:port.
1722
recycle: HashMap<PoolKey, Stream>,
23+
// This is used to keep track of which streams to expire when the
24+
// pool reaches MAX_IDLE_CONNECTIONS. The corresponding PoolKeys for
25+
// recently used Streams are added to the back of the queue;
26+
// old streams are removed from the front.
27+
lru: VecDeque<PoolKey>,
1828
}
1929

2030
impl ConnectionPool {
@@ -26,7 +36,44 @@ impl ConnectionPool {
2636

2737
/// How the unit::connect tries to get a pooled connection.
2838
pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> {
29-
self.recycle.remove(&PoolKey::new(url))
39+
let key = PoolKey::new(url);
40+
self.remove(&key)
41+
}
42+
43+
fn remove(&mut self, key: &PoolKey) -> Option<Stream> {
44+
if !self.recycle.contains_key(&key) {
45+
return None;
46+
}
47+
let index = self.lru.iter().position(|k| k == key);
48+
assert!(
49+
index.is_some(),
50+
"invariant failed: key existed in recycle but not lru"
51+
);
52+
self.lru.remove(index.unwrap());
53+
self.recycle.remove(&key)
54+
}
55+
56+
fn add(&mut self, key: PoolKey, stream: Stream) {
57+
// If an entry with the same key already exists, remove it.
58+
// The more recently used stream is likely to live longer.
59+
self.remove(&key);
60+
if self.recycle.len() + 1 > MAX_IDLE_CONNECTIONS {
61+
self.remove_oldest();
62+
}
63+
self.lru.push_back(key.clone());
64+
self.recycle.insert(key, stream);
65+
}
66+
67+
fn remove_oldest(&mut self) {
68+
if let Some(key) = self.lru.pop_front() {
69+
let removed = self.recycle.remove(&key);
70+
assert!(
71+
removed.is_some(),
72+
"invariant failed: key existed in lru but not in recycle"
73+
);
74+
} else {
75+
panic!("tried to remove oldest but no entries found!");
76+
}
3077
}
3178

3279
#[cfg(test)]
@@ -35,13 +82,26 @@ impl ConnectionPool {
3582
}
3683
}
3784

38-
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
85+
#[derive(PartialEq, Clone, Eq, Hash)]
3986
struct PoolKey {
4087
scheme: String,
4188
hostname: String,
4289
port: Option<u16>,
4390
}
4491

92+
use std::fmt;
93+
94+
impl fmt::Debug for PoolKey {
95+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96+
f.write_fmt(format_args!(
97+
"{}|{}|{}",
98+
self.scheme,
99+
self.hostname,
100+
self.port.unwrap_or(0)
101+
))
102+
}
103+
}
104+
45105
impl PoolKey {
46106
fn new(url: &Url) -> Self {
47107
let port = url.port_or_known_default();
@@ -59,6 +119,52 @@ fn poolkey_new() {
59119
PoolKey::new(&Url::parse("zzz:///example.com").unwrap());
60120
}
61121

122+
#[test]
123+
fn pool_size_limit() {
124+
assert_eq!(MAX_IDLE_CONNECTIONS, 100);
125+
let mut pool = ConnectionPool::new();
126+
let hostnames = (0..200).map(|i| format!("{}.example", i));
127+
let poolkeys = hostnames.map(|hostname| PoolKey {
128+
scheme: "https".to_string(),
129+
hostname,
130+
port: Some(999),
131+
});
132+
for key in poolkeys.clone() {
133+
pool.add(key, Stream::Cursor(std::io::Cursor::new(vec![])));
134+
}
135+
assert_eq!(pool.len(), 100);
136+
137+
for key in poolkeys.skip(100) {
138+
let result = pool.remove(&key);
139+
assert!(result.is_some(), "expected key was not in pool");
140+
}
141+
}
142+
143+
#[test]
144+
fn pool_duplicates_limit() {
145+
// Test inserting duplicates into the pool, and subsequently
146+
// filling and draining it. The duplicates should evict earlier
147+
// entries with the same key.
148+
assert_eq!(MAX_IDLE_CONNECTIONS, 100);
149+
let mut pool = ConnectionPool::new();
150+
let hostnames = (0..100).map(|i| format!("{}.example", i));
151+
let poolkeys = hostnames.map(|hostname| PoolKey {
152+
scheme: "https".to_string(),
153+
hostname,
154+
port: Some(999),
155+
});
156+
for key in poolkeys.clone() {
157+
pool.add(key.clone(), Stream::Cursor(std::io::Cursor::new(vec![])));
158+
pool.add(key, Stream::Cursor(std::io::Cursor::new(vec![])));
159+
}
160+
assert_eq!(pool.len(), 100);
161+
162+
for key in poolkeys {
163+
let result = pool.remove(&key);
164+
assert!(result.is_some(), "expected key was not in pool");
165+
}
166+
}
167+
62168
/// Read wrapper that returns the stream to the pool once the
63169
/// read is exhausted (reached a 0).
64170
///
@@ -91,7 +197,7 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
91197
}
92198
// insert back into pool
93199
let key = PoolKey::new(&unit.url);
94-
agent.pool().recycle.insert(key, stream);
200+
agent.pool().add(key, stream);
95201
}
96202
}
97203
}

0 commit comments

Comments
 (0)