88use React \EventLoop \LoopInterface ;
99use React \Http \Message \Response ;
1010use React \Http \Message \ServerRequest ;
11+ use React \Http \Middleware \InactiveConnectionTimeoutMiddleware ;
1112use React \Promise ;
1213use React \Promise \CancellablePromiseInterface ;
1314use React \Promise \PromiseInterface ;
@@ -85,6 +86,7 @@ final class StreamingServer extends EventEmitter
8586 private $ callback ;
8687 private $ parser ;
8788 private $ loop ;
89+ private $ idleConnectionTimeout ;
8890
8991 /**
9092 * Creates an HTTP server that invokes the given callback for each incoming HTTP request
@@ -96,15 +98,17 @@ final class StreamingServer extends EventEmitter
9698 *
9799 * @param LoopInterface $loop
98100 * @param callable $requestHandler
101+ * @param float $idleConnectTimeout
99102 * @see self::listen()
100103 */
101- public function __construct (LoopInterface $ loop , $ requestHandler )
104+ public function __construct (LoopInterface $ loop , $ requestHandler, $ idleConnectTimeout = InactiveConnectionTimeoutMiddleware:: DEFAULT_TIMEOUT )
102105 {
103106 if (!\is_callable ($ requestHandler )) {
104107 throw new \InvalidArgumentException ('Invalid request handler given ' );
105108 }
106109
107110 $ this ->loop = $ loop ;
111+ $ this ->idleConnectionTimeout = $ idleConnectTimeout ;
108112
109113 $ this ->callback = $ requestHandler ;
110114 $ this ->parser = new RequestHeaderParser ();
@@ -134,7 +138,27 @@ public function __construct(LoopInterface $loop, $requestHandler)
134138 */
135139 public function listen (ServerInterface $ socket )
136140 {
137- $ socket ->on ('connection ' , array ($ this ->parser , 'handle ' ));
141+ $ socket ->on ('connection ' , array ($ this , 'handle ' ));
142+ }
143+
144+ /** @internal */
145+ public function handle (ConnectionInterface $ conn )
146+ {
147+ $ timer = $ this ->loop ->addTimer ($ this ->idleConnectionTimeout , function () use ($ conn ) {
148+ $ conn ->close ();
149+ });
150+ $ loop = $ this ->loop ;
151+ $ conn ->once ('data ' , function () use ($ loop , $ timer ) {
152+ $ loop ->cancelTimer ($ timer );
153+ });
154+ $ conn ->on ('end ' , function () use ($ loop , $ timer ) {
155+ $ loop ->cancelTimer ($ timer );
156+ });
157+ $ conn ->on ('close ' , function () use ($ loop , $ timer ) {
158+ $ loop ->cancelTimer ($ timer );
159+ });
160+
161+ $ this ->parser ->handle ($ conn );
138162 }
139163
140164 /** @internal */
@@ -350,7 +374,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
350374
351375 // either wait for next request over persistent connection or end connection
352376 if ($ persist ) {
353- $ this ->parser -> handle ($ connection );
377+ $ this ->handle ($ connection );
354378 } else {
355379 $ connection ->end ();
356380 }
@@ -371,10 +395,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
371395 // write streaming body and then wait for next request over persistent connection
372396 if ($ persist ) {
373397 $ body ->pipe ($ connection , array ('end ' => false ));
374- $ parser = $ this -> parser ;
375- $ body ->on ('end ' , function () use ($ connection , $ parser , $ body ) {
398+ $ that = $ this ;
399+ $ body ->on ('end ' , function () use ($ connection , $ that , $ body ) {
376400 $ connection ->removeListener ('close ' , array ($ body , 'close ' ));
377- $ parser ->handle ($ connection );
401+ $ that ->handle ($ connection );
378402 });
379403 } else {
380404 $ body ->pipe ($ connection );
0 commit comments