Skip to content

Commit

Permalink
fix: properly propigate internal errors (#8113)
Browse files Browse the repository at this point in the history
### Description

I believe this addresses JavaScriptBach's issue in #8088

We weren't properly propagating unrecoverable errors to `turbo`'s exit
code. So things like writing a log file failing could cause a zero exit
code even though `turbo` had a failure that left us in a bad state. This
PR now propigates internal errors up to the `run` command so it will
have a non-zero exit when these are encountered.

### Testing Instructions

Force an internal failure like making a log file owned by root

Before
<img width="965" alt="Screenshot 2024-05-08 at 11 45 19 AM"
src="https://github.com/vercel/turbo/assets/4131117/f148809a-7c25-47a3-8b92-f79e1ca27d01">
(Note exit code and the fact that run summary is displayed)

After

<img width="906" alt="Screenshot 2024-05-08 at 11 39 52 AM"
src="https://github.com/vercel/turbo/assets/4131117/d0a0d41c-eda2-4f70-bacc-10cfecc85249">

(Note the non-zero exit code)


Closes TURBO-3034
  • Loading branch information
chris-olszewski committed May 8, 2024
1 parent 3e00d0f commit 2ce4f75
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 43 deletions.
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/run/mod.rs
Expand Up @@ -14,7 +14,7 @@ pub mod watch;

use std::{collections::HashSet, io::Write, sync::Arc};

pub use cache::{CacheOutput, ConfigCache, RunCache, TaskCache};
pub use cache::{CacheOutput, ConfigCache, Error as CacheError, RunCache, TaskCache};
use chrono::{DateTime, Local};
use rayon::iter::ParallelBridge;
use tracing::debug;
Expand Down
122 changes: 80 additions & 42 deletions crates/turborepo-lib/src/task_graph/visitor.rs
Expand Up @@ -9,6 +9,7 @@ use std::{
use console::{Style, StyledObject};
use either::Either;
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use regex::Regex;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, Instrument, Span};
Expand Down Expand Up @@ -84,6 +85,8 @@ pub enum Error {
TaskHash(#[from] task_hash::Error),
#[error(transparent)]
RunSummary(#[from] summary::Error),
#[error("internal errors encountered: {0}")]
InternalErrors(String),
}

impl<'a> Visitor<'a> {
Expand Down Expand Up @@ -253,7 +256,7 @@ impl<'a> Visitor<'a> {
factory.dry_run_exec_context(info.clone(), task_cache);
let tracker = self.run_tracker.track_task(info.into_owned());
tasks.push(tokio::spawn(async move {
dry_run_exec_context.execute_dry_run(tracker).await;
dry_run_exec_context.execute_dry_run(tracker).await
}));
}
false => {
Expand Down Expand Up @@ -302,7 +305,7 @@ impl<'a> Visitor<'a> {
spaces_client,
&execution_telemetry,
)
.await;
.await
}));
}
}
Expand All @@ -311,8 +314,11 @@ impl<'a> Visitor<'a> {
// Wait for the engine task to finish and for all of our tasks to finish
engine_handle.await.expect("engine execution panicked")?;
// This will poll the futures until they are all completed
let mut internal_errors = Vec::new();
while let Some(result) = tasks.next().await {
result.unwrap_or_else(|e| panic!("task executor panicked: {e}"));
if let Err(e) = result.unwrap_or_else(|e| panic!("task executor panicked: {e}")) {
internal_errors.push(e);
}
}
drop(factory);
if let Some(handle) = ui {
Expand All @@ -326,6 +332,12 @@ impl<'a> Visitor<'a> {
}
}

if !internal_errors.is_empty() {
return Err(Error::InternalErrors(
internal_errors.into_iter().map(|e| e.to_string()).join(","),
));
}

// Write out the traced-config.json file if we have one
self.task_access.save().await;

Expand Down Expand Up @@ -560,6 +572,22 @@ enum TaskErrorCause {
Spawn { msg: String },
#[error("command {command} exited ({exit_code})")]
Exit { command: String, exit_code: i32 },
#[error("turbo has internal error processing task")]
Internal,
}

#[derive(Debug, thiserror::Error)]
pub enum InternalError {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("unable to determine why task exited")]
UnknownChildExit,
#[error("unable to find package manager binary: {0}")]
Which(#[from] which::Error),
#[error("external process killed a task")]
ExternalKill,
#[error("error writing logs: {0}")]
Logs(#[from] crate::run::CacheError),
}

impl TaskError {
Expand Down Expand Up @@ -698,13 +726,13 @@ struct ExecContext {
enum ExecOutcome {
// All operations during execution succeeded
Success(SuccessOutcome),
// An internal error that indicates a shutdown should be performed
Internal,
// An error with the task execution
Task {
exit_code: Option<i32>,
message: String,
},
// Task didn't execute normally due to a shutdown being initiated by another task
Shutdown,
}

enum SuccessOutcome {
Expand All @@ -729,7 +757,7 @@ impl ExecContext {
callback: oneshot::Sender<Result<(), StopExecution>>,
spaces_client: Option<SpacesTaskClient>,
telemetry: &PackageTaskEventBuilder,
) {
) -> Result<(), InternalError> {
let tracker = tracker.start().await;
let span = tracing::debug_span!("execute_task", task = %self.task_id.task());
span.follows_from(parent_span_id);
Expand All @@ -740,19 +768,19 @@ impl ExecContext {

// If the task resulted in an error, do not group in order to better highlight
// the error.
let is_error = matches!(result, ExecOutcome::Task { .. });
let is_error = matches!(result, Ok(ExecOutcome::Task { .. }));
let logs = match output_client.finish(is_error) {
Ok(logs) => logs,
Err(e) => {
telemetry.track_error(TrackedErrors::DaemonFailedToMarkOutputsAsCached);
error!("unable to flush output client: {e}");
result = ExecOutcome::Internal;
result = Err(InternalError::Io(e));
None
}
};

match result {
ExecOutcome::Success(outcome) => {
Ok(ExecOutcome::Success(outcome)) => {
let task_summary = match outcome {
SuccessOutcome::CacheHit => tracker.cached().await,
SuccessOutcome::Run => tracker.build_succeeded(0).await,
Expand All @@ -764,12 +792,7 @@ impl ExecContext {
client.finish_task(info).await.ok();
}
}
ExecOutcome::Internal => {
tracker.cancel();
callback.send(Err(StopExecution)).ok();
self.manager.stop().await;
}
ExecOutcome::Task { exit_code, message } => {
Ok(ExecOutcome::Task { exit_code, message }) => {
let task_summary = tracker.build_failed(exit_code, message).await;
callback
.send(match self.continue_on_error {
Expand Down Expand Up @@ -800,7 +823,22 @@ impl ExecContext {
}
}
}
Ok(ExecOutcome::Shutdown) => {
tracker.cancel();
callback.send(Err(StopExecution)).ok();
// Probably overkill here, but we should make sure the process manager is
// stopped if we think we're shutting down.
self.manager.stop().await;
}
Err(e) => {
tracker.cancel();
callback.send(Err(StopExecution)).ok();
self.manager.stop().await;
return Err(e);
}
}

Ok(())
}

fn prefixed_ui<'a, W: Write>(
Expand All @@ -823,7 +861,7 @@ impl ExecContext {
&mut self,
output_client: &TaskOutput<impl std::io::Write>,
telemetry: &PackageTaskEventBuilder,
) -> ExecOutcome {
) -> Result<ExecOutcome, InternalError> {
let task_start = Instant::now();
let mut prefixed_ui = self.prefixed_ui(output_client);

Expand All @@ -846,7 +884,7 @@ impl ExecContext {
);
self.hash_tracker
.insert_cache_status(self.task_id.clone(), status);
return ExecOutcome::Success(SuccessOutcome::CacheHit);
return Ok(ExecOutcome::Success(SuccessOutcome::CacheHit));
}
Ok(None) => (),
Err(e) => {
Expand All @@ -855,9 +893,7 @@ impl ExecContext {
}
}

let Ok(package_manager_binary) = which(self.package_manager.command()) else {
return ExecOutcome::Internal;
};
let package_manager_binary = which(self.package_manager.command())?;

let mut cmd = Command::new(package_manager_binary);
let mut args = vec!["run".to_string(), self.task_id.task().to_string()];
Expand Down Expand Up @@ -899,14 +935,14 @@ impl ExecContext {
.lock()
.expect("lock poisoned")
.push(TaskError::from_spawn(self.task_id_for_display.clone(), e));
return ExecOutcome::Task {
return Ok(ExecOutcome::Task {
exit_code: None,
message: error_string,
};
});
}
// Turbo is shutting down
None => {
return ExecOutcome::Internal;
return Ok(ExecOutcome::Shutdown);
}
};

Expand All @@ -918,29 +954,27 @@ impl ExecContext {
}
}

let mut stdout_writer = match self.task_cache.output_writer(prefixed_ui.task_writer()) {
Ok(w) => w,
Err(e) => {
let mut stdout_writer = self
.task_cache
.output_writer(prefixed_ui.task_writer())
.map_err(|e| {
telemetry.track_error(TrackedErrors::FailedToCaptureOutputs);
error!("failed to capture outputs for \"{}\": {e}", self.task_id);
return ExecOutcome::Internal;
}
};
e
})?;

let exit_status = match process.wait_with_piped_outputs(&mut stdout_writer).await {
Ok(Some(exit_status)) => exit_status,
Err(e) => {
telemetry.track_error(TrackedErrors::FailedToPipeOutputs);
error!("unable to pipe outputs from command: {e}");
return ExecOutcome::Internal;
return Err(e.into());
}
Ok(None) => {
// TODO: how can this happen? we only update the
// exit status with Some and it is only initialized with
// None. Is it still running?
telemetry.track_error(TrackedErrors::UnknownChildExit);
error!("unable to determine why child exited");
return ExecOutcome::Internal;
return Err(InternalError::UnknownChildExit);
}
};
let task_duration = task_start.elapsed();
Expand All @@ -957,6 +991,7 @@ impl ExecContext {
{
if let Err(e) = self.task_cache.save_outputs(task_duration, telemetry).await {
error!("error caching output: {e}");
return Err(e.into());
} else {
// If no errors, update hash tracker with expanded outputs
self.hash_tracker.insert_expanded_outputs(
Expand All @@ -967,7 +1002,7 @@ impl ExecContext {
}

// Return success outcome
ExecOutcome::Success(SuccessOutcome::Run)
Ok(ExecOutcome::Success(SuccessOutcome::Run))
}
ChildExit::Finished(Some(code)) => {
// If there was an error, flush the buffered output
Expand All @@ -988,16 +1023,18 @@ impl ExecContext {
task_id: self.task_id_for_display.clone(),
cause: error,
});
ExecOutcome::Task {
Ok(ExecOutcome::Task {
exit_code: Some(code),
message,
}
})
}
// All of these indicate a failure where we don't know how to recover
ChildExit::Finished(None)
| ChildExit::Killed
| ChildExit::KilledExternal
| ChildExit::Failed => ExecOutcome::Internal,
// The child exited in a way where we can't figure out how it finished so we assume it
// failed.
ChildExit::Finished(None) | ChildExit::Failed => Err(InternalError::UnknownChildExit),
// Something else killed the child
ChildExit::KilledExternal => Err(InternalError::ExternalKill),
// The child was killed by turbo indicating a shutdown
ChildExit::Killed => Ok(ExecOutcome::Shutdown),
}
}

Expand Down Expand Up @@ -1029,13 +1066,14 @@ struct DryRunExecContext {
}

impl DryRunExecContext {
pub async fn execute_dry_run(&self, tracker: TaskTracker<()>) {
pub async fn execute_dry_run(&self, tracker: TaskTracker<()>) -> Result<(), InternalError> {
// may also need to do framework & command stuff?
if let Ok(Some(status)) = self.task_cache.exists().await {
self.hash_tracker
.insert_cache_status(self.task_id.clone(), status);
}
tracker.dry_run().await;
Ok(())
}
}

Expand Down

0 comments on commit 2ce4f75

Please sign in to comment.