Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ionorg/ion-sfu
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.8.2
Choose a base ref
...
head repository: ionorg/ion-sfu
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.8.3
Choose a head ref
  • 3 commits
  • 8 files changed
  • 2 contributors

Commits on Feb 3, 2021

  1. Small tweaks for ion-cluster (#403)

    * Use WebRTCTransportConfig for NewSession (cfg.router is private)
    
    * Make datachannel label public
    
    * Export turn server, make packetfactory/bufferfactory init automatically
    billylindeman authored Feb 3, 2021
    Copy the full SHA
    7925192 View commit details

Commits on Feb 5, 2021

  1. Copy the full SHA
    2d25b01 View commit details
  2. Copy the full SHA
    6dca09e View commit details
Showing with 52 additions and 79 deletions.
  1. +3 −3 pkg/buffer/buffer.go
  2. +1 −1 pkg/sfu/datachannel.go
  3. +9 −22 pkg/sfu/peer.go
  4. +21 −28 pkg/sfu/session.go
  5. +12 −9 pkg/sfu/sfu.go
  6. +2 −12 pkg/sfu/sfu_test.go
  7. +3 −3 pkg/sfu/subscriber.go
  8. +1 −1 pkg/sfu/turn.go
6 changes: 3 additions & 3 deletions pkg/buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -391,9 +391,9 @@ func (b *Buffer) buildREMBPacket() *rtcp.ReceiverEstimatedMaximumBitrate {
func (b *Buffer) buildReceptionReport() rtcp.ReceptionReport {
extMaxSeq := b.cycles | uint32(b.maxSeqNo)
expected := extMaxSeq - uint32(b.baseSN) + 1
lost := expected - b.stats.PacketCount
if b.stats.PacketCount == 0 {
lost = 0
lost := uint32(0)
if b.stats.PacketCount < expected && b.stats.PacketCount != 0 {
lost = expected - b.stats.PacketCount
}
expectedInterval := expected - b.stats.LastExpected
b.stats.LastExpected = expected
2 changes: 1 addition & 1 deletion pkg/sfu/datachannel.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ type (
// The datachannels created will be negotiated on join to all peers that joins
// the SFU.
Datachannel struct {
label string
Label string
middlewares []func(MessageProcessor) MessageProcessor
onMessage func(ctx context.Context, args ProcessArgs, out []*webrtc.DataChannel)
}
31 changes: 9 additions & 22 deletions pkg/sfu/peer.go
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ type SessionProvider interface {
type Peer struct {
sync.Mutex
id string
closed atomicBool
session *Session
provider SessionProvider

@@ -104,7 +105,7 @@ func (p *Peer) Join(sid string, sdp webrtc.SessionDescription) (*webrtc.SessionD
}

p.remoteAnswerPending = true
if p.OnOffer != nil {
if p.OnOffer != nil && !p.closed.get() {
log.Infof("peer %s send offer", p.id)
p.OnOffer(&offer)
}
@@ -116,13 +117,9 @@ func (p *Peer) Join(sid string, sdp webrtc.SessionDescription) (*webrtc.SessionD
return
}

p.Lock()
handler := p.OnIceCandidate
p.Unlock()

if handler != nil {
if p.OnIceCandidate != nil && !p.closed.get() {
json := c.ToJSON()
handler(&json, subscriber)
p.OnIceCandidate(&json, subscriber)
}
})

@@ -132,23 +129,15 @@ func (p *Peer) Join(sid string, sdp webrtc.SessionDescription) (*webrtc.SessionD
return
}

p.Lock()
handler := p.OnIceCandidate
p.Unlock()

if handler != nil {
if p.OnIceCandidate != nil && !p.closed.get() {
json := c.ToJSON()
handler(&json, publisher)
p.OnIceCandidate(&json, publisher)
}
})

p.publisher.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) {
p.Lock()
handler := p.OnICEConnectionStateChange
p.Unlock()

if handler != nil {
handler(s)
if p.OnICEConnectionStateChange != nil && !p.closed.get() {
p.OnICEConnectionStateChange(s)
}
})

@@ -237,9 +226,7 @@ func (p *Peer) Close() error {
p.Lock()
defer p.Unlock()

p.OnOffer = nil
p.OnIceCandidate = nil
p.OnICEConnectionStateChange = nil
p.closed.set(true)

if p.session != nil {
p.session.RemovePeer(p.id)
49 changes: 21 additions & 28 deletions pkg/sfu/session.go
Original file line number Diff line number Diff line change
@@ -23,14 +23,14 @@ type Session struct {
}

// NewSession creates a new session
func NewSession(id string, dcs []*Datachannel, cfg RouterConfig) *Session {
func NewSession(id string, dcs []*Datachannel, cfg WebRTCTransportConfig) *Session {
s := &Session{
id: id,
peers: make(map[string]*Peer),
datachannels: dcs,
audioObserver: newAudioLevel(cfg.AudioLevelThreshold, cfg.AudioLevelInterval, cfg.AudioLevelFilter),
audioObserver: newAudioLevel(cfg.router.AudioLevelThreshold, cfg.router.AudioLevelInterval, cfg.router.AudioLevelFilter),
}
go s.audioLevelObserver(cfg.AudioLevelInterval)
go s.audioLevelObserver(cfg.router.AudioLevelInterval)
return s

}
@@ -57,23 +57,15 @@ func (s *Session) RemovePeer(pid string) {
}

func (s *Session) onMessage(origin, label string, msg webrtc.DataChannelMessage) {
s.mu.RLock()
defer s.mu.RUnlock()
for pid, p := range s.peers {
if origin == pid {
continue
}

dc := p.subscriber.channels[label]
if dc != nil && dc.ReadyState() == webrtc.DataChannelStateOpen {
if msg.IsString {
if err := dc.SendText(string(msg.Data)); err != nil {
log.Errorf("Sending dc message err: %v", err)
}
} else {
if err := dc.Send(msg.Data); err != nil {
log.Errorf("Sending dc message err: %v", err)
}
dcs := s.getDataChannels(origin, label)
for _, dc := range dcs {
if msg.IsString {
if err := dc.SendText(string(msg.Data)); err != nil {
log.Errorf("Sending dc message err: %v", err)
}
} else {
if err := dc.Send(msg.Data); err != nil {
log.Errorf("Sending dc message err: %v", err)
}
}
}
@@ -87,7 +79,7 @@ func (s *Session) getDataChannels(origin, label string) (dcs []*webrtc.DataChann
continue
}

if dc, ok := p.subscriber.channels[label]; ok {
if dc, ok := p.subscriber.channels[label]; ok && dc.ReadyState() == webrtc.DataChannelStateOpen {
dcs = append(dcs, dc)
}
}
@@ -200,6 +192,9 @@ func (s *Session) audioLevelObserver(audioLevelInterval int) {
if audioLevelInterval <= 50 {
log.Warnf("Values near/under 20ms may return unexpected values")
}
if audioLevelInterval == 0 {
audioLevelInterval = 1000
}
for {
time.Sleep(time.Duration(audioLevelInterval) * time.Millisecond)
if s.closed.get() {
@@ -218,14 +213,12 @@ func (s *Session) audioLevelObserver(audioLevelInterval int) {
}

sl := string(l)
s.mu.RLock()
for _, peer := range s.peers {
if ch, ok := peer.subscriber.channels[APIChannelLabel]; ok && ch.ReadyState() == webrtc.DataChannelStateOpen {
if err = ch.SendText(sl); err != nil {
log.Errorf("Sending audio levels to peer: %s, err: %v", peer.id, err)
}
dcs := s.getDataChannels("", APIChannelLabel)

for _, ch := range dcs {
if err = ch.SendText(sl); err != nil {
log.Errorf("Sending audio levels err: %v", err)
}
}
s.mu.RUnlock()
}
}
21 changes: 12 additions & 9 deletions pkg/sfu/sfu.go
Original file line number Diff line number Diff line change
@@ -138,12 +138,7 @@ func NewWebRTCTransportConfig(c Config) WebRTCTransportConfig {
return w
}

// NewSFU creates a new sfu instance
func NewSFU(c Config) *SFU {
// Init random seed
rand.Seed(time.Now().UnixNano())
// Init ballast
ballast := make([]byte, c.SFU.Ballast*1024*1024)
func init() {
// Init buffer factory
bufferFactory = buffer.NewBufferFactory()
// Init packet factory
@@ -152,6 +147,14 @@ func NewSFU(c Config) *SFU {
return make([]byte, 1460)
},
}
}

// NewSFU creates a new sfu instance
func NewSFU(c Config) *SFU {
// Init random seed
rand.Seed(time.Now().UnixNano())
// Init ballast
ballast := make([]byte, c.SFU.Ballast*1024*1024)

w := NewWebRTCTransportConfig(c)

@@ -162,7 +165,7 @@ func NewSFU(c Config) *SFU {
}

if c.Turn.Enabled {
ts, err := initTurnServer(c.Turn, nil)
ts, err := InitTurnServer(c.Turn, nil)
if err != nil {
log.Panicf("Could not init turn server err: %v", err)
}
@@ -175,7 +178,7 @@ func NewSFU(c Config) *SFU {

// NewSession creates a new session instance
func (s *SFU) newSession(id string) *Session {
session := NewSession(id, s.datachannels, s.webrtc.router)
session := NewSession(id, s.datachannels, s.webrtc)

session.OnClose(func() {
s.Lock()
@@ -214,7 +217,7 @@ func (s *SFU) GetSession(sid string) (*Session, WebRTCTransportConfig) {
}

func (s *SFU) NewDatachannel(label string) *Datachannel {
dc := &Datachannel{label: label}
dc := &Datachannel{Label: label}
s.datachannels = append(s.datachannels, dc)
return dc
}
14 changes: 2 additions & 12 deletions pkg/sfu/sfu_test.go
Original file line number Diff line number Diff line change
@@ -251,7 +251,7 @@ func TestSFU_SessionScenarios(t *testing.T) {
{
actions: []*action{{
id: "remote1",
kind: "publish",
kind: "unpublish",
media: []media{
{kind: "audio", id: "stream3", tid: "audio3"},
{kind: "video", id: "stream3", tid: "video3"},
@@ -260,24 +260,14 @@ func TestSFU_SessionScenarios(t *testing.T) {
},
{
actions: []*action{{
id: "remote1",
id: "remote2",
kind: "unpublish",
media: []media{
{kind: "audio", id: "stream1", tid: "audio1"},
{kind: "video", id: "stream1", tid: "video1"},
},
}},
},
{
actions: []*action{{
id: "remote2",
kind: "publish",
media: []media{
{kind: "audio", id: "stream4", tid: "audio4"},
{kind: "video", id: "stream4", tid: "video4"},
},
}},
},
},
},
}
6 changes: 3 additions & 3 deletions pkg/sfu/subscriber.go
Original file line number Diff line number Diff line change
@@ -75,15 +75,15 @@ func NewSubscriber(id string, cfg WebRTCTransportConfig) (*Subscriber, error) {
}

func (s *Subscriber) AddDatachannel(peer *Peer, dc *Datachannel) error {
ndc, err := s.pc.CreateDataChannel(dc.label, &webrtc.DataChannelInit{})
ndc, err := s.pc.CreateDataChannel(dc.Label, &webrtc.DataChannelInit{})
if err != nil {
return err
}

mws := newDCChain(dc.middlewares)
p := mws.Process(ProcessFunc(func(ctx context.Context, args ProcessArgs) {
if dc.onMessage != nil {
dc.onMessage(ctx, args, peer.session.getDataChannels(peer.id, dc.label))
dc.onMessage(ctx, args, peer.session.getDataChannels(peer.id, dc.Label))
}
}))
ndc.OnMessage(func(msg webrtc.DataChannelMessage) {
@@ -94,7 +94,7 @@ func (s *Subscriber) AddDatachannel(peer *Peer, dc *Datachannel) error {
})
})

s.channels[dc.label] = ndc
s.channels[dc.Label] = ndc

return nil
}
2 changes: 1 addition & 1 deletion pkg/sfu/turn.go
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ type TurnConfig struct {
Auth TurnAuth `mapstructure:"auth"`
}

func initTurnServer(conf TurnConfig, auth func(username, realm string, srcAddr net.Addr) ([]byte, bool)) (*turn.Server, error) {
func InitTurnServer(conf TurnConfig, auth func(username, realm string, srcAddr net.Addr) ([]byte, bool)) (*turn.Server, error) {
var listeners []turn.ListenerConfig

// Create a UDP listener to pass into pion/turn