Skip to content

Commit

Permalink
fix(Turborepo): Make package discovery async, and apply a debouncer (#…
Browse files Browse the repository at this point in the history
…8058)

### Description

 - move the debouncer to its own module so that it is reusable
 - apply the debouncer to package discovery
- make package discovery async so it doesn't block the file watching
channel

### Testing Instructions

Existing test suite.

Fixes #3455

Closes TURBO-2909

---------

Co-authored-by: Greg Soltis <Greg Soltis>
Co-authored-by: Alexander Lyon <arlyon@me.com>
  • Loading branch information
gsoltis and arlyon committed Apr 30, 2024
1 parent b70ba36 commit 3db7af3
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 194 deletions.
127 changes: 127 additions & 0 deletions crates/turborepo-filewatch/src/debouncer.rs
@@ -0,0 +1,127 @@
use std::{fmt::Debug, sync::Mutex, time::Duration};

use tokio::{select, sync, time::Instant};
use tracing::trace;

pub(crate) struct Debouncer {
bump: sync::Notify,
serial: Mutex<Option<usize>>,
timeout: Duration,
}

const DEFAULT_DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(10);

impl Default for Debouncer {
fn default() -> Self {
Self::new(DEFAULT_DEBOUNCE_TIMEOUT)
}
}

impl Debug for Debouncer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let serial = { self.serial.lock().expect("lock is valid") };
f.debug_struct("Debouncer")
.field("is_pending", &serial.is_some())
.field("timeout", &self.timeout)
.finish()
}
}

impl Debouncer {
pub(crate) fn new(timeout: Duration) -> Self {
let bump = sync::Notify::new();
let serial = Mutex::new(Some(0));
Self {
bump,
serial,
timeout,
}
}

pub(crate) fn bump(&self) -> bool {
let mut serial = self.serial.lock().expect("lock is valid");
match *serial {
None => false,
Some(previous) => {
*serial = Some(previous + 1);
self.bump.notify_one();
true
}
}
}

pub(crate) async fn debounce(&self) {
let mut serial = {
self.serial
.lock()
.expect("lock is valid")
.expect("only this thread sets the value to None")
};
let mut deadline = Instant::now() + self.timeout;
loop {
let timeout = tokio::time::sleep_until(deadline);
select! {
_ = self.bump.notified() => {
trace!("debouncer notified");
// reset timeout
let current_serial = self.serial.lock().expect("lock is valid").expect("only this thread sets the value to None");
if current_serial == serial {
// we timed out between the serial update and the notification.
// ignore this notification, we've already bumped the timer
continue;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
_ = timeout => {
// check if serial is still valid. It's possible a bump came in before the timeout,
// but we haven't been notified yet.
let mut current_serial_opt = self.serial.lock().expect("lock is valid");
let current_serial = current_serial_opt.expect("only this thread sets the value to None");
if current_serial == serial {
// if the serial is what we last observed, and the timer expired, we timed out.
// we're done. Mark that we won't accept any more bumps and return
*current_serial_opt = None;
return;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use std::{
sync::Arc,
time::{Duration, Instant},
};

use crate::debouncer::Debouncer;

#[tokio::test]
async fn test_debouncer() {
let debouncer = Arc::new(Debouncer::new(Duration::from_millis(10)));
let debouncer_copy = debouncer.clone();
let handle = tokio::task::spawn(async move {
debouncer_copy.debounce().await;
});
for _ in 0..10 {
// assert that we can continue bumping it past the original timeout
tokio::time::sleep(Duration::from_millis(2)).await;
assert!(debouncer.bump());
}
let start = Instant::now();
handle.await.unwrap();
let end = Instant::now();
// give some wiggle room to account for race conditions, but assert that we
// didn't immediately complete after the last bump
assert!(end - start > Duration::from_millis(5));
// we shouldn't be able to bump it after it's run out it's timeout
assert!(!debouncer.bump());
}
}
124 changes: 11 additions & 113 deletions crates/turborepo-filewatch/src/hash_watcher.rs
Expand Up @@ -2,7 +2,7 @@ use std::{
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
Arc,
},
time::Duration,
};
Expand All @@ -12,15 +12,17 @@ use radix_trie::{Trie, TrieCommon};
use thiserror::Error;
use tokio::{
select,
sync::{self, broadcast, mpsc, oneshot, watch},
time::Instant,
sync::{broadcast, mpsc, oneshot, watch},
};
use tracing::{debug, trace};
use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf};
use turborepo_repository::discovery::DiscoveryResponse;
use turborepo_scm::{package_deps::GitHashes, Error as SCMError, SCM};

use crate::{globwatcher::GlobSet, package_watcher::DiscoveryData, NotifyError, OptionalWatch};
use crate::{
debouncer::Debouncer, globwatcher::GlobSet, package_watcher::DiscoveryData, NotifyError,
OptionalWatch,
};

pub struct HashWatcher {
_exit_tx: oneshot::Sender<()>,
Expand Down Expand Up @@ -125,92 +127,11 @@ enum Query {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
struct Version(usize);

struct HashDebouncer {
bump: sync::Notify,
serial: Mutex<Option<usize>>,
timeout: Duration,
}

const DEFAULT_DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(10);

impl Default for HashDebouncer {
fn default() -> Self {
Self::new(DEFAULT_DEBOUNCE_TIMEOUT)
}
}

impl HashDebouncer {
fn new(timeout: Duration) -> Self {
let bump = sync::Notify::new();
let serial = Mutex::new(Some(0));
Self {
bump,
serial,
timeout,
}
}

fn bump(&self) -> bool {
let mut serial = self.serial.lock().expect("lock is valid");
match *serial {
None => false,
Some(previous) => {
*serial = Some(previous + 1);
self.bump.notify_one();
true
}
}
}

async fn debounce(&self) {
let mut serial = {
self.serial
.lock()
.expect("lock is valid")
.expect("only this thread sets the value to None")
};
let mut deadline = Instant::now() + self.timeout;
loop {
let timeout = tokio::time::sleep_until(deadline);
select! {
_ = self.bump.notified() => {
trace!("debouncer notified");
// reset timeout
let current_serial = self.serial.lock().expect("lock is valid").expect("only this thread sets the value to None");
if current_serial == serial {
// we timed out between the serial update and the notification.
// ignore this notification, we've already bumped the timer
continue;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
_ = timeout => {
// check if serial is still valid. It's possible a bump came in before the timeout,
// but we haven't been notified yet.
let mut current_serial_opt = self.serial.lock().expect("lock is valid");
let current_serial = current_serial_opt.expect("only this thread sets the value to None");
if current_serial == serial {
// if the serial is what we last observed, and the timer expired, we timed out.
// we're done. Mark that we won't accept any more bumps and return
*current_serial_opt = None;
return;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
}
}
}
}

enum HashState {
Hashes(GitHashes),
Pending(
Version,
Arc<HashDebouncer>,
Arc<Debouncer>,
Vec<oneshot::Sender<Result<GitHashes, Error>>>,
),
Unavailable(String),
Expand Down Expand Up @@ -545,16 +466,16 @@ impl Subscriber {
spec: &HashSpec,
hash_update_tx: &mpsc::Sender<HashUpdate>,
immediate: bool,
) -> (Version, Arc<HashDebouncer>) {
) -> (Version, Arc<Debouncer>) {
let version = Version(self.next_version.fetch_add(1, Ordering::SeqCst));
let tx = hash_update_tx.clone();
let spec = spec.clone();
let repo_root = self.repo_root.clone();
let scm = self.scm.clone();
let debouncer = if immediate {
HashDebouncer::new(Duration::from_millis(0))
Debouncer::new(Duration::from_millis(0))
} else {
HashDebouncer::default()
Debouncer::default()
};
let debouncer = Arc::new(debouncer);
let debouncer_copy = debouncer.clone();
Expand Down Expand Up @@ -702,7 +623,6 @@ impl Subscriber {
mod tests {
use std::{
assert_matches::assert_matches,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -717,7 +637,7 @@ mod tests {
use crate::{
cookies::CookieWriter,
globwatcher::GlobSet,
hash_watcher::{HashDebouncer, HashSpec, HashWatcher},
hash_watcher::{HashSpec, HashWatcher},
package_watcher::PackageWatcher,
FileSystemWatcher,
};
Expand Down Expand Up @@ -1114,28 +1034,6 @@ mod tests {
assert!(result.is_empty());
}

#[tokio::test]
async fn test_debouncer() {
let debouncer = Arc::new(HashDebouncer::new(Duration::from_millis(10)));
let debouncer_copy = debouncer.clone();
let handle = tokio::task::spawn(async move {
debouncer_copy.debounce().await;
});
for _ in 0..10 {
// assert that we can continue bumping it past the original timeout
tokio::time::sleep(Duration::from_millis(2)).await;
assert!(debouncer.bump());
}
let start = Instant::now();
handle.await.unwrap();
let end = Instant::now();
// give some wiggle room to account for race conditions, but assert that we
// didn't immediately complete after the last bump
assert!(end - start > Duration::from_millis(5));
// we shouldn't be able to bump it after it's run out it's timeout
assert!(!debouncer.bump());
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_basic_file_changes_with_inputs() {
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-filewatch/src/lib.rs
Expand Up @@ -35,6 +35,7 @@ use {
};

pub mod cookies;
mod debouncer;
#[cfg(target_os = "macos")]
mod fsevent;
pub mod globwatcher;
Expand Down

0 comments on commit 3db7af3

Please sign in to comment.