Skip to content

Commit 613c8fb

Browse files
committed
Initial support for SRTP.
1 parent ba2922c commit 613c8fb

16 files changed

+889
-169
lines changed

pkg/media/dtmf/dtmf.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,11 @@ func Decode(data []byte) (Event, error) {
161161
}, nil
162162
}
163163

164-
func DecodeRTP(p *rtp.Packet) (Event, bool) {
165-
if !p.Marker {
164+
func DecodeRTP(h *rtp.Header, payload []byte) (Event, bool) {
165+
if !h.Marker {
166166
return Event{}, false
167167
}
168-
ev, err := Decode(p.Payload)
168+
ev, err := Decode(payload)
169169
if err != nil {
170170
return Event{}, false
171171
}

pkg/media/rtp/conn.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package rtp
1616

1717
import (
1818
"net"
19+
"net/netip"
1920
"sync"
2021
"sync/atomic"
2122
"time"
@@ -131,9 +132,11 @@ func (c *Conn) Listen(portMin, portMax int, listenAddr string) error {
131132
if listenAddr == "" {
132133
listenAddr = "0.0.0.0"
133134
}
134-
135-
var err error
136-
c.conn, err = ListenUDPPortRange(portMin, portMax, net.ParseIP(listenAddr))
135+
ip, err := netip.ParseAddr(listenAddr)
136+
if err != nil {
137+
return err
138+
}
139+
c.conn, err = ListenUDPPortRange(portMin, portMax, ip)
137140
if err != nil {
138141
return err
139142
}
@@ -167,24 +170,23 @@ func (c *Conn) readLoop() {
167170
close(c.received)
168171
}
169172
if h := c.onRTP.Load(); h != nil {
170-
_ = (*h).HandleRTP(&p)
173+
_ = (*h).HandleRTP(&p.Header, p.Payload)
171174
}
172175
}
173176
}
174177

175-
func (c *Conn) WriteRTP(p *rtp.Packet) error {
178+
func (c *Conn) WriteRTP(h *rtp.Header, payload []byte) (int, error) {
176179
addr := c.dest.Load()
177180
if addr == nil {
178-
return nil
181+
return 0, nil
179182
}
180-
data, err := p.Marshal()
183+
data, err := (&rtp.Packet{Header: *h, Payload: payload}).Marshal()
181184
if err != nil {
182-
return err
185+
return 0, err
183186
}
184187
c.wmu.Lock()
185188
defer c.wmu.Unlock()
186-
_, err = c.conn.WriteToUDP(data, addr)
187-
return err
189+
return c.conn.WriteToUDP(data, addr)
188190
}
189191

190192
func (c *Conn) ReadRTP() (*rtp.Packet, *net.UDPAddr, error) {

pkg/media/rtp/jitter.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ package rtp
1717
import (
1818
"time"
1919

20-
"github.com/livekit/server-sdk-go/v2/pkg/jitter"
2120
"github.com/pion/rtp"
21+
22+
"github.com/livekit/server-sdk-go/v2/pkg/jitter"
2223
)
2324

2425
const (
@@ -41,11 +42,11 @@ type jitterHandler struct {
4142
buf *jitter.Buffer
4243
}
4344

44-
func (h *jitterHandler) HandleRTP(p *rtp.Packet) error {
45-
h.buf.Push(p)
45+
func (r *jitterHandler) HandleRTP(h *rtp.Header, payload []byte) error {
46+
r.buf.Push(&rtp.Packet{Header: *h, Payload: payload})
4647
var last error
47-
for _, p := range h.buf.Pop(false) {
48-
if err := h.h.HandleRTP(p); err != nil {
48+
for _, p := range r.buf.Pop(false) {
49+
if err := r.h.HandleRTP(&p.Header, p.Payload); err != nil {
4950
last = err
5051
}
5152
}

pkg/media/rtp/listen.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ import (
1818
"errors"
1919
"math/rand"
2020
"net"
21+
"net/netip"
2122
)
2223

2324
var ListenErr = errors.New("failed to listen on udp port")
2425

25-
func ListenUDPPortRange(portMin, portMax int, IP net.IP) (*net.UDPConn, error) {
26+
func ListenUDPPortRange(portMin, portMax int, ip netip.Addr) (*net.UDPConn, error) {
2627
if portMin == 0 && portMax == 0 {
2728
return net.ListenUDP("udp", &net.UDPAddr{
28-
IP: IP,
29+
IP: ip.AsSlice(),
2930
Port: 0,
3031
})
3132
}
@@ -48,7 +49,7 @@ func ListenUDPPortRange(portMin, portMax int, IP net.IP) (*net.UDPConn, error) {
4849
portCurrent := portStart
4950

5051
for {
51-
c, e := net.ListenUDP("udp", &net.UDPAddr{IP: IP, Port: portCurrent})
52+
c, e := net.ListenUDP("udp", &net.UDPAddr{IP: ip.AsSlice(), Port: portCurrent})
5253
if e == nil {
5354
return c, nil
5455
}

pkg/media/rtp/mux.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,25 @@ type Mux struct {
3535

3636
// HandleRTP selects a Handler based on payload type.
3737
// Types can be registered with Register. If no handler is set, a default one will be used.
38-
func (m *Mux) HandleRTP(p *rtp.Packet) error {
38+
func (m *Mux) HandleRTP(h *rtp.Header, payload []byte) error {
3939
if m == nil {
4040
return nil
4141
}
42-
var h Handler
42+
var r Handler
4343
m.mu.RLock()
44-
if p.PayloadType < byte(len(m.static)) {
45-
h = m.static[p.PayloadType]
44+
if h.PayloadType < byte(len(m.static)) {
45+
r = m.static[h.PayloadType]
4646
} else {
47-
h = m.dynamic[p.PayloadType]
47+
r = m.dynamic[h.PayloadType]
4848
}
49-
if h == nil {
50-
h = m.def
49+
if r == nil {
50+
r = m.def
5151
}
5252
m.mu.RUnlock()
53-
if h == nil {
53+
if r == nil {
5454
return nil
5555
}
56-
return h.HandleRTP(p)
56+
return r.HandleRTP(h, payload)
5757
}
5858

5959
// SetDefault sets a default RTP handler.

pkg/media/rtp/rtp.go

+25-24
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package rtp
1717
import (
1818
"fmt"
1919
"math/rand/v2"
20+
"slices"
2021
"sync"
2122

2223
"github.com/pion/interceptor"
@@ -31,21 +32,21 @@ type BytesFrame interface {
3132
}
3233

3334
type Writer interface {
34-
WriteRTP(p *rtp.Packet) error
35+
WriteRTP(h *rtp.Header, payload []byte) (int, error)
3536
}
3637

3738
type Reader interface {
3839
ReadRTP() (*rtp.Packet, interceptor.Attributes, error)
3940
}
4041

4142
type Handler interface {
42-
HandleRTP(p *rtp.Packet) error
43+
HandleRTP(h *rtp.Header, payload []byte) error
4344
}
4445

45-
type HandlerFunc func(p *rtp.Packet) error
46+
type HandlerFunc func(h *rtp.Header, payload []byte) error
4647

47-
func (fnc HandlerFunc) HandleRTP(p *rtp.Packet) error {
48-
return fnc(p)
48+
func (fnc HandlerFunc) HandleRTP(h *rtp.Header, payload []byte) error {
49+
return fnc(h, payload)
4950
}
5051

5152
func HandleLoop(r Reader, h Handler) error {
@@ -54,7 +55,7 @@ func HandleLoop(r Reader, h Handler) error {
5455
if err != nil {
5556
return err
5657
}
57-
err = h.HandleRTP(p)
58+
err = h.HandleRTP(&p.Header, p.Payload)
5859
if err != nil {
5960
return err
6061
}
@@ -64,26 +65,27 @@ func HandleLoop(r Reader, h Handler) error {
6465
// Buffer is a Writer that clones and appends RTP packets into a slice.
6566
type Buffer []*Packet
6667

67-
func (b *Buffer) WriteRTP(p *Packet) error {
68-
p2 := p.Clone()
69-
*b = append(*b, p2)
68+
func (b *Buffer) WriteRTP(h *rtp.Header, payload []byte) error {
69+
*b = append(*b, &rtp.Packet{
70+
Header: *h,
71+
Payload: slices.Clone(payload),
72+
})
7073
return nil
7174
}
7275

7376
// NewSeqWriter creates an RTP writer that automatically increments the sequence number.
7477
func NewSeqWriter(w Writer) *SeqWriter {
7578
s := &SeqWriter{w: w}
76-
s.p = rtp.Packet{
77-
Header: rtp.Header{
78-
Version: 2,
79-
SSRC: rand.Uint32(),
80-
SequenceNumber: 0,
81-
},
79+
s.h = rtp.Header{
80+
Version: 2,
81+
SSRC: rand.Uint32(),
82+
SequenceNumber: 0,
8283
}
8384
return s
8485
}
8586

8687
type Packet = rtp.Packet
88+
type Header = rtp.Header
8789

8890
type Event struct {
8991
Type byte
@@ -95,20 +97,19 @@ type Event struct {
9597
type SeqWriter struct {
9698
mu sync.Mutex
9799
w Writer
98-
p Packet
100+
h Header
99101
}
100102

101103
func (s *SeqWriter) WriteEvent(ev *Event) error {
102104
s.mu.Lock()
103105
defer s.mu.Unlock()
104-
s.p.PayloadType = ev.Type
105-
s.p.Payload = ev.Payload
106-
s.p.Marker = ev.Marker
107-
s.p.Timestamp = ev.Timestamp
108-
if err := s.w.WriteRTP(&s.p); err != nil {
106+
s.h.PayloadType = ev.Type
107+
s.h.Marker = ev.Marker
108+
s.h.Timestamp = ev.Timestamp
109+
if _, err := s.w.WriteRTP(&s.h, ev.Payload); err != nil {
109110
return err
110111
}
111-
s.p.Header.SequenceNumber++
112+
s.h.SequenceNumber++
112113
return nil
113114
}
114115

@@ -211,6 +212,6 @@ func (s *MediaStreamIn[T]) String() string {
211212
return fmt.Sprintf("RTP(%d) -> %s", s.Writer.SampleRate(), s.Writer)
212213
}
213214

214-
func (s *MediaStreamIn[T]) HandleRTP(p *rtp.Packet) error {
215-
return s.Writer.WriteSample(T(p.Payload))
215+
func (s *MediaStreamIn[T]) HandleRTP(_ *rtp.Header, payload []byte) error {
216+
return s.Writer.WriteSample(T(payload))
216217
}

0 commit comments

Comments
 (0)