Skip to content

Commit

Permalink
Implement transport wide congestion control
Browse files Browse the repository at this point in the history
Add one interceptor to add the header extension to outgoing packets and
one interceptor to generate transport wide congestion control reports as
described in https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-
transport-wide-cc-extensions-01
  • Loading branch information
mengelbart committed Jun 27, 2021
1 parent e23b945 commit c8a26a2
Show file tree
Hide file tree
Showing 7 changed files with 1,147 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -76,6 +76,7 @@ Check out the **[contributing wiki](https://github.com/pion/webrtc/wiki/Contribu
* [Sean DuBois](https://github.com/sean-der) - *Original Author*
* [Atsushi Watanabe](https://github.com/at-wat)
* [Alessandro Ros](https://github.com/aler9)
* [Mathis Engelbart](https://github.com/mengelbart)

### License
MIT License - see [LICENSE](LICENSE) for full text
49 changes: 49 additions & 0 deletions pkg/twcc/header_extension.go
@@ -0,0 +1,49 @@
package twcc

import (
"sync/atomic"

"github.com/pion/interceptor"
"github.com/pion/rtp"
)

// HeaderExtensionInterceptor adds transport wide sequence numbers as header extension to each RTP packet
type HeaderExtensionInterceptor struct {
interceptor.NoOp
nextSequenceNr uint32
}

// NewHeaderExtensionInterceptor returns a HeaderExtensionInterceptor
func NewHeaderExtensionInterceptor() (*HeaderExtensionInterceptor, error) {
return &HeaderExtensionInterceptor{}, nil
}

const transportCCURI = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"

// BindLocalStream returns a writer that adds a rtp.TransportCCExtension
// header with increasing sequence numbers to each outgoing packet.
func (h *HeaderExtensionInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
var hdrExtID uint8
for _, e := range info.RTPHeaderExtensions {
if e.URI == transportCCURI {
hdrExtID = uint8(e.ID)
break
}
}
if hdrExtID == 0 { // Don't add header extension if ID is 0, because 0 is an invalid extension ID
return writer
}
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
seqNr := atomic.AddUint32(&h.nextSequenceNr, 1) - 1

tcc, err := (&rtp.TransportCCExtension{TransportSequence: uint16(seqNr)}).Marshal()
if err != nil {
return 0, err
}
err = header.SetExtension(hdrExtID, tcc)
if err != nil {
return 0, err
}
return writer.Write(header, payload, attributes)
})
}
63 changes: 63 additions & 0 deletions pkg/twcc/header_extension_test.go
@@ -0,0 +1,63 @@
package twcc

import (
"sync"
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/test"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestHeaderExtensionInterceptor(t *testing.T) {
t.Run("add transport wide cc to each packet", func(t *testing.T) {
inter, err := NewHeaderExtensionInterceptor()
assert.NoError(t, err)

pChan := make(chan *rtp.Packet, 10*5)
go func() {
// start some parallel streams using the same interceptor to test for race conditions
var wg sync.WaitGroup
num := 10
wg.Add(num)
for i := 0; i < num; i++ {
go func(ch chan *rtp.Packet, id uint16) {
stream := test.NewMockStream(&interceptor.StreamInfo{RTPHeaderExtensions: []interceptor.RTPHeaderExtension{
{
URI: transportCCURI,
ID: 1,
},
}}, inter)
defer func() {
wg.Done()
assert.NoError(t, stream.Close())
}()

for _, seqNum := range []uint16{id * 1, id * 2, id * 3, id * 4, id * 5} {
assert.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
select {
case p := <-stream.WrittenRTP():
assert.Equal(t, seqNum, p.SequenceNumber)
ch <- p
case <-time.After(10 * time.Millisecond):
panic("written rtp packet not found")
}
}
}(pChan, uint16(i+1))
}
wg.Wait()
close(pChan)
}()

for p := range pChan {
// Can't check for increasing transport cc sequence number, since we can't ensure ordering between the streams
// on pChan is same as in the interceptor, but at least make sure each packet has a seq nr.
extensionHeader := p.GetExtension(1)
twcc := &rtp.TransportCCExtension{}
err = twcc.Unmarshal(extensionHeader)
assert.NoError(t, err)
}
})
}
174 changes: 174 additions & 0 deletions pkg/twcc/sender_interceptor.go
@@ -0,0 +1,174 @@
package twcc

import (
"math/rand"
"sync"
"time"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtp"
)

// SenderInterceptor sends transport wide congestion control reports as specified in:
// https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01
type SenderInterceptor struct {
interceptor.NoOp

log logging.LeveledLogger

m sync.Mutex
wg sync.WaitGroup
close chan struct{}

interval time.Duration

recorder *Recorder
packetChan chan packet
}

// An Option is a function that can be used to configure a SenderInterceptor
type Option func(*SenderInterceptor) error

// SendInterval sets the interval at which the interceptor
// will send new feedback reports.
func SendInterval(interval time.Duration) Option {
return func(s *SenderInterceptor) error {
s.interval = interval
return nil
}
}

// NewSenderInterceptor returns a new SenderInterceptor configured with the given options.
func NewSenderInterceptor(opts ...Option) (*SenderInterceptor, error) {
i := &SenderInterceptor{
log: logging.NewDefaultLoggerFactory().NewLogger("twcc_sender_interceptor"),
packetChan: make(chan packet),
close: make(chan struct{}),
interval: 100 * time.Millisecond,
}

for _, opt := range opts {
err := opt(i)
if err != nil {
return nil, err
}
}

return i, nil
}

// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (s *SenderInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
s.m.Lock()
defer s.m.Unlock()

s.recorder = NewRecorder(rand.Uint32()) // #nosec

if s.isClosed() {
return writer
}

s.wg.Add(1)

go s.loop(writer)

return writer
}

type packet struct {
hdr *rtp.Header
seqNr uint16
ts int64
ssrc uint32
}

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (s *SenderInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
var hdrExtID uint8
for _, e := range info.RTPHeaderExtensions {
if e.URI == transportCCURI {
hdrExtID = uint8(e.ID)
break
}
}
if hdrExtID == 0 { // Don't try to read header extension if ID is 0, because 0 is an invalid extension ID
return reader
}
return interceptor.RTPReaderFunc(func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
i, attr, err := reader.Read(buf, attributes)
if err != nil {
return 0, nil, err
}
p := rtp.Packet{}
err = p.Unmarshal(buf[:i])
if err != nil {
return 0, nil, err
}
var tccExt rtp.TransportCCExtension
if ext := p.GetExtension(hdrExtID); ext != nil {
err = tccExt.Unmarshal(ext)
if err != nil {
return 0, nil, err
}

s.packetChan <- packet{
hdr: &p.Header,
seqNr: tccExt.TransportSequence,
ts: time.Now().UnixNano(),
ssrc: info.SSRC,
}
}

return i, attr, nil
})
}

// Close closes the interceptor.
func (s *SenderInterceptor) Close() error {
defer s.wg.Wait()
s.m.Lock()
defer s.m.Unlock()

if !s.isClosed() {
close(s.close)
}

return nil
}

func (s *SenderInterceptor) isClosed() bool {
select {
case <-s.close:
return true
default:
return false
}
}

func (s *SenderInterceptor) loop(w interceptor.RTCPWriter) {
defer s.wg.Done()

ticker := time.NewTicker(s.interval)

for {
select {
case <-s.close:
return
case p := <-s.packetChan:
s.recorder.Record(p.ssrc, p.seqNr, p.ts/1e6) // ns -> ms: divide by 1e6

case <-ticker.C:
// build and send twcc
if s.recorder != nil {
pkts := s.recorder.BuildFeedbackPacket()
_, err := w.Write(pkts, nil)
if err != nil {
s.log.Error(err.Error())
}
}
}
}
}

0 comments on commit c8a26a2

Please sign in to comment.