Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ionorg/ion-sfu
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.0.42
Choose a base ref
...
head repository: ionorg/ion-sfu
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.1.0
Choose a head ref
  • 6 commits
  • 17 files changed
  • 2 contributors

Commits on Oct 31, 2020

  1. fix: concurrent test (#277)

    * fix: concurrent test
    
    * sequential media test
    
    * fix track id
    
    * add unpublish
    tarrencev authored Oct 31, 2020
    Copy the full SHA
    0e9c063 View commit details
  2. fix: remove renegotiation callback (#278)

    * Remove renegotiation callback
    
    * Fix linting issues
    
    * Queue negotiations
    
    * Check negotiation on stable state
    
    * Defer negotiation status
    OrlandoCo authored Oct 31, 2020
    Copy the full SHA
    0fe3b4e View commit details
  3. Copy the full SHA
    83b0109 View commit details
  4. test: Add buffer test

    OrlandoCo committed Oct 31, 2020
    Copy the full SHA
    27360e6 View commit details

Commits on Nov 2, 2020

  1. Copy the full SHA
    fdbb193 View commit details

Commits on Nov 3, 2020

  1. feat: improve grpc interface (#285)

    * feat: improve grpc interface
    
    * fix allrpc
    tarrencev authored Nov 3, 2020
    Copy the full SHA
    cd3a3ec View commit details
9 changes: 1 addition & 8 deletions cmd/signal/allrpc/server/server.go
Original file line number Diff line number Diff line change
@@ -5,11 +5,9 @@ import (
"net/http"

log "github.com/pion/ion-log"
pb "github.com/pion/ion-sfu/cmd/signal/grpc/proto"
grpcServer "github.com/pion/ion-sfu/cmd/signal/grpc/server"
jsonrpcServer "github.com/pion/ion-sfu/cmd/signal/json-rpc/server"
sfu "github.com/pion/ion-sfu/pkg"
"google.golang.org/grpc"

// pprof
_ "net/http/pprof"
@@ -37,12 +35,7 @@ func (s *Server) ServeGRPC(gaddr string) error {
return err
}

gs := grpc.NewServer()
inst := grpcServer.GRPCSignal{SFU: s.sfu}
pb.RegisterSFUService(gs, &pb.SFUService{
Signal: inst.Signal,
})

gs := grpcServer.NewServer(s.sfu)
log.Infof("GRPC Listening at %s", gaddr)
if err := gs.Serve(l); err != nil {
log.Errorf("err=%v", err)
8 changes: 1 addition & 7 deletions cmd/signal/grpc/main.go
Original file line number Diff line number Diff line change
@@ -10,9 +10,7 @@ import (
log "github.com/pion/ion-log"
sfu "github.com/pion/ion-sfu/pkg"
"github.com/spf13/viper"
"google.golang.org/grpc"

pb "github.com/pion/ion-sfu/cmd/signal/grpc/proto"
"github.com/pion/ion-sfu/cmd/signal/grpc/server"
)

@@ -108,11 +106,7 @@ func main() {
log.Panicf("failed to listen: %v", err)
}
log.Infof("SFU Listening at %s", addr)
s := grpc.NewServer()
inst := server.GRPCSignal{SFU: sfu.NewSFU(conf.Config)}
pb.RegisterSFUService(s, &pb.SFUService{
Signal: inst.Signal,
})
s := server.NewServer(sfu.NewSFU(conf.Config))
if err := s.Serve(lis); err != nil {
log.Panicf("failed to serve: %v", err)
}
269 changes: 96 additions & 173 deletions cmd/signal/grpc/proto/sfu.pb.go

Large diffs are not rendered by default.

24 changes: 10 additions & 14 deletions cmd/signal/grpc/proto/sfu.proto
Original file line number Diff line number Diff line change
@@ -9,36 +9,32 @@ service SFU {
}

message SignalRequest {
string id = 1;
oneof payload {
JoinRequest join = 1;
SessionDescription negotiate = 2;
Trickle trickle = 3;
JoinRequest join = 2;
bytes description = 3;
Trickle trickle = 4;
}
}

message SignalReply {
string id = 1;
oneof payload {
JoinReply join = 1;
SessionDescription negotiate = 2;
Trickle trickle = 3;
JoinReply join = 2;
bytes description = 3;
Trickle trickle = 4;
}
}

message JoinRequest {
string sid = 1;
SessionDescription offer = 2;
bytes description = 2;
}

message JoinReply {
string pid = 1;
SessionDescription answer = 2;
bytes description = 1;
}

message Trickle {
string init = 1;
}

message SessionDescription {
string type = 1; // "answer" | "offer" | "pranswer" | "rollback"
bytes sdp = 2;
}
100 changes: 41 additions & 59 deletions cmd/signal/grpc/proto/sfu_grpc.pb.go
88 changes: 49 additions & 39 deletions cmd/signal/grpc/server/server.go
Original file line number Diff line number Diff line change
@@ -8,13 +8,21 @@ import (
log "github.com/pion/ion-log"
sfu "github.com/pion/ion-sfu/pkg"
"github.com/pion/webrtc/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pb "github.com/pion/ion-sfu/cmd/signal/grpc/proto"
)

type GRPCSignal struct {
func NewServer(sfu *sfu.SFU) *grpc.Server {
s := grpc.NewServer()
pb.RegisterSFUServer(s, &SFUServer{SFU: sfu})
return s
}

type SFUServer struct {
pb.UnimplementedSFUServer
SFU *sfu.SFU
}

@@ -36,8 +44,7 @@ type GRPCSignal struct {
// 2. `Trickle` containing candidate information for Trickle ICE.
//
// If the client closes this stream, the webrtc stream will be closed.
func (s *GRPCSignal) Signal(stream pb.SFU_SignalServer) error {
var pid string
func (s *SFUServer) Signal(stream pb.SFU_SignalServer) error {
peer := sfu.NewPeer(s.SFU)
for {
in, err := stream.Recv()
@@ -60,11 +67,12 @@ func (s *GRPCSignal) Signal(stream pb.SFU_SignalServer) error {

switch payload := in.Payload.(type) {
case *pb.SignalRequest_Join:
log.Infof("signal->join called:\n%v", string(payload.Join.Offer.Sdp))
log.Debugf("signal->join called:\n%v", string(payload.Join.Description))

offer := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(payload.Join.Offer.Sdp),
var offer webrtc.SessionDescription
err := json.Unmarshal(payload.Join.Description, &offer)
if err != nil {
return status.Errorf(codes.Internal, fmt.Sprintf("sdp unmarshal error: %v", err))
}

answer, err := peer.Join(payload.Join.Sid, offer)
@@ -98,12 +106,15 @@ func (s *GRPCSignal) Signal(stream pb.SFU_SignalServer) error {

// Notify user of new offer
peer.OnOffer = func(o *webrtc.SessionDescription) {
err := stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Negotiate{
Negotiate: &pb.SessionDescription{
Type: o.Type.String(),
Sdp: []byte(o.SDP),
},
marshalled, err := json.Marshal(answer)
if err != nil {
log.Errorf("sdp marshal error: %v", err)
return
}

err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Description{
Description: marshalled,
},
})

@@ -112,15 +123,17 @@ func (s *GRPCSignal) Signal(stream pb.SFU_SignalServer) error {
}
}

marshalled, err := json.Marshal(answer)
if err != nil {
return status.Errorf(codes.Internal, fmt.Sprintf("sdp marshal error: %v", err))
}

// send answer
err = stream.Send(&pb.SignalReply{
Id: in.Id,
Payload: &pb.SignalReply_Join{
Join: &pb.JoinReply{
Pid: pid,
Answer: &pb.SessionDescription{
Type: answer.Type.String(),
Sdp: []byte(answer.SDP),
},
Description: marshalled,
},
},
})
@@ -130,48 +143,45 @@ func (s *GRPCSignal) Signal(stream pb.SFU_SignalServer) error {
return status.Errorf(codes.Internal, "join error %s", err)
}

case *pb.SignalRequest_Negotiate:
if payload.Negotiate.Type == webrtc.SDPTypeOffer.String() {
offer := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(payload.Negotiate.Sdp),
}
case *pb.SignalRequest_Description:
var sdp webrtc.SessionDescription
err := json.Unmarshal(payload.Description, &sdp)
if err != nil {
return status.Errorf(codes.Internal, fmt.Sprintf("sdp unmarshal error: %v", err))
}

answer, err := peer.Answer(offer)
if sdp.Type == webrtc.SDPTypeOffer {
answer, err := peer.Answer(sdp)
if err != nil {
switch err {
case sfu.ErrNoTransportEstablished:
log.Errorf("peer hasn't joined")
return status.Errorf(codes.FailedPrecondition, err.Error())
default:
return status.Errorf(codes.Internal, fmt.Sprintf("negotiate error: %v", err))
}
}

marshalled, err := json.Marshal(answer)
if err != nil {
return status.Errorf(codes.Internal, fmt.Sprintf("sdp marshal error: %v", err))
}

err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Negotiate{
Negotiate: &pb.SessionDescription{
Type: answer.Type.String(),
Sdp: []byte(answer.SDP),
},
Id: in.Id,
Payload: &pb.SignalReply_Description{
Description: marshalled,
},
})

if err != nil {
return status.Errorf(codes.Internal, fmt.Sprintf("negotiate error: %v", err))
}

} else if payload.Negotiate.Type == webrtc.SDPTypeAnswer.String() {
answer := webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: string(payload.Negotiate.Sdp),
}

err := peer.SetRemoteDescription(answer)
} else if sdp.Type == webrtc.SDPTypeAnswer {
err := peer.SetRemoteDescription(sdp)
if err != nil {
switch err {
case sfu.ErrNoTransportEstablished:
log.Errorf("peer hasn't joined")
return status.Errorf(codes.FailedPrecondition, err.Error())
default:
return status.Errorf(codes.Internal, fmt.Sprintf("negotiate error: %v", err))
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -16,12 +16,11 @@ require (
github.com/pion/rtp v1.6.1
github.com/pion/sdp/v3 v3.0.2
github.com/pion/turn/v2 v2.0.5 // indirect
github.com/pion/webrtc/v3 v3.0.0-beta.10.0.20201030205602-60db5090fc93
github.com/pion/webrtc/v3 v3.0.0-beta.10.0.20201103052241-78f7c978fe51
github.com/sourcegraph/jsonrpc2 v0.0.0-20200429184054-15c2290dcb37
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
golang.org/x/net v0.0.0-20201024042810-be3efd7ff127 // indirect
golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 // indirect
google.golang.org/grpc v1.33.1
google.golang.org/protobuf v1.25.0
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -260,6 +260,8 @@ github.com/pion/webrtc/v3 v3.0.0-beta.10.0.20201027193323-3e64df35adfa h1:eyg97N
github.com/pion/webrtc/v3 v3.0.0-beta.10.0.20201027193323-3e64df35adfa/go.mod h1:GlriYYHJ5KkNsCunm3oFDPql4TDTrrNoI9iSWWSnafA=
github.com/pion/webrtc/v3 v3.0.0-beta.10.0.20201030205602-60db5090fc93 h1:h6kVKATdkUm7sFZQ//Q5t/2VKd7BZpO0Ozc9q94D4RI=
github.com/pion/webrtc/v3 v3.0.0-beta.10.0.20201030205602-60db5090fc93/go.mod h1:GlriYYHJ5KkNsCunm3oFDPql4TDTrrNoI9iSWWSnafA=
github.com/pion/webrtc/v3 v3.0.0-beta.10.0.20201103052241-78f7c978fe51 h1:croACaiFnjNtvUG8SirC+wTkQ8Y1/QxP+5e9J9VXLQ8=
github.com/pion/webrtc/v3 v3.0.0-beta.10.0.20201103052241-78f7c978fe51/go.mod h1:UbmDN5G82nXLXAiSIo0HYU68GN6z09jeKSNEaDUzFvY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -421,6 +423,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c/go.mod h1:iQL9McJNjoIa5mjH6nYTCTZXUN6RP+XW3eib7Ya3XcI=
golang.org/x/net v0.0.0-20201024042810-be3efd7ff127 h1:pZPp9+iYUqwYKLjht0SDBbRCRK/9gAXDy7pz5fRDpjo=
golang.org/x/net v0.0.0-20201024042810-be3efd7ff127/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
69 changes: 69 additions & 0 deletions pkg/buffer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package sfu

import (
"testing"

"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/assert"
)

func CreateTestPacket(pktStamp *SequenceNumberAndTimeStamp) *rtp.Packet {
@@ -36,3 +40,68 @@ func CreateTestListPackets(snsAndTSs []SequenceNumberAndTimeStamp) (packetList [

return packetList
}

func TestNewBuffer(t *testing.T) {
me := webrtc.MediaEngine{}
me.RegisterDefaultCodecs()
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
p, _ := api.NewPeerConnection(webrtc.Configuration{})
track, _ := p.NewTrack(webrtc.DefaultPayloadTypeVP8, 1234, "test", "pion")

type args struct {
track *webrtc.Track
o BufferOptions
}
tests := []struct {
name string
args args
}{
{
name: "Must not be nil and add packets in sequence",
args: args{
track: track,
o: BufferOptions{
TWCCExt: 0,
BufferTime: 1e3,
MaxBitRate: 1e3,
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
var TestPackets = []*rtp.Packet{
{
Header: rtp.Header{
SequenceNumber: 65533,
},
},
{
Header: rtp.Header{
SequenceNumber: 65534,
},
},
{
Header: rtp.Header{
SequenceNumber: 2,
},
},
{
Header: rtp.Header{
SequenceNumber: 65535,
},
},
}
buff := NewBuffer(tt.args.track, tt.args.o)
assert.NotNil(t, buff)

for _, p := range TestPackets {
buff.push(p)
}
assert.Equal(t, 6, buff.pktQueue.size)
assert.Equal(t, uint32(1<<16), buff.cycles)
assert.Equal(t, uint16(2), buff.maxSeqNo)
})
}
}
19 changes: 13 additions & 6 deletions pkg/peer.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ type Peer struct {

makingOffer atomicBool
isSettingRemoteAnswerPending atomicBool
negotiationPending atomicBool
}

// NewPeer creates a new Peer for signaling with the given SFU
@@ -62,8 +63,19 @@ func (p *Peer) Join(sid string, sdp webrtc.SessionDescription) (*webrtc.SessionD
}

pc.OnNegotiationNeeded(func() {
if p.makingOffer.get() {
p.negotiationPending.set(true)
return
}

p.makingOffer.set(true)
defer p.makingOffer.set(false)
defer func() {
p.makingOffer.set(false)
if p.negotiationPending.get() {
p.negotiationPending.set(false)
pc.negotiate()
}
}()

log.Debugf("on negotiation needed called")
offer, err := pc.CreateOffer()
@@ -93,10 +105,8 @@ func (p *Peer) Join(sid string, sdp webrtc.SessionDescription) (*webrtc.SessionD
json := c.ToJSON()
p.OnIceCandidate(&json)
}

})

p.pc = pc
return answer, nil
}

@@ -109,11 +119,9 @@ func (p *Peer) Answer(sdp webrtc.SessionDescription) (*webrtc.SessionDescription

readyForOffer := !p.makingOffer.get() &&
(p.pc.SignalingState() == webrtc.SignalingStateStable || p.isSettingRemoteAnswerPending.get())

if !readyForOffer {
return nil, ErrOfferIgnored
}

if err := p.pc.SetRemoteDescription(sdp); err != nil {
return nil, fmt.Errorf("error setting remote description: %v", err)
}
@@ -144,7 +152,6 @@ func (p *Peer) SetRemoteDescription(sdp webrtc.SessionDescription) error {
if err := p.pc.SetRemoteDescription(sdp); err != nil {
return fmt.Errorf("error setting remote description: %v", err)
}

return nil
}

9 changes: 8 additions & 1 deletion pkg/router.go
Original file line number Diff line number Diff line change
@@ -128,14 +128,19 @@ func (r *router) AddSender(p *WebRTCTransport, rr *receiverRouter) error {
defer r.mu.Unlock()

if rr != nil {
return r.addSender(p, rr)
if err := r.addSender(p, rr); err != nil {
return err
}
p.negotiate()
return nil
}

for _, rr = range r.receivers {
if err := r.addSender(p, rr); err != nil {
return err
}
}
p.negotiate()
return nil
}

@@ -208,6 +213,8 @@ func (r *router) addSender(p *WebRTCTransport, rr *receiverRouter) error {
sender.OnCloseHandler(func() {
if err := p.pc.RemoveTrack(t.Sender()); err != nil { // nolint:scopelint
log.Errorf("Error closing sender: %s", err) // nolint:scopelint
} else {
p.negotiate() // nolint:scopelint
}
})

123 changes: 91 additions & 32 deletions pkg/sfu_test.go
Original file line number Diff line number Diff line change
@@ -49,14 +49,14 @@ func sendRTPWithSenderUntilDone(done <-chan struct{}, t *testing.T, track *webrt
}
}

func sendRTPUntilDone(done <-chan struct{}, t *testing.T, track *webrtc.Track) {
func sendRTPUntilDone(start, done <-chan struct{}, t *testing.T, track *webrtc.Track) {
<-start
for {
select {
case <-time.After(20 * time.Millisecond):
pkt := track.Packetizer().Packetize([]byte{0x05, 0x06, 0x07, 0x08}, 1)[0]
pkt.Payload = []byte{0xff, 0xff, 0xff, 0xfd, 0xb4, 0x9f, 0x94, 0x1}
err := track.WriteRTP(pkt)
assert.NoError(t, err)
_ = track.WriteRTP(pkt)
case <-done:
return
}
@@ -98,35 +98,46 @@ type peer struct {
local *Peer
remote *webrtc.PeerConnection
subs sync.WaitGroup
pubs []*webrtc.RTPSender
pubs []*sender
}

type step struct {
actions []*action
}

func addMedia(done <-chan struct{}, t *testing.T, pc *webrtc.PeerConnection, media []media) []*webrtc.RTPSender {
var senders []*webrtc.RTPSender
type sender struct {
transceiver *webrtc.RTPTransceiver
start chan struct{}
}

func addMedia(done <-chan struct{}, t *testing.T, pc *webrtc.PeerConnection, media []media) []*sender {
var senders []*sender
for _, media := range media {
var track *webrtc.Track
var err error

start := make(chan struct{})

switch media.kind {
case "audio":
track, err = pc.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), media.tid, media.id)
assert.NoError(t, err)
sender, err := pc.AddTrack(track)
transceiver, err := pc.AddTransceiverFromTrack(track, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
})
assert.NoError(t, err)
senders = append(senders, sender)
senders = append(senders, &sender{transceiver: transceiver, start: start})
case "video":
track, err = pc.NewTrack(webrtc.DefaultPayloadTypeVP8, rand.Uint32(), media.tid, media.id)
assert.NoError(t, err)
sender, err := pc.AddTrack(track)
transceiver, err := pc.AddTransceiverFromTrack(track, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
})
assert.NoError(t, err)
senders = append(senders, sender)
senders = append(senders, &sender{transceiver: transceiver, start: start})
}

go sendRTPUntilDone(done, t, track)
go sendRTPUntilDone(start, done, t, track)
}
return senders
}
@@ -236,15 +247,17 @@ func TestSFU_SessionScenarios(t *testing.T) {
id: "remote1",
kind: "publish",
media: []media{
{kind: "audio", id: "stream1", tid: "audio"},
{kind: "video", id: "stream1", tid: "video"},
{kind: "audio", id: "stream1", tid: "audio1"},
{kind: "video", id: "stream1", tid: "video1"},
},
}, {
}},
}, {
actions: []*action{{
id: "remote2",
kind: "publish",
media: []media{
{kind: "audio", id: "stream2", tid: "audio"},
{kind: "video", id: "stream2", tid: "video"},
{kind: "audio", id: "stream2", tid: "audio2"},
{kind: "video", id: "stream2", tid: "video2"},
},
}},
},
@@ -253,8 +266,27 @@ func TestSFU_SessionScenarios(t *testing.T) {
id: "remote1",
kind: "publish",
media: []media{
{kind: "audio", id: "stream3", tid: "audio"},
{kind: "video", id: "stream3", tid: "video"},
{kind: "audio", id: "stream3", tid: "audio3"},
{kind: "video", id: "stream3", tid: "video3"},
},
}},
},
{
actions: []*action{{
id: "remote1",
kind: "unpublish",
media: []media{
{kind: "audio", id: "stream1", tid: "audio1"},
{kind: "video", id: "stream1", tid: "video1"},
},
}},
}, {
actions: []*action{{
id: "remote2",
kind: "publish",
media: []media{
{kind: "audio", id: "stream4", tid: "audio4"},
{kind: "video", id: "stream4", tid: "video4"},
},
}},
},
@@ -265,38 +297,39 @@ func TestSFU_SessionScenarios(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
var mu sync.RWMutex
done := make(chan struct{})

peers := make(map[string]*peer)

for _, step := range tt.steps {
for _, action := range step.actions {
func() {
p := peers[action.id]

switch action.kind {
case "join":
assert.Nil(t, p)

me := webrtc.MediaEngine{}
me.RegisterDefaultCodecs()
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
r, err := api.NewPeerConnection(webrtc.Configuration{})
r.OnTrack(func(*webrtc.Track, *webrtc.RTPReceiver) {
p.subs.Done()
})

assert.NoError(t, err)
_, err = r.CreateDataChannel("ion-sfu", nil)
assert.NoError(t, err)
local := NewPeer(sfu)
p = &peer{id: action.id, remote: r, local: &local}
p := &peer{id: action.id, remote: r, local: &local}
r.OnTrack(func(track *webrtc.Track, recv *webrtc.RTPReceiver) {
mu.Lock()
p.subs.Done()
mu.Unlock()
})

mu.Lock()
for id, existing := range peers {
if id != action.id {
p.subs.Add(len(existing.pubs))
}
}

peers[action.id] = p
mu.Unlock()

p.mu.Lock()
p.remote.OnNegotiationNeeded(func() {
@@ -308,8 +341,14 @@ func TestSFU_SessionScenarios(t *testing.T) {
assert.NoError(t, err)
a, err := p.local.Answer(o)
assert.NoError(t, err)
log.Infof("%v", a)
p.remote.SetRemoteDescription(*a)

for _, pub := range p.pubs {
if pub.start != nil {
close(pub.start)
pub.start = nil
}
}
})

p.local.OnOffer = func(o *webrtc.SessionDescription) {
@@ -334,20 +373,40 @@ func TestSFU_SessionScenarios(t *testing.T) {
answer, err := p.local.Join("test", *p.remote.LocalDescription())
assert.NoError(t, err)
p.remote.SetRemoteDescription(*answer)

p.mu.Unlock()

case "publish":
mu.Lock()
peer := peers[action.id]
peer.mu.Lock()
// all other peers should get sub'd
for id, p := range peers {
if id != p.id {
if id != peer.id {
p.subs.Add(len(action.media))
}
}

p.pubs = append(p.pubs, addMedia(done, t, p.remote, action.media)...)
peer.pubs = append(peer.pubs, addMedia(done, t, peer.remote, action.media)...)
peer.mu.Unlock()
mu.Unlock()

case "unpublish":
mu.Lock()
peer := peers[action.id]
peer.mu.Lock()
for _, media := range action.media {
for _, pub := range peer.pubs {
if pub.transceiver != nil && pub.transceiver.Sender().Track().ID() == media.tid {
peer.remote.RemoveTrack(pub.transceiver.Sender())
pub.transceiver = nil
}
}
}
peer.mu.Unlock()
mu.Unlock()
}
}()
time.Sleep(1 * time.Second)
}
}

4 changes: 4 additions & 0 deletions pkg/simplesender.go
Original file line number Diff line number Diff line change
@@ -219,6 +219,10 @@ func (s *SimpleSender) receiveRTCP() {
if !s.reSync.get() && s.enabled.get() {
fwdPkts = append(fwdPkts, pkt)
}
case *rtcp.ReceiverReport:
if s.enabled.get() && len(pkt.Reports) > 0 && pkt.Reports[0].FractionLost > 25 {
log.Tracef("Slow link for sender %s, fraction packet lost %.2f", s.id, float64(pkt.Reports[0].FractionLost)/256)
}
case *rtcp.TransportLayerNack:
log.Tracef("sender got nack: %+v", pkt)
for _, pair := range pkt.Nacks {
14 changes: 5 additions & 9 deletions pkg/simulcastsender.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package sfu

import (
"bytes"
"context"
"encoding/binary"
"io"
"math/rand"
@@ -18,8 +17,6 @@ import (
// SimulcastSender represents a Sender which writes RTP to a webrtc track
type SimulcastSender struct {
id string
ctx context.Context
cancel context.CancelFunc
router *receiverRouter
sender *webrtc.RTPSender
transceiver *webrtc.RTPTransceiver
@@ -91,7 +88,7 @@ func (s *SimulcastSender) Start() {
// WriteRTP to the track
func (s *SimulcastSender) WriteRTP(pkt *rtp.Packet) {
// Simulcast write RTP is sync, so the packet can be safely modified and restored
if s.ctx.Err() != nil || !s.enabled.get() {
if !s.enabled.get() {
return
}
// Check if packet SSRC is different from before
@@ -249,7 +246,6 @@ func (s *SimulcastSender) CurrentSpatialLayer() uint8 {
// Close track
func (s *SimulcastSender) Close() {
s.close.Do(func() {
s.cancel()
if s.onCloseHandler != nil {
s.onCloseHandler()
}
@@ -274,10 +270,6 @@ func (s *SimulcastSender) receiveRTCP() {
return
}

if s.ctx.Err() != nil {
return
}

if err != nil {
log.Errorf("rtcp err => %v", err)
continue
@@ -303,6 +295,10 @@ func (s *SimulcastSender) receiveRTCP() {
pkt.SenderSSRC = s.lSSRC
fwdPkts = append(fwdPkts, pkt)
}
case *rtcp.ReceiverReport:
if s.enabled.get() && len(pkt.Reports) > 0 && pkt.Reports[0].FractionLost > 25 {
log.Tracef("Slow link for sender %s, fraction packet lost %.2f", s.id, float64(pkt.Reports[0].FractionLost)/256)
}
case *rtcp.TransportLayerNack:
log.Tracef("sender got nack: %+v", pkt)
for _, pair := range pkt.Nacks {
7 changes: 0 additions & 7 deletions pkg/simulcastsender_test.go
Original file line number Diff line number Diff line change
@@ -144,7 +144,6 @@ forLoop:
t.Run(tt.name, func(t *testing.T) {
if tt.fields.checkPacket {
s := &SimulcastSender{
ctx: context.Background(),
enabled: atomicBool{1},
router: r,
track: senderTrack,
@@ -233,10 +232,7 @@ forLoop:
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wss := &SimulcastSender{
ctx: ctx,
cancel: cancel,
enabled: atomicBool{1},
router: r,
sender: s,
@@ -323,8 +319,6 @@ func TestSimulcastSender_Close(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
s := &SimulcastSender{
ctx: tt.fields.ctx,
cancel: tt.fields.cancel,
router: tt.fields.router,
onCloseHandler: tt.fields.onCloseHandler,
}
@@ -515,7 +509,6 @@ forLoop:
}

simpleSdr := SimulcastSender{
ctx: context.Background(),
enabled: atomicBool{1},
simulcastSSRC: 1234,
router: r,
20 changes: 3 additions & 17 deletions pkg/webrtctransport.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ type WebRTCTransport struct {
senders map[string][]Sender
candidates []webrtc.ICECandidateInit
onTrackHandler func(*webrtc.Track, *webrtc.RTPReceiver)
negotiate func()

subOnce sync.Once
}
@@ -215,9 +216,9 @@ func (p *WebRTCTransport) OnICECandidate(f func(c *webrtc.ICECandidate)) {
// OnNegotiationNeeded handler
func (p *WebRTCTransport) OnNegotiationNeeded(f func()) {
debounced := debounce.New(100 * time.Millisecond)
p.pc.OnNegotiationNeeded(func() {
p.negotiate = func() {
debounced(f)
})
}
}

// OnTrack handler
@@ -227,21 +228,6 @@ func (p *WebRTCTransport) OnTrack(f func(*webrtc.Track, *webrtc.RTPReceiver)) {
p.onTrackHandler = f
}

// OnConnectionStateChange handler
func (p *WebRTCTransport) OnConnectionStateChange(f func(webrtc.PeerConnectionState)) {
p.pc.OnConnectionStateChange(f)
}

// OnDataChannel handler
func (p *WebRTCTransport) OnDataChannel(f func(*webrtc.DataChannel)) {
p.pc.OnDataChannel(f)
}

// AddTransceiverFromKind adds RtpTransceiver on WebRTC Transport
func (p *WebRTCTransport) AddTransceiverFromKind(kind webrtc.RTPCodecType, init ...webrtc.RtpTransceiverInit) (*webrtc.RTPTransceiver, error) {
return p.pc.AddTransceiverFromKind(kind, init...)
}

func (p *WebRTCTransport) SignalingState() webrtc.SignalingState {
return p.pc.SignalingState()
}
208 changes: 0 additions & 208 deletions pkg/webrtctransport_test.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ import (
"context"
"reflect"
"testing"
"time"

"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/assert"
@@ -46,43 +45,6 @@ func TestNewWebRTCTransport(t *testing.T) {
}
}

func TestWebRTCTransport_AddTransceiverFromKind(t *testing.T) {
me := MediaEngine{}
me.RegisterDefaultCodecs()

type args struct {
kind webrtc.RTPCodecType
init []webrtc.RtpTransceiverInit
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "Must return transceiver without errors",
args: args{
kind: webrtc.RTPCodecTypeAudio,
init: []webrtc.RtpTransceiverInit{{Direction: webrtc.RTPTransceiverDirectionRecvonly}},
},
wantErr: false,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
s := NewSession("test")
p, err := NewWebRTCTransport(s, me, WebRTCTransportConfig{})
assert.NoError(t, err)
_, err = p.AddTransceiverFromKind(tt.args.kind, tt.args.init...)
if (err != nil) != tt.wantErr {
t.Errorf("AddTransceiverFromKind() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
}
}

func TestWebRTCTransport_Close(t *testing.T) {
me := webrtc.MediaEngine{}
me.RegisterDefaultCodecs()
@@ -340,176 +302,6 @@ func TestWebRTCTransport_LocalDescription(t *testing.T) {
}
}

func TestWebRTCTransport_OnConnectionStateChange(t *testing.T) {
me := webrtc.MediaEngine{}
me.RegisterDefaultCodecs()
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
sfu, err := api.NewPeerConnection(webrtc.Configuration{})
assert.NoError(t, err)

type fields struct {
pc *webrtc.PeerConnection
}
type args struct {
f func(webrtc.PeerConnectionState)
}

tests := []struct {
name string
fields fields
args args
}{
{
name: "Must set peer connection state callback",
fields: fields{
pc: sfu,
},
args: args{
f: func(state webrtc.PeerConnectionState) {
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
p := &WebRTCTransport{
pc: tt.fields.pc,
}
//TODO: Try to force the callback
p.OnConnectionStateChange(tt.args.f)
})
}
}

func TestWebRTCTransport_OnDataChannel(t *testing.T) {
me := webrtc.MediaEngine{}
me.RegisterDefaultCodecs()
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
sfu, remote, err := newPair(webrtc.Configuration{}, api)
assert.NoError(t, err)
_, err = remote.CreateDataChannel("data", &webrtc.DataChannelInit{})
assert.NoError(t, err)
// Register channel opening handling
dcChan := make(chan struct{}, 1)

type fields struct {
pc *webrtc.PeerConnection
}
type args struct {
f func(*webrtc.DataChannel)
}
tests := []struct {
name string
fields fields
args args
}{
{
name: "Must set and call on data channel method",
fields: fields{
pc: sfu,
},
args: args{
f: func(_ *webrtc.DataChannel) {
dcChan <- struct{}{}
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
p := &WebRTCTransport{
pc: tt.fields.pc,
}
p.OnDataChannel(tt.args.f)
err = signalPair(remote, sfu)
assert.NoError(t, err)
tmr := time.NewTimer(5000 * time.Millisecond)
testLoop:
for {
select {
case <-tmr.C:
t.Fatal("onDataChannel not called")
case <-dcChan:
tmr.Stop()
break testLoop
}
}
})
}
_ = sfu.Close()
_ = remote.Close()
}

func TestWebRTCTransport_OnNegotiationNeeded(t *testing.T) {
me := webrtc.MediaEngine{}
me.RegisterDefaultCodecs()
api := webrtc.NewAPI(webrtc.WithMediaEngine(me))
sfu, remote, err := newPair(webrtc.Configuration{}, api)
assert.NoError(t, err)

remoteTrack, err := sfu.NewTrack(webrtc.DefaultPayloadTypeVP8, 1234, "video", "pion")
assert.NoError(t, err)
_, err = sfu.AddTrack(remoteTrack)
assert.NoError(t, err)

err = signalPair(sfu, remote)

negChan := make(chan struct{}, 1)

type fields struct {
pc *webrtc.PeerConnection
}
type args struct {
f func()
}
tests := []struct {
name string
fields fields
args args
}{
{
name: "Must set and call func on negotiation",
fields: fields{
pc: sfu,
},
args: args{
f: func() {
negChan <- struct{}{}
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
p := &WebRTCTransport{
pc: tt.fields.pc,
}
p.OnNegotiationNeeded(tt.args.f)

senderTrack, err := sfu.NewTrack(webrtc.DefaultPayloadTypeVP8, 5678, "video", "pion")
assert.NoError(t, err)
_, err = sfu.AddTrack(senderTrack)
assert.NoError(t, err)
tmr := time.NewTimer(5000 * time.Millisecond)
testLoop:
for {
select {
case <-tmr.C:
t.Fatal("onNegotiation not called")
case <-negChan:
tmr.Stop()
break testLoop
}
}
})
}
_ = sfu.Close()
_ = remote.Close()
}

func TestWebRTCTransport_OnTrack(t *testing.T) {
type args struct {
f func(*webrtc.Track, *webrtc.RTPReceiver)