Skip to content

Commit 18e4762

Browse files
committed
improved dropper in control module; now dropper support dropping all clients with the same address; dropping all subscribers is supported as well
1 parent 02dd440 commit 18e4762

File tree

1 file changed

+212
-40
lines changed

1 file changed

+212
-40
lines changed

ngx_rtmp_control_module.c

Lines changed: 212 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,20 @@ typedef struct {
3535
#define NGX_RTMP_CONTROL_DROP 0x02
3636

3737

38+
enum {
39+
NGX_RTMP_CONTROL_DROP_PUBLISHER,
40+
NGX_RTMP_CONTROL_DROP_SUBSCRIBER,
41+
NGX_RTMP_CONTROL_DROP_CLIENT,
42+
};
43+
44+
45+
typedef struct {
46+
ngx_uint_t method;
47+
ngx_str_t addr;
48+
ngx_uint_t ndropped;
49+
} ngx_rtmp_control_drop_t;
50+
51+
3852
typedef struct {
3953
ngx_uint_t control;
4054
} ngx_rtmp_control_loc_conf_t;
@@ -193,6 +207,10 @@ ngx_rtmp_control_parse_live(ngx_http_request_t *r,
193207
ngx_memzero(&name, sizeof(name));
194208
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
195209

210+
if (name.len == 0) {
211+
return NGX_CONF_OK;
212+
}
213+
196214
live->lacf = core->cacf->app_conf[ngx_rtmp_live_module.ctx_index];
197215

198216
/* find live stream by name */
@@ -245,6 +263,7 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
245263
goto error;
246264
}
247265

266+
ngx_memzero(&live, sizeof(live));
248267
msg = ngx_rtmp_control_parse_live(r, &core, &live);
249268
if (msg != NGX_CONF_OK) {
250269
goto error;
@@ -328,68 +347,221 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
328347
}
329348

330349

350+
static const char *
351+
ngx_rtmp_control_drop_session(ngx_http_request_t *r,
352+
ngx_rtmp_control_drop_t *drop,
353+
ngx_rtmp_live_ctx_t *lctx)
354+
{
355+
ngx_rtmp_session_t *s;
356+
ngx_str_t *paddr;
357+
358+
s = lctx->session;
359+
360+
if (s == NULL || s->connection == NULL)
361+
{
362+
return NGX_CONF_OK;
363+
}
364+
365+
if (drop->addr.len) {
366+
paddr = &s->connection->addr_text;
367+
if (paddr->len != drop->addr.len ||
368+
ngx_strncmp(paddr->data, drop->addr.data, drop->addr.len))
369+
{
370+
return NGX_CONF_OK;
371+
}
372+
}
373+
374+
switch (drop->method) {
375+
case NGX_RTMP_CONTROL_DROP_PUBLISHER:
376+
if (!lctx->publishing) {
377+
return NGX_CONF_OK;
378+
}
379+
380+
case NGX_RTMP_CONTROL_DROP_SUBSCRIBER:
381+
if (lctx->publishing) {
382+
return NGX_CONF_OK;
383+
}
384+
385+
case NGX_RTMP_CONTROL_DROP_CLIENT:
386+
break;
387+
}
388+
389+
ngx_rtmp_finalize_session(s);
390+
++drop->ndropped;
391+
392+
return NGX_CONF_OK;
393+
}
394+
395+
396+
static const char *
397+
ngx_rtmp_control_drop_stream(ngx_http_request_t *r,
398+
ngx_rtmp_control_drop_t *drop,
399+
ngx_rtmp_live_stream_t *ls)
400+
{
401+
ngx_rtmp_live_ctx_t *lctx;
402+
const char *s;
403+
404+
for (lctx = ls->ctx; lctx; lctx = lctx->next) {
405+
s = ngx_rtmp_control_drop_session(r, drop, lctx);
406+
if (s != NGX_CONF_OK) {
407+
return s;
408+
}
409+
}
410+
411+
return NGX_CONF_OK;
412+
}
413+
414+
415+
static const char *
416+
ngx_rtmp_control_drop_app(ngx_http_request_t *r,
417+
ngx_rtmp_control_drop_t *drop,
418+
ngx_rtmp_core_app_conf_t *cacf)
419+
{
420+
ngx_rtmp_live_app_conf_t *lacf;
421+
ngx_rtmp_live_stream_t *ls;
422+
ngx_str_t name;
423+
const char *s;
424+
size_t len;
425+
ngx_uint_t n;
426+
427+
ngx_memzero(&name, sizeof(name));
428+
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
429+
430+
lacf = cacf->app_conf[ngx_rtmp_live_module.ctx_index];
431+
432+
if (name.len == 0) {
433+
for (n = 0; n < (ngx_uint_t) lacf->nbuckets; ++n) {
434+
for (ls = lacf->streams[n]; ls; ls = ls->next)
435+
{
436+
s = ngx_rtmp_control_drop_stream(r, drop, ls);
437+
if (s != NGX_CONF_OK) {
438+
return s;
439+
}
440+
}
441+
}
442+
443+
return NGX_CONF_OK;
444+
}
445+
446+
for (ls = lacf->streams[ngx_hash_key(name.data, name.len) % lacf->nbuckets];
447+
ls; ls = ls->next)
448+
{
449+
len = ngx_strlen(ls->name);
450+
if (name.len != len || ngx_strncmp(name.data, ls->name, name.len)) {
451+
continue;
452+
}
453+
454+
s = ngx_rtmp_control_drop_stream(r, drop, ls);
455+
if (s != NGX_CONF_OK) {
456+
return s;
457+
}
458+
}
459+
460+
return NGX_CONF_OK;
461+
}
462+
463+
464+
static const char *
465+
ngx_rtmp_control_drop_srv(ngx_http_request_t *r,
466+
ngx_rtmp_control_drop_t *drop,
467+
ngx_rtmp_core_srv_conf_t *cscf)
468+
{
469+
ngx_rtmp_core_app_conf_t **pcacf;
470+
ngx_str_t app;
471+
ngx_uint_t n;
472+
const char *s;
473+
474+
ngx_memzero(&app, sizeof(app));
475+
ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app);
476+
477+
pcacf = cscf->applications.elts;
478+
479+
for (n = 0; n < cscf->applications.nelts; ++n, ++pcacf) {
480+
if (app.len && ((*pcacf)->name.len != app.len ||
481+
ngx_strncmp((*pcacf)->name.data, app.data, app.len)))
482+
{
483+
continue;
484+
}
485+
486+
s = ngx_rtmp_control_drop_app(r, drop, *pcacf);
487+
if (s != NGX_CONF_OK) {
488+
return s;
489+
}
490+
}
491+
492+
return NGX_CONF_OK;
493+
}
494+
495+
496+
static const char *
497+
ngx_rtmp_control_drop_main(ngx_http_request_t *r,
498+
ngx_rtmp_control_drop_t *drop,
499+
ngx_rtmp_core_main_conf_t *cmcf)
500+
{
501+
ngx_rtmp_core_srv_conf_t **pcscf;
502+
ngx_str_t srv;
503+
ngx_uint_t sn;
504+
505+
sn = 0;
506+
if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) {
507+
sn = ngx_atoi(srv.data, srv.len);
508+
}
509+
510+
if (sn >= cmcf->servers.nelts) {
511+
return "Server index out of range";
512+
}
513+
514+
pcscf = cmcf->servers.elts;
515+
pcscf += sn;
516+
517+
return ngx_rtmp_control_drop_srv(r, drop, *pcscf);
518+
}
519+
520+
331521
static ngx_int_t
332522
ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
333523
{
334-
ngx_rtmp_control_core_t core;
335-
ngx_rtmp_control_live_t live;
336-
ngx_rtmp_live_ctx_t *lctx;
337-
ngx_str_t addr, *paddr;
338-
const char *msg;
339-
ngx_uint_t ndropped;
524+
ngx_rtmp_control_drop_t drop;
340525
size_t len;
341526
u_char *p;
342527
ngx_buf_t *b;
343528
ngx_chain_t cl;
529+
const char *msg;
344530

345-
msg = ngx_rtmp_control_parse_core(r, &core);
346-
if (msg != NGX_CONF_OK) {
347-
goto error;
348-
}
349-
350-
msg = ngx_rtmp_control_parse_live(r, &core, &live);
351-
if (msg != NGX_CONF_OK) {
531+
if (ngx_rtmp_core_main_conf == NULL) {
532+
msg = "Empty main conf";
352533
goto error;
353534
}
354535

355-
ndropped = 0;
536+
ngx_memzero(&drop, sizeof(drop));
356537

357538
if (method->len == sizeof("publisher") - 1 &&
358-
ngx_strncmp(method->data, "publisher", method->len) == 0)
539+
ngx_memcmp(method->data, "publisher", method->len) == 0)
359540
{
360-
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
361-
if (lctx->publishing) {
362-
ngx_rtmp_finalize_session(lctx->session);
363-
++ndropped;
364-
break;
365-
}
366-
}
541+
drop.method = NGX_RTMP_CONTROL_DROP_PUBLISHER;
367542

368-
} else if (method->len == sizeof("client") - 1 &&
369-
ngx_strncmp(method->data, "client", method->len) == 0)
543+
} else if (method->len == sizeof("subscriber") - 1 &&
544+
ngx_memcmp(method->data, "subscriber", method->len) == 0)
370545
{
371-
ngx_memzero(&addr, sizeof(addr));
372-
ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr);
373-
374-
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
375-
if (addr.len && lctx->session && lctx->session->connection) {
376-
paddr = &lctx->session->connection->addr_text;
377-
if (paddr->len != addr.len ||
378-
ngx_strncmp(paddr->data, addr.data, addr.len))
379-
{
380-
continue;
381-
}
382-
}
546+
drop.method = NGX_RTMP_CONTROL_DROP_SUBSCRIBER;
383547

384-
ngx_rtmp_finalize_session(lctx->session);
385-
++ndropped;
386-
}
548+
} else if (method->len == sizeof("client") - 1 &&
549+
ngx_memcmp(method->data, "client", method->len) == 0)
550+
{
551+
drop.method = NGX_RTMP_CONTROL_DROP_CLIENT;
387552

388553
} else {
389554
msg = "Undefined method";
390555
goto error;
391556
}
392557

558+
ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &drop.addr);
559+
560+
msg = ngx_rtmp_control_drop_main(r, &drop, ngx_rtmp_core_main_conf);
561+
if (msg != NGX_CONF_OK) {
562+
goto error;
563+
}
564+
393565
/* output ndropped */
394566

395567
len = NGX_OFF_T_LEN;
@@ -399,7 +571,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
399571
return NGX_ERROR;
400572
}
401573

402-
len = (size_t) (ngx_snprintf(p, len, "%ui", ndropped) - p);
574+
len = (size_t) (ngx_snprintf(p, len, "%ui", drop.ndropped) - p);
403575

404576
r->headers_out.status = NGX_HTTP_OK;
405577
r->headers_out.content_length_n = len;

0 commit comments

Comments
 (0)