Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pacer interface to send packets #1718

Draft
wants to merge 210 commits into
base: master
Choose a base branch
from
Draft

Pacer interface to send packets #1718

wants to merge 210 commits into from

Conversation

boks1971
Copy link
Contributor

Starting some work to be able to do send side bandwidth estimation.

This is adding a Pacer interface to have a central place where all packets can go through. Still a ways to go before we can do TWCC, but setting this up to get some 👀 and feedback on the approach.

All down tracks get a Pacer to enqueue data to. There is a callback from Pacer when packet is sent. DownTrack does things like accounting in this callback.

A few things to discuss

  • With Pacer, all packets are copied in the DownTrack.
  • Is the per packet callback going to be prohibitive? A quick check did not show sendPacketMetadata escaping to heap, but my go chops is not strong enough to be certain.

Will leave inline notes on implementation.

@@ -198,6 +202,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
params.Telemetry),
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality),
pacer: pacer.NewPassThrough(params.Logger),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pacer is an interface to enqueue packets to be sent. Currently, I have two implementations

  1. pacer.PassThrough -> functionally equivalent to how things are today. This one just immediately sends the packet in the calling go routine.
  2. pacer.NoQueue -> this one sends all packets from another go routine. Use case here is to do TWCC (adding TWCC extension header and send timestamp the packet). Gives control over order of packet send, i. e. for bandwidth estimation/TWCC, would be good to ensure that send order matches the sequence number/timestamp order to give receive side a better chance of receiving in same order and get more stable bandwidth estimation.

NOTES:

  1. There is more work needed like maybe completely bypassing audio tracks (or tracks which do not have TWCC extension) and send them immediately.
  2. May need to implement more pacer types to meter data at channel capacity to prevent congestion.


type Pacer interface {
Enqueue(p Packet)
Stop()
Copy link
Contributor Author

@boks1971 boks1971 May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple interface to enqueue a packet to be sent and stop a pacer. At least one more method that will be needed is to set the target bitrate if we add a pacer that meters data out per the channel capacity.

Implementations will enqueue/dequeue packets per their strategy.

p := n.packets.PopFront()
n.lock.Unlock()

n.Base.SendPacket(&p)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All packets will go out in this go routine in this Pacer implementation. Idea is to use this with TWCC/send side bandwidth estimation. Note that all packets will go out from this go routine to ensure that the sequence number and time stamp we add for TWCC purposes do not get out-of-order before packet leaves the node. But, have to see if this creates a bottleneck or performance issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also possible to try this with REMB (receive side BWE) to ensure packets are sent in order of time stamping to see if that improves bandwidth estimation, especially during probing.

WriteStream webrtc.TrackLocalWriter
Metadata interface{}
OnSent func(md interface{}, sentHeader *rtp.Header, payloadSize int, sentTime time.Time)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packet is enqueued with all the data. Extensions are added as necessary and packet sent to WriteStream. OnSent is the per packet callback. Wondering if that is going to cause performance issues.

@@ -741,6 +721,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool) int {
bytesSent += hdr.MarshalSize() + len(payload)
}

// STREAM_ALLOCATOR-TODO: change this to pull this counter from stream allocator so that counter can be update in pacer callback
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this TODO here as we were returning bytes sent when a write failed synchronously. With pacer, cannot get error synchronously. So, this is speculatively returning bytes sent for all padding packets potentially. Few options

  1. Keep counter here and stream allocator can pull from here periodically.
  2. Post periodically to stream allocator how much padding is sent.
  3. Use a wait group or some blocking mechanism to ensure all packets have been processed, count bytes sent in the callback and return that.

@@ -1217,22 +1200,17 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
return done
}

func (d *DownTrack) writeOpusBlankFrame(hdr *rtp.Header, frameEndNeeded bool) (int, error) {
func (d *DownTrack) getOpusBlankFrame(_frameEndNeeded bool) ([]byte, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplifying these. Actually these do not need to be struct method (expect for the VP8 one), but because of the VP8 one leaving these here.

Copy link
Contributor

@paulwe paulwe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm 👍

select {
case n.wake <- struct{}{}:
default:
n.logger.Warnw("wake channel full", nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this path really concerning? if the source is bursty this seems like it could get really noisy even if the queue is healthy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really Paul. I can remove that. Was just being paranoid about missing something and not having visibility.

Copy link
Member

@davidzhao davidzhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!!

Question, should we merge these into a new twcc feature branch? so that we could allow other patches/releases to be done on master

@boks1971
Copy link
Contributor Author

lgtm!!

Question, should we merge these into a new twcc feature branch? so that we could allow other patches/releases to be done on master

Yeah, I can just keep it in this branch for a long time. I normally rebase (actually I always use merge and not rebase) regularly. So, this branch can be the feature branch. Also, just set up another PR to merge into this branch.

@boks1971 boks1971 mentioned this pull request May 20, 2023
@boks1971
Copy link
Contributor Author

Sorry @paulwe , did not mean to ask for review. This is a long standing branch which I need to work at some point. Once in a while I merge to ensure that it is still okay. Will change it to a draft.

@boks1971 boks1971 marked this pull request as draft April 26, 2024 08:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants