Skip to content

Commit

Permalink
feat(trace): host-controlled tracing (#92)
Browse files Browse the repository at this point in the history
Currently, the `tracing` collector in the Allwinner D1 platform impl is
hardcoded to the DEBUG level. This is unfortunate, as it means that we
will always do the work of collecting and formatting DEBUG traces, even
when no one is listening on the UART tracing port. It would be more
efficient if we only formatted traces when a `crowtty` instance has
subscribed to the UART tracing port. Additionally, because trace
metadata is sent when the callsite is initially registered, a `crowtty`
instance that attaches after the board has started running may miss some
trace metadata. Finally, there is no way to change the trace level on
the fly, as it's hard-coded in the kernel.

This branch changes the `mnemos-trace-proto` wire protocol to include a
host to target message to select a `tracing` level. Now, the board can
start up with all tracing disabled, and `crowtty` can send the desired
tracing level once it connects. This way, we don't collect or format
traces at all when `crowtty` isn't connected. In order to ensure
`crowtty` waits to send the tracing level message only once the board's
tracing code has been initialized, we add a periodic "heartbeat" message
sent by the board on the tracing port while tracing is idle. This is
used by the host which is listening for traces to detect the presence of
the tracing collector on the debug target. The same message is used to
ack a successful request to select the tracing level.

When the tracing level is selected, the target rebuilds `tracing`'s
callsite cache, which means all metadata for callsites which have
previously been hit will be sent to the host. This ensures that
`crowtty` always has the metadata for all tracing spans and events that
are enabled, even if the target encountered them before `crowtty`
connected.

Making this work required fixing an upstream `tracing` bug,
tokio-rs/tracing#2634, where calling into code
that includes `tracing` diagnostics from inside a
`Collect::register_callsite` method would cause a deadlock.
Additionally, I had to add code to the collector for tracking whether
it's inside its own `send` method, in order to temporarily disable `bbq`
tracing. This is because the collector's use of `bbq` creates an
infinite loop when the TRACE level is enabled.

Check this out:

![image](https://github.com/tosc-rs/mnemos/assets/2796466/9a527c27-a5c4-422f-b2b3-bd0a46ba4794)
  • Loading branch information
hawkw committed Jun 24, 2023
1 parent 837887e commit be8ff93
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 77 deletions.
7 changes: 0 additions & 7 deletions .vscode/settings.json

This file was deleted.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions platforms/allwinner-d1/boards/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions platforms/allwinner-d1/boards/lichee-rv/src/main.rs
Expand Up @@ -25,8 +25,7 @@ const HEAP_SIZE: usize = 384 * 1024 * 1024;
#[used]
static AHEAP: Ram<HEAP_SIZE> = Ram::new();

static COLLECTOR: trace::SerialCollector =
trace::SerialCollector::new(trace::level_filters::LevelFilter::DEBUG);
static COLLECTOR: trace::SerialCollector = trace::SerialCollector::new();

/// A helper to initialize the kernel
fn initialize_kernel() -> Result<&'static Kernel, ()> {
Expand Down
1 change: 1 addition & 0 deletions source/kernel/Cargo.toml
Expand Up @@ -21,6 +21,7 @@ name = "kernel"
[dependencies.futures]
version = "0.3.21"
default-features = false
features = ["async-await"]

[dependencies.uuid]
version = "1.1.2"
Expand Down
193 changes: 166 additions & 27 deletions source/kernel/src/trace.rs
@@ -1,8 +1,8 @@
use crate::{comms::bbq, drivers::serial_mux};
use level_filters::LevelFilter;
use mnemos_trace_proto::TraceEvent;
use mnemos_trace_proto::{HostRequest, TraceEvent};
use mycelium_util::sync::InitOnce;
use portable_atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
use portable_atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicU8, AtomicUsize, Ordering};

pub use tracing_02::*;
use tracing_core_02::span::Current;
Expand All @@ -25,7 +25,11 @@ pub struct SerialCollector {
// TODO(eliza): Currently, this is recorded but not actually consumed...
dropped_events: AtomicUsize,

max_level: LevelFilter,
max_level: AtomicU8,

/// Tracks whether we are inside of the collector's `send_event` method, so
/// that BBQueue tracing can be disabled.
in_send: AtomicBool,
}

// === impl SerialCollector ===
Expand All @@ -34,35 +38,48 @@ impl SerialCollector {
pub const PORT: u16 = 3;
const CAPACITY: usize = 1024 * 4;

pub const fn new(max_level: LevelFilter) -> Self {
pub const fn new() -> Self {
Self::with_max_level(LevelFilter::OFF)
}

pub const fn with_max_level(max_level: LevelFilter) -> Self {
Self {
tx: InitOnce::uninitialized(),
current_span: AtomicU64::new(0),
current_meta: AtomicPtr::new(core::ptr::null_mut()),
next_id: AtomicU64::new(1),
dropped_events: AtomicUsize::new(0),
max_level,
max_level: AtomicU8::new(level_to_u8(max_level)),
in_send: AtomicBool::new(false),
}
}

pub async fn start(&'static self, k: &'static crate::Kernel) {
let mut mux = serial_mux::SerialMuxClient::from_registry(k)
.await
.expect("cannot initialize serial tracing, no serial mux exists!");
let port = mux
.open_port(3, 1024)
.await
.expect("cannot initialize serial tracing, cannot open port 3!");
// acquire sermux port 3
let port = {
let mut mux = serial_mux::SerialMuxClient::from_registry(k)
.await
.expect("cannot initialize serial tracing, no serial mux exists!");
mux.open_port(3, 1024)
.await
.expect("cannot initialize serial tracing, cannot open port 3!")
};

let (tx, rx) = bbq::new_spsc_channel(k.heap(), Self::CAPACITY).await;
self.tx.init(tx);
k.spawn(Self::worker(rx, port)).await;

// set the default tracing collector
let dispatch = tracing_02::Dispatch::from_static(self);
tracing_02::dispatch::set_global_default(dispatch)
.expect("cannot set global default tracing dispatcher");

// spawn a worker to read from the channel and write to the serial port.
k.spawn(Self::worker(self, rx, port, k)).await;
}

/// Serialize a `TraceEvent`, returning `true` if the event was correctly serialized.
fn send_event<'a>(&self, sz: usize, event: impl FnOnce() -> TraceEvent<'a>) -> bool {
self.in_send.store(true, Ordering::Release);
let Some(mut wgr) = self.tx.get().send_grant_exact_sync(sz) else {
self.dropped_events.fetch_add(1, Ordering::Relaxed);
return false;
Expand All @@ -82,29 +99,118 @@ impl SerialCollector {
}
};
wgr.commit(len);
self.in_send.store(false, Ordering::Release);

// return true if we committed a non-zero number of bytes.
len > 0
}

async fn worker(rx: bbq::Consumer, port: serial_mux::PortHandle) {
loop {
let rgr = rx.read_grant().await;
async fn worker(
&'static self,
rx: bbq::Consumer,
port: serial_mux::PortHandle,
k: &'static crate::Kernel,
) {
use futures::FutureExt;
use maitake::time;
use postcard::accumulator::{CobsAccumulator, FeedResult};

// we probably won't use 256 whole bytes of cobs yet since all the host
// -> target messages are quite small
let mut cobs_buf: CobsAccumulator<16> = CobsAccumulator::new();
let mut read_level = |rgr: bbq::GrantR| {
let mut window = &rgr[..];
let len = rgr.len();
port.send(&rgr[..]).await;
'cobs: while !window.is_empty() {
window = match cobs_buf.feed_ref::<HostRequest>(window) {
FeedResult::Consumed => break 'cobs,
FeedResult::OverFull(new_wind) => new_wind,
FeedResult::DeserError(new_wind) => new_wind,
FeedResult::Success { data, remaining } => {
match data {
HostRequest::SetMaxLevel(lvl) => {
let level = lvl
.map(|lvl| lvl as u8)
.unwrap_or(level_to_u8(LevelFilter::OFF));
let prev = self.max_level.swap(level, Ordering::AcqRel);
if prev != level {
tracing_core_02::callsite::rebuild_interest_cache();
}
}
}

remaining
}
};
}
rgr.release(len);
};

loop {
'idle: loop {
let mut heartbeat = [0u8; 8];
let heartbeat = {
let level = u8_to_level(self.max_level.load(Ordering::Acquire))
.into_level()
.as_ref()
.map(AsSerde::as_serde);
postcard::to_slice_cobs(&TraceEvent::Heartbeat(level), &mut heartbeat[..])
.expect("failed to encode heartbeat msg")
};
port.send(heartbeat).await;
if let Ok(rgr) = k
.timer()
.timeout(time::Duration::from_secs(2), port.consumer().read_grant())
.await
{
read_level(rgr);

// ack the new max level
let mut ack = [0u8; 8];
let ack = {
let level = u8_to_level(self.max_level.load(Ordering::Acquire))
.into_level()
.as_ref()
.map(AsSerde::as_serde);
postcard::to_slice_cobs(&TraceEvent::Heartbeat(level), &mut ack[..])
.expect("failed to encode heartbeat msg")
};
port.send(ack).await;
break 'idle;
}
}

loop {
futures::select_biased! {
rgr = rx.read_grant().fuse() => {
let len = rgr.len();
port.send(&rgr[..]).await;
rgr.release(len)
},
rgr = port.consumer().read_grant().fuse() => {
read_level(rgr);
},
// TODO(eliza): make the host also send a heartbeat, and
// if we don't get it, break back to the idle loop...
}
}
}
}

#[inline]
fn level_enabled(&self, metadata: &Metadata<'_>) -> bool {
// TODO(eliza): more sophisticated filtering
metadata.level() <= &u8_to_level(self.max_level.load(Ordering::Relaxed))
}
}

impl Collect for SerialCollector {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
// TODO(eliza): more sophisticated filtering
metadata.level() <= &self.max_level
self.level_enabled(metadata) && !self.in_send.load(Ordering::Acquire)
}

fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core_02::Interest {
if !self.enabled(metadata) {
if !self.level_enabled(metadata) {
return tracing_core_02::Interest::never();
}

Expand All @@ -116,17 +222,28 @@ impl Collect for SerialCollector {
meta: metadata.as_serde(),
});

if sent {
tracing_core_02::Interest::always()
} else {
// if we couldn't send the metadata, skip this callsite, because the
// consumer will not be able to understand it without its metadata.
tracing_core_02::Interest::never()
// If we couldn't send the metadata, skip this callsite, because the
// consumer will not be able to understand it without its metadata.
if !sent {
return tracing_core_02::Interest::never();
}

// Due to the fact that the collector uses `bbq` internally, we must
// return `Interest::sometimes` rather than `Interest::always` for
// `bbq` callsites, so that they can be dynamically enabled/disabled
// by the `enabled` method based on whether or not we are inside the
// collector. This avoids an infinite loop that previously occurred
// when enabling the `TRACE` level.
if metadata.target().starts_with("kernel::comms::bbq") {
return tracing_core_02::Interest::sometimes();
}

// Otherwise, always enable this callsite.
tracing_core_02::Interest::always()
}

fn max_level_hint(&self) -> Option<LevelFilter> {
Some(self.max_level)
Some(u8_to_level(self.max_level.load(Ordering::Relaxed)))
}

fn new_span(&self, span: &span::Attributes<'_>) -> span::Id {
Expand Down Expand Up @@ -195,3 +312,25 @@ impl Collect for SerialCollector {
false
}
}

const fn level_to_u8(level: LevelFilter) -> u8 {
match level {
LevelFilter::TRACE => 0,
LevelFilter::DEBUG => 1,
LevelFilter::INFO => 2,
LevelFilter::WARN => 3,
LevelFilter::ERROR => 4,
LevelFilter::OFF => 5,
}
}

const fn u8_to_level(level: u8) -> LevelFilter {
match level {
0 => LevelFilter::TRACE,
1 => LevelFilter::DEBUG,
2 => LevelFilter::INFO,
3 => LevelFilter::WARN,
4 => LevelFilter::ERROR,
_ => LevelFilter::OFF,
}
}
15 changes: 14 additions & 1 deletion source/trace-proto/src/lib.rs
Expand Up @@ -2,11 +2,14 @@

use core::{fmt, num::NonZeroU64};
use tracing_serde_structured::{
SerializeId, SerializeMetadata, SerializeRecordFields, SerializeSpanFields,
SerializeId, SerializeLevel, SerializeMetadata, SerializeRecordFields, SerializeSpanFields,
};

#[derive(serde::Serialize, serde::Deserialize)]
pub enum TraceEvent<'a> {
/// Sent by the target periodically when not actively tracing, to indicate
/// liveness, or to ack a [`HostRequest::SetMaxLevel`].
Heartbeat(Option<SerializeLevel>),
RegisterMeta {
id: MetaId,

Expand Down Expand Up @@ -36,6 +39,16 @@ pub enum TraceEvent<'a> {
DropSpan(SerializeId),
}

/// Requests sent from a host to a trace target.
#[derive(serde::Serialize, serde::Deserialize)]
pub enum HostRequest {
/// Sets the maximum tracing level. Traces above this verbosity level will
/// be discarded.
///
/// This may cause the trace target to send new metadata to the host.
SetMaxLevel(Option<SerializeLevel>), // TODO(eliza): add a keepalive?
}

#[derive(Copy, Clone, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct MetaId(NonZeroU64);

Expand Down
9 changes: 8 additions & 1 deletion tools/crowtty/Cargo.toml
Expand Up @@ -21,6 +21,7 @@ features = ["derive"]

[dependencies.postcard]
version = "1"
features = ["alloc"]

[dependencies.owo-colors]
version = "3.5"
Expand All @@ -33,4 +34,10 @@ default-features = true

[dependencies.mnemos-trace-proto]
path = "../../source/trace-proto"
features = ["std"]
features = ["std"]

[dependencies.tracing-02]
package = "tracing"
git = "https://github.com/tokio-rs/tracing"
# branch = "master"
default-features = false

0 comments on commit be8ff93

Please sign in to comment.