Skip to content

Commit b8b055d

Browse files
committed
implemented async control
1 parent 95075aa commit b8b055d

File tree

1 file changed

+55
-33
lines changed

1 file changed

+55
-33
lines changed

ngx_rtmp_control_module.c

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ typedef struct {
4040
ngx_str_t path;
4141
ngx_uint_t filter;
4242
ngx_str_t method;
43-
ngx_rtmp_control_handler_t handler;
43+
ngx_array_t sessions; /* ngx_rtmp_session_t * */
4444
} ngx_rtmp_control_ctx_t;
4545

4646

@@ -146,8 +146,9 @@ ngx_rtmp_control_record_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
146146
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
147147
racf = cacf->app_conf[ngx_rtmp_record_module.ctx_index];
148148

149-
ngx_str_null(&rec);
150-
ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec);
149+
if (ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec) != NGX_OK) {
150+
rec.len = 0;
151+
}
151152

152153
rn = ngx_rtmp_record_find(racf, &rec);
153154
if (rn == NGX_CONF_UNSET_UINT) {
@@ -203,10 +204,9 @@ ngx_rtmp_control_redirect_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
203204
ngx_rtmp_control_ctx_t *ctx;
204205
ngx_rtmp_close_stream_t vc;
205206

206-
ngx_str_null(&name);
207-
ngx_http_arg(r, (u_char *) "newname", sizeof("newname") - 1, &name);
208-
209-
if (name.len == 0) {
207+
if (ngx_http_arg(r, (u_char *) "newname", sizeof("newname") - 1, &name)
208+
!= NGX_OK)
209+
{
210210
return "newname not specified";
211211
}
212212

@@ -260,7 +260,7 @@ ngx_rtmp_control_walk_session(ngx_http_request_t *r,
260260
ngx_rtmp_live_ctx_t *lctx)
261261
{
262262
ngx_str_t addr, *paddr;
263-
ngx_rtmp_session_t *s;
263+
ngx_rtmp_session_t *s, **ss;
264264
ngx_rtmp_control_ctx_t *ctx;
265265

266266
s = lctx->session;
@@ -269,10 +269,9 @@ ngx_rtmp_control_walk_session(ngx_http_request_t *r,
269269
return NGX_CONF_OK;
270270
}
271271

272-
ngx_str_null(&addr);
273-
ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr);
274-
275-
if (addr.len) {
272+
if (ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr)
273+
== NGX_OK)
274+
{
276275
paddr = &s->connection->addr_text;
277276
if (paddr->len != addr.len ||
278277
ngx_strncmp(paddr->data, addr.data, addr.len))
@@ -300,7 +299,14 @@ ngx_rtmp_control_walk_session(ngx_http_request_t *r,
300299
break;
301300
}
302301

303-
return ctx->handler(r, s);
302+
ss = ngx_array_push(&ctx->sessions);
303+
if (ss == NULL) {
304+
return "allocation error";
305+
}
306+
307+
*ss = s;
308+
309+
return NGX_CONF_OK;
304310
}
305311

306312

@@ -333,12 +339,10 @@ ngx_rtmp_control_walk_app(ngx_http_request_t *r,
333339
ngx_rtmp_live_stream_t *ls;
334340
ngx_rtmp_live_app_conf_t *lacf;
335341

336-
ngx_memzero(&name, sizeof(name));
337-
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
338-
339342
lacf = cacf->app_conf[ngx_rtmp_live_module.ctx_index];
340343

341-
if (name.len == 0) {
344+
if (ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name) != NGX_OK)
345+
{
342346
for (n = 0; n < (ngx_uint_t) lacf->nbuckets; ++n) {
343347
for (ls = lacf->streams[n]; ls; ls = ls->next) {
344348
s = ngx_rtmp_control_walk_stream(r, ls);
@@ -378,8 +382,9 @@ ngx_rtmp_control_walk_server(ngx_http_request_t *r,
378382
const char *s;
379383
ngx_rtmp_core_app_conf_t **pcacf;
380384

381-
ngx_memzero(&app, sizeof(app));
382-
ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app);
385+
if (ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app) != NGX_OK) {
386+
app.len = 0;
387+
}
383388

384389
pcacf = cscf->applications.elts;
385390

@@ -401,12 +406,15 @@ ngx_rtmp_control_walk_server(ngx_http_request_t *r,
401406

402407

403408
static const char *
404-
ngx_rtmp_control_walk(ngx_http_request_t *r)
409+
ngx_rtmp_control_walk(ngx_http_request_t *r, ngx_rtmp_control_handler_t h)
405410
{
406411
ngx_rtmp_core_main_conf_t *cmcf = ngx_rtmp_core_main_conf;
407412

408413
ngx_str_t srv;
409-
ngx_uint_t sn;
414+
ngx_uint_t sn, n;
415+
const char *msg;
416+
ngx_rtmp_session_t **s;
417+
ngx_rtmp_control_ctx_t *ctx;
410418
ngx_rtmp_core_srv_conf_t **pcscf;
411419

412420
sn = 0;
@@ -421,7 +429,22 @@ ngx_rtmp_control_walk(ngx_http_request_t *r)
421429
pcscf = cmcf->servers.elts;
422430
pcscf += sn;
423431

424-
return ngx_rtmp_control_walk_server(r, *pcscf);
432+
msg = ngx_rtmp_control_walk_server(r, *pcscf);
433+
if (msg != NGX_CONF_OK) {
434+
return msg;
435+
}
436+
437+
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
438+
439+
s = ctx->sessions.elts;
440+
for (n = 0; n < ctx->sessions.nelts; n++) {
441+
msg = h(r, s[n]);
442+
if (msg != NGX_CONF_OK) {
443+
return msg;
444+
}
445+
}
446+
447+
return NGX_CONF_OK;
425448
}
426449

427450

@@ -434,11 +457,9 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
434457
ngx_rtmp_control_ctx_t *ctx;
435458

436459
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
437-
438460
ctx->filter = NGX_RTMP_CONTROL_FILTER_PUBLISHER;
439-
ctx->handler = ngx_rtmp_control_record_handler;
440461

441-
msg = ngx_rtmp_control_walk(r);
462+
msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_record_handler);
442463
if (msg != NGX_CONF_OK) {
443464
goto error;
444465
}
@@ -501,9 +522,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
501522
goto error;
502523
}
503524

504-
ctx->handler = ngx_rtmp_control_drop_handler;
505-
506-
msg = ngx_rtmp_control_walk(r);
525+
msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_drop_handler);
507526
if (msg != NGX_CONF_OK) {
508527
goto error;
509528
}
@@ -555,7 +574,6 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method)
555574
ngx_rtmp_control_ctx_t *ctx;
556575

557576
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
558-
ctx->handler = ngx_rtmp_control_redirect_handler;
559577

560578
if (ctx->method.len == sizeof("publisher") - 1 &&
561579
ngx_memcmp(ctx->method.data, "publisher", ctx->method.len) == 0)
@@ -578,7 +596,7 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method)
578596
goto error;
579597
}
580598

581-
msg = ngx_rtmp_control_walk(r);
599+
msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_redirect_handler);
582600
if (msg != NGX_CONF_OK) {
583601
goto error;
584602
}
@@ -666,6 +684,10 @@ ngx_rtmp_control_handler(ngx_http_request_t *r)
666684

667685
ngx_http_set_ctx(r, ctx, ngx_rtmp_control_module);
668686

687+
if (ngx_array_init(&ctx->sessions, r->pool, 1, sizeof(void *)) != NGX_OK) {
688+
return NGX_ERROR;
689+
}
690+
669691
ctx->method = method;
670692

671693
#define NGX_RTMP_CONTROL_SECTION(flag, secname) \
@@ -689,7 +711,7 @@ ngx_rtmp_control_handler(ngx_http_request_t *r)
689711
static void *
690712
ngx_rtmp_control_create_loc_conf(ngx_conf_t *cf)
691713
{
692-
ngx_rtmp_control_loc_conf_t *conf;
714+
ngx_rtmp_control_loc_conf_t *conf;
693715

694716
conf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_control_loc_conf_t));
695717
if (conf == NULL) {
@@ -705,8 +727,8 @@ ngx_rtmp_control_create_loc_conf(ngx_conf_t *cf)
705727
static char *
706728
ngx_rtmp_control_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
707729
{
708-
ngx_rtmp_control_loc_conf_t *prev = parent;
709-
ngx_rtmp_control_loc_conf_t *conf = child;
730+
ngx_rtmp_control_loc_conf_t *prev = parent;
731+
ngx_rtmp_control_loc_conf_t *conf = child;
710732

711733
ngx_conf_merge_bitmask_value(conf->control, prev->control, 0);
712734

0 commit comments

Comments
 (0)