Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/RTSPServerApp/RTSPServerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using SharpMp4;
using SharpRTSPServer;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand Down Expand Up @@ -156,9 +157,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

try
{
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), (List<byte[]>)videoTrack[videoIndex++ % videoTrack.Count]);
using (var buffer = new PooledByteBuffer(initialBufferSize: 0))
{
foreach (var trackBytes in videoTrack[videoIndex++ % videoTrack.Count])
{
buffer.Write(trackBytes);
}

rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), buffer.GetReadOnlySequence());
}
}
catch(Exception ex)
catch (Exception ex)
{
_logger.LogError(ex, $"FeedInRawSamples failed: {ex.Message}");
}
Expand All @@ -167,7 +176,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Reset(ref videoIndex, videoTimer, ref audioIndex, audioTimer);
}
};
}
;
}

if (rtspAudioTrack != null)
Expand All @@ -177,7 +187,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
audioTimer = new Timer(audioSampleDuration * 1000 / (rtspAudioTrack as SharpRTSPServer.AACTrack).SamplingRate);
audioTimer.Elapsed += (s, e) =>
{
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new List<byte[]>() { audioTrack[0][audioIndex++ % audioTrack[0].Count] });
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new ReadOnlySequence<byte>(audioTrack[0][audioIndex++ % audioTrack[0].Count]));

if (audioIndex % audioTrack[0].Count == 0)
{
Expand Down
2 changes: 1 addition & 1 deletion src/RTSPServerFFmpeg/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Task RunUdpClient(ProxyTrack track, Uri uri, CancellationToken cancellationToken
{
byte[] rtp = udpClient.Receive(ref remoteEndPoint);
uint rtpTimestamp = RTPPacketUtil.ReadTS(rtp);
track.FeedInRawSamples(rtpTimestamp, new List<byte[]>() { rtp });
track.FeedInRawSamples(rtpTimestamp, new(rtp));
}
catch (Exception e)
{
Expand Down
45 changes: 23 additions & 22 deletions src/RTSPServerPcap/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Extensions.Configuration;
using SharpRTSPServer;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
Expand Down Expand Up @@ -87,7 +88,7 @@ void Reader_OnReadPacketEvent(object context, IPacket packet)
{
var ipHeader = ParseIPHeader(packet);

if(ipHeader.Protocol == 6) // TCP - RTSP
if (ipHeader.Protocol == 6) // TCP - RTSP
{
var tcpHeader = ParseTCPHeader(packet, 4 + ipHeader.HeaderLength);
Debug.WriteLine($"Source: {ipHeader.SourceIP}:{tcpHeader.SourcePort}, Dest: {ipHeader.DestintationIP}:{tcpHeader.DestinationPort}, Ver: {ipHeader.Version}");
Expand All @@ -113,7 +114,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)

if (udp != null && data.Length > 1) // TODO
{
if(data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
if (data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
{
long messageTime = seconds * 1000 + (microseconds / 1000);
long realTime = (uint)_stopwatch.ElapsedMilliseconds;
Expand All @@ -132,9 +133,9 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
{
Thread.Sleep(sleep);
}
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
}
else if(rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
else if (rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
{
if (lastAudioMessageTime == -1)
lastAudioMessageTime = messageTime;
Expand All @@ -149,7 +150,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
Thread.Sleep(sleep);
}

audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
}
}
}
Expand Down Expand Up @@ -305,19 +306,19 @@ public bool Parse(string rtsp)
{
int.TryParse(line.Substring(CONTENT_LENGTH.Length).Trim(), out contentLength);
}
else if(line.StartsWith(TRANSPORT)) // SETUP response
else if (line.StartsWith(TRANSPORT)) // SETUP response
{
int[] clientPorts = null;
int[] serverPorts = null;
string[] split = line.Substring(TRANSPORT.Length).Trim().Split(';');
foreach(var s in split)
foreach (var s in split)
{
string str = s.Trim();
if(str.StartsWith(CLIENT_PORT))
if (str.StartsWith(CLIENT_PORT))
{
clientPorts = str.Substring(CLIENT_PORT.Length).Split('-').Select(int.Parse).ToArray();
}
else if(str.StartsWith(SERVER_PORT))
else if (str.StartsWith(SERVER_PORT))
{
serverPorts = str.Substring(SERVER_PORT.Length).Split('-').Select(int.Parse).ToArray();
}
Expand All @@ -337,7 +338,7 @@ public bool Parse(string rtsp)
if (ms.Position == ms.Length && contentLength > (ms.Length - ms.Position))
{
return true;
}
}
}
}
}
Expand All @@ -364,7 +365,7 @@ public UDPHeader(ushort sourcePort, ushort destinationPort, ushort length, ushor

public class TCPHeader
{
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
{
SourcePort = sourcePort;
DestinationPort = destinationPort;
Expand All @@ -391,18 +392,18 @@ public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber,
public class IPHeader
{
public IPHeader(
int family,
int version,
int family,
int version,
int headerLength,
byte differentiatedServicesField,
ushort totalLength,
ushort identification,
byte flags,
ushort fragmentOffset,
byte ttl,
byte protocol,
ushort headerCheckSum,
IPAddress sourceIP,
byte differentiatedServicesField,
ushort totalLength,
ushort identification,
byte flags,
ushort fragmentOffset,
byte ttl,
byte protocol,
ushort headerCheckSum,
IPAddress sourceIP,
IPAddress destintationIP)
{
Family = family;
Expand Down
161 changes: 161 additions & 0 deletions src/SharpRTSPClient/PooledByteBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
using System;
using System.Buffers;
using System.Collections.Generic;

namespace SharpRTSPClient
{
/// <summary>
/// A pooled buffer writer that implements <see cref="IBufferWriter{Byte}"/> using <see cref="ArrayPool{Byte}.Shared"/> for efficient writing of byte data and
/// allows reading the written content through a <see cref="ReadOnlySequence{Byte}"/> using the <see cref="GetReadOnlySequence"/> method.
/// </summary>
public sealed class PooledByteBuffer : IBufferWriter<byte>, IDisposable
{
private const int DefaultBufferSize = 4096;

private readonly List<byte[]> _buffers = new List<byte[]>();
private readonly ArrayPool<byte> _pool;
private int _currentIndex;
private int _currentOffset;
private bool _disposed;

/// <summary>
/// Initializes a new instance of the <see cref="PooledByteBuffer"/> class with an optional initial buffer size and array pool.
/// </summary>
/// <param name="initialBufferSize">The initial size of the buffer to rent from the pool. Defaults to 4096 bytes.</param>
/// <param name="pool">The array pool to use. If null, <see cref="ArrayPool{Byte}.Shared"/> is used.</param>
public PooledByteBuffer(int initialBufferSize = DefaultBufferSize, ArrayPool<byte> pool = null)
{
_pool = pool ?? ArrayPool<byte>.Shared;
AddNewBuffer(initialBufferSize);
}

/// <summary>
/// Notifies the buffer writer that <paramref name="count"/> bytes were written.
/// </summary>
/// <param name="count">The number of bytes written.</param>
/// <exception cref="ArgumentOutOfRangeException">Thrown if count is negative or exceeds the current buffer capacity.</exception>
public void Advance(int count)
{
if (count < 0 || _currentOffset + count > _buffers[_currentIndex].Length)
{
throw new ArgumentOutOfRangeException(nameof(count));
}

_currentOffset += count;
}

/// <summary>
/// Returns a <see cref="Memory{Byte}"/> buffer to write to, ensuring at least <paramref name="sizeHint"/> bytes are available.
/// </summary>
/// <param name="sizeHint">The minimum number of bytes required. May be 0.</param>
/// <returns>A writable memory buffer.</returns>
public Memory<byte> GetMemory(int sizeHint = 0)
{
EnsureCapacity(sizeHint);
return _buffers[_currentIndex].AsMemory(_currentOffset);
}

/// <summary>
/// Returns a <see cref="Span{Byte}"/> buffer to write to, ensuring at least <paramref name="sizeHint"/> bytes are available.
/// </summary>
/// <param name="sizeHint">The minimum number of bytes required. May be 0.</param>
/// <returns>A writable span buffer.</returns>
public Span<byte> GetSpan(int sizeHint = 0)
{
EnsureCapacity(sizeHint);
return _buffers[_currentIndex].AsSpan(_currentOffset);
}

/// <summary>
/// Returns a <see cref="ReadOnlySequence{Byte}"/> representing the written data across all buffers.
/// </summary>
/// <returns>A read-only sequence of bytes.</returns>
public ReadOnlySequence<byte> GetReadOnlySequence()
{
SequenceSegment first = null;
SequenceSegment last = null;

for (var i = 0; i < _buffers.Count; i++)
{
var buffer = _buffers[i];
var length = (i == _currentIndex) ? _currentOffset : buffer.Length;

if (length == 0)
{
continue;
}

var segment = new SequenceSegment(buffer.AsMemory(0, length));

if (first == null)
{
first = segment;
}

if (last != null)
{
last.SetNext(segment);
}

last = segment;
}

if (first == null || last == null)
{
return ReadOnlySequence<byte>.Empty;
}

return new ReadOnlySequence<byte>(first, 0, last, last.Memory.Length);
}

/// <summary>
/// Releases all buffers back to the pool and clears internal state.
/// </summary>
public void Dispose()
{
if (_disposed)
{
return;
}

foreach (var buffer in _buffers)
{
_pool.Return(buffer);
}

_buffers.Clear();
_disposed = true;
}

private void EnsureCapacity(int sizeHint)
{
if (_currentOffset + sizeHint > _buffers[_currentIndex].Length)
{
var newSize = Math.Max(sizeHint, DefaultBufferSize);
AddNewBuffer(newSize);
}
}

private void AddNewBuffer(int size)
{
var buffer = _pool.Rent(size);
_buffers.Add(buffer);
_currentIndex = _buffers.Count - 1;
_currentOffset = 0;
}

private class SequenceSegment : ReadOnlySequenceSegment<byte>
{
public SequenceSegment(ReadOnlyMemory<byte> memory)
{
Memory = memory;
}

public void SetNext(SequenceSegment next)
{
Next = next;
next.RunningIndex = RunningIndex + Memory.Length;
}
}
}
}
Loading