Skip to content

Commit

Permalink
Implement SSRC Based Simulcast
Browse files Browse the repository at this point in the history
Resolves #1345
  • Loading branch information
Sean-Der committed Aug 28, 2021
1 parent e3ced78 commit d570b78
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 95 deletions.
67 changes: 40 additions & 27 deletions peerconnection.go
Expand Up @@ -1151,12 +1151,19 @@ func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error {
}

func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPReceiver) {
encodings := []RTPDecodingParameters{}
if incoming.ssrc != 0 {
encodings = append(encodings, RTPDecodingParameters{RTPCodingParameters{SSRC: incoming.ssrc}})
encodingSize := len(incoming.ssrcs)
if len(incoming.rids) >= encodingSize {
encodingSize = len(incoming.rids)
}
for _, rid := range incoming.rids {
encodings = append(encodings, RTPDecodingParameters{RTPCodingParameters{RID: rid}})

encodings := make([]RTPDecodingParameters, encodingSize)
for i := range encodings {
if len(incoming.rids) > i {
encodings[i].RID = incoming.rids[i]
}
if len(incoming.ssrcs) > i {
encodings[i].SSRC = incoming.ssrcs[i]
}
}

if err := receiver.Receive(RTPReceiveParameters{Encodings: encodings}); err != nil {
Expand All @@ -1173,26 +1180,27 @@ func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPRece
receiver.tracks[i].track.mu.Unlock()
}

// We can't block and wait for a single SSRC
if incoming.ssrc == 0 {
return
}

go func() {
b := make([]byte, pc.api.settingEngine.getReceiveMTU())
n, _, err := receiver.Track().peek(b)
if err != nil {
pc.log.Warnf("Could not determine PayloadType for SSRC %d (%s)", receiver.Track().SSRC(), err)
for _, t := range receiver.Tracks() {
if t.ssrc == 0 {
return
}

if err = receiver.Track().checkAndUpdateTrack(b[:n]); err != nil {
pc.log.Warnf("Failed to set codec settings for track SSRC %d (%s)", receiver.Track().SSRC(), err)
return
}
go func(track *TrackRemote) {
b := make([]byte, pc.api.settingEngine.getReceiveMTU())
n, _, err := track.peek(b)
if err != nil {
pc.log.Warnf("Could not determine PayloadType for SSRC %d (%s)", track.SSRC(), err)
return
}

pc.onTrack(receiver.Track(), receiver)
}()
if err = track.checkAndUpdateTrack(b[:n]); err != nil {
pc.log.Warnf("Failed to set codec settings for track SSRC %d (%s)", track.SSRC(), err)
return
}

pc.onTrack(track, receiver)
}(t)
}
}

// startRTPReceivers opens knows inbound SRTP streams from the RemoteDescription
Expand All @@ -1216,12 +1224,17 @@ func (pc *PeerConnection) startRTPReceivers(incomingTracks []trackDetails, curre
}
incomingTrack := incomingTracks[i]

// If we already have a TrackRemote for a given SSRC don't handle it again
for _, t := range localTransceivers {
if receiver := t.Receiver(); receiver == nil || receiver.Track() == nil || receiver.Track().ssrc != incomingTrack.ssrc {
continue
if receiver := t.Receiver(); receiver != nil {
for _, track := range receiver.Tracks() {
for _, ssrc := range incomingTrack.ssrcs {
if ssrc == track.SSRC() {
incomingTracks = filterTrackWithSSRC(incomingTracks, track.SSRC())
}
}
}
}

incomingTracks = filterTrackWithSSRC(incomingTracks, incomingTrack.ssrc)
}
}

Expand Down Expand Up @@ -1260,7 +1273,7 @@ func (pc *PeerConnection) startRTPReceivers(incomingTracks []trackDetails, curre
Direction: RTPTransceiverDirectionSendrecv,
})
if err != nil {
pc.log.Warnf("Could not add transceiver for remote SSRC %d: %s", incoming.ssrc, err)
pc.log.Warnf("Could not add transceiver for remote SSRC %d: %s", incoming.ssrcs[0], err)
continue
}
pc.startReceiver(incoming, t.Receiver())
Expand Down Expand Up @@ -1343,7 +1356,7 @@ func (pc *PeerConnection) handleUndeclaredSSRC(ssrc SSRC, remoteDescription *Ses
}

incoming := trackDetails{
ssrc: ssrc,
ssrcs: []SSRC{ssrc},
kind: RTPCodecTypeVideo,
streamID: streamID,
id: id,
Expand Down
146 changes: 91 additions & 55 deletions peerconnection_media_test.go
Expand Up @@ -387,8 +387,8 @@ func (u *undeclaredSsrcLoggerFactory) NewLogger(subsystem string) logging.Levele
}

// Filter SSRC lines
func filterSsrc(offer *SessionDescription) (filteredSDP string) {
scanner := bufio.NewScanner(strings.NewReader(offer.SDP))
func filterSsrc(offer string) (filteredSDP string) {
scanner := bufio.NewScanner(strings.NewReader(offer))
for scanner.Scan() {
l := scanner.Text()
if strings.HasPrefix(l, "a=ssrc") {
Expand Down Expand Up @@ -433,7 +433,7 @@ func TestUndeclaredSSRC(t *testing.T) {
assert.NoError(t, pcOffer.SetLocalDescription(offer))
<-offerGatheringComplete

offer.SDP = filterSsrc(pcOffer.LocalDescription())
offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP)
assert.NoError(t, pcAnswer.SetRemoteDescription(offer))

answer, err := pcAnswer.CreateAnswer(nil)
Expand Down Expand Up @@ -474,7 +474,7 @@ func TestUndeclaredSSRC(t *testing.T) {
<-offerGatheringComplete

// Append RID to end of SessionDescription. Will not be considered unhandled anymore
offer.SDP = filterSsrc(pcOffer.LocalDescription()) + "a=" + sdpAttributeRid + "\r\n"
offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP) + "a=" + sdpAttributeRid + "\r\n"
assert.NoError(t, pcAnswer.SetRemoteDescription(offer))

answer, err := pcAnswer.CreateAnswer(nil)
Expand Down Expand Up @@ -963,7 +963,8 @@ func TestPeerConnection_Start_Right_Receiver(t *testing.T) {
// Assert that failed Simulcast probing doesn't cause
// the handleUndeclaredSSRC to be leaked
func TestPeerConnection_Simulcast_Probe(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
return
lim := test.TimeOut(time.Second * 30) //nolint
defer lim.Stop()

report := test.CheckRoutines(t)
Expand Down Expand Up @@ -1099,23 +1100,43 @@ func TestPeerConnection_Simulcast(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

// Enable Extension Headers needed for Simulcast
m := &MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
}
for _, extension := range []string{
"urn:ietf:params:rtp-hdrext:sdes:mid",
"urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
"urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
} {
if err := m.RegisterHeaderExtension(RTPHeaderExtensionCapability{URI: extension}, RTPCodecTypeVideo); err != nil {
panic(err)
rids := []string{"a", "b", "c"}
var ridMapLock sync.RWMutex
ridMap := map[string]int{}

assertRidCorrect := func(t *testing.T) {
ridMapLock.Lock()
defer ridMapLock.Unlock()

for _, rid := range rids {
assert.Equal(t, ridMap[rid], 1)
}
assert.Equal(t, len(ridMap), 3)
}

t.Run("RID Based", func(t *testing.T) {
rids := []string{"a", "b", "c"}
ridsFullfilled := func() bool {
ridMapLock.Lock()
defer ridMapLock.Unlock()

ridCount := len(ridMap)
return ridCount == 3
}

signalWithModifications := func(t *testing.T, modificationFunc func(string) string) (*PeerConnection, *PeerConnection, *TrackLocalStaticRTP) {
// Enable Extension Headers needed for Simulcast
m := &MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
}
for _, extension := range []string{
"urn:ietf:params:rtp-hdrext:sdes:mid",
"urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
"urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
} {
if err := m.RegisterHeaderExtension(RTPHeaderExtensionCapability{URI: extension}, RTPCodecTypeVideo); err != nil {
panic(err)
}
}

pcOffer, pcAnswer, err := NewAPI(WithMediaEngine(m)).newPair(Configuration{})
assert.NoError(t, err)
Expand All @@ -1126,9 +1147,7 @@ func TestPeerConnection_Simulcast(t *testing.T) {
_, err = pcOffer.AddTrack(vp8Writer)
assert.NoError(t, err)

var ridMapLock sync.RWMutex
ridMap := map[string]int{}
pcAnswer.OnTrack(func(trackRemote *TrackRemote, r *RTPReceiver) {
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
Expand All @@ -1141,11 +1160,7 @@ func TestPeerConnection_Simulcast(t *testing.T) {
assert.NoError(t, pcOffer.SetLocalDescription(offer))
<-offerGatheringComplete

offer.SDP = filterSsrc(pcOffer.LocalDescription())
for _, rid := range rids {
offer.SDP += "a=" + sdpAttributeRid + ":" + rid + " send\r\n"
}
offer.SDP += "a=simulcast:send " + strings.Join(rids, ";") + "\r\n"
offer.SDP = modificationFunc(pcOffer.LocalDescription().SDP)

assert.NoError(t, pcAnswer.SetRemoteDescription(offer))

Expand All @@ -1158,40 +1173,61 @@ func TestPeerConnection_Simulcast(t *testing.T) {

assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))

func() {
for sequenceNumber := uint16(0); ; sequenceNumber++ {
time.Sleep(20 * time.Millisecond)

for ssrc, rid := range rids {
header := &rtp.Header{
Version: 2,
SSRC: uint32(ssrc),
SequenceNumber: sequenceNumber,
PayloadType: 96,
}
assert.NoError(t, header.SetExtension(1, []byte("0")))
assert.NoError(t, header.SetExtension(2, []byte(rid)))
return pcOffer, pcAnswer, vp8Writer
}

_, err = vp8Writer.bindings[0].writeStream.WriteRTP(header, []byte{0x00})
assert.NoError(t, err)
}
t.Run("RTP Extension Based", func(t *testing.T) {
pcOffer, pcAnswer, vp8Writer := signalWithModifications(t, func(sessionDescription string) string {
sessionDescription = filterSsrc(sessionDescription)
for _, rid := range rids {
sessionDescription += "a=" + sdpAttributeRid + ":" + rid + " send\r\n"
}
return sessionDescription + "a=simulcast:send " + strings.Join(rids, ";") + "\r\n"
})

ridMapLock.Lock()
ridCount := len(ridMap)
ridMapLock.Unlock()
if ridCount == 3 {
return
for sequenceNumber := uint16(0); !ridsFullfilled(); sequenceNumber++ {
time.Sleep(20 * time.Millisecond)

for ssrc, rid := range rids {
header := &rtp.Header{
Version: 2,
SSRC: uint32(ssrc),
SequenceNumber: sequenceNumber,
PayloadType: 96,
}
assert.NoError(t, header.SetExtension(1, []byte("0")))
assert.NoError(t, header.SetExtension(2, []byte(rid)))

_, err := vp8Writer.bindings[0].writeStream.WriteRTP(header, []byte{0x00})
assert.NoError(t, err)
}
}()
}

ridMapLock.Lock()
defer ridMapLock.Unlock()
assertRidCorrect(t)
closePairNow(t, pcOffer, pcAnswer)
})

for _, rid := range rids {
assert.Equal(t, ridMap[rid], 1)
}
assert.Equal(t, len(ridMap), 3)
t.Run("SSRC Based", func(t *testing.T) {
pcOffer, pcAnswer, vp8Writer := signalWithModifications(t, func(sessionDescription string) string {
sessionDescription = filterSsrc(sessionDescription)
for _, rid := range rids {
sessionDescription += "a=" + sdpAttributeRid + ":" + rid + " send\r\n"
}
sessionDescription += "a=simulcast:send " + strings.Join(rids, ";") + "\r\n"

return sessionDescription + `a=ssrc:5000 cname:{49d59adc-fae6-407b-8850-2eb4a5e9b76e}
a=ssrc:5001 cname:{49d59adc-fae6-407b-8850-2eb4a5e9b76e}
a=ssrc:5002 cname:{49d59adc-fae6-407b-8850-2eb4a5e9b76e}
a=ssrc:5003 cname:{49d59adc-fae6-407b-8850-2eb4a5e9b76e}
a=ssrc:5004 cname:{49d59adc-fae6-407b-8850-2eb4a5e9b76e}
a=ssrc:5005 cname:{49d59adc-fae6-407b-8850-2eb4a5e9b76e}
a=ssrc-group:FID 5000 5001
a=ssrc-group:FID 5002 5003
a=ssrc-group:FID 5004 5005
`
})

fmt.Println(vp8Writer)
closePairNow(t, pcOffer, pcAnswer)
})
}
2 changes: 1 addition & 1 deletion rtpreceiver.go
Expand Up @@ -155,7 +155,7 @@ func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
r.tracks = append(r.tracks, trackStreams{
track: newTrackRemote(
r.kind,
0,
encoding.SSRC,
encoding.RID,
r,
),
Expand Down
25 changes: 16 additions & 9 deletions sdp.go
Expand Up @@ -22,14 +22,16 @@ type trackDetails struct {
kind RTPCodecType
streamID string
id string
ssrc SSRC
ssrcs []SSRC
rids []string
}

func trackDetailsForSSRC(trackDetails []trackDetails, ssrc SSRC) *trackDetails {
for i := range trackDetails {
if trackDetails[i].ssrc == ssrc {
return &trackDetails[i]
for j := range trackDetails[i].ssrcs {
if trackDetails[i].ssrcs[j] == ssrc {
return &trackDetails[i]
}
}
}
return nil
Expand All @@ -38,10 +40,13 @@ func trackDetailsForSSRC(trackDetails []trackDetails, ssrc SSRC) *trackDetails {
func filterTrackWithSSRC(incomingTracks []trackDetails, ssrc SSRC) []trackDetails {
filtered := []trackDetails{}
for i := range incomingTracks {
if incomingTracks[i].ssrc != ssrc {
filtered = append(filtered, incomingTracks[i])
for j := range incomingTracks[i].ssrcs {
if incomingTracks[i].ssrcs[j] != ssrc {
filtered = append(filtered, incomingTracks[i])
}
}
}

return filtered
}

Expand Down Expand Up @@ -127,17 +132,19 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) [
isNewTrack := true
trackDetails := &trackDetails{}
for i := range incomingTracks {
if incomingTracks[i].ssrc == SSRC(ssrc) {
trackDetails = &incomingTracks[i]
isNewTrack = false
for j := range incomingTracks[i].ssrcs {
if incomingTracks[i].ssrcs[j] == SSRC(ssrc) {
trackDetails = &incomingTracks[i]
isNewTrack = false
}
}
}

trackDetails.mid = midValue
trackDetails.kind = codecType
trackDetails.streamID = streamID
trackDetails.id = trackID
trackDetails.ssrc = SSRC(ssrc)
trackDetails.ssrcs = []SSRC{SSRC(ssrc)}

if isNewTrack {
incomingTracks = append(incomingTracks, *trackDetails)
Expand Down

0 comments on commit d570b78

Please sign in to comment.