@@ -5,25 +5,21 @@ extern crate tokio_core;
55extern crate tokio_service;
66
77use std:: borrow:: Cow ;
8- use std:: cell:: RefCell ;
98use std:: collections:: HashSet ;
109use std:: env;
1110use std:: fs:: { self , File } ;
1211use std:: io;
1312use std:: io:: prelude:: * ;
1413use std:: net;
1514use std:: path:: PathBuf ;
16- use std:: rc:: Rc ;
1715use std:: str;
1816use std:: sync:: { Arc , Mutex , Once } ;
1917use std:: thread;
2018
2119use self :: futures:: sync:: oneshot;
22- use self :: futures:: { Future , Stream } ;
23- use self :: hyper:: server:: Http ;
20+ use self :: futures:: { future, Future , Stream } ;
2421use self :: tokio_core:: net:: TcpListener ;
2522use self :: tokio_core:: reactor:: Core ;
26- use self :: tokio_service:: Service ;
2723use curl:: easy:: { Easy , List } ;
2824use serde_json;
2925
@@ -117,29 +113,20 @@ pub fn proxy() -> (String, Bomb) {
117113 let handle = core. handle ( ) ;
118114 let addr = t ! ( a. local_addr( ) ) ;
119115 let listener = t ! ( TcpListener :: from_listener( a, & addr, & handle) ) ;
120- let client = hyper:: Client :: configure ( )
121- . connector ( hyper_tls:: HttpsConnector :: new ( 4 , & handle) . unwrap ( ) )
122- . build ( & handle) ;
123-
124- let record = Rc :: new ( RefCell :: new ( record) ) ;
125- let srv = listener. incoming ( ) . for_each ( |( socket, _addr) | {
126- let conn = Http :: < hyper:: Chunk > :: new ( ) . serve_connection (
127- socket,
128- Proxy {
129- sink : sink2. clone ( ) ,
130- record : Rc :: clone ( & record) ,
131- client : client. clone ( ) ,
132- } ,
133- ) ;
116+ let client = hyper:: Client :: builder ( ) . build ( hyper_tls:: HttpsConnector :: new ( 4 ) . unwrap ( ) ) ;
117+
118+ let record = Arc :: new ( Mutex :: new ( record) ) ;
119+ let srv = hyper:: Server :: builder ( listener. incoming ( ) . map ( |( l, _) | l) )
120+ . serve ( Proxy {
121+ sink : sink2,
122+ record : Arc :: clone ( & record) ,
123+ client,
124+ } )
125+ . map_err ( |e| eprintln ! ( "server connection error: {}" , e) ) ;
134126
135- let fut = conn. map ( |_| ( ) )
136- . map_err ( |e| eprintln ! ( "server connection error: {}" , e) ) ;
137- handle. spawn ( fut) ;
138- Ok ( ( ) )
139- } ) ;
140127 drop ( core. run ( srv. select2 ( quitrx) ) ) ;
141128
142- let record = record. borrow ( ) ;
129+ let record = record. lock ( ) . unwrap ( ) ;
143130 match * record {
144131 Record :: Capture ( ref data, ref path) => {
145132 let data = t ! ( serde_json:: to_string( data) ) ;
@@ -159,38 +146,50 @@ pub fn proxy() -> (String, Bomb) {
159146 )
160147}
161148
149+ #[ derive( Clone ) ]
162150struct Proxy {
163151 sink : Sink ,
164- record : Rc < RefCell < Record > > ,
152+ record : Arc < Mutex < Record > > ,
165153 client : Client ,
166154}
167155
168- impl Service for Proxy {
169- type Request = hyper:: Request ;
170- type Response = hyper:: Response ;
156+ impl hyper :: service :: Service for Proxy {
157+ type ReqBody = hyper:: Body ;
158+ type ResBody = hyper:: Body ;
171159 type Error = hyper:: Error ;
172- type Future = Box < Future < Item = hyper:: Response , Error = hyper:: Error > > ;
173-
174- fn call ( & self , req : hyper:: Request ) -> Self :: Future {
175- match * self . record . borrow_mut ( ) {
176- Record :: Capture ( _, _) => {
177- let record = Rc :: clone ( & self . record ) ;
178- Box :: new (
179- record_http ( req, & self . client ) . map ( move |( response, exchange) | {
180- if let Record :: Capture ( ref mut d, _) = * record. borrow_mut ( ) {
181- d. push ( exchange) ;
182- }
183- response
184- } ) ,
185- )
186- }
160+ type Future = Box < Future < Item = hyper:: Response < Self :: ResBody > , Error = Self :: Error > + Send > ;
161+
162+ fn call ( & mut self , req : hyper:: Request < Self :: ReqBody > ) -> Self :: Future {
163+ let record2 = self . record . clone ( ) ;
164+ match * self . record . lock ( ) . unwrap ( ) {
165+ Record :: Capture ( _, _) => Box :: new ( record_http ( req, & self . client ) . map (
166+ move |( response, exchange) | {
167+ if let Record :: Capture ( ref mut d, _) = * record2. lock ( ) . unwrap ( ) {
168+ d. push ( exchange) ;
169+ }
170+ response
171+ } ,
172+ ) ) ,
187173 Record :: Replay ( ref mut exchanges) => {
188174 replay_http ( req, exchanges. remove ( 0 ) , & mut & self . sink )
189175 }
190176 }
191177 }
192178}
193179
180+ impl hyper:: service:: NewService for Proxy {
181+ type ReqBody = hyper:: Body ;
182+ type ResBody = hyper:: Body ;
183+ type Error = hyper:: Error ;
184+ type Service = Proxy ;
185+ type Future = Box < Future < Item = Self :: Service , Error = Self :: InitError > + Send > ;
186+ type InitError = hyper:: Error ;
187+
188+ fn new_service ( & self ) -> Self :: Future {
189+ Box :: new ( future:: ok ( self . clone ( ) ) )
190+ }
191+ }
192+
194193#[ derive( Serialize , Deserialize ) ]
195194struct Exchange {
196195 request : Request ,
@@ -215,17 +214,20 @@ struct Response {
215214type Client = hyper:: Client < hyper_tls:: HttpsConnector < hyper:: client:: HttpConnector > > ;
216215
217216fn record_http (
218- req : hyper:: Request ,
217+ req : hyper:: Request < hyper :: Body > ,
219218 client : & Client ,
220- ) -> Box < Future < Item = ( hyper:: Response , Exchange ) , Error = hyper:: Error > > {
221- let ( method, uri, _version, headers, body) = req. deconstruct ( ) ;
219+ ) -> Box < Future < Item = ( hyper:: Response < hyper:: Body > , Exchange ) , Error = hyper:: Error > + Send > {
220+ let ( header_parts, body) = req. into_parts ( ) ;
221+ let method = header_parts. method ;
222+ let uri = header_parts. uri ;
223+ let headers = header_parts. headers ;
222224
223225 let mut request = Request {
224226 uri : uri. to_string ( ) ,
225227 method : method. to_string ( ) ,
226228 headers : headers
227229 . iter ( )
228- . map ( |h| ( h. name ( ) . to_string ( ) , h. value_string ( ) ) )
230+ . map ( |h| ( h. 0 . as_str ( ) . to_string ( ) , h. 1 . to_str ( ) . unwrap ( ) . to_string ( ) ) )
229231 . collect ( ) ,
230232 body : Vec :: new ( ) ,
231233 } ;
@@ -235,9 +237,13 @@ fn record_http(
235237 let response = body. and_then ( move |body| {
236238 request. body = body. to_vec ( ) ;
237239 let uri = uri. to_string ( ) . replace ( "http://" , "https://" ) ;
238- let mut req = hyper:: Request :: new ( method, uri. parse ( ) . unwrap ( ) ) ;
239- * req. headers_mut ( ) = headers;
240- req. set_body ( body) ;
240+ let uri = uri. parse :: < hyper:: Uri > ( ) . unwrap ( ) ;
241+ let mut req = hyper:: Request :: builder ( )
242+ . method ( method. clone ( ) )
243+ . uri ( uri)
244+ . body ( body. into ( ) )
245+ . unwrap ( ) ;
246+ * req. headers_mut ( ) = headers. clone ( ) ;
241247 client. request ( req) . map ( |r| ( r, request) )
242248 } ) ;
243249
@@ -248,16 +254,16 @@ fn record_http(
248254 status : status. as_u16 ( ) ,
249255 headers : headers
250256 . iter ( )
251- . map ( |h| ( h. name ( ) . to_string ( ) , h. value_string ( ) ) )
257+ . map ( |h| ( h. 0 . as_str ( ) . to_string ( ) , h. 1 . to_str ( ) . unwrap ( ) . to_string ( ) ) )
252258 . collect ( ) ,
253259 body : Vec :: new ( ) ,
254260 } ;
255261
256- hyper_response. body ( ) . concat2 ( ) . map ( move |body| {
262+ hyper_response. into_body ( ) . concat2 ( ) . map ( move |body| {
257263 response. body = body. to_vec ( ) ;
258- let mut hyper_response = hyper:: Response :: new ( ) ;
259- hyper_response. set_body ( body ) ;
260- hyper_response. set_status ( status ) ;
264+ let mut hyper_response = hyper:: Response :: builder ( ) ;
265+ hyper_response. status ( status ) ;
266+ let mut hyper_response = hyper_response . body ( body . into ( ) ) . unwrap ( ) ;
261267 * hyper_response. headers_mut ( ) = headers;
262268 (
263269 hyper_response,
@@ -271,50 +277,53 @@ fn record_http(
271277}
272278
273279fn replay_http (
274- req : hyper:: Request ,
280+ req : hyper:: Request < hyper :: Body > ,
275281 mut exchange : Exchange ,
276282 stdout : & mut Write ,
277- ) -> Box < Future < Item = hyper:: Response , Error = hyper:: Error > > {
283+ ) -> Box < Future < Item = hyper:: Response < hyper :: Body > , Error = hyper:: Error > + Send > {
278284 assert_eq ! ( req. uri( ) . to_string( ) , exchange. request. uri) ;
279285 assert_eq ! ( req. method( ) . to_string( ) , exchange. request. method) ;
280286 t ! ( writeln!(
281287 stdout,
282288 "expecting: {:?}" ,
283289 exchange. request. headers
284290 ) ) ;
285- for header in req. headers ( ) . iter ( ) {
286- let pair = ( header. name ( ) . to_string ( ) , header. value_string ( ) ) ;
291+ for ( name, value) in req. headers ( ) . iter ( ) {
292+ let pair = (
293+ name. as_str ( ) . to_string ( ) ,
294+ value. to_str ( ) . unwrap ( ) . to_string ( ) ,
295+ ) ;
287296 t ! ( writeln!( stdout, "received: {:?}" , pair) ) ;
288- if header . name ( ) . starts_with ( "Date " ) {
297+ if name. as_str ( ) . starts_with ( "date " ) {
289298 continue ;
290299 }
291- if header . name ( ) . starts_with ( "Authorization " ) {
300+ if name. as_str ( ) . starts_with ( "authorization " ) {
292301 continue ;
293302 }
294303 if !exchange. request . headers . remove ( & pair) {
295304 panic ! ( "found {:?} but didn't expect it" , pair) ;
296305 }
297306 }
298307 for ( name, value) in exchange. request . headers . drain ( ) {
299- if name. starts_with ( "Date " ) {
308+ if name. starts_with ( "date " ) {
300309 continue ;
301310 }
302- if name. starts_with ( "Authorization " ) {
311+ if name. starts_with ( "authorization " ) {
303312 continue ;
304313 }
305314 panic ! ( "didn't find header {:?}" , ( name, value) ) ;
306315 }
307316 let req_body = exchange. request . body ;
308- let verify_body = req. body ( ) . concat2 ( ) . map ( move |body| {
317+ let verify_body = req. into_body ( ) . concat2 ( ) . map ( move |body| {
309318 assert_eq ! ( & body[ ..] , & req_body[ ..] ) ;
310319 } ) ;
311320
312- let mut response = hyper:: Response :: new ( ) ;
313- response. set_status ( hyper:: StatusCode :: try_from ( exchange. response . status ) . unwrap ( ) ) ;
321+ let mut response = hyper:: Response :: builder ( ) ;
322+ response. status ( hyper:: StatusCode :: from_u16 ( exchange. response . status ) . unwrap ( ) ) ;
314323 for ( key, value) in exchange. response . headers {
315- response. headers_mut ( ) . append_raw ( key , value) ;
324+ response. header ( key . as_str ( ) , value. as_str ( ) ) ;
316325 }
317- response. set_body ( exchange. response . body ) ;
326+ let response = response . body ( exchange. response . body . into ( ) ) . unwrap ( ) ;
318327
319328 Box :: new ( verify_body. map ( |( ) | response) )
320329}
0 commit comments