Skip to content

Commit

Permalink
Using engine methods in persistent task re-running
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasLYang committed Apr 18, 2024
1 parent 24a0c13 commit 8931497
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 16 deletions.
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/run/builder.rs
Expand Up @@ -408,7 +408,7 @@ impl RunBuilder {
run_telemetry,
task_access,
repo_root: self.repo_root,
opts: self.opts,
opts: Arc::new(self.opts),
api_client: self.api_client,
api_auth: self.api_auth,
env_at_execution_start,
Expand Down
19 changes: 18 additions & 1 deletion crates/turborepo-lib/src/run/mod.rs
Expand Up @@ -40,6 +40,7 @@ use crate::{
turbo_json::TurboJson,
};

#[derive(Clone)]
pub struct Run {
version: &'static str,
ui: UI,
Expand All @@ -48,7 +49,7 @@ pub struct Run {
processes: ProcessManager,
run_telemetry: GenericEventBuilder,
repo_root: AbsoluteSystemPathBuf,
opts: Opts,
opts: Arc<Opts>,
api_client: APIClient,
api_auth: Option<APIAuth>,
env_at_execution_start: EnvironmentVariableMap,
Expand Down Expand Up @@ -98,6 +99,22 @@ impl Run {
}
}

pub fn create_run_for_persistent_tasks(&self) -> Self {
let mut new_run = self.clone();
let new_engine = new_run.engine.create_engine_for_persistent_tasks();
new_run.engine = Arc::new(new_engine);

new_run
}

pub fn create_run_without_persistent_tasks(&self) -> Self {
let mut new_run = self.clone();
let new_engine = new_run.engine.create_engine_without_persistent_tasks();
new_run.engine = Arc::new(new_engine);

new_run
}

pub async fn run(&self) -> Result<i32, Error> {
if self.should_print_prelude {
self.print_run_prelude();
Expand Down
35 changes: 21 additions & 14 deletions crates/turborepo-lib/src/run/watch.rs
Expand Up @@ -168,7 +168,6 @@ impl WatchClient {
let new_base =
CommandBase::new(args, base.repo_root.clone(), get_version(), base.ui);

// TODO: Add logic on when to abort vs wait
if let Some(run) = current_runs.remove(&package_name) {
run.abort();
}
Expand Down Expand Up @@ -215,23 +214,31 @@ impl WatchClient {
let ui = base.ui;
let telemetry = telemetry.clone();
let handler = handler.clone();
let run_fut = async move {
let base = CommandBase::new(args, repo_root, get_version(), ui);
let base = CommandBase::new(args, repo_root, get_version(), ui);

let run = RunBuilder::new(base)?
.hide_prelude()
.build(&handler, telemetry)
.await?;

run.run().await
};
let run = RunBuilder::new(base)?
.hide_prelude()
.build(&handler, telemetry)
.await?;
*filtered_pkgs = run.filtered_pkgs.clone();

if has_persistent_tasks {
// If we have a persistent task, we re-run as a spawned task
// since persistent tasks can be long-running
*main_run_handle = Some(tokio::spawn(run_fut));
// Abort old run
if let Some(run) = main_run_handle.take() {
run.abort();
}

let persistent_run = run.create_run_for_persistent_tasks();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
*main_run_handle =
Some(tokio::spawn(async move { persistent_run.run().await }));

// But we still run the regular tasks blocking
let non_persistent_run = run.create_run_without_persistent_tasks();
non_persistent_run.run().await?;
} else {
let _ = run_fut.await;
run.run().await?;
}
}
proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => {
Expand Down

0 comments on commit 8931497

Please sign in to comment.