Skip to content

Commit 6825987

Browse files
Paulo Morgadopaulomorgado
authored andcommitted
Refactor RTP handling to use ReadOnlySequence<byte> for improved memory management and performance. Introduced BufferSegment and PooledByteBuffer classes for efficient memory management. Updated methods across various classes, including RTSPServerWorker, ProxyTrack, and track classes, to accept ReadOnlySequence<byte> and return IByteBuffer. Enhanced the FeedInRawRTP method in RTSPServer and IRtpSender for better raw RTP data handling. Added MemoryExtensions for utility functions. Overall improvements to code structure for readability and maintainability.
1 parent 0a651fe commit 6825987

File tree

16 files changed

+526
-305
lines changed

16 files changed

+526
-305
lines changed

src/RTSPServerApp/RTSPServerWorker.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using SharpMp4;
55
using SharpRTSPServer;
66
using System;
7+
using System.Buffers;
78
using System.Collections.Generic;
89
using System.IO;
910
using System.Linq;
@@ -156,9 +157,18 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
156157

157158
try
158159
{
159-
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), (List<byte[]>)videoTrack[videoIndex++ % videoTrack.Count]);
160+
using (var buffer = new PooledByteBuffer(initialBufferSize: 0))
161+
{
162+
foreach (var trackBytes in videoTrack[videoIndex++ % videoTrack.Count])
163+
{
164+
buffer.Write(trackBytes);
165+
buffer.Advance(trackBytes.Length);
166+
}
167+
168+
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), buffer.GetReadOnlySequence());
169+
}
160170
}
161-
catch(Exception ex)
171+
catch (Exception ex)
162172
{
163173
_logger.LogError(ex, $"FeedInRawSamples failed: {ex.Message}");
164174
}
@@ -167,7 +177,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
167177
{
168178
Reset(ref videoIndex, videoTimer, ref audioIndex, audioTimer);
169179
}
170-
};
180+
}
181+
;
171182
}
172183

173184
if (rtspAudioTrack != null)
@@ -177,7 +188,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
177188
audioTimer = new Timer(audioSampleDuration * 1000 / (rtspAudioTrack as SharpRTSPServer.AACTrack).SamplingRate);
178189
audioTimer.Elapsed += (s, e) =>
179190
{
180-
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new List<byte[]>() { audioTrack[0][audioIndex++ % audioTrack[0].Count] });
191+
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new ReadOnlySequence<byte>(audioTrack[0][audioIndex++ % audioTrack[0].Count]));
181192

182193
if (audioIndex % audioTrack[0].Count == 0)
183194
{

src/RTSPServerFFmpeg/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ Task RunUdpClient(ProxyTrack track, Uri uri, CancellationToken cancellationToken
130130
{
131131
byte[] rtp = udpClient.Receive(ref remoteEndPoint);
132132
uint rtpTimestamp = RTPPacketUtil.ReadTS(rtp);
133-
track.FeedInRawSamples(rtpTimestamp, new List<byte[]>() { rtp });
133+
track.FeedInRawSamples(rtpTimestamp, new(rtp));
134134
}
135135
catch (Exception e)
136136
{

src/RTSPServerPcap/Program.cs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Extensions.Configuration;
55
using SharpRTSPServer;
66
using System;
7+
using System.Buffers;
78
using System.Collections.Generic;
89
using System.Diagnostics;
910
using System.IO;
@@ -87,7 +88,7 @@ void Reader_OnReadPacketEvent(object context, IPacket packet)
8788
{
8889
var ipHeader = ParseIPHeader(packet);
8990

90-
if(ipHeader.Protocol == 6) // TCP - RTSP
91+
if (ipHeader.Protocol == 6) // TCP - RTSP
9192
{
9293
var tcpHeader = ParseTCPHeader(packet, 4 + ipHeader.HeaderLength);
9394
Debug.WriteLine($"Source: {ipHeader.SourceIP}:{tcpHeader.SourcePort}, Dest: {ipHeader.DestintationIP}:{tcpHeader.DestinationPort}, Ver: {ipHeader.Version}");
@@ -113,7 +114,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
113114

114115
if (udp != null && data.Length > 1) // TODO
115116
{
116-
if(data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117+
if (data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117118
{
118119
long messageTime = seconds * 1000 + (microseconds / 1000);
119120
long realTime = (uint)_stopwatch.ElapsedMilliseconds;
@@ -132,9 +133,9 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
132133
{
133134
Thread.Sleep(sleep);
134135
}
135-
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
136+
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
136137
}
137-
else if(rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138+
else if (rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138139
{
139140
if (lastAudioMessageTime == -1)
140141
lastAudioMessageTime = messageTime;
@@ -149,7 +150,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
149150
Thread.Sleep(sleep);
150151
}
151152

152-
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
153+
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
153154
}
154155
}
155156
}
@@ -305,19 +306,19 @@ public bool Parse(string rtsp)
305306
{
306307
int.TryParse(line.Substring(CONTENT_LENGTH.Length).Trim(), out contentLength);
307308
}
308-
else if(line.StartsWith(TRANSPORT)) // SETUP response
309+
else if (line.StartsWith(TRANSPORT)) // SETUP response
309310
{
310311
int[] clientPorts = null;
311312
int[] serverPorts = null;
312313
string[] split = line.Substring(TRANSPORT.Length).Trim().Split(';');
313-
foreach(var s in split)
314+
foreach (var s in split)
314315
{
315316
string str = s.Trim();
316-
if(str.StartsWith(CLIENT_PORT))
317+
if (str.StartsWith(CLIENT_PORT))
317318
{
318319
clientPorts = str.Substring(CLIENT_PORT.Length).Split('-').Select(int.Parse).ToArray();
319320
}
320-
else if(str.StartsWith(SERVER_PORT))
321+
else if (str.StartsWith(SERVER_PORT))
321322
{
322323
serverPorts = str.Substring(SERVER_PORT.Length).Split('-').Select(int.Parse).ToArray();
323324
}
@@ -337,7 +338,7 @@ public bool Parse(string rtsp)
337338
if (ms.Position == ms.Length && contentLength > (ms.Length - ms.Position))
338339
{
339340
return true;
340-
}
341+
}
341342
}
342343
}
343344
}
@@ -364,7 +365,7 @@ public UDPHeader(ushort sourcePort, ushort destinationPort, ushort length, ushor
364365

365366
public class TCPHeader
366367
{
367-
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368+
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368369
{
369370
SourcePort = sourcePort;
370371
DestinationPort = destinationPort;
@@ -391,18 +392,18 @@ public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber,
391392
public class IPHeader
392393
{
393394
public IPHeader(
394-
int family,
395-
int version,
395+
int family,
396+
int version,
396397
int headerLength,
397-
byte differentiatedServicesField,
398-
ushort totalLength,
399-
ushort identification,
400-
byte flags,
401-
ushort fragmentOffset,
402-
byte ttl,
403-
byte protocol,
404-
ushort headerCheckSum,
405-
IPAddress sourceIP,
398+
byte differentiatedServicesField,
399+
ushort totalLength,
400+
ushort identification,
401+
byte flags,
402+
ushort fragmentOffset,
403+
byte ttl,
404+
byte protocol,
405+
ushort headerCheckSum,
406+
IPAddress sourceIP,
406407
IPAddress destintationIP)
407408
{
408409
Family = family;

src/SharpRTSPServer/AACTrack.cs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -117,53 +117,49 @@ public override StringBuilder BuildSDP(StringBuilder sdp)
117117
/// <param name="samples">An array of AAC fragments. By default single fragment is expected.</param>
118118
/// <param name="rtpTimestamp">RTP timestamp in the timescale of the track.</param>
119119
/// <returns>RTP packets.</returns>
120-
public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(List<byte[]> samples, uint rtpTimestamp)
120+
public override IByteBuffer CreateRtpPackets(ReadOnlySequence<byte> samples, uint rtpTimestamp)
121121
{
122-
List<Memory<byte>> rtpPackets = new List<Memory<byte>>();
123-
List<IMemoryOwner<byte>> memoryOwners = new List<IMemoryOwner<byte>>();
122+
var byteBuffer = new PooledByteBuffer(initialBufferSize: 0);
124123

125-
for (int i = 0; i < samples.Count; i++)
124+
foreach (var sample in samples)
126125
{
127126
// append AU header (required for AAC)
128-
var audioPacket = AppendAUHeader(samples[i]);
127+
var audioPacket = AppendAUHeader(sample.Span);
129128

130129
// Put the whole Audio Packet into one RTP packet.
131130
// 12 is header size when there are no CSRCs or extensions
132131
var size = 12 + audioPacket.Length;
133-
var owner = MemoryPool<byte>.Shared.Rent(size);
134-
memoryOwners.Add(owner);
135-
136-
var rtpPacket = owner.Memory.Slice(0, size);
132+
var rtpPacket = byteBuffer.GetSpan(size).Slice(0, size);
133+
byteBuffer.Advance(size);
137134

138135
const bool rtpPadding = false;
139136
const bool rtpHasExtension = false;
140137
int rtpCsrcCount = 0;
141138
const bool rtpMarker = true; // always 1 as this is the last (and only) RTP packet for this audio timestamp
142139

143-
RTPPacketUtil.WriteHeader(rtpPacket.Span,
140+
RTPPacketUtil.WriteHeader(rtpPacket,
144141
RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, rtpMarker, PayloadType);
145142

146143
// sequence number is set just before send
147-
RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp);
144+
RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp);
148145

149146
// Now append the audio packet
150147
audioPacket.CopyTo(rtpPacket.Slice(12));
151-
152-
rtpPackets.Add(rtpPacket);
153148
}
154149

155-
return (rtpPackets, memoryOwners);
150+
return byteBuffer;
156151
}
157152

158-
private static byte[] AppendAUHeader(byte[] frame)
153+
private static byte[] AppendAUHeader(ReadOnlySpan<byte> frame)
159154
{
160155
short frameLen = (short)(frame.Length << 3);
161-
byte[] header = new byte[4];
156+
byte[] header = new byte[4 + frame.Length];
162157
header[0] = 0x00;
163158
header[1] = 0x10; // 16 bits size of the header
164159
header[2] = (byte)((frameLen >> 8) & 0xFF);
165160
header[3] = (byte)(frameLen & 0xFF);
166-
return header.Concat(frame).ToArray();
161+
frame.CopyTo(header.AsSpan(4));
162+
return header;
167163
}
168164

169165
private static int GetAACLevel(int samplingFrequency, int channelConfiguration)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
4+
using System.Text;
5+
using static System.Net.Mime.MediaTypeNames;
6+
7+
namespace SharpRTSPServer
8+
{
9+
/// <summary>
10+
/// Represents a segment of a <see cref="ReadOnlySequenceSegment{testc}"/> of <see langword="byte"/> backed by a <see cref="IMemoryOwner{testc}"/> of <see langword="byte"/>.
11+
/// </summary>
12+
public sealed class BufferSegment : ReadOnlySequenceSegment<byte>, IDisposable
13+
{
14+
public static readonly BufferSegment Empty = new BufferSegment();
15+
private readonly IMemoryOwner<byte> _owner;
16+
17+
private BufferSegment()
18+
{
19+
}
20+
21+
/// <summary>
22+
/// Initializes a new instance of the <see cref="BufferSegment"/> class.
23+
/// </summary>
24+
/// <param name="owner">The memory owner that provides the memory for the segment.</param>
25+
/// <param name="size">The size of the memory slice to use for the segment.</param>
26+
public BufferSegment(IMemoryOwner<byte> owner, int size)
27+
{
28+
Memory = owner.Memory.Slice(0, size);
29+
_owner = owner;
30+
}
31+
32+
/// <summary>
33+
/// Appends a new segment to the current segment.
34+
/// </summary>
35+
/// <param name="owner">The memory owner that provides the memory for the new segment.</param>
36+
/// <param name="size">The size of the memory slice to use for the new segment.</param>
37+
/// <returns>The newly created <see cref="BufferSegment"/>.</returns>
38+
public BufferSegment Append(IMemoryOwner<byte> owner, int size)
39+
{
40+
var segment = new BufferSegment(owner, size);
41+
segment.RunningIndex = RunningIndex + Memory.Length;
42+
Next = segment;
43+
return segment;
44+
}
45+
46+
/// <summary>
47+
/// Releases the resources used by the <see cref="BufferSegment"/>.
48+
/// </summary>
49+
public void Dispose()
50+
{
51+
_owner?.Dispose();
52+
}
53+
}
54+
}

src/SharpRTSPServer/G711Track.cs

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -59,34 +59,29 @@ public override StringBuilder BuildSDP(StringBuilder sdp)
5959
/// <param name="samples">An array of PCMU fragments. By default single fragment is expected.</param>
6060
/// <param name="rtpTimestamp">RTP timestamp in the timescale of the track.</param>
6161
/// <returns>RTP packets.</returns>
62-
public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(List<byte[]> samples, uint rtpTimestamp)
62+
public override IByteBuffer CreateRtpPackets(ReadOnlySequence<byte> samples, uint rtpTimestamp)
6363
{
64-
List<Memory<byte>> rtpPackets = new List<Memory<byte>>();
65-
List<IMemoryOwner<byte>> memoryOwners = new List<IMemoryOwner<byte>>();
64+
var byteBuffer = new PooledByteBuffer(initialBufferSize: 0);
6665

67-
for (int i = 0; i < samples.Count; i++)
66+
foreach (var audioPacket in samples)
6867
{
69-
var audioPacket = samples[i];
7068
var size = 12 + audioPacket.Length;
71-
var owner = MemoryPool<byte>.Shared.Rent(size);
72-
memoryOwners.Add(owner);
73-
74-
var rtpPacket = owner.Memory.Slice(0, size);
69+
var rtpPacket = byteBuffer.GetSpan(size).Slice(0, size);
70+
byteBuffer.Advance(size);
7571

7672
const bool rtpPadding = false;
7773
const bool rtpHasExtension = false;
7874
int rtpCsrcCount = 0;
7975
const bool rtpMarker = true;
8076

81-
RTPPacketUtil.WriteHeader(rtpPacket.Span,
77+
RTPPacketUtil.WriteHeader(rtpPacket,
8278
RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, rtpMarker, PayloadType);
8379

84-
RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp);
85-
audioPacket.CopyTo(rtpPacket.Slice(12));
86-
rtpPackets.Add(rtpPacket);
80+
RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp);
81+
audioPacket.Span.CopyTo(rtpPacket.Slice(12));
8782
}
8883

89-
return (rtpPackets, memoryOwners);
84+
return byteBuffer;
9085
}
9186
}
9287

@@ -144,34 +139,29 @@ public override StringBuilder BuildSDP(StringBuilder sdp)
144139
/// <param name="samples">An array of PCMA fragments. By default single fragment is expected.</param>
145140
/// <param name="rtpTimestamp">RTP timestamp in the timescale of the track.</param>
146141
/// <returns>RTP packets.</returns>
147-
public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(List<byte[]> samples, uint rtpTimestamp)
142+
public override IByteBuffer CreateRtpPackets(ReadOnlySequence<byte> samples, uint rtpTimestamp)
148143
{
149-
List<Memory<byte>> rtpPackets = new List<Memory<byte>>();
150-
List<IMemoryOwner<byte>> memoryOwners = new List<IMemoryOwner<byte>>();
144+
var byteBuffer = new PooledByteBuffer(initialBufferSize: 0);
151145

152-
for (int i = 0; i < samples.Count; i++)
146+
foreach (var audioPacket in samples)
153147
{
154-
var audioPacket = samples[i];
155148
var size = 12 + audioPacket.Length;
156-
var owner = MemoryPool<byte>.Shared.Rent(size);
157-
memoryOwners.Add(owner);
158-
159-
var rtpPacket = owner.Memory.Slice(0, size);
149+
var rtpPacket = byteBuffer.GetSpan(size).Slice(0, size);
150+
byteBuffer.Advance(size);
160151

161152
const bool rtpPadding = false;
162153
const bool rtpHasExtension = false;
163154
int rtpCsrcCount = 0;
164155
const bool rtpMarker = true;
165156

166-
RTPPacketUtil.WriteHeader(rtpPacket.Span,
157+
RTPPacketUtil.WriteHeader(rtpPacket,
167158
RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, rtpMarker, PayloadType);
168159

169-
RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp);
170-
audioPacket.CopyTo(rtpPacket.Slice(12));
171-
rtpPackets.Add(rtpPacket);
160+
RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp);
161+
audioPacket.Span.CopyTo(rtpPacket.Slice(12));
172162
}
173163

174-
return (rtpPackets, memoryOwners);
164+
return byteBuffer;
175165
}
176166
}
177167
}

0 commit comments

Comments
 (0)