Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/reduce-feedback-interceptor-bu…
Browse files Browse the repository at this point in the history
…ffer' into changed
  • Loading branch information
AnshulMalik committed Apr 19, 2024
2 parents 1449b4f + 926b8de commit b837b64
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 62 deletions.
13 changes: 7 additions & 6 deletions internal/cc/acknowledgment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ type Acknowledgment struct {
}

func (a Acknowledgment) String() string {
s := "ACK:\n"
s += fmt.Sprintf("\tTLCC:\t%v\n", a.SequenceNumber)
s += fmt.Sprintf("\tSIZE:\t%v\n", a.Size)
s += fmt.Sprintf("\tDEPARTURE:\t%v\n", int64(float64(a.Departure.UnixNano())/1e+6))
s += fmt.Sprintf("\tARRIVAL:\t%v\n", int64(float64(a.Arrival.UnixNano())/1e+6))
return s
return fmt.Sprintf("ssrc:%d sn:%d size:%d departure:%s arrival:%s rtt:%s",
a.SSRC,
a.SequenceNumber,
a.Size,
a.Departure.String(), a.Arrival.String(),
a.Arrival.Sub(a.Departure).String(),
)
}
31 changes: 25 additions & 6 deletions internal/cc/feedback_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/ntp"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
Expand All @@ -29,11 +30,16 @@ var (
type FeedbackAdapter struct {
lock sync.Mutex
history *feedbackHistory

log logging.LeveledLogger
}

// NewFeedbackAdapter returns a new FeedbackAdapter
func NewFeedbackAdapter() *FeedbackAdapter {
return &FeedbackAdapter{history: newFeedbackHistory(250)}
func NewFeedbackAdapter(factory logging.LoggerFactory) *FeedbackAdapter {
return &FeedbackAdapter{
history: newFeedbackHistory(5000),
log: factory.NewLogger("feedback_adapter"),
}
}

func (f *FeedbackAdapter) onSentRFC8888(ts time.Time, header *rtp.Header, size int) error {
Expand Down Expand Up @@ -61,6 +67,8 @@ func (f *FeedbackAdapter) onSentTWCC(ts time.Time, extID uint8, header *rtp.Head

f.lock.Lock()
defer f.lock.Unlock()

f.log.Infof("[twcc] recorded packet sn:%d ts:%d\n", tccExt.TransportSequence, ts.Unix())
f.history.add(Acknowledgment{
SequenceNumber: tccExt.TransportSequence,
SSRC: 0,
Expand Down Expand Up @@ -111,11 +119,18 @@ func (f *FeedbackAdapter) unpackRunLengthChunk(start uint16, refTime time.Time,
return deltaIndex, refTime, result, nil
}

func (f *FeedbackAdapter) unpackStatusVectorChunk(start uint16, refTime time.Time, chunk *rtcp.StatusVectorChunk, deltas []*rtcp.RecvDelta) (consumedDeltas int, nextRef time.Time, acks []Acknowledgment, err error) {
result := make([]Acknowledgment, len(chunk.SymbolList))
func (f *FeedbackAdapter) unpackStatusVectorChunk(start uint16, refTime time.Time, chunk *rtcp.StatusVectorChunk,
deltas []*rtcp.RecvDelta, maxBitsToRead int) (consumedDeltas int, nextRef time.Time, acks []Acknowledgment, err error) {

bitsToRead := len(chunk.SymbolList)
if bitsToRead > maxBitsToRead {
bitsToRead = maxBitsToRead
}
result := make([]Acknowledgment, bitsToRead)
deltaIndex := 0
resultIndex := 0
for i, symbol := range chunk.SymbolList {
for i := 0; i < bitsToRead; i++ {
symbol := chunk.SymbolList[i]
key := feedbackHistoryKey{
ssrc: 0,
sequenceNumber: start + uint16(i),
Expand Down Expand Up @@ -148,9 +163,11 @@ func (f *FeedbackAdapter) OnTransportCCFeedback(_ time.Time, feedback *rtcp.Tran
refTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond)
recvDeltas := feedback.RecvDeltas

processedPacketNum := 0
for _, chunk := range feedback.PacketChunks {
switch chunk := chunk.(type) {
case *rtcp.RunLengthChunk:
processedPacketNum += int(chunk.RunLength)
n, nextRefTime, acks, err := f.unpackRunLengthChunk(index, refTime, chunk, recvDeltas)
if err != nil {
return nil, err
Expand All @@ -160,11 +177,13 @@ func (f *FeedbackAdapter) OnTransportCCFeedback(_ time.Time, feedback *rtcp.Tran
recvDeltas = recvDeltas[n:]
index = uint16(int(index) + len(acks))
case *rtcp.StatusVectorChunk:
n, nextRefTime, acks, err := f.unpackStatusVectorChunk(index, refTime, chunk, recvDeltas)
maxBitsToRead := int(feedback.PacketStatusCount) - processedPacketNum
n, nextRefTime, acks, err := f.unpackStatusVectorChunk(index, refTime, chunk, recvDeltas, maxBitsToRead)
if err != nil {
return nil, err
}
refTime = nextRefTime
processedPacketNum += len(acks)
result = append(result, acks...)
recvDeltas = recvDeltas[n:]
index = uint16(int(index) + len(acks))
Expand Down
30 changes: 18 additions & 12 deletions pkg/gcc/adaptive_threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

const (
maxDeltas = 60
maxDeltas = 20
)

type adaptiveThresholdOption func(*adaptiveThreshold)
Expand Down Expand Up @@ -49,7 +49,7 @@ func newAdaptiveThreshold(opts ...adaptiveThresholdOption) *adaptiveThreshold {
thresh: time.Duration(12500 * float64(time.Microsecond)),
overuseCoefficientUp: 0.01,
overuseCoefficientDown: 0.00018,
min: 6 * time.Millisecond,
min: 30 * time.Millisecond,
max: 600 * time.Millisecond,
lastUpdate: time.Time{},
numDeltas: 0,
Expand All @@ -60,7 +60,7 @@ func newAdaptiveThreshold(opts ...adaptiveThresholdOption) *adaptiveThreshold {
return at
}

func (a *adaptiveThreshold) compare(estimate, _ time.Duration) (usage, time.Duration, time.Duration) {
func (a *adaptiveThreshold) compare(estimate, arrivalDelta time.Duration) (usage, time.Duration, time.Duration) {
a.numDeltas++
if a.numDeltas < 2 {
return usageNormal, estimate, a.max
Expand All @@ -73,29 +73,35 @@ func (a *adaptiveThreshold) compare(estimate, _ time.Duration) (usage, time.Dura
use = usageUnder
}
thresh := a.thresh
a.update(t)
a.update(t, arrivalDelta)
return use, t, thresh
}

func (a *adaptiveThreshold) update(estimate time.Duration) {
now := time.Now()
if a.lastUpdate.IsZero() {
a.lastUpdate = now
}
func (a *adaptiveThreshold) update(estimate time.Duration, arrivalDelta time.Duration) {
// This code was taken from GCC-REMB
// in GCC-REMB, this calculation happens on the subscriber end, so it is valid to look at system time diff
// but in TWCC, we need to look at arrival-difference rather than system time difference
// now := time.Now()
// if a.lastUpdate.IsZero() {
// a.lastUpdate = now
// }
absEstimate := time.Duration(math.Abs(float64(estimate.Microseconds()))) * time.Microsecond
if absEstimate > a.thresh+15*time.Millisecond {
a.lastUpdate = now
//fmt.Println("absestimate<15+threshold, skipping, abs:", absEstimate, ",comparing:", a.thresh+15*time.Millisecond)
//a.lastUpdate = now
return
}
k := a.overuseCoefficientUp
if absEstimate < a.thresh {
k = a.overuseCoefficientDown
}
maxTimeDelta := 100 * time.Millisecond
timeDelta := time.Duration(minInt(int(now.Sub(a.lastUpdate).Milliseconds()), int(maxTimeDelta.Milliseconds()))) * time.Millisecond
timeDelta := time.Duration(minInt(int(arrivalDelta.Milliseconds()), int(maxTimeDelta.Milliseconds()))) * time.Millisecond
d := absEstimate - a.thresh
add := k * float64(d.Milliseconds()) * float64(timeDelta.Milliseconds())
// fmt.Println("changing threshold, delta:", time.Duration(add*1000)*time.Microsecond, ",add:", add,
// ",timedelta:", timeDelta, ",d:", d)
a.thresh += time.Duration(add*1000) * time.Microsecond
a.thresh = clampDuration(a.thresh, a.min, a.max)
a.lastUpdate = now
//a.lastUpdate = now
}
17 changes: 15 additions & 2 deletions pkg/gcc/arrival_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,28 @@ import (
)

type arrivalGroup struct {
packets []cc.Acknowledgment
packets []cc.Acknowledgment
// departure holds the highest departure timestamp till now
departure time.Time
arrival time.Time
// firstDeparture holds the lowest departure timestamp till now
firstDeparture time.Time
}

func (g *arrivalGroup) add(a cc.Acknowledgment) {
if len(g.packets) == 0 {
g.firstDeparture = a.Departure
}
g.packets = append(g.packets, a)
g.arrival = a.Arrival
g.departure = a.Departure

if a.Departure.After(g.departure) {
g.departure = a.Departure
}
if a.Departure.Before(g.firstDeparture) {
g.firstDeparture = a.Departure
}

}

func (g arrivalGroup) String() string {
Expand Down
7 changes: 6 additions & 1 deletion pkg/gcc/arrival_group_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func (a *arrivalGroupAccumulator) run(in <-chan []cc.Acknowledgment, agWriter fu
continue
}

// fmt.Println("different group, dep delay:", interDepartureTimePkt(group, next),
// ", interArrivalTimePkt:", interArrivalTimePkt(group, next),
// ",interGroupDelayVariationPkt:", interGroupDelayVariationPkt(group, next))
agWriter(group)
group = arrivalGroup{}
group.add(next)
Expand All @@ -61,11 +64,13 @@ func interArrivalTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
return b.Arrival.Sub(a.arrival)
}

// libwebrtc takes a difference between first departure and last departure
// pion was looking at the difference between penultimate and last departures
func interDepartureTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
if len(a.packets) == 0 {
return 0
}
return b.Departure.Sub(a.packets[len(a.packets)-1].Departure)
return b.Departure.Sub(a.firstDeparture)
}

func interGroupDelayVariationPkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
Expand Down
6 changes: 3 additions & 3 deletions pkg/gcc/delay_based_bwe.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type delayControllerConfig struct {
maxBitrate int
}

func newDelayController(c delayControllerConfig) *delayController {
func newDelayController(c delayControllerConfig, factory logging.LoggerFactory) *delayController {
ackPipe := make(chan []cc.Acknowledgment)
ackRatePipe := make(chan []cc.Acknowledgment)

Expand All @@ -58,7 +58,7 @@ func newDelayController(c delayControllerConfig) *delayController {
rateController: nil,
onUpdateCallback: nil,
wg: sync.WaitGroup{},
log: logging.NewDefaultLoggerFactory().NewLogger("gcc_delay_controller"),
log: factory.NewLogger("gcc_delay_controller"),
}

rateController := newRateController(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate, func(ds DelayStats) {
Expand All @@ -68,7 +68,7 @@ func newDelayController(c delayControllerConfig) *delayController {
}
})
delayController.rateController = rateController
overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond, rateController.onDelayStats)
overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond, rateController.onDelayStats, factory)
slopeEstimator := newSlopeEstimator(newKalman(), overuseDetector.onDelayStats)
arrivalGroupAccumulator := newArrivalGroupAccumulator()

Expand Down
30 changes: 24 additions & 6 deletions pkg/gcc/kalman.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ const (
type kalmanOption func(*kalman)

type kalman struct {
gain float64
estimate time.Duration
processUncertainty float64 // Q_i
estimateError float64
measurementUncertainty float64

gain float64
estimate time.Duration
processUncertainty float64 // Q_i
estimateError float64
measurementUncertainty float64
lastUpdate time.Time
disableMeasurementUncertaintyUpdates bool
}

Expand Down Expand Up @@ -62,6 +62,7 @@ func newKalman(opts ...kalmanOption) *kalman {
estimateError: 0.1,
measurementUncertainty: 0,
disableMeasurementUncertaintyUpdates: false,
lastUpdate: time.Time{},
}
for _, opt := range opts {
opt(k)
Expand All @@ -70,6 +71,21 @@ func newKalman(opts ...kalmanOption) *kalman {
}

func (k *kalman) updateEstimate(measurement time.Duration) time.Duration {
now := time.Now()
if k.lastUpdate.IsZero() {
k.lastUpdate = now
}
// if there is a large gap, reset the state and ignore the first twcc feedback
if now.Sub(k.lastUpdate).Milliseconds() > 5000 {
k.measurementUncertainty = 0
k.estimate = 0
k.estimateError = 0.1
k.gain = 0
k.processUncertainty = 1e-3
k.lastUpdate = now
return k.estimate
}
k.lastUpdate = now
z := measurement - k.estimate

zms := float64(z.Microseconds()) / 1000.0
Expand All @@ -90,5 +106,7 @@ func (k *kalman) updateEstimate(measurement time.Duration) time.Duration {
k.estimate += time.Duration(k.gain * zms * float64(time.Millisecond))

k.estimateError = (1 - k.gain) * estimateUncertainty
// fmt.Println("k.measurementUncertainty:", k.measurementUncertainty, ",k.gain:", k.gain,
// ",estimateUncertainty", estimateUncertainty, ",k.estimateError", k.estimateError, ",k.estimate:", k.estimate)
return k.estimate
}
4 changes: 2 additions & 2 deletions pkg/gcc/leaky_bucket_pacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type LeakyBucketPacer struct {
}

// NewLeakyBucketPacer initializes a new LeakyBucketPacer
func NewLeakyBucketPacer(initialBitrate int) *LeakyBucketPacer {
func NewLeakyBucketPacer(initialBitrate int, factory logging.LoggerFactory) *LeakyBucketPacer {
p := &LeakyBucketPacer{
log: logging.NewDefaultLoggerFactory().NewLogger("pacer"),
log: factory.NewLogger("pacer"),
f: 1.5,
targetBitrate: initialBitrate,
pacingInterval: 5 * time.Millisecond,
Expand Down

0 comments on commit b837b64

Please sign in to comment.