Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ionorg/ion-sfu
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.2.0
Choose a base ref
...
head repository: ionorg/ion-sfu
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.3.0
Choose a head ref
  • 2 commits
  • 14 files changed
  • 3 contributors

Commits on Nov 11, 2020

  1. fix(sfu): Add locks in Peer (#302)

    * fix(sfu): Add locks in Peer
    
    * fix(sfu): fix deadlock
    
    * fix(sfu): fix data races
    
    * fix(sfu): remove debounce
    
    * fix(sfu): fix test
    
    * Fix ext map header negotiation
    
    * Fix lint issue
    
    * Add debounce and fix sdes mid ext for senders
    
    * fix pubsub
    
    * Fix tests
    
    Co-authored-by: tarrencev <tarrence13@gmail.com>
    OrlandoCo and tarrencev authored Nov 11, 2020
    Copy the full SHA
    03fab2f View commit details
  2. WebRTCTransportConfig Breakout, only call session.onClosed once (#283)

    * Split out function to create a WebRTCTransportConfig so external projects can replicate sfu.SFU{} functionality easily
    
    * Use interface transportProvider for sfu.Peer{}
    
    * Only call sesson.OnClosed once
    billylindeman authored Nov 11, 2020
    Copy the full SHA
    b0d44f5 View commit details
Showing with 195 additions and 100 deletions.
  1. +3 −3 examples/pubsubtest/index.html
  2. +3 −3 go.mod
  3. +6 −11 go.sum
  4. +45 −23 pkg/peer.go
  5. +80 −17 pkg/router.go
  6. +0 −1 pkg/sender.go
  7. +4 −1 pkg/session.go
  8. +14 −6 pkg/sfu.go
  9. +12 −2 pkg/sfu_test.go
  10. +2 −7 pkg/simplesender.go
  11. +13 −7 pkg/simplesender_test.go
  12. +2 −7 pkg/simulcastsender.go
  13. +6 −0 pkg/simulcastsender_test.go
  14. +5 −12 pkg/webrtctransport.go
6 changes: 3 additions & 3 deletions examples/pubsubtest/index.html
Original file line number Diff line number Diff line change
@@ -134,7 +134,7 @@ <h3>Pion</h3>
integrity="sha384-B4gt1jrGC7Jh4AgTPSdUtOBvfO8shuf57BaghqFfPlYxofvL8/KUEfYiJOMMV+rV"
crossorigin="anonymous"
></script>
<script src="https://unpkg.com/ion-sdk-js@1.0.20/dist/ion-sdk.min.js"></script>
<script src="https://unpkg.com/ion-sdk-js@1.1.1/dist/ion-sdk.min.js"></script>
<script>
const localVideo = document.getElementById("local-video");
const remotesDiv = document.getElementById("remotes");
@@ -158,7 +158,7 @@ <h3>Pion</h3>

let localStream;
const start = () => {
IonSDK.LocalStream.getUserMedia({
clientLocal.getUserMedia({
resolution: "vga",
audio: true,
})
@@ -169,7 +169,7 @@ <h3>Pion</h3>
localVideo.controls = true;
localVideo.muted = true;
joinBtns.style.display = "none";
clientLocal.publish(media);
media.publish(media);
})
.catch(console.error);
};
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -10,18 +10,18 @@ require (
github.com/lucas-clemente/quic-go v0.18.1 // indirect
github.com/lucsky/cuid v1.0.2
github.com/marten-seemann/qtls-go1-15 v0.1.1 // indirect
github.com/matryer/moq v0.1.3 // indirect
github.com/pion/ion-log v1.0.0
github.com/pion/rtcp v1.2.4
github.com/pion/rtp v1.6.1
github.com/pion/sdp/v3 v3.0.2
github.com/pion/turn/v2 v2.0.5 // indirect
github.com/pion/webrtc/v3 v3.0.0-beta.11
github.com/pion/webrtc/v3 v3.0.0-beta.12.0.20201110054931-970a59f423f6
github.com/sourcegraph/jsonrpc2 v0.0.0-20200429184054-15c2290dcb37
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 // indirect
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b // indirect
golang.org/x/sys v0.0.0-20201109165425-215b40eba54c // indirect
google.golang.org/grpc v1.33.1
google.golang.org/protobuf v1.25.0
gopkg.in/ini.v1 v1.51.1 // indirect
17 changes: 6 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
@@ -184,8 +184,6 @@ github.com/marten-seemann/qtls v0.10.0/go.mod h1:UvMd1oaYDACI99/oZUYLzMCkBXQVT0a
github.com/marten-seemann/qtls-go1-15 v0.1.0/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I=
github.com/marten-seemann/qtls-go1-15 v0.1.1 h1:LIH6K34bPVttyXnUWixk0bzH6/N07VxbSabxn5A5gZQ=
github.com/marten-seemann/qtls-go1-15 v0.1.1/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I=
github.com/matryer/moq v0.1.3 h1:+fW3u2jmlPw59a3V6spZKOLCcvrDKzPjMsRvUhnZ/c0=
github.com/matryer/moq v0.1.3/go.mod h1:9RtPYjTnH1bSBIkpvtHkFN7nbWAnO7oRpdJkEIn6UtE=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
@@ -258,6 +256,8 @@ github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI=
github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths=
github.com/pion/webrtc/v3 v3.0.0-beta.11 h1:xTle6fm767JmUoTy5LXpEFkSBRY+F1EPBy6MBek8QHc=
github.com/pion/webrtc/v3 v3.0.0-beta.11/go.mod h1:UbmDN5G82nXLXAiSIo0HYU68GN6z09jeKSNEaDUzFvY=
github.com/pion/webrtc/v3 v3.0.0-beta.12.0.20201110054931-970a59f423f6 h1:waOLb5ltWazfY3Rm6H8B9pQvCBJ31JkMyRj58rH5s4o=
github.com/pion/webrtc/v3 v3.0.0-beta.12.0.20201110054931-970a59f423f6/go.mod h1:UbmDN5G82nXLXAiSIo0HYU68GN6z09jeKSNEaDUzFvY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -347,7 +347,6 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
@@ -365,7 +364,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -390,8 +388,6 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -419,6 +415,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c/go.mod h1:iQL9McJNjoIa5mjH6nYTCTZXUN6RP+XW3eib7Ya3XcI=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -431,7 +429,6 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -457,6 +454,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 h1:iCaAy5bMeEvwANu3YnJfWwI0kWAGkEa2RXPdweI/ysk=
golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201109165425-215b40eba54c h1:+B+zPA6081G5cEb2triOIJpcvSW4AYzmIyWAqMn2JAc=
golang.org/x/sys v0.0.0-20201109165425-215b40eba54c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -487,11 +486,7 @@ golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200815165600-90abf76919f3 h1:0aScV/0rLmANzEYIhjCOi2pTvDyhZNduBUMD2q3iqs4=
golang.org/x/tools v0.0.0-20200815165600-90abf76919f3/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
68 changes: 45 additions & 23 deletions pkg/peer.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package sfu
import (
"errors"
"fmt"
"sync"

log "github.com/pion/ion-log"
"github.com/pion/webrtc/v3"
@@ -17,23 +18,29 @@ var (
ErrOfferIgnored = errors.New("offered ignored")
)

// TransportProvider provides the peerConnection to the sfu.Peer{}
// This allows the sfu.SFU{} implementation to be customized / wrapped by another package
type TransportProvider interface {
NewWebRTCTransport(sid string, me MediaEngine) (*WebRTCTransport, error)
}

// Peer represents a single peer signal session
type Peer struct {
sfu *SFU
pc *WebRTCTransport
sync.Mutex
provider TransportProvider
pc *WebRTCTransport

OnIceCandidate func(*webrtc.ICECandidateInit)
OnOffer func(*webrtc.SessionDescription)

makingOffer atomicBool
remoteAnswerPending atomicBool
negotiationPending atomicBool
remoteAnswerPending bool
negotiationPending bool
}

// NewPeer creates a new Peer for signaling with the given SFU
func NewPeer(sfu *SFU) Peer {
func NewPeer(provider TransportProvider) Peer {
return Peer{
sfu: sfu,
provider: provider,
}
}

@@ -43,34 +50,46 @@ func (p *Peer) Join(sid string, sdp webrtc.SessionDescription) (*webrtc.SessionD
log.Debugf("peer already exists")
return nil, ErrTransportExists
}
p.Lock()
defer p.Unlock()

me := MediaEngine{}
err := me.PopulateFromSDP(sdp)
if err != nil {
return nil, fmt.Errorf("error parsing sdp: %v", err)
}

pc, err := p.sfu.NewWebRTCTransport(sid, me)
pc, err := p.provider.NewWebRTCTransport(sid, me)
if err != nil {
return nil, fmt.Errorf("error creating transport: %v", err)
}
log.Infof("peer %s join session %s", pc.ID(), sid)
p.pc = pc

answer, err := p.Answer(sdp)
if err := p.pc.SetRemoteDescription(sdp); err != nil {
return nil, fmt.Errorf("error setting remote description: %v", err)
}

answer, err := p.pc.CreateAnswer()
if err != nil {
return nil, err
return nil, fmt.Errorf("error creating answer: %v", err)
}

err = p.pc.SetLocalDescription(answer)
if err != nil {
return nil, fmt.Errorf("error setting local description: %v", err)
}
log.Infof("peer %s send answer", p.pc.ID())

pc.OnNegotiationNeeded(func() {
if p.makingOffer.get() || p.remoteAnswerPending.get() {
p.negotiationPending.set(true)
p.Lock()
defer p.Unlock()

if p.remoteAnswerPending {
p.negotiationPending = true
return
}

p.makingOffer.set(true)
defer p.makingOffer.set(false)

log.Debugf("peer %s negotiation needed", p.pc.ID())
offer, err := pc.CreateOffer()
if err != nil {
@@ -84,7 +103,7 @@ func (p *Peer) Join(sid string, sdp webrtc.SessionDescription) (*webrtc.SessionD
return
}

p.remoteAnswerPending.set(true)
p.remoteAnswerPending = true
if p.OnOffer != nil {
log.Infof("peer %s send offer", p.pc.ID())
p.OnOffer(&offer)
@@ -103,18 +122,19 @@ func (p *Peer) Join(sid string, sdp webrtc.SessionDescription) (*webrtc.SessionD
}
})

return answer, nil
return &answer, nil
}

// Answer an offer from remote
func (p *Peer) Answer(sdp webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
if p.pc == nil {
return nil, ErrNoTransportEstablished
}
p.Lock()
defer p.Unlock()
log.Infof("peer %s got offer", p.pc.ID())

readyForOffer := !p.makingOffer.get() &&
(p.pc.SignalingState() == webrtc.SignalingStateStable && !p.remoteAnswerPending.get())
readyForOffer := p.pc.SignalingState() == webrtc.SignalingStateStable && !p.remoteAnswerPending

if !readyForOffer {
return nil, ErrOfferIgnored
@@ -143,17 +163,19 @@ func (p *Peer) SetRemoteDescription(sdp webrtc.SessionDescription) error {
if p.pc == nil {
return ErrNoTransportEstablished
}
p.Lock()
defer p.Unlock()

log.Infof("peer %s got answer", p.pc.ID())
if err := p.pc.SetRemoteDescription(sdp); err != nil {
return fmt.Errorf("error setting remote description: %v", err)
}

p.remoteAnswerPending.set(false)
p.remoteAnswerPending = false

if p.negotiationPending.get() {
p.negotiationPending.set(false)
p.pc.negotiate()
if p.negotiationPending {
p.negotiationPending = false
go p.pc.negotiate()
}

return nil
97 changes: 80 additions & 17 deletions pkg/router.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ package sfu

import (
"math/rand"
"strconv"
"net/url"
"strings"
"sync"
"time"
@@ -30,7 +30,8 @@ type Router interface {
AddReceiver(track *webrtc.Track, receiver *webrtc.RTPReceiver) *receiverRouter
AddSender(p *WebRTCTransport, rr *receiverRouter) error
SetExtMap(pd *sdp.SessionDescription)
GetExtMap(mid string, ext int) uint8
OfferExtMap() map[webrtc.SDPSectionType][]sdp.ExtMap
GetExtMap(mid, ext string) uint8
SendRTCP(pkts []rtcp.Packet)
Stop()
}
@@ -56,7 +57,7 @@ type router struct {
rtcpCh chan []rtcp.Packet
config RouterConfig
receivers map[string]*receiverRouter
extensions map[string]map[int]uint8
extensions map[webrtc.SDPSectionType][]sdp.ExtMap
}

// newRouter for routing rtp/rtcp packets
@@ -69,7 +70,7 @@ func newRouter(peer *webrtc.PeerConnection, id string, config RouterConfig) Rout
config: config,
rtcpCh: ch,
receivers: make(map[string]*receiverRouter),
extensions: make(map[string]map[int]uint8),
extensions: make(map[webrtc.SDPSectionType][]sdp.ExtMap),
}
go r.sendRTCP()
return r
@@ -94,10 +95,20 @@ func (r *router) AddReceiver(track *webrtc.Track, receiver *webrtc.RTPReceiver)
}
}
trackID := track.ID()

var twccExt uint8
if e, ok := r.extensions[webrtc.SDPSectionType(mid)]; ok {
for _, ex := range e {
if ex.URI.String() == sdp.TransportCCURI {
twccExt = uint8(ex.Value)
}
}
}

recv := NewWebRTCReceiver(receiver, track, BufferOptions{
BufferTime: r.config.MaxBufferTime,
MaxBitRate: r.config.MaxBandwidth * 1000,
TWCCExt: r.extensions[mid][twccExt],
TWCCExt: twccExt,
})
recv.OnTransportWideCC(func(sn uint16, timeNS int64, marker bool) {
r.twcc.push(sn, timeNS, marker)
@@ -251,16 +262,20 @@ func (r *router) sendRTCP() {
}
}

func (r *router) GetExtMap(mid string, ext int) uint8 {
func (r *router) GetExtMap(mid, ext string) uint8 {
r.mu.RLock()
defer r.mu.RUnlock()

if e, ok := r.extensions[mid]; ok {
return e[ext]
if e, ok := r.extensions[webrtc.SDPSectionType(mid)]; ok {
for _, ex := range e {
if ex.URI.String() == ext {
return uint8(ex.Value)
}
}
}
return 0
}

// nolint:scopelint
func (r *router) SetExtMap(pd *sdp.SessionDescription) {
r.mu.Lock()
defer r.mu.Unlock()
@@ -273,8 +288,18 @@ func (r *router) SetExtMap(pd *sdp.SessionDescription) {
if !ok {
continue
}
if _, ok := r.extensions[mid]; !ok {
r.extensions[mid] = make(map[int]uint8)
if _, ok := r.extensions[webrtc.SDPSectionType(mid)]; ok {
continue
}

addExt := func(att sdp.Attribute) error {
em := sdp.ExtMap{}
if err := em.Unmarshal("extmap:" + att.Value); err != nil {
log.Errorf("Parsing ext map err: %v", err)
return err
}
r.extensions[webrtc.SDPSectionType(mid)] = append(r.extensions[webrtc.SDPSectionType(mid)], em)
return nil
}

var enterExtMap bool
@@ -284,15 +309,18 @@ func (r *router) SetExtMap(pd *sdp.SessionDescription) {
enterExtMap = true
isExtMap = true
if strings.HasSuffix(att.Value, sdp.TransportCCURI) {
extID := strings.Split(att.Value, " ")
if ext, err := strconv.Atoi(extID[0]); err == nil {
r.extensions[mid][twccExt] = uint8(ext)
if err := addExt(att); err != nil {
continue
}
}
if strings.HasSuffix(att.Value, sdp.SDESMidURI) {
extID := strings.Split(att.Value, " ")
if ext, err := strconv.Atoi(extID[0]); err == nil {
r.extensions[mid][sdesMidExt] = uint8(ext)
if err := addExt(att); err != nil {
continue
}
}
if strings.HasSuffix(att.Value, sdp.SDESRTPStreamIDURI) {
if err := addExt(att); err != nil {
continue
}
}
}
@@ -303,3 +331,38 @@ func (r *router) SetExtMap(pd *sdp.SessionDescription) {

}
}

// nolint:scopelint
func (r *router) OfferExtMap() map[webrtc.SDPSectionType][]sdp.ExtMap {
r.mu.Lock()
defer r.mu.Unlock()

sdesMid, _ := url.Parse(sdp.SDESMidURI)

for _, t := range r.peer.GetTransceivers() {
if _, ok := r.extensions[webrtc.SDPSectionType(t.Mid())]; !ok {
switch t.Kind() {
case webrtc.RTPCodecTypeAudio:
if t.Direction() == webrtc.RTPTransceiverDirectionSendonly {
r.extensions[webrtc.SDPSectionType(t.Mid())] = []sdp.ExtMap{
{
Value: 1,
URI: sdesMid,
},
}
}
case webrtc.RTPCodecTypeVideo:
if t.Direction() == webrtc.RTPTransceiverDirectionSendonly {
r.extensions[webrtc.SDPSectionType(t.Mid())] = []sdp.ExtMap{
{
Value: 1,
URI: sdesMid,
},
}
}
}
}
}

return r.extensions
}
1 change: 0 additions & 1 deletion pkg/sender.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@ type Sender interface {
ID() string
Start()
Close()
SetMidExt(id uint8)
Kind() webrtc.RTPCodecType
Type() SenderType
Track() *webrtc.Track
5 changes: 4 additions & 1 deletion pkg/session.go
Original file line number Diff line number Diff line change
@@ -13,13 +13,15 @@ type Session struct {
mu sync.RWMutex
transports map[string]Transport
onCloseHandler func()
closed bool
}

// NewSession creates a new session
func NewSession(id string) *Session {
return &Session{
id: id,
transports: make(map[string]Transport),
closed: false,
}
}

@@ -39,8 +41,9 @@ func (s *Session) RemoveTransport(tid string) {
delete(s.transports, tid)

// Close session if no transports
if len(s.transports) == 0 && s.onCloseHandler != nil {
if len(s.transports) == 0 && s.onCloseHandler != nil && !s.closed {
s.onCloseHandler()
s.closed = true
}
}

20 changes: 14 additions & 6 deletions pkg/sfu.go
Original file line number Diff line number Diff line change
@@ -51,12 +51,8 @@ type SFU struct {
sessions map[string]*Session
}

// NewSFU creates a new sfu instance
func NewSFU(c Config) *SFU {
// Init random seed
rand.Seed(time.Now().UnixNano())
// Init ballast
ballast := make([]byte, c.SFU.Ballast*1024*1024)
// NewWebRTCTransportConfig parses our settings and returns a usable WebRTCTransportConfig for creating PeerConnections
func NewWebRTCTransportConfig(c Config) WebRTCTransportConfig {
se := webrtc.SettingEngine{}

// Configure required extensions
@@ -138,6 +134,18 @@ func NewSFU(c Config) *SFU {
w.setting.SetNAT1To1IPs(c.WebRTC.Candidates.NAT1To1IPs, webrtc.ICECandidateTypeHost)
}

return w
}

// NewSFU creates a new sfu instance
func NewSFU(c Config) *SFU {
// Init random seed
rand.Seed(time.Now().UnixNano())
// Init ballast
ballast := make([]byte, c.SFU.Ballast*1024*1024)

w := NewWebRTCTransportConfig(c)

s := &SFU{
webrtc: w,
sessions: make(map[string]*Session),
14 changes: 12 additions & 2 deletions pkg/sfu_test.go
Original file line number Diff line number Diff line change
@@ -297,6 +297,7 @@ func TestSFU_SessionScenarios(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
testDone := atomicBool{}
var mu sync.RWMutex
done := make(chan struct{})
peers := make(map[string]*peer)
@@ -352,6 +353,9 @@ func TestSFU_SessionScenarios(t *testing.T) {
})

p.local.OnOffer = func(o *webrtc.SessionDescription) {
if testDone.get() {
return
}
p.mu.Lock()
defer p.mu.Unlock()
err := p.remote.SetRemoteDescription(*o)
@@ -360,8 +364,13 @@ func TestSFU_SessionScenarios(t *testing.T) {
assert.NoError(t, err)
err = p.remote.SetLocalDescription(a)
assert.NoError(t, err)
err = p.local.SetRemoteDescription(a)
assert.NoError(t, err)
go func() {
if testDone.get() {
return
}
err = p.local.SetRemoteDescription(a)
assert.NoError(t, err)
}()
}

offer, err := p.remote.CreateOffer(nil)
@@ -413,6 +422,7 @@ func TestSFU_SessionScenarios(t *testing.T) {
for _, p := range peers {
p.subs.Wait()
}
testDone.set(true)
close(done)

for _, p := range peers {
9 changes: 2 additions & 7 deletions pkg/simplesender.go
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@ type SimpleSender struct {
payload uint8
maxBitrate uint64
target uint64
sdesMidExt uint8
sdesMidHdrCtr uint8
onCloseHandler func()
// Muting helpers
@@ -116,8 +115,8 @@ func (s *SimpleSender) WriteRTP(pkt *rtp.Packet) {
h.PayloadType = s.payload
h.Timestamp = s.lastTS
h.SequenceNumber = s.lastSN
if s.sdesMidHdrCtr < 50 && s.sdesMidExt != 0 {
if err := h.SetExtension(s.sdesMidExt, []byte(s.transceiver.Mid())); err != nil {
if s.sdesMidHdrCtr < 50 {
if err := h.SetExtension(1, []byte(s.transceiver.Mid())); err != nil {
log.Errorf("Setting sdes mid header err: %v", err)
}
s.sdesMidHdrCtr++
@@ -145,10 +144,6 @@ func (s *SimpleSender) Mute(val bool) {
}
}

func (s *SimpleSender) SetMidExt(id uint8) {
s.sdesMidExt = id
}

func (s *SimpleSender) Kind() webrtc.RTPCodecType {
return s.track.Kind()
}
20 changes: 13 additions & 7 deletions pkg/simplesender_test.go
Original file line number Diff line number Diff line change
@@ -72,6 +72,8 @@ func TestSimpleSender_WriteRTP(t *testing.T) {
_, err = sfu.AddTrack(senderTrack)
assert.NoError(t, err)

tr := sfu.GetTransceivers()[0]

err = signalPair(sfu, remote)
assert.NoError(t, err)

@@ -101,9 +103,10 @@ forLoop:
tt := tt
t.Run(tt.name, func(t *testing.T) {
s := &SimpleSender{
enabled: atomicBool{1},
payload: senderTrack.PayloadType(),
track: senderTrack,
enabled: atomicBool{1},
payload: senderTrack.PayloadType(),
track: senderTrack,
transceiver: tr,
}
tmr := time.NewTimer(1000 * time.Millisecond)
s.WriteRTP(fakePkt)
@@ -416,6 +419,8 @@ func TestSimpleSender_Mute(t *testing.T) {
_, err = sfu.AddTrack(senderTrack)
assert.NoError(t, err)

tr := sfu.GetTransceivers()[0]

err = signalPair(sfu, remote)
assert.NoError(t, err)

@@ -448,10 +453,11 @@ forLoop:
}

simpleSdr := SimpleSender{
enabled: atomicBool{1},
router: r,
track: senderTrack,
payload: senderTrack.PayloadType(),
enabled: atomicBool{1},
router: r,
track: senderTrack,
payload: senderTrack.PayloadType(),
transceiver: tr,
}
// Simple sender must forward packets while the sender is not muted
fakePkt := senderTrack.Packetizer().Packetize([]byte{0x05, 0x06, 0x07, 0x08}, 1)[0]
9 changes: 2 additions & 7 deletions pkg/simulcastsender.go
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@ type SimulcastSender struct {
enabled atomicBool
target uint64
payload uint8
sdesMidExt uint8
sdesMidHdrCtr uint8
maxBitrate uint64
onCloseHandler func()
@@ -181,8 +180,8 @@ func (s *SimulcastSender) WriteRTP(pkt *rtp.Packet) {
h.SequenceNumber = lSN
h.Timestamp = s.lTS
h.PayloadType = s.payload
if s.sdesMidHdrCtr < 50 && s.sdesMidExt != 0 {
if err := h.SetExtension(s.sdesMidExt, []byte(s.transceiver.Mid())); err != nil {
if s.sdesMidHdrCtr < 50 {
if err := h.SetExtension(1, []byte(s.transceiver.Mid())); err != nil {
log.Errorf("Setting sdes mid header err: %v", err)
}
s.sdesMidHdrCtr++
@@ -219,10 +218,6 @@ func (s *SimulcastSender) Transceiver() *webrtc.RTPTransceiver {
return s.transceiver
}

func (s *SimulcastSender) SetMidExt(id uint8) {
s.sdesMidExt = id
}

func (s *SimulcastSender) Type() SenderType {
return SimulcastSenderType
}
6 changes: 6 additions & 0 deletions pkg/simulcastsender_test.go
Original file line number Diff line number Diff line change
@@ -83,6 +83,8 @@ func TestSimulcastSender_WriteRTP(t *testing.T) {
_, err = sfu.AddTrack(senderTrack)
assert.NoError(t, err)

tr := sfu.GetTransceivers()[0]

gotPli := make(chan struct{}, 1)
fakeReceiver := &ReceiverMock{
TrackFunc: func() *webrtc.Track {
@@ -149,6 +151,7 @@ forLoop:
track: senderTrack,
simulcastSSRC: simulcastSSRC,
lSSRC: fakeRecvTrack.SSRC(),
transceiver: tr,
}
s.WriteRTP(tt.fields.packet)
pkt, err := remoteTrack.ReadRTP()
@@ -474,6 +477,8 @@ func TestSimulcastSender_Mute(t *testing.T) {
_, err = sfu.AddTrack(senderTrack)
assert.NoError(t, err)

tr := sfu.GetTransceivers()[0]

err = signalPair(sfu, remote)
assert.NoError(t, err)

@@ -515,6 +520,7 @@ forLoop:
track: senderTrack,
payload: senderTrack.PayloadType(),
lSSRC: 1234,
transceiver: tr,
}
// Simple sender must forward packets while the sender is not muted
fakePkt := senderTrack.Packetizer().Packetize([]byte{0x05, 0x06, 0x07, 0x08}, 1)[0]
17 changes: 5 additions & 12 deletions pkg/webrtctransport.go
Original file line number Diff line number Diff line change
@@ -4,21 +4,15 @@ import (
"sync"
"time"

"github.com/bep/debounce"
"github.com/gammazero/deque"

"github.com/bep/debounce"
"github.com/lucsky/cuid"
log "github.com/pion/ion-log"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
)

const (
sdesMidExt = iota
audioLevelExt
twccExt
)

// WebRTCTransportConfig represents configuration options
type WebRTCTransportConfig struct {
configuration webrtc.Configuration
@@ -112,6 +106,8 @@ func NewWebRTCTransport(session *Session, me MediaEngine, cfg WebRTCTransportCon
}
})

pc.GetMapExtension(p.router.OfferExtMap)

return p, nil
}

@@ -148,8 +144,8 @@ func (p *WebRTCTransport) SetRemoteDescription(desc webrtc.SessionDescription) e
switch desc.Type {
case webrtc.SDPTypeAnswer:
p.router.SetExtMap(pd)
p.mu.Lock()
if p.pendingSenders.Len() > 0 {
p.mu.Lock()
pendingStart := make([]pendingSender, 0, p.pendingSenders.Len())
for _, md := range pd.MediaDescriptions {
if p.pendingSenders.Len() == 0 {
@@ -162,15 +158,12 @@ func (p *WebRTCTransport) SetRemoteDescription(desc webrtc.SessionDescription) e
for i := 0; i < p.pendingSenders.Len(); i++ {
pd := p.pendingSenders.PopFront().(pendingSender)
if pd.transceiver.Mid() == mid {
ext := p.router.GetExtMap(mid, sdesMidExt)
pd.sender.SetMidExt(ext)
pendingStart = append(pendingStart, pd)
} else {
p.pendingSenders.PushBack(pd)
}
}
}
p.mu.Unlock()
if len(pendingStart) > 0 {
defer func() {
if err == nil {
@@ -187,7 +180,7 @@ func (p *WebRTCTransport) SetRemoteDescription(desc webrtc.SessionDescription) e
}()
}
}

p.mu.Unlock()
case webrtc.SDPTypeOffer:
p.router.SetExtMap(pd)
}