Skip to content

Commit 8a24609

Browse files
author
Paulo Morgado
committed
Enhance memory management and reliability in RTSP server
- Refactored audio and video track handling to utilize `ReadOnlySequence<byte>` for better performance and memory efficiency. - Introduced `AdjustedSizeMemoryOwner` for effective memory allocation. - Updated method signatures for RTP packet creation and sample feeding to accept `ReadOnlySequence<byte>`, ensuring consistency and optimizing memory usage. - Modified `RTPPacketUtil` to read timestamps from `ReadOnlySpan<byte>`, enhancing performance. - Overall code structure refined for improved readability and memory management.
1 parent 75ff42b commit 8a24609

File tree

13 files changed

+179
-117
lines changed

13 files changed

+179
-117
lines changed

src/RTSPServerApp/Program.cs

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using SharpMp4;
33
using SharpRTSPServer;
44
using System;
5+
using System.Buffers;
56
using System.Collections.Generic;
67
using System.IO;
78
using System.Linq;
@@ -51,7 +52,7 @@
5152
else
5253
{
5354
var h265VisualSample = videoTrackBox.GetMdia().GetMinf().GetStbl().GetStsd().Children.FirstOrDefault(x => x.Type == VisualSampleEntryBox.TYPE6 || x.Type == VisualSampleEntryBox.TYPE7) as VisualSampleEntryBox;
54-
if(h265VisualSample != null)
55+
if (h265VisualSample != null)
5556
{
5657
rtspVideoTrack = new SharpRTSPServer.H265Track();
5758
server.AddVideoTrack(rtspVideoTrack);
@@ -60,9 +61,9 @@
6061
{
6162
throw new NotSupportedException("No supported video found!");
6263
}
63-
}
64+
}
6465
}
65-
66+
6667
if (audioTrackBox != null)
6768
{
6869
audioTrackId = fmp4.FindAudioTrackID().First();
@@ -76,7 +77,7 @@
7677
server.AddAudioTrack(rtspAudioTrack);
7778
}
7879
else
79-
{
80+
{
8081
// unsupported audio
8182
}
8283
}
@@ -92,7 +93,7 @@
9293
{
9394
var videoSamplingRate = SharpRTSPServer.H264Track.DEFAULT_CLOCK;
9495
var videoSampleDuration = videoSamplingRate / videoFrameRate;
95-
var videoTrack = parsedMDAT[videoTrackId];
96+
var videoTrack = parsedMDAT[videoTrackId];
9697
videoTimer = new Timer(videoSampleDuration * 1000 / videoSamplingRate);
9798
videoTimer.Elapsed += (s, e) =>
9899
{
@@ -109,7 +110,7 @@
109110
videoIndex++;
110111
}
111112

112-
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), (List<byte[]>)videoTrack[videoIndex++ % videoTrack.Count]);
113+
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), CreateReadOnlySequence(videoTrack[videoIndex++ % videoTrack.Count]));
113114

114115
if (videoIndex % videoTrack.Count == 0)
115116
{
@@ -125,7 +126,7 @@
125126
audioTimer = new Timer(audioSampleDuration * 1000 / (rtspAudioTrack as SharpRTSPServer.AACTrack).SamplingRate);
126127
audioTimer.Elapsed += (s, e) =>
127128
{
128-
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new List<byte[]>() { audioTrack[0][audioIndex++ % audioTrack[0].Count] });
129+
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new ReadOnlySequence<byte>(audioTrack[0][audioIndex++ % audioTrack[0].Count]));
129130

130131
if (audioIndex % audioTrack[0].Count == 0)
131132
{
@@ -151,7 +152,7 @@
151152
Console.WriteLine("Press any key to exit");
152153
while (!Console.KeyAvailable)
153154
{
154-
System.Threading.Thread.Sleep(250);
155+
System.Threading.Thread.Sleep(250);
155156
}
156157
}
157158

@@ -164,3 +165,44 @@ static void Reset(ref int videoIndex, Timer videoTimer, ref int audioIndex, Time
164165
videoTimer?.Start();
165166
audioTimer?.Start();
166167
}
168+
169+
170+
static ReadOnlySequence<byte> CreateReadOnlySequence(IList<byte[]> byteArrayList)
171+
{
172+
if (byteArrayList is not { Count: > 0 })
173+
{
174+
return ReadOnlySequence<byte>.Empty;
175+
}
176+
177+
if (byteArrayList.Count == 1)
178+
{
179+
return new ReadOnlySequence<byte>(byteArrayList[0]);
180+
}
181+
182+
var firstSegment = new SequenceSegment(new ReadOnlyMemory<byte>(byteArrayList[0]));
183+
var lastSegment = firstSegment;
184+
185+
for (var i = 1; i < byteArrayList.Count; i++)
186+
{
187+
var nextSegment = new SequenceSegment(new ReadOnlyMemory<byte>(byteArrayList[i]));
188+
lastSegment.Append(nextSegment);
189+
lastSegment = nextSegment;
190+
}
191+
192+
return new ReadOnlySequence<byte>(firstSegment, 0, lastSegment, lastSegment.Memory.Length);
193+
}
194+
195+
internal class SequenceSegment : ReadOnlySequenceSegment<byte>
196+
{
197+
public SequenceSegment(ReadOnlyMemory<byte> memory)
198+
{
199+
Memory = memory;
200+
RunningIndex = 0;
201+
}
202+
203+
public void Append(SequenceSegment next)
204+
{
205+
Next = next;
206+
next.RunningIndex = RunningIndex + Memory.Length;
207+
}
208+
}

src/RTSPServerFFmpeg/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ Task RunUdpClient(ProxyTrack track, Uri uri, CancellationToken cancellationToken
130130
{
131131
byte[] rtp = udpClient.Receive(ref remoteEndPoint);
132132
uint rtpTimestamp = RTPPacketUtil.ReadTS(rtp);
133-
track.FeedInRawSamples(rtpTimestamp, new List<byte[]>() { rtp });
133+
track.FeedInRawSamples(rtpTimestamp, new(rtp));
134134
}
135135
catch (Exception e)
136136
{

src/RTSPServerPcap/Program.cs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Extensions.Configuration;
55
using SharpRTSPServer;
66
using System;
7+
using System.Buffers;
78
using System.Collections.Generic;
89
using System.Diagnostics;
910
using System.IO;
@@ -87,7 +88,7 @@ void Reader_OnReadPacketEvent(object context, IPacket packet)
8788
{
8889
var ipHeader = ParseIPHeader(packet);
8990

90-
if(ipHeader.Protocol == 6) // TCP - RTSP
91+
if (ipHeader.Protocol == 6) // TCP - RTSP
9192
{
9293
var tcpHeader = ParseTCPHeader(packet, 4 + ipHeader.HeaderLength);
9394
Debug.WriteLine($"Source: {ipHeader.SourceIP}:{tcpHeader.SourcePort}, Dest: {ipHeader.DestintationIP}:{tcpHeader.DestinationPort}, Ver: {ipHeader.Version}");
@@ -113,7 +114,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
113114

114115
if (udp != null && data.Length > 1) // TODO
115116
{
116-
if(data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117+
if (data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117118
{
118119
long messageTime = seconds * 1000 + (microseconds / 1000);
119120
long realTime = (uint)_stopwatch.ElapsedMilliseconds;
@@ -132,9 +133,9 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
132133
{
133134
Thread.Sleep(sleep);
134135
}
135-
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
136+
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
136137
}
137-
else if(rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138+
else if (rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138139
{
139140
if (lastAudioMessageTime == -1)
140141
lastAudioMessageTime = messageTime;
@@ -149,7 +150,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
149150
Thread.Sleep(sleep);
150151
}
151152

152-
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
153+
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
153154
}
154155
}
155156
}
@@ -305,19 +306,19 @@ public bool Parse(string rtsp)
305306
{
306307
int.TryParse(line.Substring(CONTENT_LENGTH.Length).Trim(), out contentLength);
307308
}
308-
else if(line.StartsWith(TRANSPORT)) // SETUP response
309+
else if (line.StartsWith(TRANSPORT)) // SETUP response
309310
{
310311
int[] clientPorts = null;
311312
int[] serverPorts = null;
312313
string[] split = line.Substring(TRANSPORT.Length).Trim().Split(';');
313-
foreach(var s in split)
314+
foreach (var s in split)
314315
{
315316
string str = s.Trim();
316-
if(str.StartsWith(CLIENT_PORT))
317+
if (str.StartsWith(CLIENT_PORT))
317318
{
318319
clientPorts = str.Substring(CLIENT_PORT.Length).Split('-').Select(int.Parse).ToArray();
319320
}
320-
else if(str.StartsWith(SERVER_PORT))
321+
else if (str.StartsWith(SERVER_PORT))
321322
{
322323
serverPorts = str.Substring(SERVER_PORT.Length).Split('-').Select(int.Parse).ToArray();
323324
}
@@ -337,7 +338,7 @@ public bool Parse(string rtsp)
337338
if (ms.Position == ms.Length && contentLength > (ms.Length - ms.Position))
338339
{
339340
return true;
340-
}
341+
}
341342
}
342343
}
343344
}
@@ -364,7 +365,7 @@ public UDPHeader(ushort sourcePort, ushort destinationPort, ushort length, ushor
364365

365366
public class TCPHeader
366367
{
367-
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368+
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368369
{
369370
SourcePort = sourcePort;
370371
DestinationPort = destinationPort;
@@ -391,18 +392,18 @@ public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber,
391392
public class IPHeader
392393
{
393394
public IPHeader(
394-
int family,
395-
int version,
395+
int family,
396+
int version,
396397
int headerLength,
397-
byte differentiatedServicesField,
398-
ushort totalLength,
399-
ushort identification,
400-
byte flags,
401-
ushort fragmentOffset,
402-
byte ttl,
403-
byte protocol,
404-
ushort headerCheckSum,
405-
IPAddress sourceIP,
398+
byte differentiatedServicesField,
399+
ushort totalLength,
400+
ushort identification,
401+
byte flags,
402+
ushort fragmentOffset,
403+
byte ttl,
404+
byte protocol,
405+
ushort headerCheckSum,
406+
IPAddress sourceIP,
406407
IPAddress destintationIP)
407408
{
408409
Family = family;

src/SharpRTSPServer/AACTrack.cs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -117,20 +117,19 @@ public override StringBuilder BuildSDP(StringBuilder sdp)
117117
/// <param name="samples">An array of AAC fragments. By default single fragment is expected.</param>
118118
/// <param name="rtpTimestamp">RTP timestamp in the timescale of the track.</param>
119119
/// <returns>RTP packets.</returns>
120-
public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(List<byte[]> samples, uint rtpTimestamp)
120+
public override List<IMemoryOwner<byte>> CreateRtpPackets(ReadOnlySequence<byte> samples, uint rtpTimestamp)
121121
{
122-
List<Memory<byte>> rtpPackets = new List<Memory<byte>>();
123122
List<IMemoryOwner<byte>> memoryOwners = new List<IMemoryOwner<byte>>();
124123

125-
for (int i = 0; i < samples.Count; i++)
124+
foreach (var sample in samples)
126125
{
127126
// append AU header (required for AAC)
128-
var audioPacket = AppendAUHeader(samples[i]);
127+
var audioPacket = AppendAUHeader(sample.Span);
129128

130129
// Put the whole Audio Packet into one RTP packet.
131130
// 12 is header size when there are no CSRCs or extensions
132131
var size = 12 + audioPacket.Length;
133-
var owner = MemoryPool<byte>.Shared.Rent(size);
132+
var owner = AdjustedSizeMemoryOwner.Rent(size);
134133
memoryOwners.Add(owner);
135134

136135
var rtpPacket = owner.Memory.Slice(0, size);
@@ -148,22 +147,21 @@ public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(
148147

149148
// Now append the audio packet
150149
audioPacket.CopyTo(rtpPacket.Slice(12));
151-
152-
rtpPackets.Add(rtpPacket);
153150
}
154151

155-
return (rtpPackets, memoryOwners);
152+
return memoryOwners;
156153
}
157154

158-
private static byte[] AppendAUHeader(byte[] frame)
155+
private static byte[] AppendAUHeader(ReadOnlySpan<byte> frame)
159156
{
160157
short frameLen = (short)(frame.Length << 3);
161-
byte[] header = new byte[4];
158+
byte[] header = new byte[4 + frame.Length];
162159
header[0] = 0x00;
163160
header[1] = 0x10; // 16 bits size of the header
164161
header[2] = (byte)((frameLen >> 8) & 0xFF);
165162
header[3] = (byte)(frameLen & 0xFF);
166-
return header.Concat(frame).ToArray();
163+
frame.CopyTo(header.AsSpan(4));
164+
return header;
167165
}
168166

169167
private static int GetAACLevel(int samplingFrequency, int channelConfiguration)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using System;
2+
using System.Buffers;
3+
4+
namespace SharpRTSPServer
5+
{
6+
internal class AdjustedSizeMemoryOwner : IMemoryOwner<byte>
7+
{
8+
private readonly IMemoryOwner<byte> _wrapped;
9+
10+
private AdjustedSizeMemoryOwner(IMemoryOwner<byte> wrapped, int size)
11+
{
12+
_wrapped = wrapped;
13+
Memory = _wrapped.Memory.Slice(0, size);
14+
}
15+
16+
public Memory<byte> Memory { get; }
17+
18+
public void Dispose() => _wrapped.Dispose();
19+
20+
public static IMemoryOwner<byte> Rent(int size)
21+
{
22+
var memoryOwner = MemoryPool<byte>.Shared.Rent(size);
23+
24+
return memoryOwner.Memory.Length == size
25+
? memoryOwner
26+
: new AdjustedSizeMemoryOwner(memoryOwner, size);
27+
}
28+
}
29+
}

src/SharpRTSPServer/G711Track.cs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,15 @@ public override StringBuilder BuildSDP(StringBuilder sdp)
5959
/// <param name="samples">An array of PCMU fragments. By default single fragment is expected.</param>
6060
/// <param name="rtpTimestamp">RTP timestamp in the timescale of the track.</param>
6161
/// <returns>RTP packets.</returns>
62-
public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(List<byte[]> samples, uint rtpTimestamp)
62+
public override List<IMemoryOwner<byte>> CreateRtpPackets(ReadOnlySequence<byte> samples, uint rtpTimestamp)
6363
{
6464
List<Memory<byte>> rtpPackets = new List<Memory<byte>>();
6565
List<IMemoryOwner<byte>> memoryOwners = new List<IMemoryOwner<byte>>();
6666

67-
for (int i = 0; i < samples.Count; i++)
67+
foreach (var audioPacket in samples)
6868
{
69-
var audioPacket = samples[i];
7069
var size = 12 + audioPacket.Length;
71-
var owner = MemoryPool<byte>.Shared.Rent(size);
70+
var owner = AdjustedSizeMemoryOwner.Rent(size);
7271
memoryOwners.Add(owner);
7372

7473
var rtpPacket = owner.Memory.Slice(0, size);
@@ -86,7 +85,7 @@ public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(
8685
rtpPackets.Add(rtpPacket);
8786
}
8887

89-
return (rtpPackets, memoryOwners);
88+
return memoryOwners;
9089
}
9190
}
9291

@@ -144,19 +143,17 @@ public override StringBuilder BuildSDP(StringBuilder sdp)
144143
/// <param name="samples">An array of PCMA fragments. By default single fragment is expected.</param>
145144
/// <param name="rtpTimestamp">RTP timestamp in the timescale of the track.</param>
146145
/// <returns>RTP packets.</returns>
147-
public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(List<byte[]> samples, uint rtpTimestamp)
146+
public override List<IMemoryOwner<byte>> CreateRtpPackets(ReadOnlySequence<byte> samples, uint rtpTimestamp)
148147
{
149-
List<Memory<byte>> rtpPackets = new List<Memory<byte>>();
150148
List<IMemoryOwner<byte>> memoryOwners = new List<IMemoryOwner<byte>>();
151149

152-
for (int i = 0; i < samples.Count; i++)
150+
foreach (var audioPacket in samples)
153151
{
154-
var audioPacket = samples[i];
155152
var size = 12 + audioPacket.Length;
156-
var owner = MemoryPool<byte>.Shared.Rent(size);
153+
var owner = AdjustedSizeMemoryOwner.Rent(size);
157154
memoryOwners.Add(owner);
158155

159-
var rtpPacket = owner.Memory.Slice(0, size);
156+
var rtpPacket = owner.Memory;
160157

161158
const bool rtpPadding = false;
162159
const bool rtpHasExtension = false;
@@ -168,10 +165,9 @@ public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(
168165

169166
RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp);
170167
audioPacket.CopyTo(rtpPacket.Slice(12));
171-
rtpPackets.Add(rtpPacket);
172168
}
173169

174-
return (rtpPackets, memoryOwners);
170+
return memoryOwners;
175171
}
176172
}
177173
}

0 commit comments

Comments
 (0)