@@ -5,25 +5,21 @@ extern crate tokio_core;
5
5
extern crate tokio_service;
6
6
7
7
use std:: borrow:: Cow ;
8
- use std:: cell:: RefCell ;
9
8
use std:: collections:: HashSet ;
10
9
use std:: env;
11
10
use std:: fs:: { self , File } ;
12
11
use std:: io;
13
12
use std:: io:: prelude:: * ;
14
13
use std:: net;
15
14
use std:: path:: PathBuf ;
16
- use std:: rc:: Rc ;
17
15
use std:: str;
18
16
use std:: sync:: { Arc , Mutex , Once } ;
19
17
use std:: thread;
20
18
21
19
use self :: futures:: sync:: oneshot;
22
- use self :: futures:: { Future , Stream } ;
23
- use self :: hyper:: server:: Http ;
20
+ use self :: futures:: { future, Future , Stream } ;
24
21
use self :: tokio_core:: net:: TcpListener ;
25
22
use self :: tokio_core:: reactor:: Core ;
26
- use self :: tokio_service:: Service ;
27
23
use curl:: easy:: { Easy , List } ;
28
24
use serde_json;
29
25
@@ -117,29 +113,20 @@ pub fn proxy() -> (String, Bomb) {
117
113
let handle = core. handle ( ) ;
118
114
let addr = t ! ( a. local_addr( ) ) ;
119
115
let listener = t ! ( TcpListener :: from_listener( a, & addr, & handle) ) ;
120
- let client = hyper:: Client :: configure ( )
121
- . connector ( hyper_tls:: HttpsConnector :: new ( 4 , & handle) . unwrap ( ) )
122
- . build ( & handle) ;
123
-
124
- let record = Rc :: new ( RefCell :: new ( record) ) ;
125
- let srv = listener. incoming ( ) . for_each ( |( socket, _addr) | {
126
- let conn = Http :: < hyper:: Chunk > :: new ( ) . serve_connection (
127
- socket,
128
- Proxy {
129
- sink : sink2. clone ( ) ,
130
- record : Rc :: clone ( & record) ,
131
- client : client. clone ( ) ,
132
- } ,
133
- ) ;
116
+ let client = hyper:: Client :: builder ( ) . build ( hyper_tls:: HttpsConnector :: new ( 4 ) . unwrap ( ) ) ;
117
+
118
+ let record = Arc :: new ( Mutex :: new ( record) ) ;
119
+ let srv = hyper:: Server :: builder ( listener. incoming ( ) . map ( |( l, _) | l) )
120
+ . serve ( Proxy {
121
+ sink : sink2,
122
+ record : Arc :: clone ( & record) ,
123
+ client,
124
+ } )
125
+ . map_err ( |e| eprintln ! ( "server connection error: {}" , e) ) ;
134
126
135
- let fut = conn. map ( |_| ( ) )
136
- . map_err ( |e| eprintln ! ( "server connection error: {}" , e) ) ;
137
- handle. spawn ( fut) ;
138
- Ok ( ( ) )
139
- } ) ;
140
127
drop ( core. run ( srv. select2 ( quitrx) ) ) ;
141
128
142
- let record = record. borrow ( ) ;
129
+ let record = record. lock ( ) . unwrap ( ) ;
143
130
match * record {
144
131
Record :: Capture ( ref data, ref path) => {
145
132
let data = t ! ( serde_json:: to_string( data) ) ;
@@ -159,38 +146,50 @@ pub fn proxy() -> (String, Bomb) {
159
146
)
160
147
}
161
148
149
+ #[ derive( Clone ) ]
162
150
struct Proxy {
163
151
sink : Sink ,
164
- record : Rc < RefCell < Record > > ,
152
+ record : Arc < Mutex < Record > > ,
165
153
client : Client ,
166
154
}
167
155
168
- impl Service for Proxy {
169
- type Request = hyper:: Request ;
170
- type Response = hyper:: Response ;
156
+ impl hyper :: service :: Service for Proxy {
157
+ type ReqBody = hyper:: Body ;
158
+ type ResBody = hyper:: Body ;
171
159
type Error = hyper:: Error ;
172
- type Future = Box < Future < Item = hyper:: Response , Error = hyper:: Error > > ;
173
-
174
- fn call ( & self , req : hyper:: Request ) -> Self :: Future {
175
- match * self . record . borrow_mut ( ) {
176
- Record :: Capture ( _, _) => {
177
- let record = Rc :: clone ( & self . record ) ;
178
- Box :: new (
179
- record_http ( req, & self . client ) . map ( move |( response, exchange) | {
180
- if let Record :: Capture ( ref mut d, _) = * record. borrow_mut ( ) {
181
- d. push ( exchange) ;
182
- }
183
- response
184
- } ) ,
185
- )
186
- }
160
+ type Future = Box < Future < Item = hyper:: Response < Self :: ResBody > , Error = Self :: Error > + Send > ;
161
+
162
+ fn call ( & mut self , req : hyper:: Request < Self :: ReqBody > ) -> Self :: Future {
163
+ let record2 = self . record . clone ( ) ;
164
+ match * self . record . lock ( ) . unwrap ( ) {
165
+ Record :: Capture ( _, _) => Box :: new ( record_http ( req, & self . client ) . map (
166
+ move |( response, exchange) | {
167
+ if let Record :: Capture ( ref mut d, _) = * record2. lock ( ) . unwrap ( ) {
168
+ d. push ( exchange) ;
169
+ }
170
+ response
171
+ } ,
172
+ ) ) ,
187
173
Record :: Replay ( ref mut exchanges) => {
188
174
replay_http ( req, exchanges. remove ( 0 ) , & mut & self . sink )
189
175
}
190
176
}
191
177
}
192
178
}
193
179
180
+ impl hyper:: service:: NewService for Proxy {
181
+ type ReqBody = hyper:: Body ;
182
+ type ResBody = hyper:: Body ;
183
+ type Error = hyper:: Error ;
184
+ type Service = Proxy ;
185
+ type Future = Box < Future < Item = Self :: Service , Error = Self :: InitError > + Send > ;
186
+ type InitError = hyper:: Error ;
187
+
188
+ fn new_service ( & self ) -> Self :: Future {
189
+ Box :: new ( future:: ok ( self . clone ( ) ) )
190
+ }
191
+ }
192
+
194
193
#[ derive( Serialize , Deserialize ) ]
195
194
struct Exchange {
196
195
request : Request ,
@@ -215,17 +214,20 @@ struct Response {
215
214
type Client = hyper:: Client < hyper_tls:: HttpsConnector < hyper:: client:: HttpConnector > > ;
216
215
217
216
fn record_http (
218
- req : hyper:: Request ,
217
+ req : hyper:: Request < hyper :: Body > ,
219
218
client : & Client ,
220
- ) -> Box < Future < Item = ( hyper:: Response , Exchange ) , Error = hyper:: Error > > {
221
- let ( method, uri, _version, headers, body) = req. deconstruct ( ) ;
219
+ ) -> Box < Future < Item = ( hyper:: Response < hyper:: Body > , Exchange ) , Error = hyper:: Error > + Send > {
220
+ let ( header_parts, body) = req. into_parts ( ) ;
221
+ let method = header_parts. method ;
222
+ let uri = header_parts. uri ;
223
+ let headers = header_parts. headers ;
222
224
223
225
let mut request = Request {
224
226
uri : uri. to_string ( ) ,
225
227
method : method. to_string ( ) ,
226
228
headers : headers
227
229
. iter ( )
228
- . map ( |h| ( h. name ( ) . to_string ( ) , h. value_string ( ) ) )
230
+ . map ( |h| ( h. 0 . as_str ( ) . to_string ( ) , h. 1 . to_str ( ) . unwrap ( ) . to_string ( ) ) )
229
231
. collect ( ) ,
230
232
body : Vec :: new ( ) ,
231
233
} ;
@@ -235,9 +237,13 @@ fn record_http(
235
237
let response = body. and_then ( move |body| {
236
238
request. body = body. to_vec ( ) ;
237
239
let uri = uri. to_string ( ) . replace ( "http://" , "https://" ) ;
238
- let mut req = hyper:: Request :: new ( method, uri. parse ( ) . unwrap ( ) ) ;
239
- * req. headers_mut ( ) = headers;
240
- req. set_body ( body) ;
240
+ let uri = uri. parse :: < hyper:: Uri > ( ) . unwrap ( ) ;
241
+ let mut req = hyper:: Request :: builder ( )
242
+ . method ( method. clone ( ) )
243
+ . uri ( uri)
244
+ . body ( body. into ( ) )
245
+ . unwrap ( ) ;
246
+ * req. headers_mut ( ) = headers. clone ( ) ;
241
247
client. request ( req) . map ( |r| ( r, request) )
242
248
} ) ;
243
249
@@ -248,16 +254,16 @@ fn record_http(
248
254
status : status. as_u16 ( ) ,
249
255
headers : headers
250
256
. iter ( )
251
- . map ( |h| ( h. name ( ) . to_string ( ) , h. value_string ( ) ) )
257
+ . map ( |h| ( h. 0 . as_str ( ) . to_string ( ) , h. 1 . to_str ( ) . unwrap ( ) . to_string ( ) ) )
252
258
. collect ( ) ,
253
259
body : Vec :: new ( ) ,
254
260
} ;
255
261
256
- hyper_response. body ( ) . concat2 ( ) . map ( move |body| {
262
+ hyper_response. into_body ( ) . concat2 ( ) . map ( move |body| {
257
263
response. body = body. to_vec ( ) ;
258
- let mut hyper_response = hyper:: Response :: new ( ) ;
259
- hyper_response. set_body ( body ) ;
260
- hyper_response. set_status ( status ) ;
264
+ let mut hyper_response = hyper:: Response :: builder ( ) ;
265
+ hyper_response. status ( status ) ;
266
+ let mut hyper_response = hyper_response . body ( body . into ( ) ) . unwrap ( ) ;
261
267
* hyper_response. headers_mut ( ) = headers;
262
268
(
263
269
hyper_response,
@@ -271,50 +277,53 @@ fn record_http(
271
277
}
272
278
273
279
fn replay_http (
274
- req : hyper:: Request ,
280
+ req : hyper:: Request < hyper :: Body > ,
275
281
mut exchange : Exchange ,
276
282
stdout : & mut Write ,
277
- ) -> Box < Future < Item = hyper:: Response , Error = hyper:: Error > > {
283
+ ) -> Box < Future < Item = hyper:: Response < hyper :: Body > , Error = hyper:: Error > + Send > {
278
284
assert_eq ! ( req. uri( ) . to_string( ) , exchange. request. uri) ;
279
285
assert_eq ! ( req. method( ) . to_string( ) , exchange. request. method) ;
280
286
t ! ( writeln!(
281
287
stdout,
282
288
"expecting: {:?}" ,
283
289
exchange. request. headers
284
290
) ) ;
285
- for header in req. headers ( ) . iter ( ) {
286
- let pair = ( header. name ( ) . to_string ( ) , header. value_string ( ) ) ;
291
+ for ( name, value) in req. headers ( ) . iter ( ) {
292
+ let pair = (
293
+ name. as_str ( ) . to_string ( ) ,
294
+ value. to_str ( ) . unwrap ( ) . to_string ( ) ,
295
+ ) ;
287
296
t ! ( writeln!( stdout, "received: {:?}" , pair) ) ;
288
- if header . name ( ) . starts_with ( "Date " ) {
297
+ if name. as_str ( ) . starts_with ( "date " ) {
289
298
continue ;
290
299
}
291
- if header . name ( ) . starts_with ( "Authorization " ) {
300
+ if name. as_str ( ) . starts_with ( "authorization " ) {
292
301
continue ;
293
302
}
294
303
if !exchange. request . headers . remove ( & pair) {
295
304
panic ! ( "found {:?} but didn't expect it" , pair) ;
296
305
}
297
306
}
298
307
for ( name, value) in exchange. request . headers . drain ( ) {
299
- if name. starts_with ( "Date " ) {
308
+ if name. starts_with ( "date " ) {
300
309
continue ;
301
310
}
302
- if name. starts_with ( "Authorization " ) {
311
+ if name. starts_with ( "authorization " ) {
303
312
continue ;
304
313
}
305
314
panic ! ( "didn't find header {:?}" , ( name, value) ) ;
306
315
}
307
316
let req_body = exchange. request . body ;
308
- let verify_body = req. body ( ) . concat2 ( ) . map ( move |body| {
317
+ let verify_body = req. into_body ( ) . concat2 ( ) . map ( move |body| {
309
318
assert_eq ! ( & body[ ..] , & req_body[ ..] ) ;
310
319
} ) ;
311
320
312
- let mut response = hyper:: Response :: new ( ) ;
313
- response. set_status ( hyper:: StatusCode :: try_from ( exchange. response . status ) . unwrap ( ) ) ;
321
+ let mut response = hyper:: Response :: builder ( ) ;
322
+ response. status ( hyper:: StatusCode :: from_u16 ( exchange. response . status ) . unwrap ( ) ) ;
314
323
for ( key, value) in exchange. response . headers {
315
- response. headers_mut ( ) . append_raw ( key , value) ;
324
+ response. header ( key . as_str ( ) , value. as_str ( ) ) ;
316
325
}
317
- response. set_body ( exchange. response . body ) ;
326
+ let response = response . body ( exchange. response . body . into ( ) ) . unwrap ( ) ;
318
327
319
328
Box :: new ( verify_body. map ( |( ) | response) )
320
329
}
0 commit comments