Skip to content

Commit

Permalink
Rewrite BiLock lock/unlock to be allocation free
Browse files Browse the repository at this point in the history
Inspired by a previous attempt to remove allocation (#606), this version
uses three tokens to synchronize access to the locked value and the waker.
These tokens are swapped between the inner struct of the lock and the
bilock halves. The tokens are initalized with the LOCK token held by the
inner struct, the WAKE token held by one bilock half, and the NULL token
held by the other bilock half.

To poll the lock, our half swaps its token with the inner token:
if we get the LOCK token we now have the lock,
if we get the NULL token we swap again and loop
if we get the WAKE token we store our waker in the inner and swap again,
if we then get the LOCK token we now have the lock, otherwise we return
Poll::Pending

To unlock the lock, our half swaps its token (which we know must be the LOCK
token) with the inner token:
if we get the NULL token, there is no contention so we return
if we get the WAKE token, we wake the waker stored in the inner

Additionally, this change makes the bilock methods require &mut self
  • Loading branch information
exrook committed Apr 3, 2021
1 parent f13f34a commit 7b5f662
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 112 deletions.
10 changes: 5 additions & 5 deletions futures-util/benches/bilock.rs
Expand Up @@ -16,7 +16,7 @@ fn contended(b: &mut Bencher) {
let mut ctx = noop_context();

b.iter(|| {
let (x, y) = BiLock::new(1);
let (mut x, mut y) = BiLock::new(1);

for _ in 0..1000 {
let x_guard = match x.poll_lock(&mut ctx) {
Expand Down Expand Up @@ -48,7 +48,7 @@ fn lock_unlock(b: &mut Bencher) {
let mut ctx = noop_context();

b.iter(|| {
let (x, y) = BiLock::new(1);
let (mut x, mut y) = BiLock::new(1);

for _ in 0..1000 {
let x_guard = match x.poll_lock(&mut ctx) {
Expand All @@ -74,7 +74,7 @@ fn concurrent(b: &mut Bencher) {
use std::thread;

b.iter(|| {
let (x, y) = BiLock::new(false);
let (mut x, mut y) = BiLock::new(false);
const ITERATION_COUNT: usize = 1000;

let a = thread::spawn(move || {
Expand All @@ -85,7 +85,7 @@ fn concurrent(b: &mut Bencher) {
*guard = false;
count += 1;
}
drop(guard);
x = guard.unlock();
}
});

Expand All @@ -97,7 +97,7 @@ fn concurrent(b: &mut Bencher) {
*guard = true;
count += 1;
}
drop(guard);
y = guard.unlock();
}
});

Expand Down
26 changes: 13 additions & 13 deletions futures-util/src/io/split.rs
Expand Up @@ -19,7 +19,7 @@ pub struct WriteHalf<T> {
}

fn lock_and_then<T, U, E, F>(
lock: &BiLock<T>,
lock: &mut BiLock<T>,
cx: &mut Context<'_>,
f: F
) -> Poll<Result<U, E>>
Expand Down Expand Up @@ -55,38 +55,38 @@ impl<T: Unpin> WriteHalf<T> {
}

impl<R: AsyncRead> AsyncRead for ReadHalf<R> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
-> Poll<io::Result<usize>>
{
lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf))
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_read(cx, buf))
}

fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>])
fn poll_read_vectored(mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>])
-> Poll<io::Result<usize>>
{
lock_and_then(&self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs))
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs))
}
}

impl<W: AsyncWrite> AsyncWrite for WriteHalf<W> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
-> Poll<io::Result<usize>>
{
lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf))
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_write(cx, buf))
}

fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>])
fn poll_write_vectored(mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>])
-> Poll<io::Result<usize>>
{
lock_and_then(&self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs))
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs))
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx))
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_flush(cx))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx))
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
lock_and_then(&mut self.handle, cx, |l, cx| l.poll_close(cx))
}
}

Expand Down

0 comments on commit 7b5f662

Please sign in to comment.