Skip to content

Commit

Permalink
chore: vendor once_cell
Browse files Browse the repository at this point in the history
Fixes #3212.

With the release of `v1.15`, the once_cell crate is breaking our current MSRV. Vendoring it will allow us to keep using certain APIs from that crate without compromising our MSRV.
  • Loading branch information
Noah-Kennedy committed Sep 21, 2022
1 parent 7096a80 commit 4e80909
Show file tree
Hide file tree
Showing 7 changed files with 1,300 additions and 6 deletions.
8 changes: 4 additions & 4 deletions tokio/Cargo.toml
Expand Up @@ -63,7 +63,7 @@ net = [
]
process = [
"bytes",
"once_cell",
"parking_lot_core",
"libc",
"mio/os-poll",
"mio/os-ext",
Expand All @@ -77,13 +77,13 @@ process = [
"winapi/winnt",
]
# Includes basic task execution capabilities
rt = ["once_cell"]
rt = ["parking_lot_core"]
rt-multi-thread = [
"num_cpus",
"rt",
]
signal = [
"once_cell",
"parking_lot_core",
"libc",
"mio/os-poll",
"mio/net",
Expand Down Expand Up @@ -112,11 +112,11 @@ pin-project-lite = "0.2.0"

# Everything else is optional...
bytes = { version = "1.0.0", optional = true }
once_cell = { version = "1.5.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.8.4", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.12.0", optional = true }
parking_lot_core = { version = "0.9.3", optional = true }

[target.'cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))'.dependencies]
socket2 = { version = "0.4.4", optional = true, features = [ "all" ] }
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/process/unix/mod.rs
Expand Up @@ -35,9 +35,9 @@ use crate::process::SpawnedChild;
use crate::signal::unix::driver::Handle as SignalHandle;
use crate::signal::unix::{signal, Signal, SignalKind};

use crate::util::once_cell::sync::Lazy;
use mio::event::Source;
use mio::unix::SourceFd;
use once_cell::sync::Lazy;
use std::fmt;
use std::fs::File;
use std::future::Future;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/signal/registry.rs
Expand Up @@ -4,7 +4,7 @@ use crate::signal::os::{OsExtraData, OsStorage};

use crate::sync::watch;

use once_cell::sync::Lazy;
use crate::util::once_cell::sync::Lazy;
use std::ops;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/util/mod.rs
Expand Up @@ -6,6 +6,9 @@ cfg_io_driver! {
#[cfg(feature = "rt")]
pub(crate) mod atomic_cell;

#[cfg(any(feature = "process", feature = "rt", feature = "signal"))]
pub(crate) mod once_cell;

#[cfg(any(
// io driver uses `WakeList` directly
feature = "net",
Expand Down
173 changes: 173 additions & 0 deletions tokio/src/util/once_cell/imp_pl.rs
@@ -0,0 +1,173 @@
use std::{
cell::UnsafeCell,
hint,
panic::{RefUnwindSafe, UnwindSafe},
sync::atomic::{AtomicU8, Ordering},
};

pub(crate) struct OnceCell<T> {
state: AtomicU8,
value: UnsafeCell<Option<T>>,
}

const INCOMPLETE: u8 = 0x0;
const RUNNING: u8 = 0x1;
const COMPLETE: u8 = 0x2;

// Why do we need `T: Send`?
// Thread A creates a `OnceCell` and shares it with
// scoped thread B, which fills the cell, which is
// then destroyed by A. That is, destructor observes
// a sent value.
unsafe impl<T: Sync + Send> Sync for OnceCell<T> {}
unsafe impl<T: Send> Send for OnceCell<T> {}

impl<T: RefUnwindSafe + UnwindSafe> RefUnwindSafe for OnceCell<T> {}
impl<T: UnwindSafe> UnwindSafe for OnceCell<T> {}

impl<T> OnceCell<T> {
pub(crate) const fn new() -> OnceCell<T> {
OnceCell {
state: AtomicU8::new(INCOMPLETE),
value: UnsafeCell::new(None),
}
}

pub(crate) const fn with_value(value: T) -> OnceCell<T> {
OnceCell {
state: AtomicU8::new(COMPLETE),
value: UnsafeCell::new(Some(value)),
}
}

/// Safety: synchronizes with store to value via Release/Acquire.
#[inline]
pub(crate) fn is_initialized(&self) -> bool {
self.state.load(Ordering::Acquire) == COMPLETE
}

/// Safety: synchronizes with store to value via `is_initialized` or mutex
/// lock/unlock, writes value only once because of the mutex.
#[cold]
pub(crate) fn initialize<F, E>(&self, f: F) -> Result<(), E>
where
F: FnOnce() -> Result<T, E>,
{
let mut f = Some(f);
let mut res: Result<(), E> = Ok(());
let slot: *mut Option<T> = self.value.get();
initialize_inner(&self.state, &mut || {
// We are calling user-supplied function and need to be careful.
// - if it returns Err, we unlock mutex and return without touching anything
// - if it panics, we unlock mutex and propagate panic without touching anything
// - if it calls `set` or `get_or_try_init` re-entrantly, we get a deadlock on
// mutex, which is important for safety. We *could* detect this and panic,
// but that is more complicated
// - finally, if it returns Ok, we store the value and store the flag with
// `Release`, which synchronizes with `Acquire`s.
let f = unsafe { super::take_unchecked(&mut f) };
match f() {
Ok(value) => unsafe {
// Safe b/c we have a unique access and no panic may happen
// until the cell is marked as initialized.
debug_assert!((*slot).is_none());
*slot = Some(value);
true
},
Err(err) => {
res = Err(err);
false
}
}
});
res
}

/// Get the reference to the underlying value, without checking if the cell
/// is initialized.
///
/// # Safety
///
/// Caller must ensure that the cell is in initialized state, and that
/// the contents are acquired by (synchronized to) this thread.
pub(crate) unsafe fn get_unchecked(&self) -> &T {
debug_assert!(self.is_initialized());
let slot: &Option<T> = &*self.value.get();
match slot {
Some(value) => value,
// This unsafe does improve performance, see `examples/bench`.
None => {
debug_assert!(false);
hint::unreachable_unchecked()
}
}
}

/// Gets the mutable reference to the underlying value.
/// Returns `None` if the cell is empty.
pub(crate) fn get_mut(&mut self) -> Option<&mut T> {
// Safe b/c we have an exclusive access
let slot: &mut Option<T> = unsafe { &mut *self.value.get() };
slot.as_mut()
}
}

struct Guard<'a> {
state: &'a AtomicU8,
new_state: u8,
}

impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
self.state.store(self.new_state, Ordering::Release);
unsafe {
let key = self.state as *const AtomicU8 as usize;
parking_lot_core::unpark_all(key, parking_lot_core::DEFAULT_UNPARK_TOKEN);
}
}
}

// Note: this is intentionally monomorphic
#[inline(never)]
fn initialize_inner(state: &AtomicU8, init: &mut dyn FnMut() -> bool) {
loop {
let exchange =
state.compare_exchange_weak(INCOMPLETE, RUNNING, Ordering::Acquire, Ordering::Acquire);
match exchange {
Ok(_) => {
let mut guard = Guard {
state,
new_state: INCOMPLETE,
};
if init() {
guard.new_state = COMPLETE;
}
return;
}
Err(COMPLETE) => return,
Err(RUNNING) => unsafe {
let key = state as *const AtomicU8 as usize;
parking_lot_core::park(
key,
|| state.load(Ordering::Relaxed) == RUNNING,
|| (),
|_, _| (),
parking_lot_core::DEFAULT_PARK_TOKEN,
None,
);
},
Err(INCOMPLETE) => (),
Err(_) => debug_assert!(false),
}
}
}

#[test]
fn test_size() {
use std::mem::size_of;

assert_eq!(
size_of::<OnceCell<bool>>(),
1 * size_of::<bool>() + size_of::<u8>()
);
}

0 comments on commit 4e80909

Please sign in to comment.