Skip to content

Commit 6e6ab72

Browse files
dmorndaniel-jodlos
authored andcommitted
Fix memory leak in the sink (#31)
1 parent 772cdb7 commit 6e6ab72

File tree

7 files changed

+153
-54
lines changed

7 files changed

+153
-54
lines changed

c_src/membrane_rtmp_plugin/sink/rtmp_sink.c

+11-11
Original file line numberDiff line numberDiff line change
@@ -134,24 +134,24 @@ UNIFEX_TERM write_video_frame(UnifexEnv *env, State *state,
134134
state->output_ctx->streams[state->video_stream_index]->time_base;
135135
AVPacket *packet = av_packet_alloc();
136136

137+
uint8_t* data = (uint8_t *)av_malloc(frame->size);
138+
memcpy(data, frame->data, frame->size);
139+
av_packet_from_data(packet, data, frame->size);
140+
137141
UNIFEX_TERM write_frame_result;
138142

139143
if (is_key_frame) {
140144
packet->flags |= AV_PKT_FLAG_KEY;
141145
}
142146

143147
packet->stream_index = state->video_stream_index;
144-
packet->size = frame->size;
145-
packet->data = (uint8_t *)av_malloc(frame->size);
146148

147149
if (!packet->data) {
148150
write_frame_result =
149151
unifex_raise(env, "Failed allocating video frame data");
150152
goto end;
151153
}
152154

153-
memcpy(packet->data, frame->data, frame->size);
154-
155155
int64_t dts_scaled =
156156
av_rescale_q(dts, MEMBRANE_TIME_BASE, video_stream_time_base);
157157
int64_t pts_scaled =
@@ -162,15 +162,14 @@ UNIFEX_TERM write_video_frame(UnifexEnv *env, State *state,
162162
packet->duration = dts_scaled - state->current_video_dts;
163163
state->current_video_dts = dts_scaled;
164164

165-
if (av_interleaved_write_frame(state->output_ctx, packet)) {
165+
if (av_write_frame(state->output_ctx, packet)) {
166166
write_frame_result =
167167
write_video_frame_result_error(env, "Failed writing video frame");
168168
goto end;
169169
}
170170
write_frame_result = write_video_frame_result_ok(env, state);
171171

172172
end:
173-
av_packet_unref(packet);
174173
av_packet_free(&packet);
175174
return write_frame_result;
176175
}
@@ -187,18 +186,20 @@ UNIFEX_TERM write_audio_frame(UnifexEnv *env, State *state,
187186
state->output_ctx->streams[state->audio_stream_index]->time_base;
188187
AVPacket *packet = av_packet_alloc();
189188

189+
uint8_t* data = (uint8_t *)av_malloc(frame->size);
190+
memcpy(data, frame->data, frame->size);
191+
av_packet_from_data(packet, data, frame->size);
192+
190193
UNIFEX_TERM write_frame_result;
191194

192195
packet->stream_index = state->audio_stream_index;
193-
packet->size = frame->size;
194196

195-
packet->data = (uint8_t *)av_malloc(frame->size);
196197
if (!packet->data) {
197198
write_frame_result =
198199
unifex_raise(env, "Failed allocating audio frame data.");
199200
goto end;
200201
}
201-
memcpy(packet->data, frame->data, frame->size);
202+
202203
int64_t pts_scaled =
203204
av_rescale_q(pts, MEMBRANE_TIME_BASE, audio_stream_time_base);
204205
// Packet DTS is set to PTS since AAC buffers do not contain DTS
@@ -208,15 +209,14 @@ UNIFEX_TERM write_audio_frame(UnifexEnv *env, State *state,
208209
packet->duration = pts_scaled - state->current_audio_pts;
209210
state->current_audio_pts = pts_scaled;
210211

211-
if (av_interleaved_write_frame(state->output_ctx, packet)) {
212+
if (av_write_frame(state->output_ctx, packet)) {
212213
write_frame_result =
213214
write_audio_frame_result_error(env, "Failed writing audio frame");
214215
goto end;
215216
}
216217
write_frame_result = write_audio_frame_result_ok(env, state);
217218

218219
end:
219-
av_packet_unref(packet);
220220
av_packet_free(&packet);
221221
return write_frame_result;
222222
}

lib/membrane_rtmp_plugin/rtmp/sink/sink.ex

+84-42
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,20 @@ defmodule Membrane.RTMP.Sink do
1212
require Membrane.Logger
1313

1414
alias __MODULE__.Native
15-
alias Membrane.{AAC, MP4}
15+
alias Membrane.{AAC, Buffer, MP4}
1616

1717
@supported_protocols ["rtmp://", "rtmps://"]
1818
@connection_attempt_interval 500
1919
@default_state %{
2020
attempts: 0,
2121
native: nil,
22-
buffered_frame: nil,
23-
ready: false,
24-
current_timestamps: %{}
22+
# Keys here are the pad names.
23+
frame_buffer: %{audio: nil, video: nil},
24+
ready?: false,
25+
# Activated when one of the source inputs gets closed. Interleaving is
26+
# disabled, frame buffer is flushed and from that point buffers on the
27+
# remaining pad are simply forwarded to the output.
28+
forward_mode?: false
2529
}
2630

2731
def_input_pad :audio,
@@ -130,52 +134,52 @@ defmodule Membrane.RTMP.Sink do
130134

131135
@impl true
132136
def handle_write(pad, buffer, _ctx, %{ready: false} = state) do
133-
state = Map.put(state, :buffered_frame, {pad, buffer})
134-
{:ok, state}
137+
{:ok, fill_frame_buffer(state, pad, buffer)}
135138
end
136139

137-
@impl true
138-
def handle_write(
139-
pad,
140-
buffer,
141-
_ctx,
142-
%{ready: true, buffered_frame: {frame_pad, frame}} = state
143-
) do
144-
state = write_frame(state, frame_pad, frame) |> write_frame(pad, buffer)
145-
{{:ok, demand: get_demand(state)}, Map.put(state, :buffered_frame, nil)}
140+
def handle_write(pad, buffer, _ctx, %{forward_mode?: true} = state) do
141+
{{:ok, demand: pad}, write_frame(state, pad, buffer)}
146142
end
147143

148-
@impl true
149144
def handle_write(pad, buffer, _ctx, state) do
150-
state = write_frame(state, pad, buffer)
151-
{{:ok, demand: get_demand(state)}, state}
145+
state
146+
|> fill_frame_buffer(pad, buffer)
147+
|> write_frame_interleaved()
152148
end
153149

154150
@impl true
155-
def handle_end_of_stream(pad, ctx, state) do
156-
if ctx.pads |> Map.values() |> Enum.all?(& &1.end_of_stream?) do
151+
def handle_end_of_stream(pad, _ctx, state) do
152+
if state.forward_mode? do
157153
Native.finalize_stream(state.native)
158154
{:ok, state}
159155
else
160-
state = Map.update!(state, :current_timestamps, &Map.delete(&1, pad))
161-
{{:ok, demand: get_demand(state)}, state}
156+
# The interleave logic does not work if either one of the inputs does not
157+
# produce buffers. From this point on we act as a "forward" filter.
158+
other_pad =
159+
case pad do
160+
:audio -> :video
161+
:video -> :audio
162+
end
163+
164+
state = flush_frame_buffer(state)
165+
{{:ok, demand: other_pad}, %{state | forward_mode?: true}}
162166
end
163167
end
164168

165169
@impl true
166170
def handle_other(:try_connect, _ctx, %{attempts: attempts, max_attempts: max_attempts} = state)
167171
when max_attempts != :infinity and attempts >= max_attempts do
168-
raise "Failed to connect to '#{state.rtmp_url}' #{attempts} times, aborting"
172+
raise "failed to connect to '#{state.rtmp_url}' #{attempts} times, aborting"
169173
end
170174

171-
def handle_other(:try_connect, ctx, state) do
175+
def handle_other(:try_connect, _ctx, state) do
172176
state = %{state | attempts: state.attempts + 1}
173177

174178
case Native.try_connect(state.native) do
175179
:ok ->
176180
Membrane.Logger.debug("Correctly initialized connection with: #{state.rtmp_url}")
177-
demands = ctx.pads |> Map.keys() |> Enum.map(&{:demand, &1})
178-
{{:ok, [{:playback_change, :resume} | demands]}, state}
181+
182+
{{:ok, [{:playback_change, :resume} | build_demand(state)]}, state}
179183

180184
{:error, error} when error in [:econnrefused, :etimedout] ->
181185
Process.send_after(self(), :try_connect, @connection_attempt_interval)
@@ -187,22 +191,70 @@ defmodule Membrane.RTMP.Sink do
187191
{:ok, state}
188192

189193
{:error, reason} ->
190-
raise "Failed to connect to '#{state.rtmp_url}': #{reason}"
194+
raise "failed to connect to '#{state.rtmp_url}': #{reason}"
195+
end
196+
end
197+
198+
defp build_demand(%{frame_buffer: frame_buffer}) do
199+
frame_buffer
200+
|> Enum.filter(fn {_pad, buffer} -> buffer == nil end)
201+
|> Enum.map(fn {pad, _} -> {:demand, pad} end)
202+
end
203+
204+
defp fill_frame_buffer(state, pad, buffer) do
205+
if get_in(state, [:frame_buffer, pad]) == nil do
206+
put_in(state, [:frame_buffer, pad], buffer)
207+
else
208+
raise "attempted to overwrite frame buffer on pad #{inspect(pad)}"
191209
end
192210
end
193211

212+
defp write_frame_interleaved(state = %{frame_buffer: %{audio: audio, video: video}})
213+
when audio == nil or video == nil do
214+
# We still have to wait for the other frame.
215+
{:ok, state}
216+
end
217+
218+
defp write_frame_interleaved(%{frame_buffer: frame_buffer} = state) do
219+
{pad, buffer} =
220+
Enum.min_by(frame_buffer, fn {_, buffer} ->
221+
buffer
222+
|> Buffer.get_dts_or_pts()
223+
|> Ratio.ceil()
224+
end)
225+
226+
state =
227+
state
228+
|> write_frame(pad, buffer)
229+
|> put_in([:frame_buffer, pad], nil)
230+
231+
{{:ok, build_demand(state)}, state}
232+
end
233+
234+
defp flush_frame_buffer(%{frame_buffer: frame_buffer} = state) do
235+
pads_with_buffer =
236+
frame_buffer
237+
|> Enum.filter(fn {_pad, buffer} -> buffer != nil end)
238+
|> Enum.sort(fn {_, left}, {_, right} ->
239+
Buffer.get_dts_or_pts(left) <= Buffer.get_dts_or_pts(right)
240+
end)
241+
242+
Enum.reduce(pads_with_buffer, state, fn {pad, buffer}, state ->
243+
state
244+
|> write_frame(pad, buffer)
245+
|> put_in([:frame_buffer, pad], nil)
246+
end)
247+
end
248+
194249
defp write_frame(state, :audio, buffer) do
195250
buffer_pts = buffer.pts |> Ratio.ceil()
196251

197252
case Native.write_audio_frame(state.native, buffer.payload, buffer_pts) do
198253
{:ok, native} ->
199254
Map.put(state, :native, native)
200-
|> Map.update!(:current_timestamps, fn curr_tmps ->
201-
Map.put(curr_tmps, :audio, buffer_pts)
202-
end)
203255

204256
{:error, reason} ->
205-
raise("Writing audio frame failed with reason: #{reason}")
257+
raise "writing audio frame failed with reason: #{inspect(reason)}"
206258
end
207259
end
208260

@@ -216,19 +268,9 @@ defmodule Membrane.RTMP.Sink do
216268
) do
217269
{:ok, native} ->
218270
Map.put(state, :native, native)
219-
|> Map.update!(:current_timestamps, fn curr_tmps ->
220-
Map.put(curr_tmps, :video, buffer.dts)
221-
end)
222271

223272
{:error, reason} ->
224-
raise("Writing video frame failed with reason: #{reason}")
273+
raise "writing video frame failed with reason: #{inspect(reason)}"
225274
end
226275
end
227-
228-
defp get_demand(state) do
229-
{pad, _timestamp} =
230-
state.current_timestamps |> Enum.min_by(fn {_pad, timestamp} -> timestamp end)
231-
232-
{pad, 1}
233-
end
234276
end

mix.exs

+2
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ defmodule Membrane.RTMP.Mixfile do
4343
{:membrane_h264_ffmpeg_plugin, "~> 0.22.0"},
4444
{:membrane_aac_plugin, "~> 0.12.1"},
4545
{:membrane_mp4_plugin, "~> 0.16.0"},
46+
{:membrane_file_plugin, "~> 0.12.0"},
4647
# testing
4748
{:membrane_hackney_plugin, "~> 0.8.0", only: :test},
4849
{:ffmpex, "~> 0.7", only: :test},
50+
{:membrane_stream_plugin, "~> 0.1", only: :test},
4951
# development
5052
{:ex_doc, "~> 0.28", only: :dev, runtime: false},
5153
{:dialyxir, "~> 1.1", only: :dev, runtime: false},

mix.lock

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
2323
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
2424
"membrane_aac_format": {:hex, :membrane_aac_format, "0.7.0", "93ea75b57d5cbee5db59f0250baba65a8949d32ee4cfa38224e93ea306c30048", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "3233d2a5a1b264039a198abcef3b7f1c64f5bde9704ec01afbf1bab43a56d415"},
25-
"membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.12.1", "3cca913e084e331015513a6bf5fc83e40c92fbc97843d2f28ac4c4ac406b2cdb", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:crc, "~> 0.10.2", [hex: :crc, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "f6e097402e60e450b970a6ef4f8161161e0e3ba16de371ed15166237d4bb5226"},
25+
"membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.12.2", "f688b8001e010377f33d4fd9ca9983f77a31b193f742c116923b51ffc07f1c0c", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:crc, "~> 0.10.2", [hex: :crc, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "cde8f7c5afdca51b25fa656778f57631074a4195e27ee1940ebf336e3740bede"},
2626
"membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.6.1", "89d130b5f8786d4285d395697b0f7763a2c82a02de1658cdeb4f8e37e6a6c85c", [:mix], [], "hexpm", "e916e3c8216f3bf999b069ffda94da48c9bdbe3181ce7155a458d1ccf1a97b3d"},
2727
"membrane_common_c": {:hex, :membrane_common_c, "0.13.0", "c314623f93209eb2fa092379954c686f6e50ac89baa48360f836d24f4d53f5ee", [:mix], [{:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "90181fbbe481ccd0a4a76daf0300f8ad1b5b0bf0ebd8b42c133904f8839663ca"},
2828
"membrane_core": {:hex, :membrane_core, "0.10.2", "d2d17039f6df746e4a3c47da32f51867fbafe528272cdd9b226d16b1032bc337", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6a4f290f919ada66c772807d64d5830be2962b7c13a2f2bc9ace416a1cd19ee1"},
@@ -34,6 +34,7 @@
3434
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.16.2", "e893c0c107b0ec9f00918e8c8927d0cebb15f6cb6f91d08bef5c9a7c333f701d", [:mix], [{:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.6.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.12.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.3", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.7.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}], "hexpm", "3c3726e551eb14f8368d93849b79a765d7a08c66d29c33750cbc3e1696ea7b90"},
3535
"membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"},
3636
"membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.2.0", "cda8eb207cf65c93690a19001aba3edbb2ba5d22abc8068a1f6a785ba871e8cf", [:mix], [], "hexpm", "6b716fc24f60834323637c95aaaa0f99be23fcc6a84a21af70195ef50185b634"},
37+
"membrane_stream_plugin": {:hex, :membrane_stream_plugin, "0.1.0", "0a21e36c4523fae2c514d2661e49a0630f548dcdfe5d18bbf07a8c15e6d1b036", [:mix], [{:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "303658ac9e7915b77ed0a127df2813bdc3d57a5d287bbc0328506c21021138c4"},
3738
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
3839
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
3940
"mockery": {:hex, :mockery, "2.3.1", "a02fd60b10ac9ed37a7a2ecf6786c1f1dd5c75d2b079a60594b089fba32dc087", [:mix], [], "hexpm", "1d0971d88ebf084e962da3f2cfee16f0ea8e04ff73a7710428500d4500b947fa"},

test/fixtures/audio.msr

89.9 KB
Binary file not shown.

test/fixtures/video.msr

1.33 MB
Binary file not shown.

test/membrane_rtmp_plugin/rtmp_sink_test.exs

+54
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,30 @@ defmodule Membrane.RTMP.Sink.Test do
1717
%{flv_output_file: flv_output_file}
1818
end
1919

20+
@tag :tmp_dir
21+
test "Checks if audio and video are interleaved correctly", %{tmp_dir: tmp_dir} do
22+
output_file = Path.join(tmp_dir, "rtmp_sink_interleave_test.flv")
23+
24+
rtmp_server = Task.async(fn -> start_rtmp_server(output_file) end)
25+
{:ok, sink_pipeline_pid} = start_interleaving_sink_pipeline(@rtmp_server_url)
26+
27+
# There's an RC - there's no way to ensure RTMP server starts to listen before pipeline is started
28+
# so it may retry a few times before succeeding
29+
assert_pipeline_playback_changed(sink_pipeline_pid, :prepared, :playing, 5000)
30+
31+
assert_start_of_stream(sink_pipeline_pid, :rtmp_sink, :video, 5_000)
32+
assert_start_of_stream(sink_pipeline_pid, :rtmp_sink, :audio, 5_000)
33+
34+
assert_end_of_stream(sink_pipeline_pid, :rtmp_sink, :video, 5_000)
35+
assert_end_of_stream(sink_pipeline_pid, :rtmp_sink, :audio, 5_000)
36+
37+
Membrane.Testing.Pipeline.terminate(sink_pipeline_pid, blocking?: true)
38+
# RTMP server should terminate when the connection is closed
39+
assert :ok = Task.await(rtmp_server)
40+
41+
assert File.exists?(output_file)
42+
end
43+
2044
@tag :tmp_dir
2145
test "Check if audio and video streams are correctly received by RTMP server instance", %{
2246
flv_output_file: flv_output_file
@@ -43,6 +67,36 @@ defmodule Membrane.RTMP.Sink.Test do
4367
assert File.stat!(flv_output_file).size == File.stat!(@reference_flv_path).size
4468
end
4569

70+
defp start_interleaving_sink_pipeline(rtmp_url) do
71+
import Membrane.ParentSpec
72+
73+
options = [
74+
children: [
75+
rtmp_sink: %Membrane.RTMP.Sink{rtmp_url: rtmp_url, max_attempts: 5}
76+
],
77+
links: [
78+
link(:video_source, %Membrane.File.Source{location: "test/fixtures/video.msr"})
79+
|> to(:video_deserializer, Membrane.Stream.Deserializer)
80+
|> to(:video_parser, %Membrane.H264.FFmpeg.Parser{
81+
alignment: :au,
82+
attach_nalus?: true,
83+
skip_until_parameters?: true
84+
})
85+
|> to(:video_payloader, Membrane.MP4.Payloader.H264)
86+
|> via_in(:video)
87+
|> to(:rtmp_sink),
88+
link(:audio_source, %Membrane.File.Source{location: "test/fixtures/audio.msr"})
89+
|> to(:audio_deserializer, Membrane.Stream.Deserializer)
90+
|> to(:audio_parser, Membrane.AAC.Parser)
91+
|> via_in(:audio)
92+
|> to(:rtmp_sink)
93+
],
94+
test_process: self()
95+
]
96+
97+
Pipeline.start_link(options)
98+
end
99+
46100
defp start_sink_pipeline(rtmp_url) do
47101
import Membrane.ParentSpec
48102

0 commit comments

Comments
 (0)