Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Create Scan parallel iterator #1036

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions rayon-demo/Cargo.toml
Expand Up @@ -16,6 +16,7 @@ once_cell = "1.17.1"
rand = "0.8"
rand_xorshift = "0.3"
regex = "1"
ndarray = "0.15.6"

[dependencies.serde]
version = "1.0.85"
Expand Down
2 changes: 2 additions & 0 deletions rayon-demo/src/main.rs
Expand Up @@ -33,6 +33,8 @@ mod sort;
mod str_split;
#[cfg(test)]
mod vec_collect;
#[cfg(test)]
mod scan;

#[cfg(test)]
extern crate test;
Expand Down
124 changes: 124 additions & 0 deletions rayon-demo/src/scan/mod.rs
@@ -0,0 +1,124 @@
use ndarray::{Array, Dim};
use rayon::iter::*;
use std::time::{Duration, Instant};
use std::num::Wrapping;

const SIZE: usize = 10000;

enum Procs {
Sequential,
Parallel,
}

fn scan_sequential<T, P, I>(init: I, id: T, scan_op: P) -> Vec<T>
where
T: Clone,
I: Fn() -> T,
P: FnMut(&mut T, &T) -> Option<T>,
{
let v = vec![init(); SIZE];
let scan = v.iter().scan(id, scan_op);
scan.collect()
}

fn scan_parallel<T, P, I>(init: I, id: T, scan_op: P) -> Vec<T>
where
T: Clone + Send + Sync,
I: Fn() -> T,
P: Fn(&T, &T) -> T + Sync,
{
let v = vec![init(); SIZE];
let scan = v.into_par_iter().with_min_len(SIZE / 100).scan(&scan_op, id);
scan.collect()
}

/******* Addition with artificial delay *******/

const DELAY: Duration = Duration::from_nanos(10);
fn wait() -> i32 {
let time = Instant::now();

let mut sum = 0;
while time.elapsed() < DELAY {
sum += 1;
}
sum
}

fn scan_add(procs: Procs) -> Vec<i32> {
let init = || 2;
let id = 0;

match procs {
Procs::Sequential => {
let f = |state: &mut i32, x: &i32| {
test::black_box(wait());
*state += x;
Some(*state)
};
scan_sequential(init, id, f)
}
Procs::Parallel => {
let f = |x: &i32, y: &i32| {
test::black_box(wait());
*x + *y
};
scan_parallel(init, id, f)
}
}
}

#[bench]
fn scan_add_sequential(b: &mut test::Bencher) {
b.iter(|| scan_add(Procs::Sequential));
}

#[bench]
fn scan_add_parallel(b: &mut test::Bencher) {
b.iter(|| scan_add(Procs::Parallel));
}

#[test]
fn test_scan_add() {
assert_eq!(scan_add(Procs::Sequential), scan_add(Procs::Parallel));
}

/******** Matrix multiplication with wrapping arithmetic *******/

type Matrix = Array<Wrapping<i32>, Dim<[usize; 2]>>;
fn scan_matmul(procs: Procs) -> Vec<Matrix> {
const MAT_SIZE: usize = 50;
let init = || {
Array::from_iter((0..((MAT_SIZE * MAT_SIZE) as i32)).map(|x| Wrapping(x)))
.into_shape((MAT_SIZE, MAT_SIZE))
.unwrap()
};
let id = Array::eye(MAT_SIZE);

match procs {
Procs::Sequential => {
let f = |state: &mut Matrix, x: &Matrix| {
*state = state.dot(x);
Some(state.clone())
};

scan_sequential(init, id, f)
}
Procs::Parallel => scan_parallel(init, id, |x, y| x.dot(y)),
}
}

#[bench]
fn scan_matmul_sequential(b: &mut test::Bencher) {
b.iter(|| scan_matmul(Procs::Sequential));
}

#[bench]
fn scan_matmul_parallel(b: &mut test::Bencher) {
b.iter(|| scan_matmul(Procs::Parallel));
}

#[test]
fn test_scan_matmul() {
assert_eq!(scan_matmul(Procs::Sequential), scan_matmul(Procs::Parallel));
}
45 changes: 45 additions & 0 deletions src/iter/mod.rs
Expand Up @@ -81,6 +81,7 @@

use self::plumbing::*;
use self::private::Try;
use self::scan::Scan;
pub use either::Either;
use std::cmp::{self, Ordering};
use std::iter::{Product, Sum};
Expand Down Expand Up @@ -158,6 +159,8 @@ mod while_some;
mod zip;
mod zip_eq;

mod scan;

pub use self::{
chain::Chain,
chunks::Chunks,
Expand Down Expand Up @@ -1384,6 +1387,48 @@ pub trait ParallelIterator: Sized + Send {
sum::sum(self)
}

/// Folds the items in the iterator using `scan_op`, and produces a
/// new iterator with all of the intermediate results.
///
/// Specifically, the nth element of the scan iterator will be the
/// result of reducing the first n elements of the input with `scan_op`.
///
/// # Examples
///
/// ```
/// // Iterate over a sequence of numbers `x0, ..., xN`
/// // and use scan to compute the partial sums
/// use rayon::prelude::*;
/// let partial_sums = [1, 2, 3, 4, 5]
/// .into_par_iter() // iterating over i32
/// .scan(|a, b| *a + *b, // add (&i32, &i32) -> i32
/// 0) // identity
/// .collect::<Vec<i32>>();
/// assert_eq!(partial_sums, vec![1, 3, 6, 10, 15]);
/// ```
///
/// **Note:** Unlike a sequential `scan` operation, the order in
/// which `scan_op` will be applied to produce the result is not fully
/// specified. So `scan_op` should be [associative] or else the results
/// will be non-deterministic. Also unlike sequential `scan`, there is
/// no internal state for this operation, so the operation has a
/// different signature.
///
/// The argument `identity` should be an "identity" value for
/// `scan_op`, which may be inserted into the sequence as
/// needed to create opportunities for parallel execution. So, for
/// example, if you are doing a summation, then `identity` ought
/// to represent the zero for your type.
///
/// [associative]: https://en.wikipedia.org/wiki/Associative_property
fn scan<F>(self, scan_op: F, identity: Self::Item) -> Scan<Self::Item, F>
where
F: Fn(&Self::Item, &Self::Item) -> Self::Item + Sync + Send,
<Self as ParallelIterator>::Item: Send + Sync,
{
scan::scan(self, scan_op, identity)
}

/// Multiplies all the items in the iterator.
///
/// Note that the order in items will be reduced is not specified,
Expand Down