7
7
package webrtc
8
8
9
9
import (
10
+ "encoding/binary"
10
11
"fmt"
11
12
"io"
12
13
"sync"
13
14
"time"
14
15
15
16
"github.com/pion/interceptor"
16
17
"github.com/pion/rtcp"
18
+ "github.com/pion/rtp"
17
19
"github.com/pion/srtp/v2"
18
20
"github.com/pion/webrtc/v3/internal/util"
19
21
)
@@ -31,13 +33,19 @@ type trackStreams struct {
31
33
rtcpReadStream * srtp.ReadStreamSRTCP
32
34
rtcpInterceptor interceptor.RTCPReader
33
35
34
- repairReadStream * srtp.ReadStreamSRTP
35
- repairInterceptor interceptor.RTPReader
36
+ repairReadStream * srtp.ReadStreamSRTP
37
+ repairInterceptor interceptor.RTPReader
38
+ repairStreamChannel chan rtxPacketWithAttributes
36
39
37
40
repairRtcpReadStream * srtp.ReadStreamSRTCP
38
41
repairRtcpInterceptor interceptor.RTCPReader
39
42
}
40
43
44
+ type rtxPacketWithAttributes struct {
45
+ rtxPacket rtp.Packet
46
+ attributes interceptor.Attributes
47
+ }
48
+
41
49
// RTPReceiver allows an application to inspect the receipt of a TrackRemote
42
50
type RTPReceiver struct {
43
51
kind RTPCodecType
@@ -145,6 +153,7 @@ func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) {
145
153
track : newTrackRemote (
146
154
r .kind ,
147
155
parameters .Encodings [i ].SSRC ,
156
+ parameters .Encodings [i ].RTX .SSRC ,
148
157
parameters .Encodings [i ].RID ,
149
158
r ,
150
159
),
@@ -379,8 +388,6 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo
379
388
}
380
389
381
390
// receiveForRtx starts a routine that processes the repair stream
382
- // These packets aren't exposed to the user yet, but we need to process them for
383
- // TWCC
384
391
func (r * RTPReceiver ) receiveForRtx (ssrc SSRC , rsid string , streamInfo * interceptor.StreamInfo , rtpReadStream * srtp.ReadStreamSRTP , rtpInterceptor interceptor.RTPReader , rtcpReadStream * srtp.ReadStreamSRTCP , rtcpInterceptor interceptor.RTCPReader ) error {
385
392
var track * trackStreams
386
393
if ssrc != 0 && len (r .tracks ) == 1 {
@@ -402,13 +409,42 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
402
409
track .repairInterceptor = rtpInterceptor
403
410
track .repairRtcpReadStream = rtcpReadStream
404
411
track .repairRtcpInterceptor = rtcpInterceptor
412
+ track .repairStreamChannel = make (chan rtxPacketWithAttributes )
405
413
406
414
go func () {
407
415
b := make ([]byte , r .api .settingEngine .getReceiveMTU ())
408
416
for {
409
- if _ , _ , readErr := track .repairInterceptor .Read (b , nil ); readErr != nil {
417
+ i , attributes , err := track .repairInterceptor .Read (b , nil )
418
+ if err != nil {
419
+ return
420
+ }
421
+
422
+ pkt := & rtp.Packet {}
423
+ if err := pkt .Unmarshal (b [:i ]); err != nil {
410
424
return
411
425
}
426
+
427
+ if len (pkt .Payload ) < 2 {
428
+ // BWE probe packet, ignore
429
+ continue
430
+ }
431
+
432
+ // RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
433
+ // payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
434
+ // as non-RTX RTP packets
435
+ attributes .Set (attributeRtxPayloadType , pkt .Header .PayloadType )
436
+ attributes .Set (attributeRtxSsrc , pkt .Header .SSRC )
437
+ attributes .Set (attributeRtxSequenceNumber , pkt .Header .SequenceNumber )
438
+ pkt .Header .PayloadType = uint8 (track .track .PayloadType ())
439
+ pkt .Header .SSRC = uint32 (track .track .SSRC ())
440
+ pkt .Header .SequenceNumber = binary .BigEndian .Uint16 (pkt .Payload [:2 ])
441
+ pkt .Payload = pkt .Payload [2 :]
442
+
443
+ select {
444
+ case <- r .closed :
445
+ return
446
+ case track .repairStreamChannel <- rtxPacketWithAttributes {rtxPacket : * pkt , attributes : attributes }:
447
+ }
412
448
}
413
449
}()
414
450
return nil
@@ -446,3 +482,25 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote
446
482
}
447
483
return fmt .Errorf ("%w: %d" , errRTPReceiverWithSSRCTrackStreamNotFound , reader .SSRC ())
448
484
}
485
+
486
+ // readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil
487
+ func (r * RTPReceiver ) readRTX (reader * TrackRemote ) (* rtp.Packet , interceptor.Attributes ) {
488
+ if ! reader .HasRTX () {
489
+ return nil , interceptor.Attributes {}
490
+ }
491
+
492
+ select {
493
+ case <- r .received :
494
+ default :
495
+ return nil , interceptor.Attributes {}
496
+ }
497
+
498
+ if t := r .streamsForTrack (reader ); t != nil {
499
+ select {
500
+ case rtxPacketReceived := <- t .repairStreamChannel :
501
+ return & rtxPacketReceived .rtxPacket , rtxPacketReceived .attributes
502
+ default :
503
+ }
504
+ }
505
+ return nil , interceptor.Attributes {}
506
+ }
0 commit comments