Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/daemon fixups #4831

Merged
merged 8 commits into from May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 14 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions cli/internal/daemon/connector/connector.go
Expand Up @@ -128,6 +128,7 @@ const (
_maxAttempts = 10
_shutdownTimeout = 1 * time.Second
_socketPollTimeout = 1 * time.Second
_notReadyTimeout = 3 * time.Millisecond
)

// killLiveServer tells a running server to shut down. This method is also responsible
Expand Down Expand Up @@ -254,8 +255,11 @@ func (c *Connector) connectInternal(ctx context.Context) (*Client, error) {
}
// Loops back around and tries again.
} else if errors.Is(err, errUnavailable) {
// close the client, see if we can kill the stale daemon
// The rust daemon will open the socket a few ms before it's ready to accept connections.
// If we get here, we know that the socket exists, but the server isn't ready yet.
// We'll wait a few ms and try again.
c.Logger.Debug("server not ready yet")
time.Sleep(_notReadyTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why it takes so long to startup the Rust daemon compared to Go? I've noticed I'm still getting the whole

WARNING  failed to contact turbod. Continuing in standalone mode: connection to turbo daemon process failed. Please ensure the following

on the first run if the daemon isn't already up.

} else if err != nil {
// Some other error occurred, close the client and
// report the error to the user
Expand Down Expand Up @@ -371,7 +375,7 @@ func (c *Connector) waitForSocket() error {

// startDaemon starts the daemon and returns the pid for the new process
func (c *Connector) startDaemon() (int, error) {
args := []string{"daemon"}
args := []string{"--skip-infer", "daemon"}
if c.Opts.ServerTimeout != 0 {
args = append(args, fmt.Sprintf("--idle-time=%v", c.Opts.ServerTimeout.String()))
}
Expand Down
21 changes: 18 additions & 3 deletions crates/globwatch/src/lib.rs
Expand Up @@ -62,7 +62,7 @@ impl GlobWatcher {
std::fs::create_dir_all(&flush_dir).ok();
let flush_dir = flush_dir.canonicalize()?;

let mut watcher = notify::recommended_watcher(move |event: Result<Event, Error>| {
let watcher = notify::recommended_watcher(move |event: Result<Event, Error>| {
let span = span!(tracing::Level::TRACE, "watcher");
let _ = span.enter();

Expand All @@ -83,10 +83,25 @@ impl GlobWatcher {
}
})?;

watcher.watch(flush_dir.as_path(), notify::RecursiveMode::Recursive)?;

let watcher = Arc::new(Mutex::new(watcher));

// registering to watch this directory takes a few ms,
// so we just fire and forget a thread to do it in the
// background, to cut our startup time in half.
let flush = watcher.clone();
let path = flush_dir.as_path().to_owned();
tokio::task::spawn_blocking(move || {
if let Err(e) = flush
.lock()
.expect("only fails if poisoned")
.watch(&path, notify::RecursiveMode::Recursive)
{
warn!("failed to watch flush dir: {}", e);
} else {
trace!("watching flush dir: {:?}", path);
}
});

Ok((
Self {
flush_dir,
Expand Down
2 changes: 1 addition & 1 deletion crates/pidlock/src/lib.rs
Expand Up @@ -12,7 +12,7 @@ use log::warn;
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum PidlockError {
/// A lock already exists
#[error("lock exists at {0}")]
#[error("lock exists at \"{0}\", please remove it")]
LockExists(PathBuf),
/// An operation was attempted in the wrong state, e.g. releasing before
/// acquiring.
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/Cargo.toml
Expand Up @@ -85,6 +85,7 @@ node-semver = "2.1.0"
owo-colors.workspace = true
regex.workspace = true
tracing-appender = "0.2.2"
tracing-chrome = { version = "0.7.1", optional = true }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing.workspace = true
turbo-updater = { workspace = true }
Expand Down
22 changes: 20 additions & 2 deletions crates/turborepo-lib/src/commands/daemon.rs
@@ -1,11 +1,13 @@
use std::{path::PathBuf, time::Duration};

use pidlock::PidlockError::AlreadyOwned;
use tracing::{trace, warn};
use turbopath::{AbsoluteSystemPathBuf, RelativeSystemPathBuf};

use super::CommandBase;
use crate::{
cli::DaemonCommand,
daemon::{DaemonConnector, DaemonError},
daemon::{endpoint::SocketOpenError, CloseReason, DaemonConnector, DaemonError},
tracing::TurboSubscriber,
};

Expand Down Expand Up @@ -65,6 +67,7 @@ pub async fn daemon_client(command: &DaemonCommand, base: &CommandBase) -> Resul
Ok(())
}

#[tracing::instrument(skip(base, logging), fields(repo_root = %base.repo_root))]
pub async fn daemon_server(
base: &CommandBase,
idle_time: &String,
Expand Down Expand Up @@ -101,7 +104,22 @@ pub async fn daemon_server(
.map(|d| Duration::from_nanos(d as u64))?;

let server = crate::daemon::DaemonServer::new(base, timeout, log_file)?;
server.serve().await;
let reason = server.serve().await;

match reason {
CloseReason::SocketOpenError(SocketOpenError::LockError(AlreadyOwned)) => {
warn!("daemon already running");
}
CloseReason::SocketOpenError(e) => return Err(e.into()),
CloseReason::Interrupt
| CloseReason::ServerClosed
| CloseReason::WatcherClosed
| CloseReason::Timeout
| CloseReason::Shutdown => {
// these are all ok, just exit
trace!("shutting down daemon: {:?}", reason);
}
};

Ok(())
}
Expand Down
7 changes: 6 additions & 1 deletion crates/turborepo-lib/src/daemon/client.rs
Expand Up @@ -3,7 +3,10 @@ use tonic::{Code, Status};
use tracing::info;

use self::proto::turbod_client::TurbodClient;
use super::connector::{DaemonConnector, DaemonConnectorError};
use super::{
connector::{DaemonConnector, DaemonConnectorError},
endpoint::SocketOpenError,
};
use crate::get_version;

pub mod proto {
Expand Down Expand Up @@ -123,6 +126,8 @@ pub enum DaemonError {
/// The server was connected but is now unavailable.
#[error("server is unavailable")]
Unavailable,
#[error("error opening socket: {0}")]
SocketOpen(#[from] SocketOpenError),
/// The server is running a different version of turborepo.
#[error("version mismatch")]
VersionMismatch,
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/src/daemon/connector.rs
Expand Up @@ -148,6 +148,7 @@ impl DaemonConnector {
// this creates a new process group for the given command
// in a cross platform way, directing all output to /dev/null
let mut group = tokio::process::Command::new(binary_path)
.arg("--skip-infer")
.arg("daemon")
.stderr(Stdio::null())
.stdout(Stdio::null())
Expand Down
11 changes: 8 additions & 3 deletions crates/turborepo-lib/src/daemon/endpoint.rs
Expand Up @@ -5,14 +5,17 @@ use std::{io::ErrorKind, sync::atomic::Ordering, time::Duration};
use futures::Stream;
use tokio::io::{AsyncRead, AsyncWrite};
use tonic::transport::server::Connected;
use tracing::debug;
use tracing::{debug, trace};
use turbopath::{AbsoluteSystemPathBuf, RelativeSystemPathBuf};

#[derive(thiserror::Error, Debug)]
pub enum SocketOpenError {
/// Returned when there is an IO error opening the socket,
/// such as the path being too long, or the path being
/// invalid.
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("pidlock error")]
#[error("pidlock error: {0}")]
LockError(#[from] pidlock::PidlockError),
}

Expand All @@ -25,6 +28,7 @@ const WINDOWS_POLL_DURATION: Duration = Duration::from_millis(1);
///
/// note: the running param is used by the windows
/// code path to shut down the non-blocking polling
#[tracing::instrument]
pub async fn listen_socket(
path: AbsoluteSystemPathBuf,
#[allow(unused)] running: Arc<AtomicBool>,
Expand All @@ -39,11 +43,12 @@ pub async fn listen_socket(
let sock_path = path.join_relative(RelativeSystemPathBuf::new("turbod.sock").unwrap());
let mut lock = pidlock::Pidlock::new(pid_path.as_path().to_owned());

trace!("acquiring pidlock");
// this will fail if the pid is already owned
lock.acquire()?;
std::fs::remove_file(&sock_path).ok();

debug!("pidlock acquired as {}", pid_path);
debug!("pidlock acquired at {}", pid_path);
debug!("listening on socket at {}", sock_path);

#[cfg(unix)]
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/daemon/mod.rs
Expand Up @@ -7,7 +7,7 @@ mod server;

pub use client::{DaemonClient, DaemonError};
pub use connector::DaemonConnector;
pub use server::DaemonServer;
pub use server::{CloseReason, DaemonServer};

pub(crate) mod proto {
tonic::include_proto!("turbodprotocol");
Expand Down
2 changes: 2 additions & 0 deletions crates/turborepo-lib/src/daemon/server.rs
Expand Up @@ -71,6 +71,7 @@ pub enum CloseReason {
}

impl DaemonServer<notify::RecommendedWatcher> {
#[tracing::instrument(skip(base), fields(repo_root = %base.repo_root))]
pub fn new(
base: &CommandBase,
timeout: Duration,
Expand Down Expand Up @@ -112,6 +113,7 @@ impl<T: Watcher> Drop for DaemonServer<T> {

impl<T: Watcher + Send + 'static> DaemonServer<T> {
/// Serve the daemon server, while also watching for filesystem changes.
#[tracing::instrument(skip(self))]
pub async fn serve(mut self) -> CloseReason {
let stop = StopSource::new();
let watcher = self.watcher.clone();
Expand Down
2 changes: 2 additions & 0 deletions crates/turborepo-lib/src/globwatcher/mod.rs
Expand Up @@ -46,6 +46,7 @@ pub struct GlobSet {
}

impl HashGlobWatcher<RecommendedWatcher> {
#[tracing::instrument]
pub fn new(
relative_to: AbsoluteSystemPathBuf,
flush_folder: PathBuf,
Expand All @@ -64,6 +65,7 @@ impl HashGlobWatcher<RecommendedWatcher> {
impl<T: Watcher> HashGlobWatcher<T> {
/// Watches a given path, using the flush_folder as temporary storage to
/// make sure that file events are handled in the appropriate order.
#[tracing::instrument(skip(self, token))]
pub async fn watch(&self, token: StopToken) {
let start_globs = {
let lock = self.hash_globs.lock().expect("only fails if poisoned");
Expand Down
21 changes: 19 additions & 2 deletions crates/turborepo-lib/src/tracing.rs
Expand Up @@ -5,7 +5,7 @@ use owo_colors::{
colors::{Black, Default, Red, Yellow},
Color, OwoColorize,
};
use tracing::{field::Visit, metadata::LevelFilter, Event, Level, Subscriber};
use tracing::{field::Visit, metadata::LevelFilter, trace, Event, Level, Subscriber};
use tracing_appender::{
non_blocking::{NonBlocking, WorkerGuard},
rolling::RollingFileAppender,
Expand Down Expand Up @@ -46,6 +46,9 @@ pub struct TurboSubscriber {
/// The non-blocking file logger only continues to log while this guard is
/// held. We keep it here so that it doesn't get dropped.
guard: Mutex<Option<WorkerGuard>>,

#[cfg(feature = "tracing-chrome")]
chrome_guard: tracing_chrome::FlushGuard,
}

impl TurboSubscriber {
Expand Down Expand Up @@ -90,19 +93,33 @@ impl TurboSubscriber {
// we set this layer to None to start with, effectively disabling it
let (logrotate, update) = reload::Layer::new(Option::<DaemonLog>::None);

Registry::default().with(stdout).with(logrotate).init();
let registry = Registry::default().with(stdout).with(logrotate);

#[cfg(feature = "tracing-chrome")]
let (registry, chrome_guard) = {
let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
.file("./tracing.json")
.build();
(registry.with(chrome_layer), guard)
};

registry.init();

Self {
update,
guard: Mutex::new(None),
#[cfg(feature = "tracing-chrome")]
chrome_guard,
}
}

/// Enables daemon logging with the specified rotation settings.
///
/// Daemon logging uses the standard tracing formatter.
#[tracing::instrument(skip(self))]
pub fn set_daemon_logger(&self, appender: RollingFileAppender) -> Result<(), Error> {
let (file_writer, guard) = tracing_appender::non_blocking(appender);
trace!("created non-blocking file writer");

let layer = tracing_subscriber::fmt::layer().with_writer(file_writer);

Expand Down
1 change: 1 addition & 0 deletions crates/turborepo/Cargo.toml
Expand Up @@ -13,6 +13,7 @@ default = ["rustls-tls"]
native-tls = ["turborepo-lib/native-tls"]
rustls-tls = ["turborepo-lib/rustls-tls"]
http = ["turborepo-lib/http"]
tracing-chrome = ["turborepo-lib/tracing-chrome"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[build-dependencies]
Expand Down
1 change: 1 addition & 0 deletions turborepo-tests/helpers/setup.sh
Expand Up @@ -5,3 +5,4 @@ ROOT_DIR="${THIS_DIR}/../.."

TURBO=${ROOT_DIR}/target/debug/turbo
VERSION=${ROOT_DIR}/version.txt
TMPDIR=$(mktemp -d)
13 changes: 13 additions & 0 deletions turborepo-tests/integration/tests/daemon.t
@@ -0,0 +1,13 @@
Setup
$ . ${TESTDIR}/../../helpers/setup.sh

The daemon exits when there is a stale pid file
$ ${TURBO} daemon & sleep 1 && kill $! && ${TURBO} daemon
WARN stale pid file at ".+" (re)
ERROR error opening socket: pidlock error: lock exists at ".+", please remove it (re)
/bin/bash: line 4: .+ (re)
[1]

A message is printed when the daemon is running already
$ rm -r ${TMPDIR}/turbod; ${TURBO} daemon & (export PID=$!; sleep 1 && ${TURBO} daemon && kill $PID && kill $PID && kill $PID)
WARN daemon already running