diff --git a/src/libs/device.c b/src/libs/device.c index baa4cbe42..513f903f5 100644 --- a/src/libs/device.c +++ b/src/libs/device.c @@ -366,7 +366,7 @@ int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) { } while (true); *hw = &run->hw_bufs[buf.index]; - atomic_store(&(*hw)->busy, 0); + atomic_store(&(*hw)->refs, 0); (*hw)->raw.dma_fd = (*hw)->dma_fd; (*hw)->raw.used = buf.bytesused; (*hw)->raw.width = run->width; @@ -383,6 +383,7 @@ int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) { } int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw) { + assert(atomic_load(&hw->refs) == 0); const uint index = hw->buf.index; _D_LOG_DEBUG("Releasing device buffer=%u ...", index); if (us_xioctl(dev->run->fd, VIDIOC_QBUF, &hw->buf) < 0) { @@ -390,10 +391,17 @@ int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw) { return -1; } hw->grabbed = false; - atomic_store(&hw->busy, 0); return 0; } +void us_device_buffer_incref(us_hw_buffer_s *hw) { + atomic_fetch_add(&hw->refs, 1); +} + +void us_device_buffer_decref(us_hw_buffer_s *hw) { + atomic_fetch_sub(&hw->refs, 1); +} + int _device_wait_buffer(us_device_s *dev) { us_device_runtime_s *const run = dev->run; @@ -834,7 +842,7 @@ static int _device_open_io_method_mmap(us_device_s *dev) { } us_hw_buffer_s *hw = &run->hw_bufs[run->n_bufs]; - atomic_init(&hw->busy, false); + atomic_init(&hw->refs, 0); const uz buf_size = (run->capture_mplane ? buf.m.planes[0].length : buf.length); const off_t buf_offset = (run->capture_mplane ? buf.m.planes[0].m.mem_offset : buf.m.offset); diff --git a/src/libs/device.h b/src/libs/device.h index 3bdbe530b..7bfb8dce7 100644 --- a/src/libs/device.h +++ b/src/libs/device.h @@ -48,7 +48,7 @@ typedef struct { struct v4l2_buffer buf; int dma_fd; bool grabbed; - atomic_int busy; + atomic_int refs; } us_hw_buffer_s; typedef struct { @@ -132,3 +132,6 @@ void us_device_close(us_device_s *dev); int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw); int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw); + +void us_device_buffer_incref(us_hw_buffer_s *hw); +void us_device_buffer_decref(us_hw_buffer_s *hw); diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 5120b99ad..0bc34de9d 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -26,7 +26,6 @@ #include #include #include -#include #include @@ -49,6 +48,13 @@ #endif +typedef struct { + pthread_t tid; + us_queue_s *queue; + us_stream_s *stream; + atomic_bool *stop; +} _jpeg_context_s; + typedef struct { pthread_t tid; us_device_s *dev; @@ -66,11 +72,11 @@ typedef struct { } _releaser_context_s; +static void *_jpeg_thread(void *v_ctx); static void *_h264_thread(void *v_ctx); static void *_releaser_thread(void *v_ctx); static bool _stream_has_any_clients(us_stream_s *stream); -static bool _stream_slowdown(us_stream_s *stream); static int _stream_init_loop(us_stream_s *stream); static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame); static void _stream_check_suicide(us_stream_s *stream); @@ -142,6 +148,13 @@ void us_stream_loop(us_stream_s *stream) { US_THREAD_CREATE(releasers[index].tid, _releaser_thread, &releasers[index]); } + _jpeg_context_s jpeg_ctx = { + .queue = us_queue_init(stream->dev->run->n_bufs), + .stream = stream, + .stop = &threads_stop, + }; + US_THREAD_CREATE(jpeg_ctx.tid, _jpeg_thread, &jpeg_ctx); + _h264_context_s h264_ctx; if (run->h264 != NULL) { h264_ctx.dev = stream->dev; @@ -151,40 +164,18 @@ void us_stream_loop(us_stream_s *stream) { US_THREAD_CREATE(h264_ctx.tid, _h264_thread, &h264_ctx); } - ldf grab_after = 0; - uint fluency_passed = 0; uint captured_fps_accum = 0; sll captured_fps_ts = 0; US_LOG_INFO("Capturing ..."); + uint slowdown_count = 0; while (!atomic_load(&run->stop) && !atomic_load(&threads_stop)) { _stream_check_suicide(stream); - - US_SEP_DEBUG('-'); - US_LOG_DEBUG("Waiting for worker ..."); - - us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool); - us_encoder_job_s *const ready_job = ready_wr->job; - - if (ready_job->hw != NULL) { - assert(!us_queue_put(releasers[ready_job->hw->buf.index].queue, ready_job->hw, 0)); - atomic_fetch_sub(&ready_job->hw->busy, 1); - ready_job->hw = NULL; - if (ready_wr->job_failed) { - // pass - } else if (ready_wr->job_timely) { - _stream_expose_frame(stream, ready_job->dest); - US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf", - ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts); - } else { - US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name); - } - } - - const bool h264_force_key = _stream_slowdown(stream); - if (atomic_load(&run->stop) || atomic_load(&threads_stop)) { - goto close; + if (stream->slowdown && slowdown_count > 0 && !_stream_has_any_clients(stream)) { + usleep(100 * 1000); + slowdown_count = (slowdown_count + 1) % 10; + continue; } us_hw_buffer_s *hw; @@ -192,53 +183,32 @@ void us_stream_loop(us_stream_s *stream) { switch (buf_index) { case -3: continue; // Broken frame case -2: // Persistent timeout - case -1: goto close; // Any error + case -1: goto close; // Error } assert(buf_index >= 0); + const sll now_sec_ts = us_floor_ms(us_get_now_monotonic()); + if (now_sec_ts != captured_fps_ts) { + US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum); + atomic_store(&run->http_captured_fps, captured_fps_accum); + captured_fps_accum = 0; + captured_fps_ts = now_sec_ts; + } + captured_fps_accum += 1; + # ifdef WITH_GPIO us_gpio_set_stream_online(true); # endif - const ldf now_ts = us_get_now_monotonic(); - - if (now_ts < grab_after) { - fluency_passed += 1; - US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf", - fluency_passed, now_ts, grab_after); - assert(!us_queue_put(releasers[hw->buf.index].queue, hw, 0)); - } else { - int hw_busy = 1; - if (run->h264 != NULL) { - hw_busy += 1; - } - atomic_store(&hw->busy, hw_busy); - - fluency_passed = 0; - - const sll now_sec_ts = us_floor_ms(now_ts); - if (now_sec_ts != captured_fps_ts) { - US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum); - atomic_store(&run->http_captured_fps, captured_fps_accum); - captured_fps_accum = 0; - captured_fps_ts = now_sec_ts; - } - captured_fps_accum += 1; - - const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr); - grab_after = now_ts + fluency_delay; - US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after); - - ready_job->hw = hw; - us_workers_pool_assign(stream->enc->run->pool, ready_wr); - US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", buf_index, ready_wr->name); - - _SINK_PUT(raw_sink, &hw->raw); - - if (run->h264 != NULL) { - us_queue_put(h264_ctx.queue, hw, h264_force_key); - } + us_device_buffer_incref(hw); // JPEG + us_queue_put(jpeg_ctx.queue, hw, 0); + if (run->h264 != NULL) { + us_device_buffer_incref(hw); // H264 + us_queue_put(h264_ctx.queue, hw, 0); } + us_queue_put(releasers[hw->buf.index].queue, hw, 0); // Plan to release + + _SINK_PUT(raw_sink, &hw->raw); } close: @@ -249,6 +219,9 @@ void us_stream_loop(us_stream_s *stream) { us_queue_destroy(h264_ctx.queue); } + US_THREAD_JOIN(jpeg_ctx.tid); + us_queue_destroy(jpeg_ctx.queue); + for (uint index = 0; index < n_releasers; ++index) { US_THREAD_JOIN(releasers[index].tid); us_queue_destroy(releasers[index].queue); @@ -273,14 +246,66 @@ void us_stream_loop_break(us_stream_s *stream) { atomic_store(&stream->run->stop, true); } +static void *_jpeg_thread(void *v_ctx) { + _jpeg_context_s *ctx = v_ctx; + us_stream_s *stream = ctx->stream; + + ldf grab_after = 0; + uint fluency_passed = 0; + + while (!atomic_load(ctx->stop)) { + us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool); + us_encoder_job_s *const ready_job = ready_wr->job; + + if (ready_job->hw != NULL) { + us_device_buffer_decref(ready_job->hw); + ready_job->hw = NULL; + if (ready_wr->job_failed) { + // pass + } else if (ready_wr->job_timely) { + _stream_expose_frame(stream, ready_job->dest); + US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf", + ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts); + } else { + US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name); + } + } + + us_hw_buffer_s *hw; + if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) { + continue; + } + + const ldf now_ts = us_get_now_monotonic(); + if (now_ts < grab_after) { + fluency_passed += 1; + US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf", + fluency_passed, now_ts, grab_after); + us_device_buffer_decref(hw); + continue; + } + fluency_passed = 0; + + const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr); + grab_after = now_ts + fluency_delay; + US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after); + + ready_job->hw = hw; + us_workers_pool_assign(stream->enc->run->pool, ready_wr); + US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", hw->buf.index, ready_wr->name); + } + return NULL; +} + static void *_h264_thread(void *v_ctx) { _h264_context_s *ctx = v_ctx; while (!atomic_load(ctx->stop)) { us_hw_buffer_s *hw; - if (!us_queue_get(ctx->queue, (void**)&hw, 0.1)) { - us_h264_stream_process(ctx->h264, &hw->raw, false); - atomic_fetch_sub(&hw->busy, 1); + if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) { + continue; } + us_h264_stream_process(ctx->h264, &hw->raw, false); + us_device_buffer_decref(hw); } return NULL; } @@ -289,21 +314,23 @@ static void *_releaser_thread(void *v_ctx) { _releaser_context_s *ctx = v_ctx; while (!atomic_load(ctx->stop)) { us_hw_buffer_s *hw; - if (!us_queue_get(ctx->queue, (void**)&hw, 0.1)) { - while (atomic_load(&hw->busy) > 0) { - if (atomic_load(ctx->stop)) { - break; - } - usleep(5 * 1000); - } - US_MUTEX_LOCK(*ctx->mutex); - const int released = us_device_release_buffer(ctx->dev, hw); - US_MUTEX_UNLOCK(*ctx->mutex); - if (released < 0) { - break; + if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) { + continue; + } + while (atomic_load(&hw->refs) > 0) { + if (atomic_load(ctx->stop)) { + goto done; } + usleep(5 * 1000); + } + US_MUTEX_LOCK(*ctx->mutex); + const int released = us_device_release_buffer(ctx->dev, hw); + US_MUTEX_UNLOCK(*ctx->mutex); + if (released < 0) { + goto done; } } +done: atomic_store(ctx->stop, true); // Stop all other guys on error return NULL; } @@ -318,18 +345,6 @@ static bool _stream_has_any_clients(us_stream_s *stream) { ); } -static bool _stream_slowdown(us_stream_s *stream) { - if (stream->slowdown) { - unsigned count = 0; - while (count < 10 && !atomic_load(&stream->run->stop) && !_stream_has_any_clients(stream)) { - usleep(100000); - ++count; - } - return (count >= 10); - } - return false; -} - static int _stream_init_loop(us_stream_s *stream) { us_stream_runtime_s *const run = stream->run; @@ -448,7 +463,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) { static void _stream_check_suicide(us_stream_s *stream) { us_stream_runtime_s *const run = stream->run; - if (stream->exit_on_no_clients <= 0) { + if (stream->exit_on_no_clients > 0) { return; } const ldf now_ts = us_get_now_monotonic();