Skip to content

Commit

Permalink
[WIP] Working on code to negotiate FlexFEC, it worked on a previous a…
Browse files Browse the repository at this point in the history
…pplication, I need to debug it
  • Loading branch information
ypothoma committed May 6, 2024
1 parent 480be18 commit 7ebbf6a
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 19 deletions.
4 changes: 4 additions & 0 deletions go.mod
Expand Up @@ -34,3 +34,7 @@ require (
golang.org/x/sys v0.19.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pion/interceptor => ../interceptor

replace github.com/pion/ice/v2 => ../ice
2 changes: 0 additions & 2 deletions go.sum
Expand Up @@ -45,8 +45,6 @@ github.com/pion/dtls/v2 v2.2.10 h1:u2Axk+FyIR1VFTPurktB+1zoEPGIW3bmyj3LEFrXjAA=
github.com/pion/dtls/v2 v2.2.10/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v3 v3.0.7 h1:dfMViRKblENqzorR2cQiiRKWqQfqKZ9+nT/sREX3ra8=
github.com/pion/ice/v3 v3.0.7/go.mod h1:pBRcCoJRC0vwvFsemfRIqRLYukV4bPboGb0B4b8AhrQ=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=
Expand Down
16 changes: 16 additions & 0 deletions interceptor.go
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"

"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/flexfec"
"github.com/pion/interceptor/pkg/nack"
"github.com/pion/interceptor/pkg/report"
"github.com/pion/interceptor/pkg/rfc8888"
Expand All @@ -29,6 +30,10 @@ func RegisterDefaultInterceptors(mediaEngine *MediaEngine, interceptorRegistry *
if err := ConfigureRTCPReports(interceptorRegistry); err != nil {
return err
}
specialLog("[INTERCEPTOR] CONFIGURING THE FEC SENDER")
if err := ConfigureFECSender(mediaEngine, interceptorRegistry); err != nil {
return err
}

if err := ConfigureSimulcastExtensionHeaders(mediaEngine); err != nil {
return err
Expand Down Expand Up @@ -140,6 +145,17 @@ func ConfigureSimulcastExtensionHeaders(mediaEngine *MediaEngine) error {
return mediaEngine.RegisterHeaderExtension(RTPHeaderExtensionCapability{URI: sdesRepairRTPStreamIDURI}, RTPCodecTypeVideo)
}

func ConfigureFECSender(mediaEngine *MediaEngine, interceptorRegistry *interceptor.Registry) error {
fecInterceptor, err := flexfec.NewFecInterceptor()

if err != nil {
return err
}

interceptorRegistry.Add(fecInterceptor)
return nil
}

type interceptorToTrackLocalWriter struct{ interceptor atomic.Value } // interceptor.RTPWriter }

func (i *interceptorToTrackLocalWriter) WriteRTP(header *rtp.Header, payload []byte) (int, error) {
Expand Down
6 changes: 6 additions & 0 deletions mediaengine.go
Expand Up @@ -199,6 +199,12 @@ func (m *MediaEngine) RegisterDefaultCodecs() error {
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=112", nil},
PayloadType: 113,
},
/*
{
RTPCodecCapability: RTPCodecCapability{"video/flexfec-03", 90000, 0, "repair-window=200ms", nil},
PayloadType: 122,
},
*/
} {
if err := m.RegisterCodec(codec, RTPCodecTypeVideo); err != nil {
return err
Expand Down
53 changes: 50 additions & 3 deletions peerconnection.go
Expand Up @@ -809,6 +809,7 @@ func (pc *PeerConnection) createICETransport() *ICETransport {

// CreateAnswer starts the PeerConnection and generates the localDescription
func (pc *PeerConnection) CreateAnswer(*AnswerOptions) (SessionDescription, error) {
specialLog("[PEERCONNECTION] createAnswer()")
useIdentity := pc.idpLoginURL != nil
remoteDesc := pc.RemoteDescription()
switch {
Expand Down Expand Up @@ -852,6 +853,8 @@ func (pc *PeerConnection) CreateAnswer(*AnswerOptions) (SessionDescription, erro
SDP: string(sdpBytes),
parsed: d,
}
specialLog("[PEERCONNECTION] SDP answer:")
specialLog(desc.SDP)
pc.lastAnswer = desc.SDP
return desc, nil
}
Expand Down Expand Up @@ -1035,6 +1038,7 @@ func (pc *PeerConnection) LocalDescription() *SessionDescription {

// SetRemoteDescription sets the SessionDescription of the remote peer
func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error { //nolint:gocognit,gocyclo
specialLog("[PEERCONNECTION] SetRemoteDescription()")
if pc.isClosed.get() {
return &rtcerr.InvalidStateError{Err: ErrConnectionClosed}
}
Expand Down Expand Up @@ -1218,6 +1222,9 @@ func (pc *PeerConnection) configureReceiver(incoming trackDetails, receiver *RTP
// set track id and label early so they can be set as new track information
// is received from the SDP.
for i := range receiver.tracks {
specialLog("----------------------")
specialLog("[PEERCONNECTION] configureReceiver: track.id ", incoming.id)
specialLog("[PEERCONNECTION] configureReceiver: track.streamID ", incoming.streamID)
receiver.tracks[i].track.mu.Lock()
receiver.tracks[i].track.id = incoming.id
receiver.tracks[i].track.streamID = incoming.streamID
Expand All @@ -1226,6 +1233,7 @@ func (pc *PeerConnection) configureReceiver(incoming trackDetails, receiver *RTP
}

func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPReceiver) {
specialLog("[PEERCONNECTION] startReceiver()")
if err := receiver.startReceive(trackDetailsToRTPReceiveParameters(&incoming)); err != nil {
pc.log.Warnf("RTPReceiver Receive failed %s", err)
return
Expand All @@ -1237,6 +1245,13 @@ func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPRece
}

go func(track *TrackRemote) {
specialLog("------------------------------")
specialLog("[receiver func] track.ssrc ", track.ssrc)
specialLog("[receiver func] track.id ", track.id)
specialLog("[receiver func] track.kind ", track.kind)
specialLog("[receiver func] track.codec ", track.codec)
specialLog("[receiver func] track.payloadType ", track.payloadType)
specialLog("[receiver func] trac.params", track.params)
b := make([]byte, pc.api.settingEngine.getReceiveMTU())
n, _, err := track.peek(b)
if err != nil {
Expand Down Expand Up @@ -1328,6 +1343,8 @@ func runIfNewReceiver(

// configureRTPReceivers opens knows inbound SRTP streams from the RemoteDescription
func (pc *PeerConnection) configureRTPReceivers(isRenegotiation bool, remoteDesc *SessionDescription, currentTransceivers []*RTPTransceiver) { //nolint:gocognit
specialLog("[configureRTPReceivers()] =============================")

incomingTracks := trackDetailsFromSDP(pc.log, remoteDesc.parsed)

if isRenegotiation {
Expand Down Expand Up @@ -1405,6 +1422,12 @@ func (pc *PeerConnection) configureRTPReceivers(isRenegotiation bool, remoteDesc
}

for _, incomingTrack := range filteredTracks {
specialLog("[PEERCONNECTION] configureRTPReceivers(): incomingTrack.id = ", incomingTrack.id)
specialLog("[PEERCONNECTION] configureRTPReceivers(): incomingTrack.kind = ", incomingTrack.kind)
specialLog("[PEERCONNECTION] configureRTPReceivers(): incomingTrack.mid = ", incomingTrack.mid)
specialLog("[PEERCONNECTION] configureRTPReceivers(): incomingTrack.ssrcs = ", incomingTrack.ssrcs)
specialLog("[PEERCONNECTION] configureRTPReceivers(): incomingTrack.repairSsrc = ", incomingTrack.repairSsrc)

_ = runIfNewReceiver(incomingTrack, localTransceivers, pc.configureReceiver)
}
}
Expand Down Expand Up @@ -1453,7 +1476,11 @@ func (pc *PeerConnection) startRTPReceivers(remoteDesc *SessionDescription, curr

// startRTPSenders starts all outbound RTP streams
func (pc *PeerConnection) startRTPSenders(currentTransceivers []*RTPTransceiver) error {
specialLog("[PEERCONNECTION] startRTPSenders()")
for _, transceiver := range currentTransceivers {
specialLog("[PEERCONNECTION] startRTPSender() -> transceiver.kind = ", transceiver.kind)
specialLog("[PEERCONNECTION] startRTPSender() -> transceiver.kind = ", transceiver.mid)
specialLog("[PEERCONNECTION] startRTPSender() -> transceiver.codecs = ", transceiver.codecs)
if sender := transceiver.Sender(); sender != nil && sender.isNegotiated() && !sender.hasSent() {
err := sender.Send(sender.GetParameters())
if err != nil {
Expand Down Expand Up @@ -2387,6 +2414,7 @@ func (pc *PeerConnection) generateUnmatchedSDP(transceivers []*RTPTransceiver, u
// this is used everytime we have a RemoteDescription
// nolint: gocyclo
func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, useIdentity bool, includeUnmatched bool, connectionRole sdp.ConnectionRole) (*sdp.SessionDescription, error) { //nolint:gocognit
specialLog("[PEERCONNECTION] generateMatchedSDP()")
d, err := sdp.NewJSEPSessionDescription(useIdentity)
if err != nil {
return nil, err
Expand All @@ -2412,14 +2440,23 @@ func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, use
localTransceivers := append([]*RTPTransceiver{}, transceivers...)

detectedPlanB := descriptionIsPlanB(remoteDescription, pc.log)
if pc.configuration.SDPSemantics != SDPSemanticsUnifiedPlan {
detectedPlanB = descriptionPossiblyPlanB(remoteDescription)
}
specialLog("[PEERCONNECTION] descriptionIsPlanB = ", strconv.FormatBool(detectedPlanB))

/*
if pc.configuration.SDPSemantics != SDPSemanticsUnifiedPlan {
detectedPlanB = descriptionPossiblyPlanB(remoteDescription)
specialLog("[PEERCONNECTION] descriptionPossiblyPlanB:")
specialLog(detectedPlanB)
}
*/

mediaSections := []mediaSection{}
alreadyHaveApplicationMediaSection := false
for _, media := range remoteDescription.parsed.MediaDescriptions {
midValue := getMidValue(media)
specialLog("[PEERCONNECTION] remoteDesc.mediaDesc -> midValue = ", midValue)
specialLog("[PEERCONNECTION] remoteDesc.mediaDesc -> mediaName = ", media.MediaName.Media)
specialLog("[PEERCONNECTION] remoteDesc.mediaDesc -> mediaName = ", media.MediaName.String())
if midValue == "" {
return nil, errPeerConnRemoteDescriptionWithoutMidValue
}
Expand All @@ -2431,7 +2468,9 @@ func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, use
}

kind := NewRTPCodecType(media.MediaName.Media)
specialLog("[PEERCONNECTION] mediaDesc -> kind = ", kind.String())
direction := getPeerDirection(media)
specialLog("[PEERCONNECTION] mediaDesc -> direction = ", direction.String())
if kind == 0 || direction == RTPTransceiverDirectionUnknown {
continue
}
Expand All @@ -2447,6 +2486,7 @@ func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, use
// media entry with all matching local transceivers
mediaTransceivers := []*RTPTransceiver{}
for {
specialLog("[PEERCONNECTION] Finding a batch of sender transceivers")
// keep going until we can't get any more
t, localTransceivers = satisfyTypeAndDirection(kind, direction, localTransceivers)
if t == nil {
Expand Down Expand Up @@ -2475,7 +2515,14 @@ func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, use
sender.setNegotiated()
}
mediaTransceivers := []*RTPTransceiver{t}
for _, mediaTransceiver := range mediaTransceivers {
specialLog("[PEERCONNECTION] mediaTransceiver:")
specialLog(mediaTransceiver)
}

mediaSections = append(mediaSections, mediaSection{id: midValue, transceivers: mediaTransceivers, ridMap: getRids(media)})
specialLog("[PEERCONNECTION] mediaSections:")
specialLog(mediaSections)
}
}

Expand Down
11 changes: 11 additions & 0 deletions rtpreceiver.go
Expand Up @@ -244,6 +244,17 @@ func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
func (r *RTPReceiver) Read(b []byte) (n int, a interceptor.Attributes, err error) {
select {
case <-r.received:
specialLog("[rtpreceiver] r.tracks[0] = ", r.tracks[0])
specialLog("[rtpreceiver] r.tracks[0].track = ", r.tracks[0].track)
specialLog("[rtpreceiver] r.tracks[0].streamInfo = ", r.tracks[0].streamInfo)
specialLog("[rtpreceiver] r.tracks[0] interceptor? => ", r.tracks[0].rtcpInterceptor)
specialLog("[rtpreceiver] interceptor.attributes param = ", a)

if r.tracks[0].rtcpInterceptor == nil {
// I don't know what this phantom track is
specialLog("PHANTOM TRACK!!!!")
return
}
return r.tracks[0].rtcpInterceptor.Read(b, a)
case <-r.closed:
return 0, nil, io.ErrClosedPipe
Expand Down
8 changes: 8 additions & 0 deletions rtptransceiver.go
Expand Up @@ -238,6 +238,10 @@ func findByMid(mid string, localTransceivers []*RTPTransceiver) (*RTPTransceiver
// Given a direction+type pluck a transceiver from the passed list
// if no entry satisfies the requested type+direction return a inactive Transceiver
func satisfyTypeAndDirection(remoteKind RTPCodecType, remoteDirection RTPTransceiverDirection, localTransceivers []*RTPTransceiver) (*RTPTransceiver, []*RTPTransceiver) {

fmt.Printf("[satisfy] remoteDirection: %v\n", remoteDirection)
fmt.Printf("[satisfy] remoteKind: %v\n", remoteKind)

// Get direction order from most preferred to least
getPreferredDirections := func() []RTPTransceiverDirection {
switch remoteDirection {
Expand All @@ -255,12 +259,16 @@ func satisfyTypeAndDirection(remoteKind RTPCodecType, remoteDirection RTPTransce
for _, possibleDirection := range getPreferredDirections() {
for i := range localTransceivers {
t := localTransceivers[i]
specialLog("[satisfy] localTrancsceiver: %v\n", t)
specialLog("[satisfy] localTransceiver.Mid: " + t.Mid())
specialLog("[satisfy] localTransceiver.kind: " + t.kind.String())
if t.Mid() == "" && t.kind == remoteKind && possibleDirection == t.Direction() {
return t, append(localTransceivers[:i], localTransceivers[i+1:]...)
}
}
}

specialLog("[satisfyTypeAndDir] didn't find anything sadly")
return nil, localTransceivers
}

Expand Down
67 changes: 53 additions & 14 deletions sdp.go
Expand Up @@ -84,6 +84,7 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (
for _, media := range s.MediaDescriptions {
tracksInMediaSection := []trackDetails{}
rtxRepairFlows := map[uint64]uint64{}
fecRepairFlows := map[uint64]uint64{}

// Plan B can have multiple tracks in a single media section
streamID := ""
Expand Down Expand Up @@ -136,6 +137,28 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (
}
}
}
specialLog("[trackDetails...] split: " + split[0])

if split[0] == "FEC-FR" {
// Add fec ssrcs to blacklist, to avoid adding them as tracks
// Essentially lines like `a=ssrc-group:FEC-FR 2231627014 632943048` are processed by this section
// as this declares that the second SSRC (632943048) is a fec repair flow (RFC4588) for the first
// (2231627014) as specified in RFC5576
if len(split) == 3 {
baseSsrc, err := strconv.ParseUint(split[1], 10, 32)
if err != nil {
log.Warnf("Failed to parse SSRC: %v", err)
continue
}
fecRepairFlow, err := strconv.ParseUint(split[2], 10, 32)
if err != nil {
log.Warnf("Failed to parse SSRC: %v", err)
continue
}
fecRepairFlows[fecRepairFlow] = baseSsrc
tracksInMediaSection = filterTrackWithSSRC(tracksInMediaSection, SSRC(fecRepairFlow))
}
}

// Handle `a=msid:<stream_id> <track_label>` for Unified plan. The first value is the same as MediaStream.id
// in the browser and can be used to figure out which tracks belong to the same stream. The browser should
Expand Down Expand Up @@ -649,21 +672,37 @@ func getMidValue(media *sdp.MediaDescription) string {

// SessionDescription contains a MediaSection with Multiple SSRCs, it is Plan-B
func descriptionIsPlanB(desc *SessionDescription, log logging.LeveledLogger) bool {
if desc == nil || desc.parsed == nil {
return false
}

// Store all MIDs that already contain a track
midWithTrack := map[string]bool{}

for _, trackDetail := range trackDetailsFromSDP(log, desc.parsed) {
if _, ok := midWithTrack[trackDetail.mid]; ok {
return true
}
midWithTrack[trackDetail.mid] = true
}

// To get a working FlexFEC implementation, we are focusing on unified plan for now.
// From what I remember, the spec is not very clear on how flex fec works with plan B.
return false
/*
if desc == nil || desc.parsed == nil {
return false
}
// Store all MIDs that already contain a track
midWithTrack := map[string]bool{}
for _, trackDetail := range trackDetailsFromSDP(log, desc.parsed) {
specialLog("--------trackDetails--------")
specialLog("trackDetail.streamID: " + trackDetail.streamID)
specialLog("trackDetail.mid: " + trackDetail.mid)
specialLog("trackDetail.id: " + trackDetail.id)
specialLog("trackDetail.kind: " + trackDetail.kind.String())
if trackDetail.repairSsrc != nil {
specialLog("trackDetail.repairSsrc: ")
specialLog(*trackDetail.repairSsrc)
}
if _, ok := midWithTrack[trackDetail.mid]; ok {
return true
}
midWithTrack[trackDetail.mid] = true
}
return false
*/
}

// SessionDescription contains a MediaSection with name `audio`, `video` or `data`
Expand Down
11 changes: 11 additions & 0 deletions special_logger.go
@@ -0,0 +1,11 @@
package webrtc

import "fmt"

const specialLogEnabled = false

func specialLog(toLog ...interface{}) {
if specialLogEnabled {
fmt.Println(toLog...)
}
}
2 changes: 2 additions & 0 deletions track_local_static.go
Expand Up @@ -165,6 +165,8 @@ func (s *TrackLocalStaticRTP) writeRTP(p *rtp.Packet) error {
for _, b := range s.bindings {
p.Header.SSRC = uint32(b.ssrc)
p.Header.PayloadType = uint8(b.payloadType)
//specialLog("[WEBRTC] writeRTP() -> SSRC -> ", p.Header.SSRC)
//specialLog("[WEBRTC] writeRTP() -> PayloadType -> ", p.Header.PayloadType)
if _, err := b.writeStream.WriteRTP(&p.Header, p.Payload); err != nil {
writeErrs = append(writeErrs, err)
}
Expand Down

0 comments on commit 7ebbf6a

Please sign in to comment.