Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing, core: fix deadlock in register_callsite #2020

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tracing-core/Cargo.toml
Expand Up @@ -8,7 +8,7 @@ name = "tracing-core"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.23"
version = "0.1.24"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
Expand Down
71 changes: 67 additions & 4 deletions tracing-core/src/callsite.rs
Expand Up @@ -3,12 +3,12 @@
use crate::stdlib::{
fmt,
hash::{Hash, Hasher},
sync::Mutex,
vec::Vec,
};
use crate::{
dispatcher::{self, Dispatch},
metadata::{LevelFilter, Metadata},
mutex::Mutex,
subscriber::Interest,
};

Expand Down Expand Up @@ -125,7 +125,7 @@ pub struct Identifier(
/// [`Interest::sometimes()`]: ../subscriber/struct.Interest.html#method.sometimes
/// [`Subscriber`]: ../subscriber/trait.Subscriber.html
pub fn rebuild_interest_cache() {
let mut registry = REGISTRY.lock().unwrap();
let mut registry = REGISTRY.lock();
registry.rebuild_interest();
}

Expand All @@ -134,13 +134,76 @@ pub fn rebuild_interest_cache() {
/// This should be called once per callsite after the callsite has been
/// constructed.
pub fn register(callsite: &'static dyn Callsite) {
let mut registry = REGISTRY.lock().unwrap();
let mut registry = REGISTRY.lock();
registry.rebuild_callsite_interest(callsite);
registry.callsites.push(callsite);
}

/// Attempts to register a new `Callsite` with the global registry, if the
/// *current* thread is not already registering a callsite.
///
/// This returns `true` if the callsite was successfully registered. Otherwise,
/// if it returns `false`, the callsite registry was already locked by the
/// current thread, and the registration will need to be attempted again.
///
/// This function may need to be called multiple times per callsite before it
/// returns `true`; once it returns `true`, this function will not need to be
/// called again.
#[cfg(feature = "std")]
pub fn try_register(callsite: &'static dyn Callsite) -> bool {
use std::cell::Cell;
std::thread_local! {
static IS_REGISTERING: Cell<bool> = Cell::new(false);
}

IS_REGISTERING
.try_with(|cell| {
if cell.replace(true) {
// this thread is already registering a callsite, bail!
return false;
}

// okay, we can register the callsite.
register(callsite);
cell.set(false);
true
})
.unwrap_or(false)
}

/// Attempts to register a new [`Callsite`] with the global registry, if the
/// global registry can be accessed.
///
/// This returns `true` if the callsite was successfully registered, or `false` if
/// the registry was busy.
///
/// This function may need to be called multiple times per callsite before it
/// returns `true`; once it returns `true`, this function will not need to be
/// called again.
#[cfg(not(feature = "std"))]
pub fn try_register(callsite: &'static dyn Callsite) -> bool {
use core::sync::atomic::{AtomicBool, Ordering};
static IS_REGISTERING: AtomicBool = AtomicBool::new(false);
// If a callsite is currently being registered, bail. This avoids potential
// deadlocks due to a recursive `register_callsite` call.
if IS_REGISTERING
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return false;
}

// Otherwise, try to register the callsite. This will acquire the mutex, but
// that's okay --- if the cached interests are being re-evaluated, we can
// happily wait for that to complete without deadlocking.
register(callsite);
// Reset the flag once the callsite is registered.
IS_REGISTERING.store(false, Ordering::Release);
true
}

pub(crate) fn register_dispatch(dispatch: &Dispatch) {
let mut registry = REGISTRY.lock().unwrap();
let mut registry = REGISTRY.lock();
registry.dispatchers.push(dispatch.registrar());
registry.rebuild_interest();
}
Expand Down
1 change: 1 addition & 0 deletions tracing-core/src/lib.rs
Expand Up @@ -282,6 +282,7 @@ pub mod dispatcher;
pub mod event;
pub mod field;
pub mod metadata;
pub(crate) mod mutex;
mod parent;
pub mod span;
pub(crate) mod stdlib;
Expand Down
26 changes: 26 additions & 0 deletions tracing-core/src/mutex.rs
@@ -0,0 +1,26 @@
#[cfg(feature = "std")]
pub(crate) use self::imp::*;
#[cfg(feature = "std")]
mod imp {
use std::sync::Mutex as StdMutex;
pub(crate) use std::sync::MutexGuard;

#[derive(Debug)]
pub(crate) struct Mutex<T>(StdMutex<T>);

impl<T> Mutex<T> {
pub(crate) fn new(data: T) -> Self {
Self(StdMutex::new(data))
}

pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
match self.0.lock() {
Ok(guard) => guard,
Err(poison) => poison.into_inner(),
}
}
}
}

#[cfg(not(feature = "std"))]
pub(crate) use crate::spin::{Mutex, MutexGuard};
23 changes: 0 additions & 23 deletions tracing-core/src/stdlib.rs
Expand Up @@ -49,30 +49,7 @@ mod no_std {
}

pub(crate) mod sync {
pub(crate) use crate::spin::MutexGuard;
pub(crate) use alloc::sync::*;
pub(crate) use core::sync::*;

/// This wraps `spin::Mutex` to return a `Result`, so that it can be
/// used with code written against `std::sync::Mutex`.
///
/// Since `spin::Mutex` doesn't support poisoning, the `Result` returned
/// by `lock` will always be `Ok`.
#[derive(Debug, Default)]
pub(crate) struct Mutex<T> {
inner: crate::spin::Mutex<T>,
}

impl<T> Mutex<T> {
pub(crate) fn new(data: T) -> Self {
Self {
inner: crate::spin::Mutex::new(data),
}
}

pub(crate) fn lock(&self) -> Result<MutexGuard<'_, T>, ()> {
Ok(self.inner.lock())
}
}
}
}
2 changes: 1 addition & 1 deletion tracing/Cargo.toml
Expand Up @@ -28,7 +28,7 @@ edition = "2018"
rust-version = "1.49.0"

[dependencies]
tracing-core = { path = "../tracing-core", version = "0.1.22", default-features = false }
tracing-core = { path = "../tracing-core", version = "0.1.24", default-features = false }
log = { version = "0.4", optional = true }
tracing-attributes = { path = "../tracing-attributes", version = "0.1.20", optional = true }
cfg-if = "1.0.0"
Expand Down
73 changes: 60 additions & 13 deletions tracing/src/lib.rs
Expand Up @@ -971,11 +971,10 @@ pub mod __macro_support {
pub use crate::callsite::Callsite;
use crate::stdlib::{
fmt,
sync::atomic::{AtomicUsize, Ordering},
sync::atomic::{AtomicU8, Ordering},
};
use crate::{subscriber::Interest, Metadata};
pub use core::concat;
use tracing_core::Once;

/// Callsite implementation used by macro-generated code.
///
Expand All @@ -986,11 +985,19 @@ pub mod __macro_support {
/// Breaking changes to this module may occur in small-numbered versions
/// without warning.
pub struct MacroCallsite {
interest: AtomicUsize,
interest: AtomicU8,
registration: AtomicU8,
meta: &'static Metadata<'static>,
registration: Once,
}

const INTEREST_NEVER: u8 = 0;
const INTEREST_SOMETIMES: u8 = 1;
const INTEREST_ALWAYS: u8 = 2;

const UNREGISTERED: u8 = 0;
const REGISTERING: u8 = 1;
const REGISTERED: u8 = 2;

impl MacroCallsite {
/// Returns a new `MacroCallsite` with the specified `Metadata`.
///
Expand All @@ -1002,9 +1009,9 @@ pub mod __macro_support {
/// without warning.
pub const fn new(meta: &'static Metadata<'static>) -> Self {
Self {
interest: AtomicUsize::new(0xDEADFACED),
interest: AtomicU8::new(255),
registration: AtomicU8::new(UNREGISTERED),
meta,
registration: Once::new(),
}
}

Expand All @@ -1022,11 +1029,51 @@ pub mod __macro_support {
// This only happens once (or if the cached interest value was corrupted).
#[cold]
pub fn register(&'static self) -> Interest {
self.registration
.call_once(|| crate::callsite::register(self));
loop {
// Attempt to advance the registration state to `REGISTERING`...
match self.registration.compare_exchange(
UNREGISTERED,
REGISTERING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Okay, we advanced the state, try to register the callsite.
if crate::callsite::try_register(self) {
// We successfully registered the callsite, advance
// its state to `REGISTERED` so we don't try to
// register again.
self.registration.store(REGISTERED, Ordering::Release);
break;
} else {
// We are already inside of a `register_callsite`
// call (or something weird is going on). Don't
// register the callsite yet, bail out.
self.registration.store(UNREGISTERED, Ordering::Release);
// Returning `Interest::never()` here means that we
// will skip the callsite *this time*. This is
// necessary to ensure subscribers never see
// unregistered callsites.
return Interest::never();
}
}
// Great, the callsite is already registered! Just load its
// previous cached interest.
Err(REGISTERED) => break,
// Someone else is registering...
Err(_state) => {
debug_assert_eq!(_state, REGISTERING, "weird callsite registration state");
// XXX(eliza): it would be nicer if this waited for the
// registry mutex to be released, but there isn't really
// a nice way for `tracing_core` to expose it without
// leaking a ton of impl details...
core::hint::spin_loop();
}
}
}
match self.interest.load(Ordering::Relaxed) {
0 => Interest::never(),
2 => Interest::always(),
INTEREST_NEVER => Interest::never(),
INTEREST_ALWAYS => Interest::always(),
_ => Interest::sometimes(),
}
}
Expand All @@ -1043,9 +1090,9 @@ pub mod __macro_support {
#[inline]
pub fn interest(&'static self) -> Interest {
match self.interest.load(Ordering::Relaxed) {
0 => Interest::never(),
1 => Interest::sometimes(),
2 => Interest::always(),
INTEREST_NEVER => Interest::never(),
INTEREST_SOMETIMES => Interest::sometimes(),
INTEREST_ALWAYS => Interest::always(),
_ => self.register(),
}
}
Expand Down
47 changes: 47 additions & 0 deletions tracing/tests/register_callsite_deadlock.rs
@@ -0,0 +1,47 @@
use std::{sync::mpsc, thread, time::Duration};
use tracing::{
metadata::Metadata,
span,
subscriber::{self, Interest, Subscriber},
Event,
};

#[test]
fn register_callsite_doesnt_deadlock() {
pub struct EvilSubscriber;

impl Subscriber for EvilSubscriber {
fn register_callsite(&self, meta: &'static Metadata<'static>) -> Interest {
tracing::info!(?meta, "registered a callsite");
Interest::always()
}

fn enabled(&self, _: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, _: &span::Attributes<'_>) -> span::Id {
span::Id::from_u64(1)
}
fn record(&self, _: &span::Id, _: &span::Record<'_>) {}
fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}
fn event(&self, _: &Event<'_>) {}
fn enter(&self, _: &span::Id) {}
fn exit(&self, _: &span::Id) {}
}

subscriber::set_global_default(EvilSubscriber).unwrap();

// spawn a thread, and assert it doesn't hang...
let (tx, didnt_hang) = mpsc::channel();
let th = thread::spawn(move || {
tracing::info!("hello world!");
tx.send(()).unwrap();
});

didnt_hang
// Note: 60 seconds is *way* more than enough, but let's be generous in
// case of e.g. slow CI machines.
.recv_timeout(Duration::from_secs(60))
.expect("the thread must not have hung!");
th.join().expect("thread should join successfully");
}