Skip to content

Commit b77337e

Browse files
committed
Merge branch 'smart-drop'
2 parents bcf7df1 + 18e4762 commit b77337e

File tree

1 file changed

+212
-40
lines changed

1 file changed

+212
-40
lines changed

ngx_rtmp_control_module.c

Lines changed: 212 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ typedef struct {
3636
#define NGX_RTMP_CONTROL_DROP 0x02
3737

3838

39+
enum {
40+
NGX_RTMP_CONTROL_DROP_PUBLISHER,
41+
NGX_RTMP_CONTROL_DROP_SUBSCRIBER,
42+
NGX_RTMP_CONTROL_DROP_CLIENT,
43+
};
44+
45+
46+
typedef struct {
47+
ngx_uint_t method;
48+
ngx_str_t addr;
49+
ngx_uint_t ndropped;
50+
} ngx_rtmp_control_drop_t;
51+
52+
3953
typedef struct {
4054
ngx_uint_t control;
4155
} ngx_rtmp_control_loc_conf_t;
@@ -194,6 +208,10 @@ ngx_rtmp_control_parse_live(ngx_http_request_t *r,
194208
ngx_memzero(&name, sizeof(name));
195209
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
196210

211+
if (name.len == 0) {
212+
return NGX_CONF_OK;
213+
}
214+
197215
live->lacf = core->cacf->app_conf[ngx_rtmp_live_module.ctx_index];
198216

199217
/* find live stream by name */
@@ -246,6 +264,7 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
246264
goto error;
247265
}
248266

267+
ngx_memzero(&live, sizeof(live));
249268
msg = ngx_rtmp_control_parse_live(r, &core, &live);
250269
if (msg != NGX_CONF_OK) {
251270
goto error;
@@ -329,68 +348,221 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
329348
}
330349

331350

351+
static const char *
352+
ngx_rtmp_control_drop_session(ngx_http_request_t *r,
353+
ngx_rtmp_control_drop_t *drop,
354+
ngx_rtmp_live_ctx_t *lctx)
355+
{
356+
ngx_rtmp_session_t *s;
357+
ngx_str_t *paddr;
358+
359+
s = lctx->session;
360+
361+
if (s == NULL || s->connection == NULL)
362+
{
363+
return NGX_CONF_OK;
364+
}
365+
366+
if (drop->addr.len) {
367+
paddr = &s->connection->addr_text;
368+
if (paddr->len != drop->addr.len ||
369+
ngx_strncmp(paddr->data, drop->addr.data, drop->addr.len))
370+
{
371+
return NGX_CONF_OK;
372+
}
373+
}
374+
375+
switch (drop->method) {
376+
case NGX_RTMP_CONTROL_DROP_PUBLISHER:
377+
if (!lctx->publishing) {
378+
return NGX_CONF_OK;
379+
}
380+
381+
case NGX_RTMP_CONTROL_DROP_SUBSCRIBER:
382+
if (lctx->publishing) {
383+
return NGX_CONF_OK;
384+
}
385+
386+
case NGX_RTMP_CONTROL_DROP_CLIENT:
387+
break;
388+
}
389+
390+
ngx_rtmp_finalize_session(s);
391+
++drop->ndropped;
392+
393+
return NGX_CONF_OK;
394+
}
395+
396+
397+
static const char *
398+
ngx_rtmp_control_drop_stream(ngx_http_request_t *r,
399+
ngx_rtmp_control_drop_t *drop,
400+
ngx_rtmp_live_stream_t *ls)
401+
{
402+
ngx_rtmp_live_ctx_t *lctx;
403+
const char *s;
404+
405+
for (lctx = ls->ctx; lctx; lctx = lctx->next) {
406+
s = ngx_rtmp_control_drop_session(r, drop, lctx);
407+
if (s != NGX_CONF_OK) {
408+
return s;
409+
}
410+
}
411+
412+
return NGX_CONF_OK;
413+
}
414+
415+
416+
static const char *
417+
ngx_rtmp_control_drop_app(ngx_http_request_t *r,
418+
ngx_rtmp_control_drop_t *drop,
419+
ngx_rtmp_core_app_conf_t *cacf)
420+
{
421+
ngx_rtmp_live_app_conf_t *lacf;
422+
ngx_rtmp_live_stream_t *ls;
423+
ngx_str_t name;
424+
const char *s;
425+
size_t len;
426+
ngx_uint_t n;
427+
428+
ngx_memzero(&name, sizeof(name));
429+
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
430+
431+
lacf = cacf->app_conf[ngx_rtmp_live_module.ctx_index];
432+
433+
if (name.len == 0) {
434+
for (n = 0; n < (ngx_uint_t) lacf->nbuckets; ++n) {
435+
for (ls = lacf->streams[n]; ls; ls = ls->next)
436+
{
437+
s = ngx_rtmp_control_drop_stream(r, drop, ls);
438+
if (s != NGX_CONF_OK) {
439+
return s;
440+
}
441+
}
442+
}
443+
444+
return NGX_CONF_OK;
445+
}
446+
447+
for (ls = lacf->streams[ngx_hash_key(name.data, name.len) % lacf->nbuckets];
448+
ls; ls = ls->next)
449+
{
450+
len = ngx_strlen(ls->name);
451+
if (name.len != len || ngx_strncmp(name.data, ls->name, name.len)) {
452+
continue;
453+
}
454+
455+
s = ngx_rtmp_control_drop_stream(r, drop, ls);
456+
if (s != NGX_CONF_OK) {
457+
return s;
458+
}
459+
}
460+
461+
return NGX_CONF_OK;
462+
}
463+
464+
465+
static const char *
466+
ngx_rtmp_control_drop_srv(ngx_http_request_t *r,
467+
ngx_rtmp_control_drop_t *drop,
468+
ngx_rtmp_core_srv_conf_t *cscf)
469+
{
470+
ngx_rtmp_core_app_conf_t **pcacf;
471+
ngx_str_t app;
472+
ngx_uint_t n;
473+
const char *s;
474+
475+
ngx_memzero(&app, sizeof(app));
476+
ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app);
477+
478+
pcacf = cscf->applications.elts;
479+
480+
for (n = 0; n < cscf->applications.nelts; ++n, ++pcacf) {
481+
if (app.len && ((*pcacf)->name.len != app.len ||
482+
ngx_strncmp((*pcacf)->name.data, app.data, app.len)))
483+
{
484+
continue;
485+
}
486+
487+
s = ngx_rtmp_control_drop_app(r, drop, *pcacf);
488+
if (s != NGX_CONF_OK) {
489+
return s;
490+
}
491+
}
492+
493+
return NGX_CONF_OK;
494+
}
495+
496+
497+
static const char *
498+
ngx_rtmp_control_drop_main(ngx_http_request_t *r,
499+
ngx_rtmp_control_drop_t *drop,
500+
ngx_rtmp_core_main_conf_t *cmcf)
501+
{
502+
ngx_rtmp_core_srv_conf_t **pcscf;
503+
ngx_str_t srv;
504+
ngx_uint_t sn;
505+
506+
sn = 0;
507+
if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) {
508+
sn = ngx_atoi(srv.data, srv.len);
509+
}
510+
511+
if (sn >= cmcf->servers.nelts) {
512+
return "Server index out of range";
513+
}
514+
515+
pcscf = cmcf->servers.elts;
516+
pcscf += sn;
517+
518+
return ngx_rtmp_control_drop_srv(r, drop, *pcscf);
519+
}
520+
521+
332522
static ngx_int_t
333523
ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
334524
{
335-
ngx_rtmp_control_core_t core;
336-
ngx_rtmp_control_live_t live;
337-
ngx_rtmp_live_ctx_t *lctx;
338-
ngx_str_t addr, *paddr;
339-
const char *msg;
340-
ngx_uint_t ndropped;
525+
ngx_rtmp_control_drop_t drop;
341526
size_t len;
342527
u_char *p;
343528
ngx_buf_t *b;
344529
ngx_chain_t cl;
530+
const char *msg;
345531

346-
msg = ngx_rtmp_control_parse_core(r, &core);
347-
if (msg != NGX_CONF_OK) {
348-
goto error;
349-
}
350-
351-
msg = ngx_rtmp_control_parse_live(r, &core, &live);
352-
if (msg != NGX_CONF_OK) {
532+
if (ngx_rtmp_core_main_conf == NULL) {
533+
msg = "Empty main conf";
353534
goto error;
354535
}
355536

356-
ndropped = 0;
537+
ngx_memzero(&drop, sizeof(drop));
357538

358539
if (method->len == sizeof("publisher") - 1 &&
359-
ngx_strncmp(method->data, "publisher", method->len) == 0)
540+
ngx_memcmp(method->data, "publisher", method->len) == 0)
360541
{
361-
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
362-
if (lctx->publishing) {
363-
ngx_rtmp_finalize_session(lctx->session);
364-
++ndropped;
365-
break;
366-
}
367-
}
542+
drop.method = NGX_RTMP_CONTROL_DROP_PUBLISHER;
368543

369-
} else if (method->len == sizeof("client") - 1 &&
370-
ngx_strncmp(method->data, "client", method->len) == 0)
544+
} else if (method->len == sizeof("subscriber") - 1 &&
545+
ngx_memcmp(method->data, "subscriber", method->len) == 0)
371546
{
372-
ngx_memzero(&addr, sizeof(addr));
373-
ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr);
374-
375-
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
376-
if (addr.len && lctx->session && lctx->session->connection) {
377-
paddr = &lctx->session->connection->addr_text;
378-
if (paddr->len != addr.len ||
379-
ngx_strncmp(paddr->data, addr.data, addr.len))
380-
{
381-
continue;
382-
}
383-
}
547+
drop.method = NGX_RTMP_CONTROL_DROP_SUBSCRIBER;
384548

385-
ngx_rtmp_finalize_session(lctx->session);
386-
++ndropped;
387-
}
549+
} else if (method->len == sizeof("client") - 1 &&
550+
ngx_memcmp(method->data, "client", method->len) == 0)
551+
{
552+
drop.method = NGX_RTMP_CONTROL_DROP_CLIENT;
388553

389554
} else {
390555
msg = "Undefined method";
391556
goto error;
392557
}
393558

559+
ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &drop.addr);
560+
561+
msg = ngx_rtmp_control_drop_main(r, &drop, ngx_rtmp_core_main_conf);
562+
if (msg != NGX_CONF_OK) {
563+
goto error;
564+
}
565+
394566
/* output ndropped */
395567

396568
len = NGX_INT_T_LEN;
@@ -400,7 +572,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
400572
return NGX_ERROR;
401573
}
402574

403-
len = (size_t) (ngx_snprintf(p, len, "%ui", ndropped) - p);
575+
len = (size_t) (ngx_snprintf(p, len, "%ui", drop.ndropped) - p);
404576

405577
r->headers_out.status = NGX_HTTP_OK;
406578
r->headers_out.content_length_n = len;

0 commit comments

Comments
 (0)