New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pacer interface to send packets #1718
base: master
Are you sure you want to change the base?
Conversation
pkg/rtc/participant.go
Outdated
@@ -198,6 +202,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { | |||
params.Telemetry), | |||
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), | |||
tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality), | |||
pacer: pacer.NewPassThrough(params.Logger), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pacer
is an interface to enqueue packets to be sent. Currently, I have two implementations
pacer.PassThrough
-> functionally equivalent to how things are today. This one just immediately sends the packet in the calling go routine.pacer.NoQueue
-> this one sends all packets from another go routine. Use case here is to do TWCC (adding TWCC extension header and send timestamp the packet). Gives control over order of packet send, i. e. for bandwidth estimation/TWCC, would be good to ensure that send order matches the sequence number/timestamp order to give receive side a better chance of receiving in same order and get more stable bandwidth estimation.
NOTES:
- There is more work needed like maybe completely bypassing audio tracks (or tracks which do not have TWCC extension) and send them immediately.
- May need to implement more pacer types to meter data at channel capacity to prevent congestion.
pkg/sfu/pacer/pacer.go
Outdated
|
||
type Pacer interface { | ||
Enqueue(p Packet) | ||
Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple interface to enqueue a packet to be sent and stop a pacer. At least one more method that will be needed is to set the target bitrate if we add a pacer that meters data out per the channel capacity.
Implementations will enqueue/dequeue packets per their strategy.
pkg/sfu/pacer/no_queue.go
Outdated
p := n.packets.PopFront() | ||
n.lock.Unlock() | ||
|
||
n.Base.SendPacket(&p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All packets will go out in this go routine in this Pacer
implementation. Idea is to use this with TWCC/send side bandwidth estimation. Note that all packets will go out from this go routine to ensure that the sequence number and time stamp we add for TWCC purposes do not get out-of-order before packet leaves the node. But, have to see if this creates a bottleneck or performance issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is also possible to try this with REMB (receive side BWE) to ensure packets are sent in order of time stamping to see if that improves bandwidth estimation, especially during probing.
pkg/sfu/pacer/pacer.go
Outdated
WriteStream webrtc.TrackLocalWriter | ||
Metadata interface{} | ||
OnSent func(md interface{}, sentHeader *rtp.Header, payloadSize int, sentTime time.Time) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Packet is enqueued with all the data. Extensions are added as necessary and packet sent to WriteStream
. OnSent
is the per packet callback. Wondering if that is going to cause performance issues.
pkg/sfu/downtrack.go
Outdated
@@ -741,6 +721,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool) int { | |||
bytesSent += hdr.MarshalSize() + len(payload) | |||
} | |||
|
|||
// STREAM_ALLOCATOR-TODO: change this to pull this counter from stream allocator so that counter can be update in pacer callback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this TODO here as we were returning bytes sent when a write failed synchronously. With pacer, cannot get error synchronously. So, this is speculatively returning bytes sent for all padding packets potentially. Few options
- Keep counter here and stream allocator can pull from here periodically.
- Post periodically to stream allocator how much padding is sent.
- Use a wait group or some blocking mechanism to ensure all packets have been processed, count bytes sent in the callback and return that.
pkg/sfu/downtrack.go
Outdated
@@ -1217,22 +1200,17 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan | |||
return done | |||
} | |||
|
|||
func (d *DownTrack) writeOpusBlankFrame(hdr *rtp.Header, frameEndNeeded bool) (int, error) { | |||
func (d *DownTrack) getOpusBlankFrame(_frameEndNeeded bool) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplifying these. Actually these do not need to be struct method (expect for the VP8 one), but because of the VP8 one leaving these here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm 👍
pkg/sfu/pacer/no_queue.go
Outdated
select { | ||
case n.wake <- struct{}{}: | ||
default: | ||
n.logger.Warnw("wake channel full", nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this path really concerning? if the source is bursty this seems like it could get really noisy even if the queue is healthy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really Paul. I can remove that. Was just being paranoid about missing something and not having visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!!
Question, should we merge these into a new twcc feature branch? so that we could allow other patches/releases to be done on master
Yeah, I can just keep it in this branch for a long time. I normally rebase (actually I always use merge and not rebase) regularly. So, this branch can be the feature branch. Also, just set up another PR to merge into this branch. |
* add packet time * WIP commit * WIP commit * WIP commit * minor comments
* WIP commit * some notes * WIP commit
Sorry @paulwe , did not mean to ask for review. This is a long standing branch which I need to work at some point. Once in a while I merge to ensure that it is still okay. Will change it to a draft. |
Starting some work to be able to do send side bandwidth estimation.
This is adding a
Pacer
interface to have a central place where all packets can go through. Still a ways to go before we can do TWCC, but setting this up to get some 👀 and feedback on the approach.All down tracks get a
Pacer
to enqueue data to. There is a callback fromPacer
when packet is sent.DownTrack
does things like accounting in this callback.A few things to discuss
Pacer
, all packets are copied in theDownTrack
.sendPacketMetadata
escaping to heap, but my go chops is not strong enough to be certain.Will leave inline notes on implementation.