Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yet Another Custom Scheduler #735

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 51 additions & 5 deletions rayon-demo/src/nbody/mod.rs
Expand Up @@ -45,6 +45,8 @@ Ported from the RiverTrail demo found at:
#[derive(Copy, Clone, PartialEq, Eq, serde::Deserialize)]
pub enum ExecutionMode {
Par,
ParSched,
ParBridge,
ParReduce,
Seq,
}
Expand Down Expand Up @@ -77,6 +79,8 @@ pub fn main(args: &[String]) {

fn run_benchmarks(mode: Option<ExecutionMode>, bodies: usize, ticks: usize) {
let run_par = mode.map(|m| m == ExecutionMode::Par).unwrap_or(true);
let run_par_schedule = mode.map(|m| m == ExecutionMode::ParSched).unwrap_or(true);
let run_par_bridge = mode.map(|m| m == ExecutionMode::ParBridge).unwrap_or(true);
let run_par_reduce = mode.map(|m| m == ExecutionMode::ParReduce).unwrap_or(true);
let run_seq = mode.map(|m| m == ExecutionMode::Seq).unwrap_or(true);

Expand All @@ -90,7 +94,41 @@ fn run_benchmarks(mode: Option<ExecutionMode>, bodies: usize, ticks: usize) {
}

let par_time = par_start.elapsed().as_nanos();
println!("Parallel time : {} ns", par_time);
println!("Parallel time\t: {} ns", par_time);

Some(par_time)
} else {
None
};

let par_schedule_time = if run_par_schedule {
let mut rng = crate::seeded_rng();
let mut benchmark = NBodyBenchmark::new(bodies, &mut rng);
let par_start = Instant::now();

for _ in 0..ticks {
benchmark.tick_par_schedule();
}

let par_time = par_start.elapsed().as_nanos();
println!("ParSched time\t: {} ns", par_time);

Some(par_time)
} else {
None
};

let par_bridge_time = if run_par_bridge {
let mut rng = crate::seeded_rng();
let mut benchmark = NBodyBenchmark::new(bodies, &mut rng);
let par_start = Instant::now();

for _ in 0..ticks {
benchmark.tick_par_bridge();
}

let par_time = par_start.elapsed().as_nanos();
println!("ParBridge time\t: {} ns", par_time);

Some(par_time)
} else {
Expand All @@ -107,7 +145,7 @@ fn run_benchmarks(mode: Option<ExecutionMode>, bodies: usize, ticks: usize) {
}

let par_time = par_start.elapsed().as_nanos();
println!("ParReduce time : {} ns", par_time);
println!("ParReduce time\t: {} ns", par_time);

Some(par_time)
} else {
Expand All @@ -124,18 +162,26 @@ fn run_benchmarks(mode: Option<ExecutionMode>, bodies: usize, ticks: usize) {
}

let seq_time = seq_start.elapsed().as_nanos();
println!("Sequential time : {} ns", seq_time);
println!("Sequential time\t: {} ns", seq_time);

Some(seq_time)
} else {
None
};

if let (Some(pt), Some(st)) = (par_time, seq_time) {
println!("Parallel speedup : {}", (st as f32) / (pt as f32));
println!("Parallel speedup\t: {}", (st as f32) / (pt as f32));
}

if let (Some(pt), Some(st)) = (par_schedule_time, seq_time) {
println!("ParSched speedup\t: {}", (st as f32) / (pt as f32));
}

if let (Some(pt), Some(st)) = (par_bridge_time, seq_time) {
println!("ParBridge speedup\t: {}", (st as f32) / (pt as f32));
}

if let (Some(pt), Some(st)) = (par_reduce_time, seq_time) {
println!("ParReduce speedup: {}", (st as f32) / (pt as f32));
println!("ParReduce speedup\t: {}", (st as f32) / (pt as f32));
}
}
28 changes: 27 additions & 1 deletion rayon-demo/src/nbody/nbody.rs
Expand Up @@ -116,7 +116,33 @@ impl NBodyBenchmark {
out_bodies
}

#[cfg(test)]
pub fn tick_par_schedule(&mut self) -> &[Body] {
let (in_bodies, out_bodies) = if (self.time & 1) == 0 {
(&self.bodies.0, &mut self.bodies.1)
} else {
(&self.bodies.1, &mut self.bodies.0)
};

let time = self.time;
use rayon::iter::StaticScheduler;
out_bodies
.par_iter_mut()
.zip(&in_bodies[..])
.with_scheduler(StaticScheduler::new())
.for_each(|(out, prev)| {
let (vel, vel2) = next_velocity(time, prev, in_bodies);
out.velocity = vel;
out.velocity2 = vel2;

let next_velocity = vel - vel2;
out.position = prev.position + next_velocity;
});

self.time += 1;

out_bodies
}

pub fn tick_par_bridge(&mut self) -> &[Body] {
let (in_bodies, out_bodies) = if (self.time & 1) == 0 {
(&self.bodies.0, &mut self.bodies.1)
Expand Down
2 changes: 2 additions & 0 deletions rayon-demo/src/nbody/visualize.rs
Expand Up @@ -141,6 +141,8 @@ pub fn visualize_benchmarks(num_bodies: usize, mut mode: ExecutionMode) {
Event::MainEventsCleared => {
let bodies = match mode {
ExecutionMode::Par => benchmark.tick_par(),
ExecutionMode::ParSched => benchmark.tick_par_schedule(),
ExecutionMode::ParBridge => benchmark.tick_par_bridge(),
ExecutionMode::ParReduce => benchmark.tick_par_reduce(),
ExecutionMode::Seq => benchmark.tick_seq(),
};
Expand Down
37 changes: 37 additions & 0 deletions rayon-demo/src/vec_collect.rs
Expand Up @@ -227,3 +227,40 @@ mod vec_i_filtered {

make_bench!(generate, check);
}

/// Tests a big vector of i forall i in 0 to N, with static scheduler.
mod vec_i_schedule {
use rayon::prelude::*;

const N: u32 = 4 * 1024 * 1024;

fn generate() -> impl IndexedParallelIterator<Item = u32> {
(0_u32..N)
.into_par_iter()
.with_scheduler(rayon::iter::StaticScheduler::with_chunk_size(4))
}

fn check(v: &[u32]) {
assert!(v.iter().cloned().eq(0..N));
}

#[bench]
fn with_collect_into_vec(b: &mut ::test::Bencher) {
let mut vec = None;
b.iter(|| {
let mut v = vec![];
generate().collect_into_vec(&mut v);
vec = Some(v);
});
check(&vec.unwrap());
}

#[bench]
fn with_collect_into_vec_reused(b: &mut ::test::Bencher) {
let mut vec = vec![];
b.iter(|| generate().collect_into_vec(&mut vec));
check(&vec);
}

make_bench!(generate, check);
}
14 changes: 14 additions & 0 deletions src/iter/mod.rs
Expand Up @@ -100,6 +100,10 @@ use std::ops::Fn;
mod par_bridge;
pub use self::par_bridge::{IterBridge, ParallelBridge};

pub mod scheduler;
pub use self::scheduler::{Scheduler, UnindexedScheduler, WithScheduler, WithUnindexedScheduler};
pub use self::scheduler::misc::*;

mod chain;
mod find;
mod find_first_last;
Expand Down Expand Up @@ -349,6 +353,11 @@ pub trait ParallelIterator: Sized + Send {
/// [`for_each`]: #method.for_each
type Item: Send;

/// Sets the "scheduler" that this thread-pool will use.
fn with_unindexed_scheduler<S>(self, scheduler: S) -> WithUnindexedScheduler<Self, S> {
WithUnindexedScheduler::new(self, scheduler)
}

/// Executes `OP` on each item produced by the iterator, in parallel.
///
/// # Examples
Expand Down Expand Up @@ -2085,6 +2094,11 @@ impl<T: ParallelIterator> IntoParallelIterator for T {
///
/// **Note:** Not implemented for `u64`, `i64`, `u128`, or `i128` ranges
pub trait IndexedParallelIterator: ParallelIterator {
/// Sets the "scheduler" that this thread-pool will use.
fn with_scheduler<S>(self, scheduler: S) -> WithScheduler<Self, S> {
WithScheduler::new(self, scheduler)
}

/// Collects the results of the iterator into the specified
/// vector. The vector is always truncated before execution
/// begins. If possible, reusing the vector across calls can lead
Expand Down
164 changes: 164 additions & 0 deletions src/iter/scheduler/misc.rs
@@ -0,0 +1,164 @@
//! This module contains useful schedulers.
use super::*;

/// Default Scheduler.
/// When used as Indexed Scheduler, Thief-splitting will be used.
/// When used as Unindexed Scheduler, tasks will be divided to minimum piece.
#[derive(Debug, Clone, Default)]
pub struct DefaultScheduler;

impl Scheduler for DefaultScheduler {
fn bridge<P, C, T>(&mut self, len: usize, producer: P, consumer: C) -> C::Result
where
P: Producer<Item = T>,
C: Consumer<T>,
{
bridge_producer_consumer(len, producer, consumer)
}
}

impl UnindexedScheduler for DefaultScheduler {
fn bridge_unindexed<P, C, T>(&mut self, producer: P, consumer: C) -> C::Result
where
P: UnindexedProducer<Item = T>,
C: UnindexedConsumer<T>,
{
bridge_unindexed(producer, consumer)
}
}

/// Dummy Sequential Scheduler.
/// No parallel is used at all.
#[derive(Debug, Clone, Default)]
pub struct SequentialScheduler;

impl Scheduler for SequentialScheduler {
fn bridge<P, C, T>(&mut self, _len: usize, producer: P, consumer: C) -> C::Result
where
P: Producer<Item = T>,
C: Consumer<T>,
{
producer.fold_with(consumer.into_folder()).complete()
}
}

impl UnindexedScheduler for SequentialScheduler {
fn bridge_unindexed<P, C, T>(&mut self, producer: P, consumer: C) -> C::Result
where
P: UnindexedProducer<Item = T>,
C: UnindexedConsumer<T>,
{
producer.fold_with(consumer.into_folder()).complete()
}
}

fn static_partition_bridge<P, C, T>(positions: &[usize], producer: P, consumer: C) -> C::Result
where
P: Producer<Item = T>,
C: Consumer<T>,
{
fn helper<P, C, T>(positions: &[usize], bias: usize, producer: P, consumer: C) -> C::Result
where
P: Producer<Item = T>,
C: Consumer<T>,
{
if consumer.full() {
consumer.into_folder().complete()
} else if positions.len() > 0 {
let mid_index = positions.len() / 2;
let position = positions[mid_index];

let (left_producer, right_producer) = producer.split_at(position - bias);
let (left_consumer, right_consumer, reducer) = consumer.split_at(position - bias);

use crate::join;
let (left_result, right_result) = join(
|| helper(&positions[0..mid_index], bias, left_producer, left_consumer),
|| {
helper(
&positions[mid_index + 1..],
position,
right_producer,
right_consumer,
)
},
);
reducer.reduce(left_result, right_result)
} else {
producer.fold_with(consumer.into_folder()).complete()
}
}
helper(positions, 0, producer, consumer)
}

/// Fixed length scheduler.
/// Every tasks assigned to a thread will contain a fixed number of items,
/// except for the last task which will possibly contain less.
/// The parameter in `with_min_len` and `with_max_len` will be ignored.
#[derive(Debug, Clone, Default)]
pub struct FixedLengthScheduler {
fixed_length: usize,
}

impl FixedLengthScheduler {
/// Create fixed length scheduler with assigned length. Length must be greater than or equal to 1.
pub fn new(fixed_length: usize) -> Self {
if fixed_length == 0 {
panic!("Length must be greater than or equal to 1.")
};
Self { fixed_length }
}
}

impl Scheduler for FixedLengthScheduler {
fn bridge<P, C, T>(&mut self, len: usize, producer: P, consumer: C) -> C::Result
where
P: Producer<Item = T>,
C: Consumer<T>,
{
let positions: Vec<_> = (0..len).step_by(self.fixed_length).skip(1).collect();
static_partition_bridge(&positions, producer, consumer)
}
}

/// Static split scheduler.
/// Given a chunk size, this scheduler will divide all items evenly based on their
/// length to create `current_num_threads()` number of tasks.
/// The length of each task should be multiple of the chunk size, except for the last task.
#[derive(Debug, Clone, Default)]
pub struct StaticScheduler {
chunk_size: usize,
}

impl StaticScheduler {
/// Create static split scheduler with default chunk size 1.
pub fn new() -> Self {
Self { chunk_size: 1 }
}
/// Create static split scheduler with assigned chunk size. Chunk size must be greater than or equal to 1.
pub fn with_chunk_size(chunk_size: usize) -> Self {
if chunk_size == 0 {
panic!("Chunk size must be greater than or equal to 1.")
};
Self {
chunk_size: chunk_size,
}
}
}

impl Scheduler for StaticScheduler {
fn bridge<P, C, T>(&mut self, len: usize, producer: P, consumer: C) -> C::Result
where
P: Producer<Item = T>,
C: Consumer<T>,
{
use crate::current_num_threads;
let num_threads = current_num_threads();
let full_chunks = len / self.chunk_size;
let positions: Vec<_> = (1..num_threads)
.map(|i| (i * full_chunks) / num_threads * self.chunk_size)
.collect();

static_partition_bridge(&positions, producer, consumer)
}
}