Skip to content

Commit

Permalink
feat(turborepo): Using file hashing for package watching (#8104)
Browse files Browse the repository at this point in the history
### Description

We leverage the file hash watcher to make watch mode more stable.
Basically, once we get a list of changed packages, we query the file
hash watcher to see if there are file hashes available. If so, we check
if we've seen these file hashes before. If they are, then we don't
re-run.

This does have some caveats. For one, file hashes are not exactly
correct for whether a task should be re-run. We really should be
checking task hashes, but that will require more daemon infrastructure.
Also files are most likely the only changes a user will make with watch
mode (will they edit env vars? Probably not), so this is good enough.

Also, this will only work on the *second* run, since we need to get the
file hash and store it. Maybe even the third if hashing takes too long
to get.

### Testing Instructions

I tested on the next.js repo. You can confirm the behavior by looking at
the daemon logs and seeing that we don't get a re-run when running
`touch foo` multiple times, since the file has the same content.

Closes TURBO-3010
  • Loading branch information
NicholasLYang committed May 7, 2024
1 parent 7059c03 commit d9d1b3a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 8 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion crates/turborepo-lib/src/daemon/server.rs
Expand Up @@ -136,7 +136,11 @@ impl FileWatching {
scm,
));

let package_changes_watcher = Arc::new(PackageChangesWatcher::new(repo_root, recv.clone()));
let package_changes_watcher = Arc::new(PackageChangesWatcher::new(
repo_root,
recv.clone(),
hash_watcher.clone(),
));

Ok(FileWatching {
watcher,
Expand Down
66 changes: 59 additions & 7 deletions crates/turborepo-lib/src/package_changes_watcher.rs
@@ -1,16 +1,25 @@
use std::{cell::RefCell, collections::HashSet, ops::DerefMut};
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
ops::DerefMut,
sync::Arc,
};

use ignore::gitignore::Gitignore;
use notify::Event;
use radix_trie::{Trie, TrieCommon};
use tokio::sync::{broadcast, oneshot, Mutex};
use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf};
use turborepo_filewatch::{NotifyError, OptionalWatch};
use turborepo_filewatch::{
hash_watcher::{HashSpec, HashWatcher, InputGlobs},
NotifyError, OptionalWatch,
};
use turborepo_repository::{
change_mapper::{ChangeMapper, GlobalDepsPackageChangeMapper, PackageChanges},
package_graph::{PackageGraph, PackageGraphBuilder, PackageName, WorkspacePackage},
package_json::PackageJson,
};
use turborepo_scm::package_deps::GitHashes;

use crate::turbo_json::TurboJson;

Expand All @@ -35,11 +44,17 @@ impl PackageChangesWatcher {
pub fn new(
repo_root: AbsoluteSystemPathBuf,
file_events_lazy: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
hash_watcher: Arc<HashWatcher>,
) -> Self {
let (exit_tx, exit_rx) = oneshot::channel();
let (package_change_events_tx, package_change_events_rx) =
broadcast::channel(CHANGE_EVENT_CHANNEL_CAPACITY);
let subscriber = Subscriber::new(repo_root, file_events_lazy, package_change_events_tx);
let subscriber = Subscriber::new(
repo_root,
file_events_lazy,
package_change_events_tx,
hash_watcher,
);

let _handle = tokio::spawn(subscriber.watch(exit_rx));
Self {
Expand Down Expand Up @@ -80,6 +95,7 @@ struct Subscriber {
changed_files: Mutex<RefCell<ChangedFiles>>,
repo_root: AbsoluteSystemPathBuf,
package_change_events_tx: broadcast::Sender<PackageChangeEvent>,
hash_watcher: Arc<HashWatcher>,
}

// This is a workaround because `ignore` doesn't match against a path's
Expand Down Expand Up @@ -127,12 +143,14 @@ impl Subscriber {
repo_root: AbsoluteSystemPathBuf,
file_events_lazy: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
package_change_events_tx: broadcast::Sender<PackageChangeEvent>,
hash_watcher: Arc<HashWatcher>,
) -> Self {
Subscriber {
repo_root,
file_events_lazy,
changed_files: Default::default(),
package_change_events_tx,
hash_watcher,
}
}

Expand Down Expand Up @@ -172,6 +190,35 @@ impl Subscriber {
))
}

async fn is_same_hash(
&self,
pkg: &WorkspacePackage,
package_file_hashes: &mut HashMap<AnchoredSystemPathBuf, GitHashes>,
) -> bool {
let Ok(hash) = self
.hash_watcher
.get_file_hashes(HashSpec {
package_path: pkg.path.clone(),
// TODO: Support inputs
inputs: InputGlobs::Default,
})
.await
else {
return false;
};

let old_hash = package_file_hashes.get(&pkg.path).cloned();

if Some(&hash) != old_hash.as_ref() {
package_file_hashes.insert(pkg.path.clone(), hash);
return false;
}

tracing::warn!("hashes are the same, no need to rerun");

true
}

async fn watch(mut self, exit_rx: oneshot::Receiver<()>) {
let Ok(mut file_events) = self.file_events_lazy.get().await.map(|r| r.resubscribe()) else {
// if we get here, it means that file watching has not started, so we should
Expand Down Expand Up @@ -222,6 +269,9 @@ impl Subscriber {
else {
return;
};
// We store the hash of the package's files. If the hash is already
// in here, we don't need to recompute it
let mut package_file_hashes = HashMap::new();

let mut change_mapper = match repo_state.get_change_mapper() {
Some(change_mapper) => change_mapper,
Expand Down Expand Up @@ -335,11 +385,13 @@ impl Subscriber {
}

for pkg in filtered_pkgs {
let _ =
self.package_change_events_tx
.send(PackageChangeEvent::Package {
if !self.is_same_hash(&pkg, &mut package_file_hashes).await {
let _ = self.package_change_events_tx.send(
PackageChangeEvent::Package {
name: pkg.name.clone(),
});
},
);
}
}
}
Err(err) => {
Expand Down

0 comments on commit d9d1b3a

Please sign in to comment.