Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Go in a more idiomatic way #126

Merged
merged 17 commits into from Feb 9, 2023
Merged

Conversation

daniel-abramov
Copy link
Contributor

@daniel-abramov daniel-abramov commented Feb 6, 2023

This is a result of working on #120 which did not quite work as expected due to some bugs and issues with Pion.

So currently it's more a preparation to properly implement a #120 and to get ready for #117

This PR changes the following things:

  • The way we work with the Go channels. It removes some helpers that we used to use in the past since they were not generic enough and their implementation would not work as expected if used as a generic channel implementation. We also use unbounded channels more than before that simplifies the backpressure between the router and the conference. The only part that is left that does not look super idiomatic is pkg/channel/sink.go (might get improved in the future).
  • We now use a bit more idiomatic package structure, i.e. this PR removes the usage of a common package as it seems to be an antipattern that is used to overcome Go limitations and which ends up having everything that does not belong to other packages.
  • Prepare for Make each publisher send RTP packets directly to its subscribers #117

Note that there is one significant change in the behavior: currently, the router process the messages from the Matrix SDk and sends them to the conference via an unbounded channel. This means that if one conference freezes, it will affect all other conferences! It's not a problem for now since we only primarily use a single conference and the conferences should never freeze (so it helps to check if our implementation is sound), but ideally we probably need to get the buffer back for the conference, otherwise we won't be able to make them all independent.

There was no point requesting the key frame by sending a message to the
conference as the only thing that the conference did was calling a
single function from the peer, so it was completely wasting the CPU
time. Instead, we could directly call this function and write to the
peer connection. The function is thread-safe since all mutable state is
protected by a mutex and the peer connection has a mutex inside.
This solves the problem with managing the queue size on the receiver.
It's ok if the matrix SDK blocks until we're ready to accept new
messages.
Use unbounded channels for that. We can't quite use it for the
multi-conference set ups as a single conference will affect others, but
currently we don't have these problems.
Usage of `common` is an anti-pattern.
Package `common` is an antipattern.
The `common` package is an antipattern.
This is a temporary measure until we can workaround bugs in Pion that
lead to the deadlocks.
@daniel-abramov daniel-abramov requested a review from a team February 6, 2023 17:52
@daniel-abramov daniel-abramov changed the title Use channels in a more idiomatic way Use Go in a more idiomatic way Feb 6, 2023
Copy link
Member

@dbkr dbkr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good, but this is a huge PR with a lot of unrelated changes, making it very hard to review. Moving things out of common, in particular, feels like it doesn't belong here.

@daniel-abramov
Copy link
Contributor Author

daniel-abramov commented Feb 7, 2023

I think this looks good, but this is a huge PR with a lot of unrelated changes, making it very hard to review. Moving things out of common, in particular, feels like it doesn't belong here.

Yeah, sorry, perhaps the common part should have been moved into its own PR.

I tried to separate changes by commits, so it's easier to review the PR by checking each individual commit as one commit did one single thing (I tried to rebase them before creating a PR so that it's easier to follow the logic and the changes). I.e. if you review it by commit, I think it should be more manageable as they are mostly small.

cmd/sfu/main.go Outdated

// Start matrix client sync. This function will block until the sync fails.
matrixClient.RunSyncing(func(e *event.Event) {
routerChannel <- e
matrixEvents <- e
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matrixClient.RunSyncing running endless until a panic happen. Is that right?

We could avoid this if we change the signature like this:

	matrixEvents, errc := s.matrixClient.RunSyncing()
	routing.RunRouter(matrixClient, connectionFactory, matrixEvents, config.Conference)
	if err := <-errc; err != nil {
		log(err)
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Do I get it right that the advantage is that we don't panic from inside of a package, but do it in main instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could decide that by what kind of error occurred. I think we should avoid panic if not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense, though we only have one type of error. But please confirm that my understanding is correct: the reason why we're doing this is that it would allow us to avoid panic inside a package? - If so, I agree, but couldn't we solve it easier by just returning this error instead of panicking? I.e. I can slightly change the RunSyncing() so that it does not panic once syncing stopped, but instead just returns the error back to the caller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunSyncing() acts as the main loop. errc is a kind of done channel that could interrupt the loop gracefully. Additionally errc contains the reason for the termination. With nil it was an intentional abort and with error an abort because an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. But we could simply return an error instead of a panic which would be the same effect, right?

Anyway, I pushed a couple of new changes, so that:

  1. We don't panic anymore from inside the signaling package, we return an error instead (to be consistent with the API of the Matrix SDK there; it seems like a separate errc is not required in that particular case, but could be added on top of it if we ever need it?).
  2. I don't store the done inside a conference state anymore (as you pointed out, we only use it from the processMessages() loop to close it once the loop is over, so I just pass it as a local variable. That way no other function can [by accident] close a channel that it's not supposed to close 👍

Ptal 🙂

return ErrSinkSealed
case s.messageSink <- messageWithSender:
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not know which of the two cases is executed at runtime. With 'default' you make sure that the case is checked first.

select {
	case <-s.sealed:
		return ErrSinkSealed
	default: 
	s.messageSink <- messageWithSender:
		return nil
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But is it not different semantically? I.e. what if the sender gets to the s.messageSink <- messageWithSender part and blocks there waiting for the reader to get ready? We must ensure that once the reader is not ready to accept new messages (e.g. not interested in them anymore), then we don't continue waiting on the channel. Essentially that's what I tried to describe, i.e. it's semantically I wanted this

select {
    case <-done:
        return OkRecipientDoesNotExpectNewMessages
    case s.messageSink <- messageWithSender:
        return nil
}

Though your concern is valid, I tried to catch this case by checking the atomic variable at the beginning of the function (this does not guarantee that those who called Send() just before Close() won't send a value though).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok got it better send a message more than blocking.

}

participantID := participant.ID{UserID: userID, DeviceID: inviteEvent.DeviceID, CallID: inviteEvent.CallID}
if err := conference.onNewParticipant(participantID, inviteEvent); err != nil {
return nil, err
return nil, nil
}

// Start conference "main loop".
go conference.processMessages()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if processMessages has its own done channel that it closes, then we can wait here for the processMessages done and clean up if necessary.

go func() {
    defer close(done) // close conference after processMessages is done
    pmDone := conference.processMessages()
    <- pmDone
    // ... do some clean ups, currently not needed
}()   

I thought only because the conference done is distributed in the sinks.
And it would have the advantage that we don't have to bind done it to the conference structure.
What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your point about storing done inside the conference is valid. This could be solved if I pass done to the processMessages (so that we won't need to store it in the conference and rename it to signalDone instead of done so that it does not give a notion as if it's something that we need to listen to).

The rest I think is identical (we start go-routine for processMessages that then cleans up things), though we don't need to wait on done inside StartConference(), since that done channel is returned for the upstream jobs, so that they know when the conference is closed.

I.e. we return the done, so that the caller which sends messages to the conference knows when to stop producing new values (when to stop sending values to the conference).

That way when the sender sends a matrix message to the conference, the sender's code looks like this:

select {
    case conferenceSink <- message:
        // message sent to the conference, cool
    case <-conferenceDone:
        // conference closed, ok, so we don't send messages to it anymore
        close(conferenceSink)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. At this time we are not running in a panic because the conference message channel will never be closed. With the clean up we could first close the sinks and then close the message channel from conference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this time we are not running in a panic because the conference message channel will never be closed.

Sorry, I did not quite get it. The channel between the router and a conference gets closed by the sender (router) once the sender spots that the Conference closed the done channel. The done channel is closed by the conference when the conference ends to inform the upstream stage (router) that no more new messages are expected. When the router spots that, the router closes the message channel that it used to send messages to the conference and removes the conference from the list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant this select construct select to send to conference
could lead to a panic when this conference.peerMessages channel would be closed. Because every Sink writes to this channel.

Here in NewSink the channel conference.peerMessages is handed over. I couldn't find a place where we close conference.peerMessages.

Should we decide to close conference.peerMessages then here would be. a good place, after all sinks and processMessages have been closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, you mean a different place. Yes, this is a single leftover that is still not refactored (I mentioned it in the description), i.e. ideally that part should be rewritten as a fan-in thing where we have a single sender for each peer and then a merge function. Though we would need to use Select from the reflect since the number of peers changes over time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@daniel-abramov daniel-abramov merged commit ddabfc5 into main Feb 9, 2023
@daniel-abramov daniel-abramov deleted the optimise-the-usage-of-channels branch February 9, 2023 09:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants