From e3ced781d0e5631ff8c54c38d4ff4d79aee7fb39 Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Thu, 26 Aug 2021 16:47:11 -0400 Subject: [PATCH] Add E2E RID Simulcast test Relates to #1345 --- peerconnection_media_test.go | 132 +++++++++++++++++++++++++++++++---- 1 file changed, 118 insertions(+), 14 deletions(-) diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index 3e4004442e..030c26e037 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -386,6 +386,20 @@ func (u *undeclaredSsrcLoggerFactory) NewLogger(subsystem string) logging.Levele return &undeclaredSsrcLogger{u.unhandledSimulcastError} } +// Filter SSRC lines +func filterSsrc(offer *SessionDescription) (filteredSDP string) { + scanner := bufio.NewScanner(strings.NewReader(offer.SDP)) + for scanner.Scan() { + l := scanner.Text() + if strings.HasPrefix(l, "a=ssrc") { + continue + } + + filteredSDP += l + "\n" + } + return +} + // If a SessionDescription has a single media section and no SSRC // assume that it is meant to handle all RTP packets func TestUndeclaredSSRC(t *testing.T) { @@ -395,20 +409,6 @@ func TestUndeclaredSSRC(t *testing.T) { report := test.CheckRoutines(t) defer report() - // Filter SSRC lines - filterSsrc := func(offer *SessionDescription) (filteredSDP string) { - scanner := bufio.NewScanner(strings.NewReader(offer.SDP)) - for scanner.Scan() { - l := scanner.Text() - if strings.HasPrefix(l, "a=ssrc") { - continue - } - - filteredSDP += l + "\n" - } - return - } - t.Run("No SSRC", func(t *testing.T) { pcOffer, pcAnswer, err := newPair() assert.NoError(t, err) @@ -1091,3 +1091,107 @@ func TestPeerConnection_RaceReplaceTrack(t *testing.T) { assert.NoError(t, pc.Close()) } + +func TestPeerConnection_Simulcast(t *testing.T) { + lim := test.TimeOut(time.Second * 30) + defer lim.Stop() + + report := test.CheckRoutines(t) + defer report() + + // Enable Extension Headers needed for Simulcast + m := &MediaEngine{} + if err := m.RegisterDefaultCodecs(); err != nil { + panic(err) + } + for _, extension := range []string{ + "urn:ietf:params:rtp-hdrext:sdes:mid", + "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id", + "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id", + } { + if err := m.RegisterHeaderExtension(RTPHeaderExtensionCapability{URI: extension}, RTPCodecTypeVideo); err != nil { + panic(err) + } + } + + t.Run("RID Based", func(t *testing.T) { + rids := []string{"a", "b", "c"} + + pcOffer, pcAnswer, err := NewAPI(WithMediaEngine(m)).newPair(Configuration{}) + assert.NoError(t, err) + + vp8Writer, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2") + assert.NoError(t, err) + + _, err = pcOffer.AddTrack(vp8Writer) + assert.NoError(t, err) + + var ridMapLock sync.RWMutex + ridMap := map[string]int{} + pcAnswer.OnTrack(func(trackRemote *TrackRemote, r *RTPReceiver) { + ridMapLock.Lock() + defer ridMapLock.Unlock() + ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1 + }) + + offer, err := pcOffer.CreateOffer(nil) + assert.NoError(t, err) + + offerGatheringComplete := GatheringCompletePromise(pcOffer) + assert.NoError(t, pcOffer.SetLocalDescription(offer)) + <-offerGatheringComplete + + offer.SDP = filterSsrc(pcOffer.LocalDescription()) + for _, rid := range rids { + offer.SDP += "a=" + sdpAttributeRid + ":" + rid + " send\r\n" + } + offer.SDP += "a=simulcast:send " + strings.Join(rids, ";") + "\r\n" + + assert.NoError(t, pcAnswer.SetRemoteDescription(offer)) + + answer, err := pcAnswer.CreateAnswer(nil) + assert.NoError(t, err) + + answerGatheringComplete := GatheringCompletePromise(pcAnswer) + assert.NoError(t, pcAnswer.SetLocalDescription(answer)) + <-answerGatheringComplete + + assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription())) + + func() { + for sequenceNumber := uint16(0); ; sequenceNumber++ { + time.Sleep(20 * time.Millisecond) + + for ssrc, rid := range rids { + header := &rtp.Header{ + Version: 2, + SSRC: uint32(ssrc), + SequenceNumber: sequenceNumber, + PayloadType: 96, + } + assert.NoError(t, header.SetExtension(1, []byte("0"))) + assert.NoError(t, header.SetExtension(2, []byte(rid))) + + _, err = vp8Writer.bindings[0].writeStream.WriteRTP(header, []byte{0x00}) + assert.NoError(t, err) + } + + ridMapLock.Lock() + ridCount := len(ridMap) + ridMapLock.Unlock() + if ridCount == 3 { + return + } + } + }() + + ridMapLock.Lock() + defer ridMapLock.Unlock() + + for _, rid := range rids { + assert.Equal(t, ridMap[rid], 1) + } + assert.Equal(t, len(ridMap), 3) + closePairNow(t, pcOffer, pcAnswer) + }) +}