From bc30165ae456af3e8a9ae25e1c3d25dc3a7ccc41 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 20 Dec 2020 20:20:35 +0100 Subject: [PATCH] Implement Sender and Receiver reports Two interceptor are added to generate sender and receiver reports from incoming and outgoing RTP and RTCP packets. --- internal/test/mock_time.go | 26 ++ pkg/report/receiver_interceptor.go | 166 ++++++++++ pkg/report/receiver_interceptor_test.go | 419 ++++++++++++++++++++++++ pkg/report/receiver_option.go | 34 ++ pkg/report/receiver_stream.go | 159 +++++++++ pkg/report/report.go | 2 + pkg/report/sender_interceptor.go | 139 ++++++++ pkg/report/sender_interceptor_test.go | 84 +++++ pkg/report/sender_option.go | 34 ++ pkg/report/sender_stream.go | 37 +++ 10 files changed, 1100 insertions(+) create mode 100644 internal/test/mock_time.go create mode 100644 pkg/report/receiver_interceptor.go create mode 100644 pkg/report/receiver_interceptor_test.go create mode 100644 pkg/report/receiver_option.go create mode 100644 pkg/report/receiver_stream.go create mode 100644 pkg/report/report.go create mode 100644 pkg/report/sender_interceptor.go create mode 100644 pkg/report/sender_interceptor_test.go create mode 100644 pkg/report/sender_option.go create mode 100644 pkg/report/sender_stream.go diff --git a/internal/test/mock_time.go b/internal/test/mock_time.go new file mode 100644 index 00000000..39c15539 --- /dev/null +++ b/internal/test/mock_time.go @@ -0,0 +1,26 @@ +package test + +import ( + "sync" + "time" +) + +// MockTime is a helper to replace time.Now() for testing purposes. +type MockTime struct { + m sync.RWMutex + curNow time.Time +} + +// SetNow sets the current time. +func (t *MockTime) SetNow(n time.Time) { + t.m.Lock() + defer t.m.Unlock() + t.curNow = n +} + +// Now returns the current time. +func (t *MockTime) Now() time.Time { + t.m.RLock() + defer t.m.RUnlock() + return t.curNow +} diff --git a/pkg/report/receiver_interceptor.go b/pkg/report/receiver_interceptor.go new file mode 100644 index 00000000..5235b990 --- /dev/null +++ b/pkg/report/receiver_interceptor.go @@ -0,0 +1,166 @@ +package report + +import ( + "sync" + "time" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// ReceiverInterceptor interceptor generates receiver reports. +type ReceiverInterceptor struct { + interceptor.NoOp + interval time.Duration + now func() time.Time + streams sync.Map + log logging.LeveledLogger + m sync.Mutex + wg sync.WaitGroup + close chan struct{} +} + +// NewReceiverInterceptor returns a new ReceiverInterceptor interceptor. +func NewReceiverInterceptor(opts ...ReceiverOption) (*ReceiverInterceptor, error) { + r := &ReceiverInterceptor{ + interval: 1 * time.Second, + now: time.Now, + log: logging.NewDefaultLoggerFactory().NewLogger("receiver_interceptor"), + close: make(chan struct{}), + } + + for _, opt := range opts { + if err := opt(r); err != nil { + return nil, err + } + } + + return r, nil +} + +func (r *ReceiverInterceptor) isClosed() bool { + select { + case <-r.close: + return true + default: + return false + } +} + +// Close closes the interceptor. +func (r *ReceiverInterceptor) Close() error { + defer r.wg.Wait() + r.m.Lock() + defer r.m.Unlock() + + if !r.isClosed() { + close(r.close) + } + + return nil +} + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (r *ReceiverInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + r.m.Lock() + defer r.m.Unlock() + + if r.isClosed() { + return writer + } + + r.wg.Add(1) + + go r.loop(writer) + + return writer +} + +func (r *ReceiverInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { + defer r.wg.Done() + + ticker := time.NewTicker(r.interval) + for { + select { + case <-ticker.C: + now := r.now() + r.streams.Range(func(key, value interface{}) bool { + stream := value.(*receiverStream) + + var pkts []rtcp.Packet + + pkts = append(pkts, stream.generateReport(now)) + + if _, err := rtcpWriter.Write(pkts, interceptor.Attributes{}); err != nil { + r.log.Warnf("failed sending: %+v", err) + } + + return true + }) + + case <-r.close: + return + } + } +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (r *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + stream := newReceiverStream(info.SSRC, info.ClockRate) + r.streams.Store(info.SSRC, stream) + + return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := reader.Read(b, a) + if err != nil { + return 0, nil, err + } + + pkt := rtp.Packet{} + if err = pkt.Unmarshal(b[:i]); err != nil { + return 0, nil, err + } + + stream.processRTP(r.now(), &pkt) + + return i, attr, nil + }) +} + +// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (r *ReceiverInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + r.streams.Delete(info.SSRC) +} + +// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might +// change in the future. The returned method will be called once per packet batch. +func (r *ReceiverInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := reader.Read(b, a) + if err != nil { + return 0, nil, err + } + + pkts, err := rtcp.Unmarshal(b[:i]) + if err != nil { + return 0, nil, err + } + + for _, pkt := range pkts { + if sr, ok := (pkt).(*rtcp.SenderReport); ok { + value, ok := r.streams.Load(sr.SSRC) + if !ok { + continue + } + + stream := value.(*receiverStream) + stream.processSenderReport(r.now(), sr) + } + } + + return i, attr, nil + }) +} diff --git a/pkg/report/receiver_interceptor_test.go b/pkg/report/receiver_interceptor_test.go new file mode 100644 index 00000000..412a0166 --- /dev/null +++ b/pkg/report/receiver_interceptor_test.go @@ -0,0 +1,419 @@ +package report + +import ( + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestReceiverInterceptor(t *testing.T) { + t.Run("before any packet", func(t *testing.T) { + mt := test.MockTime{} + i, err := NewReceiverInterceptor( + ReceiverInterval(time.Millisecond*50), + ReceiverLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ReceiverNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok := pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 0, + LastSenderReport: 0, + FractionLost: 0, + TotalLost: 0, + Delay: 0, + Jitter: 0, + }, rr.Reports[0]) + }) + + rtpTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + + t.Run("after RTP packets", func(t *testing.T) { + mt := test.MockTime{} + i, err := NewReceiverInterceptor( + ReceiverInterval(time.Millisecond*50), + ReceiverLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ReceiverNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + for i := 0; i < 10; i++ { + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(i), + }}) + } + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok := pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 9, + LastSenderReport: 0, + FractionLost: 0, + TotalLost: 0, + Delay: 0, + Jitter: 0, + }, rr.Reports[0]) + }) + + t.Run("after RTP and RTCP packets", func(t *testing.T) { + mt := test.MockTime{} + i, err := NewReceiverInterceptor( + ReceiverInterval(time.Millisecond*50), + ReceiverLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ReceiverNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + for i := 0; i < 10; i++ { + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(i), + }}) + } + + now := time.Date(2009, time.November, 10, 23, 0, 1, 0, time.UTC) + stream.ReceiveRTCP([]rtcp.Packet{ + &rtcp.SenderReport{ + SSRC: 123456, + NTPTime: ntpTime(now), + RTPTime: 987654321 + uint32(now.Sub(rtpTime).Seconds()*90000), + PacketCount: 10, + OctetCount: 0, + }, + }) + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok := pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 9, + LastSenderReport: 1861287936, + FractionLost: 0, + TotalLost: 0, + Delay: rr.Reports[0].Delay, + Jitter: 0, + }, rr.Reports[0]) + }) + + t.Run("overflow", func(t *testing.T) { + mt := test.MockTime{} + i, err := NewReceiverInterceptor( + ReceiverInterval(time.Millisecond*50), + ReceiverLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ReceiverNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: 0xffff, + }}) + + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: 0x00, + }}) + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok := pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 1<<16 | 0x0000, + LastSenderReport: 0, + FractionLost: 0, + TotalLost: 0, + Delay: 0, + Jitter: 0, + }, rr.Reports[0]) + }) + + t.Run("packet loss", func(t *testing.T) { + mt := test.MockTime{} + i, err := NewReceiverInterceptor( + ReceiverInterval(time.Millisecond*50), + ReceiverLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ReceiverNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: 0x01, + }}) + + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: 0x03, + }}) + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok := pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 0x03, + LastSenderReport: 0, + FractionLost: 256 * 1 / 3, + TotalLost: 1, + Delay: 0, + Jitter: 0, + }, rr.Reports[0]) + + now := time.Date(2009, time.November, 10, 23, 0, 1, 0, time.UTC) + stream.ReceiveRTCP([]rtcp.Packet{ + &rtcp.SenderReport{ + SSRC: 123456, + NTPTime: ntpTime(now), + RTPTime: 987654321 + uint32(now.Sub(rtpTime).Seconds()*90000), + PacketCount: 10, + OctetCount: 0, + }, + }) + + pkts = <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok = pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 0x03, + LastSenderReport: 1861287936, + FractionLost: 0, + TotalLost: 1, + Delay: rr.Reports[0].Delay, + Jitter: 0, + }, rr.Reports[0]) + }) + + t.Run("overflow and packet loss", func(t *testing.T) { + mt := test.MockTime{} + i, err := NewReceiverInterceptor( + ReceiverInterval(time.Millisecond*50), + ReceiverLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ReceiverNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: 0xffff, + }}) + + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: 0x01, + }}) + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok := pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 1<<16 | 0x01, + LastSenderReport: 0, + FractionLost: 256 * 1 / 3, + TotalLost: 1, + Delay: 0, + Jitter: 0, + }, rr.Reports[0]) + }) + + t.Run("reordered packets", func(t *testing.T) { + mt := test.MockTime{} + i, err := NewReceiverInterceptor( + ReceiverInterval(time.Millisecond*50), + ReceiverLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ReceiverNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + for _, seqNum := range []uint16{0x01, 0x03, 0x02, 0x04} { + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: seqNum, + }}) + } + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok := pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 0x04, + LastSenderReport: 0, + FractionLost: 0, + TotalLost: 0, + Delay: 0, + Jitter: 0, + }, rr.Reports[0]) + }) + + t.Run("jitter", func(t *testing.T) { + mt := test.MockTime{} + i, err := NewReceiverInterceptor( + ReceiverInterval(time.Millisecond*50), + ReceiverLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ReceiverNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + mt.SetNow(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: 0x01, + Timestamp: 42378934, + }}) + <-stream.ReadRTP() + + mt.SetNow(time.Date(2009, time.November, 10, 23, 0, 1, 0, time.UTC)) + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: 0x02, + Timestamp: 42378934 + 60000, + }}) + + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok := pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 0x02, + LastSenderReport: 0, + FractionLost: 0, + TotalLost: 0, + Delay: 0, + Jitter: 30000 / 16, + }, rr.Reports[0]) + }) + + t.Run("delay", func(t *testing.T) { + mt := test.MockTime{} + i, err := NewReceiverInterceptor( + ReceiverInterval(time.Millisecond*50), + ReceiverLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ReceiverNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + mt.SetNow(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) + stream.ReceiveRTCP([]rtcp.Packet{ + &rtcp.SenderReport{ + SSRC: 123456, + NTPTime: ntpTime(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + RTPTime: 987654321, + PacketCount: 0, + OctetCount: 0, + }, + }) + <-stream.ReadRTCP() + + mt.SetNow(time.Date(2009, time.November, 10, 23, 0, 1, 0, time.UTC)) + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + rr, ok := pkts[0].(*rtcp.ReceiverReport) + assert.True(t, ok) + assert.Equal(t, 1, len(rr.Reports)) + assert.Equal(t, rtcp.ReceptionReport{ + SSRC: uint32(123456), + LastSequenceNumber: 0, + LastSenderReport: 1861222400, + FractionLost: 0, + TotalLost: 0, + Delay: 65536, + Jitter: 0, + }, rr.Reports[0]) + }) +} diff --git a/pkg/report/receiver_option.go b/pkg/report/receiver_option.go new file mode 100644 index 00000000..0467dc5d --- /dev/null +++ b/pkg/report/receiver_option.go @@ -0,0 +1,34 @@ +package report + +import ( + "time" + + "github.com/pion/logging" +) + +// ReceiverOption can be used to configure ReceiverInterceptor. +type ReceiverOption func(r *ReceiverInterceptor) error + +// ReceiverLog sets a logger for the interceptor. +func ReceiverLog(log logging.LeveledLogger) ReceiverOption { + return func(r *ReceiverInterceptor) error { + r.log = log + return nil + } +} + +// ReceiverInterval sets send interval for the interceptor. +func ReceiverInterval(interval time.Duration) ReceiverOption { + return func(r *ReceiverInterceptor) error { + r.interval = interval + return nil + } +} + +// ReceiverNow sets an alternative for the time.Now function. +func ReceiverNow(f func() time.Time) ReceiverOption { + return func(r *ReceiverInterceptor) error { + r.now = f + return nil + } +} diff --git a/pkg/report/receiver_stream.go b/pkg/report/receiver_stream.go new file mode 100644 index 00000000..569715d0 --- /dev/null +++ b/pkg/report/receiver_stream.go @@ -0,0 +1,159 @@ +package report + +import ( + "math/rand" + "sync" + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +type receiverStream struct { + ssrc uint32 + receiverSSRC uint32 + clockRate float64 + + m sync.Mutex + size uint16 + packets []uint64 + started bool + seqnumCycles uint16 + lastSeqnum uint16 + lastReportSeqnum uint16 + lastRTPTimeRTP uint32 + lastRTPTimeTime time.Time + jitter float64 + lastSenderReport uint32 + lastSenderReportTime time.Time + totalLost uint32 +} + +func newReceiverStream(ssrc uint32, clockRate uint32) *receiverStream { + receiverSSRC := rand.Uint32() // #nosec + return &receiverStream{ + ssrc: ssrc, + receiverSSRC: receiverSSRC, + clockRate: float64(clockRate), + size: 128, + packets: make([]uint64, 128), + } +} + +func (stream *receiverStream) processRTP(now time.Time, pkt *rtp.Packet) { + stream.m.Lock() + defer stream.m.Unlock() + + if !stream.started { // first frame + stream.started = true + stream.setReceived(pkt.SequenceNumber) + stream.lastSeqnum = pkt.SequenceNumber + stream.lastReportSeqnum = pkt.SequenceNumber - 1 + stream.lastRTPTimeRTP = pkt.Timestamp + stream.lastRTPTimeTime = now + } else { // following frames + stream.setReceived(pkt.SequenceNumber) + + diff := int32(pkt.SequenceNumber) - int32(stream.lastSeqnum) + if diff > 0 || diff < -0x0FFF { + // overflow + if diff < -0x0FFF { + stream.seqnumCycles++ + } + + // set missing packets as missing + for i := stream.lastSeqnum + 1; i != pkt.SequenceNumber; i++ { + stream.delReceived(i) + } + + stream.lastSeqnum = pkt.SequenceNumber + } + + // compute jitter + // https://tools.ietf.org/html/rfc3550#page-39 + D := now.Sub(stream.lastRTPTimeTime).Seconds()*stream.clockRate - + (float64(pkt.Timestamp) - float64(stream.lastRTPTimeRTP)) + if D < 0 { + D = -D + } + stream.jitter += (D - stream.jitter) / 16 + stream.lastRTPTimeRTP = pkt.Timestamp + stream.lastRTPTimeTime = now + } +} + +func (stream *receiverStream) setReceived(seq uint16) { + pos := seq % stream.size + stream.packets[pos/64] |= 1 << (pos % 64) +} + +func (stream *receiverStream) delReceived(seq uint16) { + pos := seq % stream.size + stream.packets[pos/64] &^= 1 << (pos % 64) +} + +func (stream *receiverStream) getReceived(seq uint16) bool { + pos := seq % stream.size + return (stream.packets[pos/64] & (1 << (pos % 64))) != 0 +} + +func (stream *receiverStream) processSenderReport(now time.Time, sr *rtcp.SenderReport) { + stream.m.Lock() + defer stream.m.Unlock() + + stream.lastSenderReport = uint32(sr.NTPTime >> 16) + stream.lastSenderReportTime = now +} + +func (stream *receiverStream) generateReport(now time.Time) *rtcp.ReceiverReport { + stream.m.Lock() + defer stream.m.Unlock() + + totalSinceReport := stream.lastSeqnum - stream.lastReportSeqnum + totalLostSinceReport := func() uint32 { + if stream.lastSeqnum == stream.lastReportSeqnum { + return 0 + } + + ret := uint32(0) + for i := stream.lastReportSeqnum + 1; i != stream.lastSeqnum; i++ { + if !stream.getReceived(i) { + ret++ + } + } + return ret + }() + stream.totalLost += totalLostSinceReport + + // allow up to 24 bits + if totalLostSinceReport > 0xFFFFFF { + totalLostSinceReport = 0xFFFFFF + } + if stream.totalLost > 0xFFFFFF { + stream.totalLost = 0xFFFFFF + } + + r := &rtcp.ReceiverReport{ + SSRC: stream.receiverSSRC, + Reports: []rtcp.ReceptionReport{ + { + SSRC: stream.ssrc, + LastSequenceNumber: uint32(stream.seqnumCycles)<<16 | uint32(stream.lastSeqnum), + LastSenderReport: stream.lastSenderReport, + FractionLost: uint8(float64(totalLostSinceReport*256) / float64(totalSinceReport)), + TotalLost: stream.totalLost, + Delay: func() uint32 { + if stream.lastSenderReportTime.IsZero() { + return 0 + } + return uint32(now.Sub(stream.lastSenderReportTime).Seconds() * 65536) + }(), + Jitter: uint32(stream.jitter), + }, + }, + } + + stream.lastReportSeqnum = stream.lastSeqnum + + return r +} diff --git a/pkg/report/report.go b/pkg/report/report.go new file mode 100644 index 00000000..0a3034ce --- /dev/null +++ b/pkg/report/report.go @@ -0,0 +1,2 @@ +// Package report provides interceptors to implement sending sender and receiver reports. +package report diff --git a/pkg/report/sender_interceptor.go b/pkg/report/sender_interceptor.go new file mode 100644 index 00000000..e56fd8c3 --- /dev/null +++ b/pkg/report/sender_interceptor.go @@ -0,0 +1,139 @@ +package report + +import ( + "sync" + "time" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +func ntpTime(t time.Time) uint64 { + // seconds since 1st January 1900 + s := (float64(t.UnixNano()) / 1000000000) + 2208988800 + + // higher 32 bits are the integer part, lower 32 bits are the fractional part + integerPart := uint32(s) + fractionalPart := uint32((s - float64(integerPart)) * 0xFFFFFFFF) + return uint64(integerPart)<<32 | uint64(fractionalPart) +} + +// SenderInterceptor interceptor generates sender reports. +type SenderInterceptor struct { + interceptor.NoOp + interval time.Duration + now func() time.Time + streams sync.Map + log logging.LeveledLogger + m sync.Mutex + wg sync.WaitGroup + close chan struct{} +} + +// NewSenderInterceptor returns a new SenderInterceptor interceptor. +func NewSenderInterceptor(opts ...SenderOption) (*SenderInterceptor, error) { + s := &SenderInterceptor{ + interval: 1 * time.Second, + now: time.Now, + log: logging.NewDefaultLoggerFactory().NewLogger("sender_interceptor"), + close: make(chan struct{}), + } + + for _, opt := range opts { + if err := opt(s); err != nil { + return nil, err + } + } + + return s, nil +} + +func (s *SenderInterceptor) isClosed() bool { + select { + case <-s.close: + return true + default: + return false + } +} + +// Close closes the interceptor. +func (s *SenderInterceptor) Close() error { + defer s.wg.Wait() + s.m.Lock() + defer s.m.Unlock() + + if !s.isClosed() { + close(s.close) + } + + return nil +} + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (s *SenderInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + s.m.Lock() + defer s.m.Unlock() + + if s.isClosed() { + return writer + } + + s.wg.Add(1) + + go s.loop(writer) + + return writer +} + +func (s *SenderInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { + defer s.wg.Done() + + ticker := time.NewTicker(s.interval) + for { + select { + case <-ticker.C: + now := s.now() + s.streams.Range(func(key, value interface{}) bool { + ssrc := key.(uint32) + stream := value.(*senderStream) + + stream.m.Lock() + defer stream.m.Unlock() + + sr := &rtcp.SenderReport{ + SSRC: ssrc, + NTPTime: ntpTime(now), + RTPTime: stream.lastRTPTimeRTP + uint32(now.Sub(stream.lastRTPTimeTime).Seconds()*stream.clockRate), + PacketCount: stream.packetCount, + OctetCount: stream.octetCount, + } + + if _, err := rtcpWriter.Write([]rtcp.Packet{sr}, interceptor.Attributes{}); err != nil { + s.log.Warnf("failed sending: %+v", err) + } + + return true + }) + + case <-s.close: + return + } + } +} + +// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method +// will be called once per rtp packet. +func (s *SenderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + stream := newSenderStream(info.ClockRate) + s.streams.Store(info.SSRC, stream) + + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, a interceptor.Attributes) (int, error) { + stream.processRTP(s.now(), header, payload) + + return writer.Write(header, payload, a) + }) +} diff --git a/pkg/report/sender_interceptor_test.go b/pkg/report/sender_interceptor_test.go new file mode 100644 index 00000000..5a534919 --- /dev/null +++ b/pkg/report/sender_interceptor_test.go @@ -0,0 +1,84 @@ +package report + +import ( + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestSenderInterceptor(t *testing.T) { + t.Run("before any packet", func(t *testing.T) { + mt := &test.MockTime{} + i, err := NewSenderInterceptor( + SenderInterval(time.Millisecond*50), + SenderLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + SenderNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + mt.SetNow(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + sr, ok := pkts[0].(*rtcp.SenderReport) + assert.True(t, ok) + assert.Equal(t, &rtcp.SenderReport{ + SSRC: 123456, + NTPTime: ntpTime(mt.Now()), + RTPTime: 2269117121, + PacketCount: 0, + OctetCount: 0, + }, sr) + }) + + t.Run("after RTP packets", func(t *testing.T) { + mt := &test.MockTime{} + i, err := NewSenderInterceptor( + SenderInterval(time.Millisecond*50), + SenderLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + SenderNow(mt.Now), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + for i := 0; i < 10; i++ { + assert.NoError(t, stream.WriteRTP(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: uint16(i)}, + Payload: []byte("\x00\x00"), + })) + } + + mt.SetNow(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) + pkts := <-stream.WrittenRTCP() + assert.Equal(t, len(pkts), 1) + sr, ok := pkts[0].(*rtcp.SenderReport) + assert.True(t, ok) + assert.Equal(t, &rtcp.SenderReport{ + SSRC: 123456, + NTPTime: ntpTime(mt.Now()), + RTPTime: 2269117121, + PacketCount: 10, + OctetCount: 20, + }, sr) + }) +} diff --git a/pkg/report/sender_option.go b/pkg/report/sender_option.go new file mode 100644 index 00000000..4cb161a3 --- /dev/null +++ b/pkg/report/sender_option.go @@ -0,0 +1,34 @@ +package report + +import ( + "time" + + "github.com/pion/logging" +) + +// SenderOption can be used to configure SenderInterceptor. +type SenderOption func(r *SenderInterceptor) error + +// SenderLog sets a logger for the interceptor. +func SenderLog(log logging.LeveledLogger) SenderOption { + return func(r *SenderInterceptor) error { + r.log = log + return nil + } +} + +// SenderInterval sets send interval for the interceptor. +func SenderInterval(interval time.Duration) SenderOption { + return func(r *SenderInterceptor) error { + r.interval = interval + return nil + } +} + +// SenderNow sets an alternative for the time.Now function. +func SenderNow(f func() time.Time) SenderOption { + return func(r *SenderInterceptor) error { + r.now = f + return nil + } +} diff --git a/pkg/report/sender_stream.go b/pkg/report/sender_stream.go new file mode 100644 index 00000000..851d70e5 --- /dev/null +++ b/pkg/report/sender_stream.go @@ -0,0 +1,37 @@ +package report + +import ( + "sync" + "time" + + "github.com/pion/rtp" +) + +type senderStream struct { + clockRate float64 + m sync.Mutex + + // data from rtp packets + lastRTPTimeRTP uint32 + lastRTPTimeTime time.Time + packetCount uint32 + octetCount uint32 +} + +func newSenderStream(clockRate uint32) *senderStream { + return &senderStream{ + clockRate: float64(clockRate), + } +} + +func (stream *senderStream) processRTP(now time.Time, header *rtp.Header, payload []byte) { + stream.m.Lock() + defer stream.m.Unlock() + + // always update time to minimize errors + stream.lastRTPTimeRTP = header.Timestamp + stream.lastRTPTimeTime = now + + stream.packetCount++ + stream.octetCount += uint32(len(payload)) +}