1
1
<?php
2
2
namespace Swango ;
3
3
use Swango \Environment ;
4
-
5
4
class HttpServer {
6
5
protected static $ worker , $ worker_id , $ terminal_server , $ http_request_counter , $ max_coroutine , $ is_stopping = false ;
7
6
public static function getWorkerId (): ?int {
@@ -21,17 +20,26 @@ public static function isStopping(): bool {
21
20
}
22
21
protected $ server , $ daemonize , $ callback ;
23
22
protected $ swoole_server_config = [
24
- 'reactor_num ' => 4 , // reactor thread num
25
- 'worker_num ' => 8 , // worker process num
23
+ 'reactor_num ' => 4 ,
24
+ // reactor thread num
25
+ 'worker_num ' => 8 ,
26
+ // worker process num
26
27
'task_worker_num ' => 8 ,
27
- 'task_ipc_mode ' => 1 , // 3为使用消息队列通信,争抢模式,无法使用定向投递
28
- 'task_max_request ' => 5000 , // task进程处理200个请求后自动退出,防止内存溢出
29
- 'backlog ' => 128 , // 最多同时有多少个等待accept的连接
30
- 'max_request ' => 0 , // worker永不退出
28
+ 'task_ipc_mode ' => 1 ,
29
+ // 3为使用消息队列通信,争抢模式,无法使用定向投递
30
+ 'task_max_request ' => 5000 ,
31
+ // task进程处理200个请求后自动退出,防止内存溢出
32
+ 'backlog ' => 128 ,
33
+ // 最多同时有多少个等待accept的连接
34
+ 'max_request ' => 0 ,
35
+ // worker永不退出
31
36
'reload_async ' => true ,
32
- 'http_parse_post ' => false , // 不自动解析POST包体
33
- 'http_compression ' => false , // 不自动压缩响应值
34
- 'max_wait_time ' => 30 // 重载后旧进程最大存活时间
37
+ 'http_parse_post ' => false ,
38
+ // 不自动解析POST包体
39
+ 'http_compression ' => false ,
40
+ // 不自动压缩响应值
41
+ 'max_wait_time ' => 30
42
+ // 重载后旧进程最大存活时间
35
43
];
36
44
public function __construct () {
37
45
$ this ->callback = [
@@ -88,32 +96,29 @@ protected function loadConfig(): void {
88
96
protected function createSwooleServer (): void {
89
97
$ daemon_config = Environment::getServiceConfig ();
90
98
$ this ->server = new \Swoole \Http \Server ($ daemon_config ->http_server_host , $ daemon_config ->http_server_port );
91
- self ::$ terminal_server = new HttpServer \TerminalServer (
92
- $ this ->server ->addListener ($ daemon_config ->terminal_server_host , $ daemon_config ->terminal_server_port ,
93
- SWOOLE_SOCK_TCP ));
99
+ self ::$ terminal_server = new HttpServer \TerminalServer ($ this ->server ->addListener ($ daemon_config ->terminal_server_host ,
100
+ $ daemon_config ->terminal_server_port , SWOOLE_SOCK_TCP ));
94
101
}
95
102
protected function bindCallBack (): void {
96
- foreach ($ this ->callback as $ event=> $ func )
103
+ foreach ($ this ->callback as $ event => $ func )
97
104
$ this ->server ->on ($ event , $ func );
98
105
}
99
106
protected function initBeforeStart (): void {
100
107
mt_srand ((int )(microtime (true ) * 10000 ) * 100 + ip2long (Environment::getServiceConfig ()->local_ip ));
101
108
define ('SERVER_TEMP_ID ' , mt_rand (0 , 4294967295 ));
102
-
103
109
$ this ->swoole_server_config ['dispatch_mode ' ] = 3 ;
104
-
105
110
\Swoole \Runtime::enableCoroutine (true );
106
-
107
111
\Swango \Db \Pool \master::init ();
108
112
\Swango \Db \Pool \slave::init ();
109
113
\Swango \Model \LocalCache::init ();
110
114
self ::$ http_request_counter = new \Swoole \Atomic \Long ();
111
- self ::$ max_coroutine = array_key_exists ('max_coroutine ' , $ this ->swoole_server_config ) ? $ this ->swoole_server_config ['max_coroutine ' ] : 3000 ;
115
+ self ::$ max_coroutine = array_key_exists ('max_coroutine ' ,
116
+ $ this ->swoole_server_config ) ? $ this ->swoole_server_config ['max_coroutine ' ] : 3000 ;
112
117
}
113
118
public function start ($ daemonize = false ): void {
114
- if ($ this ->getPid () !== null )
119
+ if ($ this ->getPid () !== null ) {
115
120
exit ("Already running \n" );
116
-
121
+ }
117
122
$ this ->swoole_server_config ['log_file ' ] = Environment::getDir ()->log . 'swoole.log ' ;
118
123
$ this ->daemonize = $ daemonize ;
119
124
$ this ->loadConfig ();
@@ -130,8 +135,9 @@ public function getPid(): ?int {
130
135
if (file_exists ($ pidfile )) {
131
136
$ pid = file_get_contents ($ pidfile );
132
137
return $ pid && @posix_kill ($ pid , 0 ) ? $ pid : null ;
133
- } else
138
+ } else {
134
139
return null ;
140
+ }
135
141
}
136
142
public function stop (): bool {
137
143
$ pid = $ this ->getPid ();
@@ -146,42 +152,44 @@ public function stop(): bool {
146
152
}
147
153
public function reload (): void {
148
154
$ pid = $ this ->getPid ();
149
- if ($ pid === null )
155
+ if ($ pid === null ) {
150
156
exit ("Not running \n" );
157
+ }
151
158
posix_kill ($ pid , SIGUSR1 );
152
159
exit ("Reloading \n" );
153
160
}
154
161
public function reloadTask (): void {
155
162
$ pid = $ this ->getPid ();
156
- if ($ pid === null )
163
+ if ($ pid === null ) {
157
164
exit ("Not running \n" );
165
+ }
158
166
posix_kill ($ pid , SIGUSR2 );
159
167
exit ("Reloading task \n" );
160
168
}
161
169
public function getStatus (): string {
162
170
$ pid = $ this ->getPid ();
163
- if ($ pid === null )
171
+ if ($ pid === null ) {
164
172
return "Not running \n" ;
173
+ }
165
174
return "Master pid: $ pid \n" ;
166
175
}
167
176
public function talk (array $ cmds , string $ host = '127.0.0.1 ' , ?int $ port = null ): void {
168
177
echo $ this ->getStatus ();
169
- go (
170
- function () use ($ cmds , $ host , $ port ) {
171
- $ client = new \Swoole \Coroutine \Client (SWOOLE_SOCK_TCP );
172
- $ client ->set (
173
- [
174
- 'open_eof_check ' => true ,
175
- 'package_eof ' => "\r\n" ,
176
- 'package_max_length ' => 1024 * 1024 * 2
177
- ]);
178
- if (! $ client ->connect ($ host , $ port ?? Environment::getServiceConfig ()->terminal_server_port , - 1 ))
179
- exit ("connect failed. Error: {$ client ->errCode }\n" );
180
- $ client ->send (implode ("\x1E" , $ cmds ) . "\r\n" );
181
- for ($ response = $ client ->recv (); $ response ; $ response = $ client ->recv ())
182
- echo $ response ;
183
- $ client ->close ();
184
- });
178
+ go (function () use ($ cmds , $ host , $ port ) {
179
+ $ client = new \Swoole \Coroutine \Client (SWOOLE_SOCK_TCP );
180
+ $ client ->set ([
181
+ 'open_eof_check ' => true ,
182
+ 'package_eof ' => "\r\n" ,
183
+ 'package_max_length ' => 1024 * 1024 * 2
184
+ ]);
185
+ if (! $ client ->connect ($ host , $ port ?? Environment::getServiceConfig ()->terminal_server_port , -1 )) {
186
+ exit ("connect failed. Error: {$ client ->errCode }\n" );
187
+ }
188
+ $ client ->send (implode ("\x1E" , $ cmds ) . "\r\n" );
189
+ for ($ response = $ client ->recv (); $ response ; $ response = $ client ->recv ())
190
+ echo $ response ;
191
+ $ client ->close ();
192
+ });
185
193
}
186
194
public function onStart (\Swoole \Server $ server ): void {
187
195
@cli_set_process_title (Environment::getName () . ' master ' );
@@ -201,17 +209,18 @@ private function onWorkerStart(\Swoole\Server $serv, $worker_id): void {
201
209
if ($ worker_id === 0 ) {
202
210
new \Swango \Cache \InternelCmd ();
203
211
// 每隔15分钟进行全服务DbPool计数校对,因为如果有worker非正常退出的情况,会引起该计数错误
204
- $ this ->add_worker_count_to_atomic_timer = \swoole_timer_tick (900000 ,
205
- function (int $ timer_id ) use ($ serv ) {
206
- \Swango \Db \Pool \master::addWorkerCountToAtomic (true );
207
- \Swango \Db \Pool \slave::addWorkerCountToAtomic (true );
208
- for ($ dst_worker_id = 1 ; $ dst_worker_id < Environment::getServiceConfig ()->worker_num ; ++ $ dst_worker_id )
209
- @$ serv ->sendMessage (pack ('n ' , 3 ), $ dst_worker_id );
210
- });
212
+ $ this ->add_worker_count_to_atomic_timer = \swoole_timer_tick (900000 , function (int $ timer_id ) use ($ serv ) {
213
+ \Swango \Db \Pool \master::addWorkerCountToAtomic (true );
214
+ \Swango \Db \Pool \slave::addWorkerCountToAtomic (true );
215
+ for ($ dst_worker_id = 1 ; $ dst_worker_id < Environment::getServiceConfig ()->worker_num ; ++$ dst_worker_id )
216
+ @$ serv ->sendMessage (pack ('n ' , 3 ), $ dst_worker_id );
217
+ });
211
218
}
212
219
}
213
220
public function onAllWorkerStart (\Swoole \Server $ serv , $ worker_id ): void {
214
- opcache_reset ();
221
+ if (function_exists ('opcache_reset ' )) {
222
+ opcache_reset ();
223
+ }
215
224
mt_srand ((int )(microtime (true ) * 10000 ) * 100 + $ worker_id );
216
225
self ::$ worker = $ serv ;
217
226
self ::$ worker_id = $ worker_id ;
@@ -225,14 +234,17 @@ public function onAllWorkerStart(\Swoole\Server $serv, $worker_id): void {
225
234
}
226
235
private function recycle (): void {
227
236
self ::$ is_stopping = true ;
228
- if (self ::getWorkerId () < $ this ->swoole_server_config ['worker_num ' ])
237
+ if (self ::getWorkerId () < $ this ->swoole_server_config ['worker_num ' ]) {
229
238
go ('Swango \\Cache \\InternelCmd::stopLoop ' );
239
+ }
230
240
$ pool = \Gateway::getDbPool (\Gateway::MASTER_DB );
231
- if (isset ($ pool ))
241
+ if (isset ($ pool )) {
232
242
$ pool ->clearQueueAndTimer ();
243
+ }
233
244
$ pool = \Gateway::getDbPool (\Gateway::SLAVE_DB );
234
- if (isset ($ pool ))
245
+ if (isset ($ pool )) {
235
246
$ pool ->clearQueueAndTimer ();
247
+ }
236
248
\Swango \Cache \RedisPool::clearQueue ();
237
249
if (isset ($ this ->add_worker_count_to_atomic_timer )) {
238
250
\swoole_timer_clear ($ this ->add_worker_count_to_atomic_timer );
@@ -258,9 +270,8 @@ public function onWorkerExit(\Swoole\Server $serv, $worker_id): void {
258
270
$ this ->recycle ();
259
271
}
260
272
public function onRequest (\Swoole \Http \Request $ request , \Swoole \Http \Response $ response ): void {
261
- ++ self ::$ worker ->worker_http_request_counter ;
273
+ ++self ::$ worker ->worker_http_request_counter ;
262
274
$ count = self ::$ http_request_counter ->add ();
263
-
264
275
$ request_time_float = $ request ->server ['request_time_float ' ];
265
276
$ request_time = (int )$ request_time_float ;
266
277
$ client_ip = $ request ->header ['x-forwarded-for ' ] ?? $ request ->server ['remote_addr ' ];
@@ -270,16 +281,14 @@ public function onRequest(\Swoole\Http\Request $request, \Swoole\Http\Response $
270
281
mt_rand (8 , 0xB ), mt_rand (0 , 0xFFF ), ((int )$ request_time ) >> 4 , $ count % 0x100000 );
271
282
\SysContext::set ('request_id ' , $ request_id );
272
283
$ response ->header ('X-Request-ID ' , $ request_id );
273
-
274
284
$ micro_second = substr (sprintf ('%.3f ' , $ request_time_float - $ request_time ), 2 );
275
285
$ request_string = date ("[H:i:s. $ micro_second] " , $ request_time ) . self ::$ worker_id . "- {$ count } " . $ client_ip .
276
- ' ' . $ request ->server ['request_method ' ] . ' ' . ($ request ->header ['host ' ] ?? '' ) .
277
- $ request ->server ['request_uri ' ] . (isset ($ request ->server ['query_string ' ]) ? '? ' .
278
- $ request ->server ['query_string ' ] : '' );
279
-
280
- if (self ::$ terminal_server ->getRequestLogSwitchStatus (1 ))
286
+ ' ' . $ request ->server ['request_method ' ] . ' ' . ($ request ->header ['host ' ] ?? '' ) .
287
+ $ request ->server ['request_uri ' ] .
288
+ (isset ($ request ->server ['query_string ' ]) ? '? ' . $ request ->server ['query_string ' ] : '' );
289
+ if (self ::$ terminal_server ->getRequestLogSwitchStatus (1 )) {
281
290
self ::$ terminal_server ->send ($ request_string , 1 );
282
-
291
+ }
283
292
$ user_id = null ;
284
293
try {
285
294
[
@@ -288,22 +297,22 @@ public function onRequest(\Swoole\Http\Request $request, \Swoole\Http\Response $
288
297
$ cnmsg
289
298
] = HttpServer \Handler::start ($ request , $ response );
290
299
$ user_id = HttpServer \Authorization::getUidWithRole ();
291
- } catch (\Swoole \ExitException $ e ) {
300
+ } catch (\Swoole \ExitException $ e ) {
292
301
trigger_error ("Unexpected exit: {$ e ->getCode ()} {$ e ->getMessage ()}" );
293
- } catch (\Throwable $ e ) {
302
+ } catch (\Throwable $ e ) {
294
303
trigger_error ("Unexpected throwable: {$ e ->getCode ()} {$ e ->getMessage ()} {$ e ->getTraceAsString ()}" );
295
304
}
296
305
HttpServer \Handler::end ();
297
- -- self ::$ worker ->worker_http_request_counter ;
298
-
306
+ --self ::$ worker ->worker_http_request_counter ;
299
307
$ end_time = microtime (true );
300
308
$ response_string = sprintf ("# $ user_id (%s) %.3fms [ $ code] $ enmsg " , \session::getId (),
301
309
($ end_time - $ request_time_float ) * 1000 );
302
- if ($ code !== 200 || $ enmsg !== 'ok ' )
310
+ if ($ code !== 200 || $ enmsg !== 'ok ' ) {
303
311
$ response_string .= ' ' . $ cnmsg ;
304
-
305
- if (self ::$ terminal_server ->getRequestLogSwitchStatus (2 ))
312
+ }
313
+ if (self ::$ terminal_server ->getRequestLogSwitchStatus (2 )) {
306
314
self ::$ terminal_server ->send ($ request_string . ' ==> ' . $ response_string , 2 );
315
+ }
307
316
}
308
317
public function onPipeMessage (\Swoole \Server $ server , int $ src_worker_id , $ message ) {
309
318
$ cmd = unpack ('n ' , substr ($ message , 0 , 2 ))[1 ];
@@ -316,15 +325,13 @@ public function onPipeMessage(\Swoole\Server $server, int $src_worker_id, $messa
316
325
$ fd = unpack ('N ' , substr ($ message , 2 , 4 ))[1 ];
317
326
self ::$ terminal_server ->sendPipMessageToTerminalWorker ($ server , $ fd , 3 ,
318
327
"$ server ->worker_http_request_counter - " . \Swango \Db \Pool \master::getWorkerCount () . '- ' .
319
- \Swango \Db \Pool \slave::getWorkerCount () . '- ' . memory_get_usage () . '- ' .
320
- memory_get_peak_usage () . "- {$ status ['coroutine_num ' ]}- {$ status ['coroutine_peak_num ' ]}- " .
321
- \SysContext::getSize ());
328
+ \Swango \Db \Pool \slave::getWorkerCount () . '- ' . memory_get_usage () . '- ' . memory_get_peak_usage () .
329
+ "- {$ status ['coroutine_num ' ]}- {$ status ['coroutine_peak_num ' ]}- " . \SysContext::getSize ());
322
330
break ;
323
331
case 3 : // 正在进行DbPool计数校对
324
332
\Swango \Db \Pool \master::addWorkerCountToAtomic ();
325
333
\Swango \Db \Pool \slave::addWorkerCountToAtomic ();
326
334
break ;
327
-
328
335
case 5 : // 执行 gc_collect_cycles() 并回复结果
329
336
$ result = gc_collect_cycles ();
330
337
$ fd = unpack ('N ' , substr ($ message , 2 , 4 ))[1 ];
@@ -341,17 +348,20 @@ public function onTask(\Swoole\Server $serv, int $task_id, int $src_worker_id, $
341
348
static $ certs = [];
342
349
if (! array_key_exists ($ index , $ certs )) {
343
350
$ certname = Environment::getDir ()->data . 'cert/rsa_private_key_ ' . $ index . '.pem ' ;
344
- if (! file_exists ($ certname ))
345
- return - 3 ;
351
+ if (! file_exists ($ certname )) {
352
+ return -3 ;
353
+ }
346
354
$ key = include $ certname ;
347
355
mangoParseRequest_SetPrivateKey ($ index , $ key );
348
356
$ certs [$ index ] = null ;
349
357
}
350
- if ($ cmd === 1 )
358
+ if ($ cmd === 1 ) {
351
359
return mangoParseRequest (substr ($ data , 2 ), $ index , false );
352
- else
360
+ } else {
353
361
return mangoParseRequestRaw (substr ($ data , 2 ), $ index , false );
362
+ }
354
363
}
355
364
}
356
- public function onFinish (\Swoole \Server $ serv , int $ task_id , string $ data ) {}
365
+ public function onFinish (\Swoole \Server $ serv , int $ task_id , string $ data ) {
366
+ }
357
367
}
0 commit comments