Skip to content

Commit 5da7278

Browse files
adriancablecnderrauber
authored andcommittedFeb 5, 2024
Read() handles distinct-SSRC RTX packets
1 parent c7ca890 commit 5da7278

File tree

3 files changed

+101
-11
lines changed

3 files changed

+101
-11
lines changed
 

‎constants.go

+5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ const (
3838
generatedCertificateOrigin = "WebRTC"
3939

4040
sdesRepairRTPStreamIDURI = "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id"
41+
42+
// Attributes returned when Read() returns an RTX packet from a separate RTX stream (distinct SSRC)
43+
attributeRtxPayloadType = "rtx_payload_type"
44+
attributeRtxSsrc = "rtx_ssrc"
45+
attributeRtxSequenceNumber = "rtx_sequence_number"
4146
)
4247

4348
func defaultSrtpProtectionProfiles() []dtls.SRTPProtectionProfile {

‎rtpreceiver.go

+63-5
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
package webrtc
88

99
import (
10+
"encoding/binary"
1011
"fmt"
1112
"io"
1213
"sync"
1314
"time"
1415

1516
"github.com/pion/interceptor"
1617
"github.com/pion/rtcp"
18+
"github.com/pion/rtp"
1719
"github.com/pion/srtp/v2"
1820
"github.com/pion/webrtc/v3/internal/util"
1921
)
@@ -31,13 +33,19 @@ type trackStreams struct {
3133
rtcpReadStream *srtp.ReadStreamSRTCP
3234
rtcpInterceptor interceptor.RTCPReader
3335

34-
repairReadStream *srtp.ReadStreamSRTP
35-
repairInterceptor interceptor.RTPReader
36+
repairReadStream *srtp.ReadStreamSRTP
37+
repairInterceptor interceptor.RTPReader
38+
repairStreamChannel chan rtxPacketWithAttributes
3639

3740
repairRtcpReadStream *srtp.ReadStreamSRTCP
3841
repairRtcpInterceptor interceptor.RTCPReader
3942
}
4043

44+
type rtxPacketWithAttributes struct {
45+
rtxPacket rtp.Packet
46+
attributes interceptor.Attributes
47+
}
48+
4149
// RTPReceiver allows an application to inspect the receipt of a TrackRemote
4250
type RTPReceiver struct {
4351
kind RTPCodecType
@@ -145,6 +153,7 @@ func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) {
145153
track: newTrackRemote(
146154
r.kind,
147155
parameters.Encodings[i].SSRC,
156+
parameters.Encodings[i].RTX.SSRC,
148157
parameters.Encodings[i].RID,
149158
r,
150159
),
@@ -379,8 +388,6 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo
379388
}
380389

381390
// receiveForRtx starts a routine that processes the repair stream
382-
// These packets aren't exposed to the user yet, but we need to process them for
383-
// TWCC
384391
func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
385392
var track *trackStreams
386393
if ssrc != 0 && len(r.tracks) == 1 {
@@ -402,13 +409,42 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
402409
track.repairInterceptor = rtpInterceptor
403410
track.repairRtcpReadStream = rtcpReadStream
404411
track.repairRtcpInterceptor = rtcpInterceptor
412+
track.repairStreamChannel = make(chan rtxPacketWithAttributes)
405413

406414
go func() {
407415
b := make([]byte, r.api.settingEngine.getReceiveMTU())
408416
for {
409-
if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil {
417+
i, attributes, err := track.repairInterceptor.Read(b, nil)
418+
if err != nil {
419+
return
420+
}
421+
422+
pkt := &rtp.Packet{}
423+
if err := pkt.Unmarshal(b[:i]); err != nil {
410424
return
411425
}
426+
427+
if len(pkt.Payload) < 2 {
428+
// BWE probe packet, ignore
429+
continue
430+
}
431+
432+
// RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
433+
// payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
434+
// as non-RTX RTP packets
435+
attributes.Set(attributeRtxPayloadType, pkt.Header.PayloadType)
436+
attributes.Set(attributeRtxSsrc, pkt.Header.SSRC)
437+
attributes.Set(attributeRtxSequenceNumber, pkt.Header.SequenceNumber)
438+
pkt.Header.PayloadType = uint8(track.track.PayloadType())
439+
pkt.Header.SSRC = uint32(track.track.SSRC())
440+
pkt.Header.SequenceNumber = binary.BigEndian.Uint16(pkt.Payload[:2])
441+
pkt.Payload = pkt.Payload[2:]
442+
443+
select {
444+
case <-r.closed:
445+
return
446+
case track.repairStreamChannel <- rtxPacketWithAttributes{rtxPacket: *pkt, attributes: attributes}:
447+
}
412448
}
413449
}()
414450
return nil
@@ -446,3 +482,25 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote
446482
}
447483
return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
448484
}
485+
486+
// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil
487+
func (r *RTPReceiver) readRTX(reader *TrackRemote) (*rtp.Packet, interceptor.Attributes) {
488+
if !reader.HasRTX() {
489+
return nil, interceptor.Attributes{}
490+
}
491+
492+
select {
493+
case <-r.received:
494+
default:
495+
return nil, interceptor.Attributes{}
496+
}
497+
498+
if t := r.streamsForTrack(reader); t != nil {
499+
select {
500+
case rtxPacketReceived := <-t.repairStreamChannel:
501+
return &rtxPacketReceived.rtxPacket, rtxPacketReceived.attributes
502+
default:
503+
}
504+
}
505+
return nil, interceptor.Attributes{}
506+
}

‎track_remote.go

+33-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type TrackRemote struct {
2424
payloadType PayloadType
2525
kind RTPCodecType
2626
ssrc SSRC
27+
rtxSsrc SSRC
2728
codec RTPCodecParameters
2829
params RTPParameters
2930
rid string
@@ -33,10 +34,11 @@ type TrackRemote struct {
3334
peekedAttributes interceptor.Attributes
3435
}
3536

36-
func newTrackRemote(kind RTPCodecType, ssrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote {
37+
func newTrackRemote(kind RTPCodecType, ssrc, rtxSsrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote {
3738
return &TrackRemote{
3839
kind: kind,
3940
ssrc: ssrc,
41+
rtxSsrc: rtxSsrc,
4042
rid: rid,
4143
receiver: receiver,
4244
}
@@ -125,13 +127,24 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
125127
}
126128
}
127129

128-
n, attributes, err = r.readRTP(b, t)
129-
if err != nil {
130-
return
130+
// If there's a separate RTX track and an RTX packet is available, return that
131+
if rtxPacket, rtxAttributes := r.readRTX(t); rtxPacket != nil {
132+
n, err = rtxPacket.MarshalTo(b)
133+
attributes = rtxAttributes
134+
if err != nil {
135+
return 0, nil, err
136+
}
137+
} else {
138+
// If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return
139+
// a packet from the main track
140+
n, attributes, err = r.readRTP(b, t)
141+
if err != nil {
142+
return
143+
}
144+
err = t.checkAndUpdateTrack(b)
131145
}
132146

133-
err = t.checkAndUpdateTrack(b)
134-
return
147+
return n, attributes, err
135148
}
136149

137150
// checkAndUpdateTrack checks payloadType for every incoming packet
@@ -197,3 +210,17 @@ func (t *TrackRemote) peek(b []byte) (n int, a interceptor.Attributes, err error
197210
func (t *TrackRemote) SetReadDeadline(deadline time.Time) error {
198211
return t.receiver.setRTPReadDeadline(deadline, t)
199212
}
213+
214+
// RtxSSRC returns the RTX SSRC for a track, or 0 if track does not have a separate RTX stream
215+
func (t *TrackRemote) RtxSSRC() SSRC {
216+
t.mu.RLock()
217+
defer t.mu.RUnlock()
218+
return t.rtxSsrc
219+
}
220+
221+
// HasRTX returns true if the track has a separate RTX stream
222+
func (t *TrackRemote) HasRTX() bool {
223+
t.mu.RLock()
224+
defer t.mu.RUnlock()
225+
return t.rtxSsrc != 0
226+
}

0 commit comments

Comments
 (0)
Please sign in to comment.