Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for_each_init calls init several times per thread #742

Open
pchampin opened this issue Mar 31, 2020 · 4 comments
Open

for_each_init calls init several times per thread #742

pchampin opened this issue Mar 31, 2020 · 4 comments

Comments

@pchampin
Copy link

Here is my use case, I must process all elements of ts: Vec<T>, with a function f(&mut T, &mut Buffer). The Buffer type is long to allocate an initialize, but its state after executing f is guaranteed to be identical to the initial state. So my (single-threaded) loop looks like this:

let mut b = allocate_and_init_buffer();
for t in ts.iter_mut() {
    f(t, &mut b);
}

I would like to make this parallel. So I turned it to:

ts.par_iter_mut().for_each_init(
    allocate_and_init_buffer,
    |b, t| f(t, b);
});

This runs faster (thanks rayon 😄), but I noticed that allocate_and_init_buffer is called more times than the number of threads in the thread pool... I'm guessing that the threads get "reset" every once in a while, making it necessary to call the init function again?

Is there a way I can control how/when this "reset" occurs, in order to minimize the number of times allocate_and_init_buffer is called?

@cuviper
Copy link
Member

cuviper commented Mar 31, 2020

Your init will be called at the granularity of the job splits, and yes that will probably be over-estimated compared to the number of threads. That's a consequence of the "adaptive" job splitting, where we don't just assume all parts will take equal time to compute.

The high level of rayon iterators can't really predict where each job will run as a result of work stealing, and the low level rayon-core that does the work stealing doesn't know the specifics of for_each_init's code. Even if it were all in one crate, I'm not sure how we could dynamically toggle whether to use the same value or make a new one, depending on whether it was stolen. I think at the very least that would require unsafe code to get around Send/Sync requirements.

One option for you is to set a manual floor on the job granularity using with_min_len(), perhaps with a value of ts.len() / rayon::current_num_threads().

In the future we might try to have more flexibility, like the static scheduler in #735.

@pchampin
Copy link
Author

pchampin commented Apr 1, 2020

Thanks, this makes things a little clearer.

The with_min_len trick seems to work (my init function is only called RAYON_NUM_THREADS times). As I understand, the drawback is that, if one of the threads finishes sooner than the others, it won't be able to steal work from them, right?

I'll do some experiments, but since the elements in ts may indeed be quite heterogeneous in size, I guess I'll leave it to Rayon to decide when to call init, then. But at least now I understand why 😉 Thanks again.

@chaueve
Copy link

chaueve commented Sep 25, 2020

I originally made the following test to determine whether all inits were dropped before for_each_init returned. Imagine my surprise when I found out only one init was reused and even then only once. This seems to be recurring behaviour on my machine.

use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use std::thread::sleep;
use std::sync::{Mutex,Arc};
use std::time::Duration;

struct InitData {
	id: u64,
}

impl Drop for InitData {
	fn drop(&mut self) {
		println!("Dropping init {}.", self.id);
		sleep(Duration::from_millis(10+self.id));
		println!("Init {} dropped.", self.id);
	}
}

fn main() {
	let next_id = Arc::new( Mutex::new(0) );

	ThreadPoolBuilder::new().num_threads(3).build().unwrap().install(|| {
		println!("Calling for each init.");
		(0u64..10).into_par_iter().for_each_init(
			|| {
				let mut l = next_id.lock().unwrap();
				let id = *l;
				*l += 1;
				println!("Initializing init {}.", id);
				sleep(Duration::from_millis(10));
				println!("Init {} initialized.", id);
				InitData { id }
			}
			, |init_data, item| {
				println!("Processing {} with init {}.", item, init_data.id);
				sleep(Duration::from_millis(10+init_data.id+item));
				println!("Processed {} with init {}.", item, init_data.id);
			}
			);
		println!("For each init returned.");
	});
}

Is this a bug or still just suboptimal behaviour?

@cuviper
Copy link
Member

cuviper commented Sep 29, 2020

@chaueve as mentioned above, the adaptive splitting can be overly aggressive, and you may want to tune it manually. Unfortunately you can't use with_min_len on a Range<u64> because that is not an IndexedParallelIterator, but it should work with Range<u32>.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants