Skip to content

Commit 219c6a3

Browse files
committedFeb 5, 2024
Fix data race of RTX packet
Fix data race of RTX packet
1 parent f68b789 commit 219c6a3

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed
 

‎rtpreceiver.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ type trackStreams struct {
4343
type rtxPacketWithAttributes struct {
4444
pkt []byte
4545
attributes interceptor.Attributes
46+
pool *sync.Pool
47+
}
48+
49+
func (p *rtxPacketWithAttributes) release() {
50+
if p.pkt != nil {
51+
b := p.pkt[:cap(p.pkt)]
52+
p.pool.Put(b) // nolint:staticcheck
53+
p.pkt = nil
54+
}
4655
}
4756

4857
// RTPReceiver allows an application to inspect the receipt of a TrackRemote
@@ -59,6 +68,8 @@ type RTPReceiver struct {
5968

6069
// A reference to the associated api object
6170
api *API
71+
72+
rtxPool sync.Pool
6273
}
6374

6475
// NewRTPReceiver constructs a new RTPReceiver
@@ -74,6 +85,9 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT
7485
closed: make(chan interface{}),
7586
received: make(chan interface{}),
7687
tracks: []trackStreams{},
88+
rtxPool: sync.Pool{New: func() interface{} {
89+
return make([]byte, api.settingEngine.getReceiveMTU())
90+
}},
7791
}
7892

7993
return r, nil
@@ -411,10 +425,11 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
411425
track.repairStreamChannel = make(chan rtxPacketWithAttributes)
412426

413427
go func() {
414-
b := make([]byte, r.api.settingEngine.getReceiveMTU())
415428
for {
429+
b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
416430
i, attributes, err := track.repairInterceptor.Read(b, nil)
417431
if err != nil {
432+
r.rtxPool.Put(b) // nolint:staticcheck
418433
return
419434
}
420435

@@ -435,6 +450,7 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
435450

436451
if i-int(headerLength)-paddingLength < 2 {
437452
// BWE probe packet, ignore
453+
r.rtxPool.Put(b) // nolint:staticcheck
438454
continue
439455
}
440456

@@ -450,8 +466,9 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
450466

451467
select {
452468
case <-r.closed:
469+
r.rtxPool.Put(b) // nolint:staticcheck
453470
return
454-
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes}:
471+
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}:
455472
}
456473
}
457474
}()

‎track_remote.go

+1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
131131
if rtxPacketReceived := r.readRTX(t); rtxPacketReceived != nil {
132132
n = copy(b, rtxPacketReceived.pkt)
133133
attributes = rtxPacketReceived.attributes
134+
rtxPacketReceived.release()
134135
err = nil
135136
} else {
136137
// If there's no separate RTX track (or there's a separate RTX track but no RTX packet waiting), wait for and return

0 commit comments

Comments
 (0)
Please sign in to comment.