Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a single queue for message threads instead of round-robin #93

Open
vincentfretin opened this issue Jul 26, 2021 · 3 comments
Open

Comments

@vincentfretin
Copy link
Contributor

I finished reading the Rust book! While reading the book I read again the code in janus-plugin-sfu and in janus-plugin-rs where you handle all the Rust - C/C++ conversion and incr/decr refcount for session/jansson/libcstring, I understand it all now, wow, beautiful :)

The last chapter Final Project: Building a Multithreaded Web Server is really interesting.
In the janus-plugin-sfu current implementation for message threads, we use a simple round-robin and a sender/receiver channel for each thread, relevant code here

janus-plugin-sfu/src/lib.rs

Lines 274 to 282 in ef815ec

thread::Builder::new()
.name(format!("sfu msg {}", i))
.spawn(move || {
for msg in messages_rx.iter() {
if let Err(e) = handle_message_async(msg) {
janus_err!("Error processing message: {}", e);
}
}
})

and here

janus-plugin-sfu/src/lib.rs

Lines 775 to 778 in ef815ec

let message_count = MESSAGE_COUNTER.fetch_add(1, Ordering::Relaxed);
let senders = MESSAGE_SENDERS.get().unwrap();
let sender = &senders[message_count % senders.len()];
sender.send(msg).ok();

While reading the chapter and the current janus-plugin-sfu code, I reached the same conclusion as
you wrote in #49 (comment)
If you're using the websocket transport in naf-janus-adapter you can indeed have lots of quick messages for process_data. Hubs didn't use this transport, so janus handled sdp offer/answer primarily.

In the Rust book chapter they use a single sender/receiver, and use an Arc<Mutex<receiver>> to really have a single queue of messages, full code at the end of the chapter

I may propose a PR to change the implementation to that, it will be a good exercise.

@vincentfretin
Copy link
Contributor Author

I'm thinking out loud, I hope you don't mind :) I may not do anything about this, but I write all this to share what I learned.
Without proper load testing with simulated messages based on a real use case, it's hard to know if the change proposed here may be better or not. I think it would be cool to add some code to save all messages into a file to be able to replay it x20 to do some load testing.

Also I looked at the code of janus-conference, they went full async with the async_std crate. In the handle_message code, instead of sending the message to one of the thread like we do, they call async_std::task::spawn.
https://github.com/foxford/janus-conference/blob/87e459091b2042462dc5fde5d8242c797a1a4319/src/lib.rs#L118-L167
Their usage of async codes makes sense in their case because they can send an upload stream operation that would block a thread completely during the upload of the mjr dumps they saved with janus recorders if they didn't use async.

I looked at the code of async_std and the dependencies, behind the scene it creates num_cpu::get() threads and schedule the task on one thread. Each thread is using a queue. Once a thread doesn't have work anymore, they can steal some tasks from another thread's queue.

In the end, the difference is between the channel type that is used to communicate with the threads, if it's a sync channel or not, if there is a mutex involved in the channel receiver in the case we use a single channel for all threads, and if there is a queue for each thread.

@vincentfretin
Copy link
Contributor Author

Interesting article about a bottleneck with Arc: https://pkolaczk.github.io/server-slower-than-a-laptop/

Alternative to async_std is tokio that seems popular these days.

If we want better perf (this needs to be measured) with little change of the code, we can maybe change the sync channel used by the threads, switch from std::sync::mpsc to crossbeam::channel. I found that while reading https://tokio.rs/tokio/tutorial/channels
janus-conference is currently using crossbeam_channel::unbounded() for their recorder thread.

@vincentfretin
Copy link
Contributor Author

Another thing I noted some months ago in my todo list when I read the janus-conference code is they're using an alternate hash function for all the HashMap instances used in their switchboard, so replacing std::collections::HashMap by fnv::FnvHashMap (see fnv crate). From the graph you can probably gain 14ns for a RoomId or UserId lookup? I have no idea what real impact this optimization can have, but you probably needs several hundreds of users on the janus instance to be able to measure that and find out. I currently don't have that much users. :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant