Skip to content

Commit

Permalink
[QBOX] Support TrackLocal FlexFEC (pion#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
aggresss committed Feb 29, 2024
1 parent 62e4e71 commit 663fc83
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 2 deletions.
3 changes: 3 additions & 0 deletions mediaengine.go
Expand Up @@ -47,6 +47,9 @@ const (
// MimeTypePCMA PCMA MIME type
// Note: Matching should be case insensitive.
MimeTypePCMA = "audio/PCMA"
// MimeTypeFlexFEC03 FlexFEC03 MIME type
// Note: Matching should be case insensitive.
MimeTypeFlexFEC03 = "video/flexfec-03"
)

type mediaEngineHeaderExtension struct {
Expand Down
10 changes: 10 additions & 0 deletions rtpcodec.go
Expand Up @@ -145,3 +145,13 @@ func codecParametersAssociatedSearch(needle RTPCodecParameters, haystack []RTPCo

return RTPCodecParameters{}, codecMatchNone
}

// Do a search by mime type in the list of codecs
func codecParametersSearchByMimeType(mimeType string, haystack []RTPCodecParameters) (codecs []RTPCodecParameters) {
for i, c := range haystack {
if c.MimeType == mimeType {
codecs = append(codecs, haystack[i])
}
}
return codecs
}
6 changes: 6 additions & 0 deletions rtpcodingparameters.go
Expand Up @@ -9,6 +9,11 @@ type RTPRtxParameters struct {
SSRC SSRC `json:"ssrc"`
}

// RTPFecParameters dictionary contains information relating to FEC settings.
type RTPFecParameters struct {
SSRC SSRC `json:"ssrc"`
}

// RTPCodingParameters provides information relating to both encoding and decoding.
// This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself
// http://draft.ortc.org/#dom-rtcrtpcodingparameters
Expand All @@ -17,4 +22,5 @@ type RTPCodingParameters struct {
SSRC SSRC `json:"ssrc"`
PayloadType PayloadType `json:"payloadType"`
RTX RTPRtxParameters `json:"rtx"`
FEC RTPFecParameters `json:"fec,omitempty"`
}
99 changes: 97 additions & 2 deletions rtpsender.go
Expand Up @@ -32,6 +32,11 @@ type trackEncoding struct {
rtxSrtpStream *srtpWriterFuture
rtxRtcpInterceptor interceptor.RTCPReader
rtxStreamInfo interceptor.StreamInfo

fecSsrc SSRC
fecSrtpStream *srtpWriterFuture
fecRtcpInterceptor interceptor.RTCPReader
fecStreamInfo interceptor.StreamInfo
}

// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
Expand Down Expand Up @@ -125,6 +130,7 @@ func (r *RTPSender) getParameters() RTPSendParameters {
SSRC: trackEncoding.ssrc,
PayloadType: r.payloadType,
RTX: RTPRtxParameters{SSRC: trackEncoding.rtxSsrc},
FEC: RTPFecParameters{SSRC: trackEncoding.fecSsrc},
},
})
}
Expand Down Expand Up @@ -223,6 +229,13 @@ func (r *RTPSender) addEncoding(track TrackLocal) {
}
}

if r.api.settingEngine.trackLocalFlexfec {
codecs := r.api.mediaEngine.getCodecsByKind(track.Kind())
if len(codecParametersSearchByMimeType(MimeTypeFlexFEC03, codecs)) > 0 {
trackEncoding.fecSsrc = SSRC(randutil.NewMathRandomGenerator().Uint32())
}
}

r.trackEncodings = append(r.trackEncodings, trackEncoding)
}

Expand Down Expand Up @@ -314,8 +327,14 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error {
return errRTPSenderTrackRemoved
}

for idx, trackEncoding := range r.trackEncodings {
for idx := range r.trackEncodings {
trackEncoding := r.trackEncodings[idx]
srtpStream := &srtpWriterFuture{ssrc: parameters.Encodings[idx].SSRC, rtpSender: r}
writeStream := &interceptorToTrackLocalWriter{}
fecCodecs := codecParametersSearchByMimeType(MimeTypeFlexFEC03, r.api.mediaEngine.getCodecsByKind(r.kind))

trackEncoding.srtpStream = srtpStream
trackEncoding.ssrc = parameters.Encodings[idx].SSRC
trackEncoding.context = &baseTrackLocalContext{
id: r.id,
params: r.api.mediaEngine.getRTPParametersByKind(trackEncoding.track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}),
Expand All @@ -337,7 +356,18 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error {
codec.RTPCodecCapability,
parameters.HeaderExtensions,
)
srtpStream := trackEncoding.srtpStream

if len(fecCodecs) > 0 {
trackEncoding.streamInfo.Attributes.Set("flexfec-03", struct{}{})
}

trackEncoding.rtcpInterceptor = r.api.interceptor.BindRTCPReader(
interceptor.RTCPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, _ interceptor.Attributes, err error) {
n, err = trackEncoding.srtpStream.Read(in)
return n, a, err
}),
)

rtpInterceptor := r.api.interceptor.BindLocalStream(
&trackEncoding.streamInfo,
interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
Expand Down Expand Up @@ -376,6 +406,37 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error {
}),
)
}

if len(fecCodecs) > 0 &&
parameters.Encodings[idx].FEC.SSRC != 0 {
fecSrtpStream := &srtpWriterFuture{ssrc: parameters.Encodings[idx].FEC.SSRC, rtpSender: r}

trackEncoding.fecSrtpStream = fecSrtpStream
trackEncoding.fecSsrc = parameters.Encodings[idx].FEC.SSRC

trackEncoding.fecStreamInfo = *createStreamInfo(
r.id+"_fec",
parameters.Encodings[idx].FEC.SSRC,
fecCodecs[0].PayloadType,
fecCodecs[0].RTPCodecCapability,
parameters.HeaderExtensions,
)
trackEncoding.fecStreamInfo.Attributes.Set("apt_ssrc", uint32(parameters.Encodings[idx].SSRC))

trackEncoding.fecRtcpInterceptor = r.api.interceptor.BindRTCPReader(
interceptor.RTCPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = trackEncoding.fecSrtpStream.Read(in)
return n, a, err
}),
)

r.api.interceptor.BindLocalStream(
&trackEncoding.fecStreamInfo,
interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, _ interceptor.Attributes) (int, error) {
return fecSrtpStream.WriteRTP(header, payload)
}),
)
}
}

close(r.sendCalled)
Expand Down Expand Up @@ -415,6 +476,10 @@ func (r *RTPSender) Stop() error {
r.api.interceptor.UnbindLocalStream(&trackEncoding.rtxStreamInfo)
errs = append(errs, trackEncoding.rtxSrtpStream.Close())
}
if trackEncoding.fecSrtpStream != nil {
r.api.interceptor.UnbindLocalStream(&trackEncoding.fecStreamInfo)
errs = append(errs, trackEncoding.fecSrtpStream.Close())
}
}

return util.FlattenErrs(errs)
Expand Down Expand Up @@ -476,6 +541,36 @@ func (r *RTPSender) ReadRtxRTCP() ([]rtcp.Packet, interceptor.Attributes, error)
return pkts, attributes, nil
}

// ReadFec reads incoming FEC Stream RTCP for this RTPSender
func (r *RTPSender) ReadFec(b []byte) (n int, a interceptor.Attributes, err error) {
if r.trackEncodings[0].fecRtcpInterceptor == nil {
return 0, nil, io.ErrNoProgress
}

select {
case <-r.sendCalled:
return r.trackEncodings[0].fecRtcpInterceptor.Read(b, a)
case <-r.stopCalled:
return 0, nil, io.ErrClosedPipe
}
}

// ReadFecRTCP is a convenience method that wraps ReadFec and unmarshals for you.
func (r *RTPSender) ReadFecRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
i, attributes, err := r.ReadFec(b)
if err != nil {
return nil, nil, err
}

pkts, err := rtcp.Unmarshal(b[:i])
if err != nil {
return nil, nil, err
}

return pkts, attributes, nil
}

// ReadSimulcast reads incoming RTCP for this RTPSender for given rid
func (r *RTPSender) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
select {
Expand Down
6 changes: 6 additions & 0 deletions sdp.go
Expand Up @@ -392,10 +392,16 @@ func addSenderSDP(
if encoding.RTX.SSRC != 0 {
media = media.WithValueAttribute(sdp.AttrKeySSRCGroup, fmt.Sprintf("FID %d %d", encoding.SSRC, encoding.RTX.SSRC))
}
if encoding.FEC.SSRC != 0 {
media = media.WithValueAttribute(sdp.AttrKeySSRCGroup, fmt.Sprintf("FEC-FR %d %d", encoding.SSRC, encoding.FEC.SSRC))
}
media = media.WithMediaSource(uint32(encoding.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
if encoding.RTX.SSRC != 0 {
media = media.WithMediaSource(uint32(encoding.RTX.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
}
if encoding.FEC.SSRC != 0 {
media = media.WithMediaSource(uint32(encoding.FEC.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
}
if !isPlanB {
media = media.WithPropertyAttribute("msid:" + track.StreamID() + " " + track.ID())
}
Expand Down
6 changes: 6 additions & 0 deletions settingengine.go
Expand Up @@ -92,6 +92,7 @@ type SettingEngine struct {
srtpProtectionProfiles []dtls.SRTPProtectionProfile
receiveMTU uint
trackLocalRtx bool
trackLocalFlexfec bool
}

// getReceiveMTU returns the configured MTU. If SettingEngine's MTU is configured to 0 it returns the default
Expand Down Expand Up @@ -440,3 +441,8 @@ func (e *SettingEngine) SetSCTPMaxReceiveBufferSize(maxReceiveBufferSize uint32)
func (e *SettingEngine) SetTrackLocalRtx(enable bool) {
e.trackLocalRtx = enable
}

// SetTrackLocalFlexfec allows track local use FlexFEC.
func (e *SettingEngine) SetTrackLocalFlexfec(enable bool) {
e.trackLocalFlexfec = enable
}

0 comments on commit 663fc83

Please sign in to comment.