Skip to content

Commit

Permalink
JitterBuffer: Add a JitterBuffer-based Interceptor
Browse files Browse the repository at this point in the history
The JitterBufferInterceptor is designed to fit in a RemoteStream
pipeline and buffer incoming packets for a short period (currently
defaulting to 50 packets) before emitting packets to be consumed by the
next step in the pipeline.

The caller must ensure they are prepared to handle an
ErrPopWhileBuffering in the case that insufficient packets have been
received by the jitter buffer. The caller should retry the operation
at some point later as the buffer may have been filled in the interim.

The caller should also be aware that an ErrBufferUnderrun may be
returned in the case that the initial buffering was sufficient and
playback began but the caller is consuming packets (or they are not
arriving) quickly enough.
  • Loading branch information
thatsnotright committed Apr 19, 2024
1 parent 1449b4f commit 2668bc2
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 5 deletions.
4 changes: 2 additions & 2 deletions attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (a Attributes) Set(key interface{}, val interface{}) {
}

// GetRTPHeader gets the RTP header if present. If it is not present, it will be
// unmarshalled from the raw byte slice and stored in the attribtues.
// unmarshalled from the raw byte slice and stored in the attributes.
func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) {
if val, ok := a[rtpHeaderKey]; ok {
if header, ok := val.(*rtp.Header); ok {
Expand All @@ -50,7 +50,7 @@ func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) {
}

// GetRTCPPackets gets the RTCP packets if present. If the packet slice is not
// present, it will be unmarshaled from the raw byte slice and stored in the
// present, it will be unmarshalled from the raw byte slice and stored in the
// attributes.
func (a Attributes) GetRTCPPackets(raw []byte) ([]rtcp.Packet, error) {
if val, ok := a[rtcpPacketsKey]; ok {
Expand Down
7 changes: 5 additions & 2 deletions internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
for {
i, _, err := s.rtpReader.Read(buf, interceptor.Attributes{})
if err != nil {
if err.Error() == "attempt to pop while buffering" {
continue

Check warning on line 133 in internal/test/mock_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock_stream.go#L133

Added line #L133 was not covered by tests
}
if errors.Is(err, io.EOF) {
s.rtpInModified <- RTPWithError{Err: err}
}
Expand Down Expand Up @@ -160,12 +163,12 @@ func (s *MockStream) WriteRTP(p *rtp.Packet) error {
return err
}

// ReceiveRTCP schedules a new rtcp batch, so it can be read be the stream
// ReceiveRTCP schedules a new rtcp batch, so it can be read by the stream
func (s *MockStream) ReceiveRTCP(pkts []rtcp.Packet) {
s.rtcpIn <- pkts
}

// ReceiveRTP schedules a rtp packet, so it can be read be the stream
// ReceiveRTP schedules a rtp packet, so it can be read by the stream
func (s *MockStream) ReceiveRTP(packet *rtp.Packet) {
s.rtpIn <- packet
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,15 @@ func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) {
jb.updateState()
return packet, nil
}

func (jb *JitterBuffer) Clear(resetState bool) {

Check warning on line 271 in pkg/jitterbuffer/jitter_buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported method JitterBuffer.Clear should have comment or be unexported (revive)
jb.mutex.Lock()
defer jb.mutex.Unlock()
jb.packets.Clear()
if resetState {
jb.lastSequence = 0
jb.state = Buffering
jb.stats = Stats{0, 0, 0}
jb.minStartCount = 50
}
}
17 changes: 16 additions & 1 deletion pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func TestJitterBuffer(t *testing.T) {

jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5012))
assert.Equal(jb.stats.outOfOrderCount, uint32(1))
assert.Equal(jb.packets.Length(), uint16(4))
assert.Equal(jb.lastSequence, uint16(5012))
Expand Down Expand Up @@ -214,4 +213,20 @@ func TestJitterBuffer(t *testing.T) {
assert.NotNil(pkt)
}
})

t.Run("Allows clearing the buffer", func(*testing.T) {
jb := New()
jb.Clear(false)

assert.Equal(jb.lastSequence, uint16(0))
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5002))
jb.Clear(true)
assert.Equal(jb.lastSequence, uint16(0))
assert.Equal(jb.stats.outOfOrderCount, uint32(0))
assert.Equal(jb.packets.Length(), uint16(0))
})
}
19 changes: 19 additions & 0 deletions pkg/jitterbuffer/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"github.com/pion/logging"
)

// JitterBufferOption can be used to configure SenderInterceptor
type JitterBufferOption func(d *JitterBufferInterceptor) error

Check warning on line 11 in pkg/jitterbuffer/option.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: type name will be used as jitterbuffer.JitterBufferOption by other packages, and that stutters; consider calling this Option (revive)

// Log sets a logger for the interceptor
func Log(log logging.LeveledLogger) JitterBufferOption {
return func(d *JitterBufferInterceptor) error {
d.log = log
return nil
}
}
10 changes: 10 additions & 0 deletions pkg/jitterbuffer/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,13 @@ func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) {
}
return nil, ErrNotFound
}

// Clear will empty a PriorityQueue
func (q *PriorityQueue) Clear() {
next := q.next
q.length = 0
for next != nil {
next.prev = nil
next = next.next
}
}
15 changes: 15 additions & 0 deletions pkg/jitterbuffer/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,18 @@ func TestPriorityQueue_Find(t *testing.T) {
_, err = packets.Find(1001)
assert.Error(t, err)
}

func TestPriorityQueue_Clean(t *testing.T) {
packets := NewQueue()
packets.Clear()
packets.Push(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 1000,
Timestamp: 5,
SSRC: 5,
},
Payload: []uint8{0xA},
}, 1000)
assert.EqualValues(t, 1, packets.Length())
packets.Clear()
}
108 changes: 108 additions & 0 deletions pkg/jitterbuffer/receiver_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"sync"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtp"
)

// JitterBufferInterceptorFactory is a interceptor.Factory for a GeneratorInterceptor
type JitterBufferInterceptorFactory struct {

Check warning on line 15 in pkg/jitterbuffer/receiver_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: type name will be used as jitterbuffer.JitterBufferInterceptorFactory by other packages, and that stutters; consider calling this InterceptorFactory (revive)
opts []JitterBufferOption
}

// NewInterceptor constructs a new ReceiverInterceptor
func (g *JitterBufferInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &JitterBufferInterceptor{
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"),
buffer: New(),
}

for _, opt := range g.opts {
if err := opt(i); err != nil {
return nil, err

Check warning on line 29 in pkg/jitterbuffer/receiver_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/receiver_interceptor.go#L29

Added line #L29 was not covered by tests
}
}

return i, nil
}

// JitterBufferInterceptor interceptor places a JitterBuffer in the chain to smooth packet arrival
// and allow for network jitter
//
// The JitterBufferInterceptor is designed to fit in a RemoteStream
// pipeline and buffer incoming packets for a short period (currently
// defaulting to 50 packets) before emitting packets to be consumed by the
// next step in the pipeline.
//
// The caller must ensure they are prepared to handle an
// ErrPopWhileBuffering in the case that insufficient packets have been
// received by the jitter buffer. The caller should retry the operation
// at some point later as the buffer may have been filled in the interim.
//
// The caller should also be aware that an ErrBufferUnderrun may be
// returned in the case that the initial buffering was sufficient and
// playback began but the caller is consuming packets (or they are not
// arriving) quickly enough.
type JitterBufferInterceptor struct {

Check warning on line 53 in pkg/jitterbuffer/receiver_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: type name will be used as jitterbuffer.JitterBufferInterceptor by other packages, and that stutters; consider calling this Interceptor (revive)
interceptor.NoOp
buffer *JitterBuffer
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
}

// NewInterceptor returns a new JitterBufferInterceptorFactory
func NewInterceptor(opts ...JitterBufferOption) (*JitterBufferInterceptorFactory, error) {
return &JitterBufferInterceptorFactory{opts}, nil
}

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (i *JitterBufferInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {

Check warning on line 69 in pkg/jitterbuffer/receiver_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'info' seems to be unused, consider removing or renaming it as _ (revive)
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
buf := make([]byte, len(b))
n, attr, err := reader.Read(buf, a)
if err != nil {
return n, attr, err
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(buf); err != nil {
return 0, nil, err

Check warning on line 78 in pkg/jitterbuffer/receiver_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/receiver_interceptor.go#L78

Added line #L78 was not covered by tests
}
i.buffer.Push(packet)
if i.buffer.state == Emitting {
newPkt, err := i.buffer.Pop()
if err != nil {
return 0, nil, err

Check warning on line 84 in pkg/jitterbuffer/receiver_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/receiver_interceptor.go#L84

Added line #L84 was not covered by tests
}
nlen, err := newPkt.MarshalTo(b)
return nlen, attr, err
}
return n, attr, ErrPopWhileBuffering
})
}

// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (jb *JitterBufferInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {

Check warning on line 94 in pkg/jitterbuffer/receiver_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

receiver-naming: receiver name jb should be consistent with previous receiver name i for JitterBufferInterceptor (revive)
defer jb.wg.Wait()
jb.m.Lock()
defer jb.m.Unlock()
jb.buffer.Clear(true)

Check warning on line 98 in pkg/jitterbuffer/receiver_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/receiver_interceptor.go#L94-L98

Added lines #L94 - L98 were not covered by tests
}

// Close closes the interceptor
func (jb *JitterBufferInterceptor) Close() error {

Check warning on line 102 in pkg/jitterbuffer/receiver_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

receiver-naming: receiver name jb should be consistent with previous receiver name i for JitterBufferInterceptor (revive)
defer jb.wg.Wait()
jb.m.Lock()
defer jb.m.Unlock()
jb.buffer.Clear(true)
return nil
}
94 changes: 94 additions & 0 deletions pkg/jitterbuffer/receiver_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"bytes"
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/test"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestReceiverFilterEverythingOut(t *testing.T) {
buf := bytes.Buffer{}

factory, err := NewInterceptor(
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.Zero(t, buf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 456,
}})
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(0),
}})

// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)

err = i.Close()
assert.NoError(t, err)

// Every packet should have been filtered out – nothing should be written.
assert.Zero(t, buf.Len())
}

func TestReceiverFilterNothing(t *testing.T) {
buf := bytes.Buffer{}

factory, err := NewInterceptor(
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.EqualValues(t, 0, buf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 456,
}})
for s := 0; s < 61; s++ {
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(s),
}})
}
assert.NoError(t, stream.Close())
// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)
for s := 0; s < 10; s++ {
read := <-stream.ReadRTP()
assert.EqualValues(t, uint16(s), read.Packet.Header.SequenceNumber)
}
err = i.Close()
assert.NoError(t, err)
}

0 comments on commit 2668bc2

Please sign in to comment.