Skip to content

Commit

Permalink
Rewrite BiLock lock/unlock to be allocation free
Browse files Browse the repository at this point in the history
Inspired by a previous attempt to remove allocation (#606), this version
uses three tokens to synchronize access to the locked value and the waker.
These tokens are swapped between the inner struct of the lock and the
bilock halves. The tokens are initalized with the LOCK token held by the
inner struct, the WAKE token held by one bilock half, and the NULL token
held by the other bilock half.

To poll the lock, our half swaps its token with the inner token:
if we get the LOCK token we now have the lock,
if we get the NULL token we swap again and loop
if we get the WAKE token we store our waker in the inner and swap again,
if we then get the LOCK token we now have the lock, otherwise we return
Poll::Pending

To unlock the lock, our half swaps its token (which we know must be the LOCK
token) with the inner token:
if we get the NULL token, there is no contention so we return
if we get the WAKE token, we wake the waker stored in the inner

Additionally, this change makes the bilock methods require &mut self
  • Loading branch information
exrook committed Apr 10, 2021
1 parent 2be999d commit 37c66f6
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 110 deletions.
10 changes: 5 additions & 5 deletions futures-util/benches/bilock.rs
Expand Up @@ -16,7 +16,7 @@ mod bench {
let mut ctx = noop_context();

b.iter(|| {
let (x, y) = BiLock::new(1);
let (mut x, mut y) = BiLock::new(1);

for _ in 0..1000 {
let x_guard = match x.poll_lock(&mut ctx) {
Expand Down Expand Up @@ -48,7 +48,7 @@ mod bench {
let mut ctx = noop_context();

b.iter(|| {
let (x, y) = BiLock::new(1);
let (mut x, mut y) = BiLock::new(1);

for _ in 0..1000 {
let x_guard = match x.poll_lock(&mut ctx) {
Expand All @@ -74,7 +74,7 @@ mod bench {
use std::thread;

b.iter(|| {
let (x, y) = BiLock::new(false);
let (mut x, mut y) = BiLock::new(false);
const ITERATION_COUNT: usize = 1000;

let a = thread::spawn(move || {
Expand All @@ -85,7 +85,7 @@ mod bench {
*guard = false;
count += 1;
}
drop(guard);
x = guard.unlock();
}
});

Expand All @@ -97,7 +97,7 @@ mod bench {
*guard = true;
count += 1;
}
drop(guard);
y = guard.unlock();
}
});

Expand Down
26 changes: 13 additions & 13 deletions futures-util/src/io/split.rs
Expand Up @@ -18,7 +18,7 @@ pub struct WriteHalf<T> {
handle: BiLock<T>,
}

fn lock_and_then<T, U, E, F>(lock: &BiLock<T>, cx: &mut Context<'_>, f: F) -> Poll<Result<U, E>>
fn lock_and_then<T, U, E, F>(lock: &mut BiLock<T>, cx: &mut Context<'_>, f: F) -> Poll<Result<U, E>>
where
F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<U, E>>,
{
Expand Down Expand Up @@ -53,45 +53,45 @@ impl<T: Unpin> WriteHalf<T> {

impl<R: AsyncRead> AsyncRead for ReadHalf<R> {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf))
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_read(cx, buf))
}

fn poll_read_vectored(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs))
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs))
}
}

impl<W: AsyncWrite> AsyncWrite for WriteHalf<W> {
fn poll_write(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf))
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_write(cx, buf))
}

fn poll_write_vectored(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs))
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs))
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx))
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_flush(cx))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx))
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_close(cx))
}
}

Expand Down

0 comments on commit 37c66f6

Please sign in to comment.