Skip to content

Commit

Permalink
Fix/daemon fixups (vercel#4831)
Browse files Browse the repository at this point in the history
Co-authored-by: Greg Soltis <greg.soltis@vercel.com>
Co-authored-by: Greg Soltis <Greg Soltis>
  • Loading branch information
2 people authored and NicholasLYang committed May 9, 2023
1 parent 426a2a4 commit e238c2a
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 15 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions cli/internal/daemon/connector/connector.go
Expand Up @@ -128,6 +128,7 @@ const (
_maxAttempts = 10
_shutdownTimeout = 1 * time.Second
_socketPollTimeout = 1 * time.Second
_notReadyTimeout = 3 * time.Millisecond
)

// killLiveServer tells a running server to shut down. This method is also responsible
Expand Down Expand Up @@ -254,8 +255,11 @@ func (c *Connector) connectInternal(ctx context.Context) (*Client, error) {
}
// Loops back around and tries again.
} else if errors.Is(err, errUnavailable) {
// close the client, see if we can kill the stale daemon
// The rust daemon will open the socket a few ms before it's ready to accept connections.
// If we get here, we know that the socket exists, but the server isn't ready yet.
// We'll wait a few ms and try again.
c.Logger.Debug("server not ready yet")
time.Sleep(_notReadyTimeout)
} else if err != nil {
// Some other error occurred, close the client and
// report the error to the user
Expand Down Expand Up @@ -371,7 +375,7 @@ func (c *Connector) waitForSocket() error {

// startDaemon starts the daemon and returns the pid for the new process
func (c *Connector) startDaemon() (int, error) {
args := []string{"daemon"}
args := []string{"--skip-infer", "daemon"}
if c.Opts.ServerTimeout != 0 {
args = append(args, fmt.Sprintf("--idle-time=%v", c.Opts.ServerTimeout.String()))
}
Expand Down
21 changes: 18 additions & 3 deletions crates/turborepo-globwatch/src/lib.rs
Expand Up @@ -62,7 +62,7 @@ impl GlobWatcher {
std::fs::create_dir_all(&flush_dir).ok();
let flush_dir = flush_dir.canonicalize()?;

let mut watcher = notify::recommended_watcher(move |event: Result<Event, Error>| {
let watcher = notify::recommended_watcher(move |event: Result<Event, Error>| {
let span = span!(tracing::Level::TRACE, "watcher");
let _ = span.enter();

Expand All @@ -83,10 +83,25 @@ impl GlobWatcher {
}
})?;

watcher.watch(flush_dir.as_path(), notify::RecursiveMode::Recursive)?;

let watcher = Arc::new(Mutex::new(watcher));

// registering to watch this directory takes a few ms,
// so we just fire and forget a thread to do it in the
// background, to cut our startup time in half.
let flush = watcher.clone();
let path = flush_dir.as_path().to_owned();
tokio::task::spawn_blocking(move || {
if let Err(e) = flush
.lock()
.expect("only fails if poisoned")
.watch(&path, notify::RecursiveMode::Recursive)
{
warn!("failed to watch flush dir: {}", e);
} else {
trace!("watching flush dir: {:?}", path);
}
});

Ok((
Self {
flush_dir,
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/Cargo.toml
Expand Up @@ -85,6 +85,7 @@ node-semver = "2.1.0"
owo-colors.workspace = true
regex.workspace = true
tracing-appender = "0.2.2"
tracing-chrome = { version = "0.7.1", optional = true }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing.workspace = true
turbo-updater = { workspace = true }
Expand Down
22 changes: 20 additions & 2 deletions crates/turborepo-lib/src/commands/daemon.rs
@@ -1,11 +1,13 @@
use std::{path::PathBuf, time::Duration};

use pidlock::PidlockError::AlreadyOwned;
use tracing::{trace, warn};
use turbopath::{AbsoluteSystemPathBuf, RelativeSystemPathBuf};

use super::CommandBase;
use crate::{
cli::DaemonCommand,
daemon::{DaemonConnector, DaemonError},
daemon::{endpoint::SocketOpenError, CloseReason, DaemonConnector, DaemonError},
tracing::TurboSubscriber,
};

Expand Down Expand Up @@ -65,6 +67,7 @@ pub async fn daemon_client(command: &DaemonCommand, base: &CommandBase) -> Resul
Ok(())
}

#[tracing::instrument(skip(base, logging), fields(repo_root = %base.repo_root))]
pub async fn daemon_server(
base: &CommandBase,
idle_time: &String,
Expand Down Expand Up @@ -101,7 +104,22 @@ pub async fn daemon_server(
.map(|d| Duration::from_nanos(d as u64))?;

let server = crate::daemon::DaemonServer::new(base, timeout, log_file)?;
server.serve().await;
let reason = server.serve().await;

match reason {
CloseReason::SocketOpenError(SocketOpenError::LockError(AlreadyOwned)) => {
warn!("daemon already running");
}
CloseReason::SocketOpenError(e) => return Err(e.into()),
CloseReason::Interrupt
| CloseReason::ServerClosed
| CloseReason::WatcherClosed
| CloseReason::Timeout
| CloseReason::Shutdown => {
// these are all ok, just exit
trace!("shutting down daemon: {:?}", reason);
}
};

Ok(())
}
Expand Down
7 changes: 6 additions & 1 deletion crates/turborepo-lib/src/daemon/client.rs
Expand Up @@ -3,7 +3,10 @@ use tonic::{Code, Status};
use tracing::info;

use self::proto::turbod_client::TurbodClient;
use super::connector::{DaemonConnector, DaemonConnectorError};
use super::{
connector::{DaemonConnector, DaemonConnectorError},
endpoint::SocketOpenError,
};
use crate::get_version;

pub mod proto {
Expand Down Expand Up @@ -123,6 +126,8 @@ pub enum DaemonError {
/// The server was connected but is now unavailable.
#[error("server is unavailable")]
Unavailable,
#[error("error opening socket: {0}")]
SocketOpen(#[from] SocketOpenError),
/// The server is running a different version of turborepo.
#[error("version mismatch")]
VersionMismatch,
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/src/daemon/connector.rs
Expand Up @@ -148,6 +148,7 @@ impl DaemonConnector {
// this creates a new process group for the given command
// in a cross platform way, directing all output to /dev/null
let mut group = tokio::process::Command::new(binary_path)
.arg("--skip-infer")
.arg("daemon")
.stderr(Stdio::null())
.stdout(Stdio::null())
Expand Down
77 changes: 74 additions & 3 deletions crates/turborepo-lib/src/daemon/endpoint.rs
Expand Up @@ -5,14 +5,17 @@ use std::{io::ErrorKind, sync::atomic::Ordering, time::Duration};
use futures::Stream;
use tokio::io::{AsyncRead, AsyncWrite};
use tonic::transport::server::Connected;
use tracing::debug;
use tracing::{debug, trace};
use turbopath::{AbsoluteSystemPathBuf, RelativeSystemPathBuf};

#[derive(thiserror::Error, Debug)]
pub enum SocketOpenError {
/// Returned when there is an IO error opening the socket,
/// such as the path being too long, or the path being
/// invalid.
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("pidlock error")]
#[error("pidlock error: {0}")]
LockError(#[from] pidlock::PidlockError),
}

Expand All @@ -25,6 +28,7 @@ const WINDOWS_POLL_DURATION: Duration = Duration::from_millis(1);
///
/// note: the running param is used by the windows
/// code path to shut down the non-blocking polling
#[tracing::instrument]
pub async fn listen_socket(
path: AbsoluteSystemPathBuf,
#[allow(unused)] running: Arc<AtomicBool>,
Expand All @@ -39,11 +43,12 @@ pub async fn listen_socket(
let sock_path = path.join_relative(RelativeSystemPathBuf::new("turbod.sock").unwrap());
let mut lock = pidlock::Pidlock::new(pid_path.as_path().to_owned());

trace!("acquiring pidlock");
// this will fail if the pid is already owned
lock.acquire()?;
std::fs::remove_file(&sock_path).ok();

debug!("pidlock acquired as {}", pid_path);
debug!("pidlock acquired at {}", pid_path);
debug!("listening on socket at {}", sock_path);

#[cfg(unix)]
Expand Down Expand Up @@ -167,3 +172,69 @@ impl<T> Connected for UdsWindowsStream<T> {
type ConnectInfo = ();
fn connect_info(&self) -> Self::ConnectInfo {}
}

#[cfg(test)]
mod test {
use std::{
assert_matches::assert_matches,
path::Path,
process::Command,
sync::{atomic::AtomicBool, Arc},
};

use pidlock::PidlockError;
use turbopath::AbsoluteSystemPathBuf;

use super::listen_socket;
use crate::daemon::endpoint::SocketOpenError;

fn pid_path(tmp_path: &Path) -> AbsoluteSystemPathBuf {
AbsoluteSystemPathBuf::new(tmp_path.join("turbod.pid")).unwrap()
}

#[tokio::test]
async fn test_stale_pid() {
let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = tmp_dir.path().to_owned();
let pid_path = pid_path(&tmp_path);
// A pid that will never be running and is guaranteed not to be us
pid_path.create_with_contents("100000").unwrap();

let running = Arc::new(AtomicBool::new(true));
let result = listen_socket(pid_path, running).await;

// Note: PidLock doesn't implement Debug, so we can't unwrap_err()
if let Err(err) = result {
assert_matches!(err, SocketOpenError::LockError(PidlockError::LockExists(_)));
} else {
panic!("expected an error")
}
}

#[tokio::test]
async fn test_existing_process() {
let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = tmp_dir.path().to_owned();
let pid_path = pid_path(&tmp_path);

#[cfg(windows)]
let node_bin = "node.exe";
#[cfg(not(windows))]
let node_bin = "node";

let child = Command::new(node_bin).spawn().unwrap();
pid_path
.create_with_contents(format!("{}", child.id()).as_ref())
.unwrap();

let running = Arc::new(AtomicBool::new(true));
let result = listen_socket(pid_path, running).await;

// Note: PidLock doesn't implement Debug, so we can't unwrap_err()
if let Err(err) = result {
assert_matches!(err, SocketOpenError::LockError(PidlockError::LockExists(_)));
} else {
panic!("expected an error")
}
}
}
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/daemon/mod.rs
Expand Up @@ -7,7 +7,7 @@ mod server;

pub use client::{DaemonClient, DaemonError};
pub use connector::DaemonConnector;
pub use server::DaemonServer;
pub use server::{CloseReason, DaemonServer};

pub(crate) mod proto {
tonic::include_proto!("turbodprotocol");
Expand Down
2 changes: 2 additions & 0 deletions crates/turborepo-lib/src/daemon/server.rs
Expand Up @@ -71,6 +71,7 @@ pub enum CloseReason {
}

impl DaemonServer<notify::RecommendedWatcher> {
#[tracing::instrument(skip(base), fields(repo_root = %base.repo_root))]
pub fn new(
base: &CommandBase,
timeout: Duration,
Expand Down Expand Up @@ -112,6 +113,7 @@ impl<T: Watcher> Drop for DaemonServer<T> {

impl<T: Watcher + Send + 'static> DaemonServer<T> {
/// Serve the daemon server, while also watching for filesystem changes.
#[tracing::instrument(skip(self))]
pub async fn serve(mut self) -> CloseReason {
let stop = StopSource::new();
let watcher = self.watcher.clone();
Expand Down
2 changes: 2 additions & 0 deletions crates/turborepo-lib/src/globwatcher/mod.rs
Expand Up @@ -46,6 +46,7 @@ pub struct GlobSet {
}

impl HashGlobWatcher<RecommendedWatcher> {
#[tracing::instrument]
pub fn new(
relative_to: AbsoluteSystemPathBuf,
flush_folder: PathBuf,
Expand All @@ -64,6 +65,7 @@ impl HashGlobWatcher<RecommendedWatcher> {
impl<T: Watcher> HashGlobWatcher<T> {
/// Watches a given path, using the flush_folder as temporary storage to
/// make sure that file events are handled in the appropriate order.
#[tracing::instrument(skip(self, token))]
pub async fn watch(&self, token: StopToken) {
let start_globs = {
let lock = self.hash_globs.lock().expect("only fails if poisoned");
Expand Down
21 changes: 19 additions & 2 deletions crates/turborepo-lib/src/tracing.rs
Expand Up @@ -5,7 +5,7 @@ use owo_colors::{
colors::{Black, Default, Red, Yellow},
Color, OwoColorize,
};
use tracing::{field::Visit, metadata::LevelFilter, Event, Level, Subscriber};
use tracing::{field::Visit, metadata::LevelFilter, trace, Event, Level, Subscriber};
use tracing_appender::{
non_blocking::{NonBlocking, WorkerGuard},
rolling::RollingFileAppender,
Expand Down Expand Up @@ -46,6 +46,9 @@ pub struct TurboSubscriber {
/// The non-blocking file logger only continues to log while this guard is
/// held. We keep it here so that it doesn't get dropped.
guard: Mutex<Option<WorkerGuard>>,

#[cfg(feature = "tracing-chrome")]
chrome_guard: tracing_chrome::FlushGuard,
}

impl TurboSubscriber {
Expand Down Expand Up @@ -90,19 +93,33 @@ impl TurboSubscriber {
// we set this layer to None to start with, effectively disabling it
let (logrotate, update) = reload::Layer::new(Option::<DaemonLog>::None);

Registry::default().with(stdout).with(logrotate).init();
let registry = Registry::default().with(stdout).with(logrotate);

#[cfg(feature = "tracing-chrome")]
let (registry, chrome_guard) = {
let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
.file("./tracing.json")
.build();
(registry.with(chrome_layer), guard)
};

registry.init();

Self {
update,
guard: Mutex::new(None),
#[cfg(feature = "tracing-chrome")]
chrome_guard,
}
}

/// Enables daemon logging with the specified rotation settings.
///
/// Daemon logging uses the standard tracing formatter.
#[tracing::instrument(skip(self))]
pub fn set_daemon_logger(&self, appender: RollingFileAppender) -> Result<(), Error> {
let (file_writer, guard) = tracing_appender::non_blocking(appender);
trace!("created non-blocking file writer");

let layer = tracing_subscriber::fmt::layer().with_writer(file_writer);

Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-pidlock/src/lib.rs
Expand Up @@ -12,7 +12,7 @@ use log::warn;
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum PidlockError {
/// A lock already exists
#[error("lock exists at {0}")]
#[error("lock exists at \"{0}\", please remove it")]
LockExists(PathBuf),
/// An operation was attempted in the wrong state, e.g. releasing before
/// acquiring.
Expand Down

0 comments on commit e238c2a

Please sign in to comment.