Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(turborepo): new ui + watch mode #7962

Merged
merged 7 commits into from May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion crates/turborepo-lib/src/commands/run.rs
@@ -1,5 +1,6 @@
use std::future::Future;

use tracing::error;
use turborepo_telemetry::events::command::CommandEventBuilder;

use crate::{commands::CommandBase, run, run::builder::RunBuilder, signal::SignalHandler};
Expand Down Expand Up @@ -42,12 +43,22 @@ pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result<i3
.with_analytics_sender(analytics_sender)
.build(&handler, telemetry)
.await?;
let result = run.run().await;

let (sender, handle) = run.start_experimental_ui().unzip();

let result = run.run(sender.clone()).await;

if let Some(analytics_handle) = analytics_handle {
analytics_handle.close_with_timeout().await;
}

if let (Some(handle), Some(sender)) = (handle, sender) {
sender.stop();
if let Err(e) = handle.await.expect("render thread panicked") {
error!("error encountered rendering tui: {e}");
}
}

result
};

Expand Down
26 changes: 22 additions & 4 deletions crates/turborepo-lib/src/run/mod.rs
Expand Up @@ -17,6 +17,7 @@ use std::{collections::HashSet, io::Write, sync::Arc};
pub use cache::{CacheOutput, ConfigCache, Error as CacheError, RunCache, TaskCache};
use chrono::{DateTime, Local};
use rayon::iter::ParallelBridge;
use tokio::task::JoinHandle;
use tracing::debug;
use turbopath::AbsoluteSystemPathBuf;
use turborepo_api_client::{APIAuth, APIClient};
Expand All @@ -25,7 +26,7 @@ use turborepo_env::EnvironmentVariableMap;
use turborepo_repository::package_graph::{PackageGraph, PackageName};
use turborepo_scm::SCM;
use turborepo_telemetry::events::generic::GenericEventBuilder;
use turborepo_ui::{cprint, cprintln, BOLD_GREY, GREY, UI};
use turborepo_ui::{cprint, cprintln, tui, tui::AppSender, BOLD_GREY, GREY, UI};

pub use crate::run::error::Error;
use crate::{
Expand All @@ -45,7 +46,6 @@ use crate::{
pub struct Run {
version: &'static str,
ui: UI,
experimental_ui: bool,
start_at: DateTime<Local>,
processes: ProcessManager,
run_telemetry: GenericEventBuilder,
Expand All @@ -64,6 +64,7 @@ pub struct Run {
task_access: TaskAccess,
daemon: Option<DaemonClient<DaemonConnector>>,
should_print_prelude: bool,
experimental_ui: bool,
}

impl Run {
Expand Down Expand Up @@ -117,7 +118,23 @@ impl Run {
new_run
}

pub async fn run(&mut self) -> Result<i32, Error> {
pub fn has_experimental_ui(&self) -> bool {
self.experimental_ui
}

pub fn start_experimental_ui(&self) -> Option<(AppSender, JoinHandle<Result<(), tui::Error>>)> {
if !self.experimental_ui {
return None;
}

let task_names = self.engine.tasks_with_command(&self.pkg_dep_graph);
let (sender, receiver) = AppSender::new();
let handle = tokio::task::spawn_blocking(move || tui::run_app(task_names, receiver));

Some((sender, handle))
}

pub async fn run(&mut self, experimental_ui_sender: Option<AppSender>) -> Result<i32, Error> {
if self.should_print_prelude {
self.print_run_prelude();
}
Expand Down Expand Up @@ -240,7 +257,7 @@ impl Run {
self.processes.clone(),
&self.repo_root,
global_env,
self.experimental_ui,
experimental_ui_sender,
);

if self.opts.run_opts.dry_run.is_some() {
Expand Down Expand Up @@ -279,6 +296,7 @@ impl Run {
&self.engine,
&self.env_at_execution_start,
self.opts.scope_opts.pkg_inference_root.as_deref(),
self.experimental_ui,
)
.await?;

Expand Down
14 changes: 9 additions & 5 deletions crates/turborepo-lib/src/run/summary/mod.rs
Expand Up @@ -278,6 +278,7 @@ impl RunTracker {
engine: &'a Engine,
hash_tracker: TaskHashTracker,
env_at_execution_start: &'a EnvironmentVariableMap,
has_experimental_ui: bool,
) -> Result<(), Error> {
let end_time = Local::now();

Expand Down Expand Up @@ -305,7 +306,7 @@ impl RunTracker {
.await?;

run_summary
.finish(end_time, exit_code, pkg_dep_graph, ui)
.finish(end_time, exit_code, pkg_dep_graph, ui, has_experimental_ui)
.await
}

Expand Down Expand Up @@ -380,6 +381,7 @@ impl<'a> RunSummary<'a> {
exit_code: i32,
pkg_dep_graph: &PackageGraph,
ui: UI,
has_experimental_ui: bool,
) -> Result<(), Error> {
if matches!(self.run_type, RunType::DryJson | RunType::DryText) {
return self.close_dry_run(pkg_dep_graph, ui);
Expand All @@ -391,10 +393,12 @@ impl<'a> RunSummary<'a> {
}
}

if let Some(execution) = &self.execution {
let path = self.get_path();
let failed_tasks = self.get_failed_tasks();
execution.print(ui, path, failed_tasks);
if !has_experimental_ui {
if let Some(execution) = &self.execution {
let path = self.get_path();
let failed_tasks = self.get_failed_tasks();
execution.print(ui, path, failed_tasks);
}
}

if let Some(spaces_client_handle) = self.spaces_client_handle.take() {
Expand Down
31 changes: 26 additions & 5 deletions crates/turborepo-lib/src/run/watch.rs
Expand Up @@ -10,6 +10,7 @@ use tokio::{
};
use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;
use turborepo_ui::{tui, tui::AppSender};

use crate::{
cli::{Command, RunArgs},
Expand Down Expand Up @@ -49,6 +50,8 @@ pub struct WatchClient {
base: CommandBase,
telemetry: CommandEventBuilder,
handler: SignalHandler,
ui_sender: Option<AppSender>,
ui_handle: Option<JoinHandle<Result<(), tui::Error>>>,
}

#[derive(Debug, Error, Diagnostic)]
Expand Down Expand Up @@ -88,6 +91,8 @@ pub enum Error {
SignalInterrupt,
#[error("package change error")]
PackageChange(#[from] tonic::Status),
#[error("could not connect to UI thread")]
UISend(String),
}

impl WatchClient {
Expand All @@ -109,6 +114,8 @@ impl WatchClient {
.build(&handler, telemetry.clone())
.await?;

let (sender, handle) = run.start_experimental_ui().unzip();

let connector = DaemonConnector {
can_start_server: true,
can_kill_server: true,
Expand All @@ -122,6 +129,8 @@ impl WatchClient {
handler,
telemetry,
persistent_tasks_handle: None,
ui_sender: sender,
ui_handle: handle,
})
}

Expand All @@ -131,7 +140,9 @@ impl WatchClient {

let mut events = client.package_changes().await?;

self.run.print_run_prelude();
if !self.run.has_experimental_ui() {
self.run.print_run_prelude();
}

let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?;

Expand Down Expand Up @@ -254,7 +265,7 @@ impl WatchClient {
.build(&signal_handler, telemetry)
.await?;

Ok(run.run().await?)
Ok(run.run(self.ui_sender.clone()).await?)
}
ChangedPackages::All => {
let mut args = self.base.args().clone();
Expand Down Expand Up @@ -286,23 +297,33 @@ impl WatchClient {
.build(&self.handler, self.telemetry.clone())
.await?;

if let Some(sender) = &self.ui_sender {
let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph);
sender
.update_tasks(task_names)
.map_err(|err| Error::UISend(err.to_string()))?;
}

if self.run.has_persistent_tasks() {
// Abort old run
if let Some(run) = self.persistent_tasks_handle.take() {
run.abort();
}

let mut persistent_run = self.run.create_run_for_persistent_tasks();
let ui_sender = self.ui_sender.clone();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
self.persistent_tasks_handle =
Some(tokio::spawn(async move { persistent_run.run().await }));
Some(tokio::spawn(
async move { persistent_run.run(ui_sender).await },
));

// But we still run the regular tasks blocking
let mut non_persistent_run = self.run.create_run_without_persistent_tasks();
Ok(non_persistent_run.run().await?)
Ok(non_persistent_run.run(self.ui_sender.clone()).await?)
} else {
Ok(self.run.run().await?)
Ok(self.run.run(self.ui_sender.clone()).await?)
}
}
}
Expand Down
38 changes: 11 additions & 27 deletions crates/turborepo-lib/src/task_graph/visitor.rs
Expand Up @@ -24,7 +24,7 @@ use turborepo_telemetry::events::{
generic::GenericEventBuilder, task::PackageTaskEventBuilder, EventBuilder, TrackedErrors,
};
use turborepo_ui::{
tui::{self, TuiTask},
tui::{self, AppSender, TuiTask},
ColorSelector, OutputClient, OutputSink, OutputWriter, PrefixedUI, UI,
};
use which::which;
Expand Down Expand Up @@ -63,7 +63,7 @@ pub struct Visitor<'a> {
sink: OutputSink<StdWriter>,
task_hasher: TaskHasher<'a>,
ui: UI,
experimental_ui: bool,
experimental_ui_sender: Option<AppSender>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<'a> Visitor<'a> {
manager: ProcessManager,
repo_root: &'a AbsoluteSystemPath,
global_env: EnvironmentVariableMap,
experimental_ui: bool,
experimental_ui_sender: Option<AppSender>,
) -> Self {
let task_hasher = TaskHasher::new(
package_inputs_hashes,
Expand All @@ -135,7 +135,7 @@ impl<'a> Visitor<'a> {
task_hasher,
ui,
global_env,
experimental_ui,
experimental_ui_sender,
}
}

Expand All @@ -148,16 +148,6 @@ impl<'a> Visitor<'a> {
let concurrency = self.run_opts.concurrency as usize;
let (node_sender, mut node_stream) = mpsc::channel(concurrency);

let (ui, render_thread_handle) = if self.experimental_ui {
let task_names = engine.tasks_with_command(&self.package_graph);

let (handle, receiver) = tui::AppSender::new();
let app = tokio::task::spawn_blocking(move || tui::run_app(task_names, receiver));
(Some(handle), Some(app))
} else {
(None, None)
};

let engine_handle = {
let engine = engine.clone();
tokio::spawn(engine.execute(ExecutionOptions::new(false, concurrency), node_sender))
Expand Down Expand Up @@ -285,7 +275,7 @@ impl<'a> Visitor<'a> {
let vendor_behavior =
Vendor::infer().and_then(|vendor| vendor.behavior.as_ref());

let output_client = if let Some(handle) = &ui {
let output_client = if let Some(handle) = &self.experimental_ui_sender {
TaskOutput::UI(handle.task(info.to_string()))
} else {
TaskOutput::Direct(self.output_client(&info, vendor_behavior))
Expand Down Expand Up @@ -321,16 +311,6 @@ impl<'a> Visitor<'a> {
}
}
drop(factory);
if let Some(handle) = ui {
handle.stop();
if let Err(e) = render_thread_handle
.unwrap()
.await
.expect("render thread panicked")
{
error!("error encountered rendering tui: {e}");
}
}

if !internal_errors.is_empty() {
return Err(Error::InternalErrors(
Expand All @@ -351,6 +331,8 @@ impl<'a> Visitor<'a> {

/// Finishes visiting the tasks, creates the run summary, and either
/// prints, saves, or sends it to spaces.

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(
self,
packages,
Expand All @@ -366,6 +348,7 @@ impl<'a> Visitor<'a> {
engine: &Engine,
env_at_execution_start: &EnvironmentVariableMap,
pkg_inference_root: Option<&AnchoredSystemPath>,
has_experimental_ui: bool,
) -> Result<(), Error> {
let Self {
package_graph,
Expand Down Expand Up @@ -394,6 +377,7 @@ impl<'a> Visitor<'a> {
engine,
task_hasher.task_hash_tracker(),
env_at_execution_start,
has_experimental_ui,
)
.await?)
}
Expand Down Expand Up @@ -495,7 +479,7 @@ impl<'a> Visitor<'a> {
pub fn dry_run(&mut self) {
self.dry = true;
// No need to start a TUI on dry run
self.experimental_ui = false;
self.experimental_ui_sender = None;
}
}

Expand Down Expand Up @@ -665,7 +649,7 @@ impl<'a> ExecContextFactory<'a> {
ExecContext {
engine: self.engine.clone(),
ui: self.visitor.ui,
experimental_ui: self.visitor.experimental_ui,
experimental_ui: self.visitor.experimental_ui_sender.is_some(),
is_github_actions: self.visitor.run_opts.is_github_actions,
pretty_prefix: self
.visitor
Expand Down