From 7f9497cb37bc5fd253dcba4be336030cd8b6af31 Mon Sep 17 00:00:00 2001 From: Jasper Bekkers Date: Tue, 1 Feb 2022 17:20:01 +0100 Subject: [PATCH 1/4] Specify a name for every thread notify creates --- src/fsevent.rs | 2 +- src/inotify.rs | 4 ++-- src/kqueue.rs | 2 +- src/poll.rs | 2 +- src/windows.rs | 2 +- tests/race-with-remove-dir.rs | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/fsevent.rs b/src/fsevent.rs index 9a366943..59a9990e 100644 --- a/src/fsevent.rs +++ b/src/fsevent.rs @@ -432,7 +432,7 @@ impl FsEventWatcher { // channel to pass runloop around let (rl_tx, rl_rx) = unbounded(); - let thread_handle = thread::spawn(move || { + let thread_handle = thread::Builder::new().name("notify-rs fsevents".to_string()).spawn(move || { let stream = stream.0; unsafe { diff --git a/src/inotify.rs b/src/inotify.rs index 4fcd55af..19d73040 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -128,7 +128,7 @@ impl EventLoop { // Run the event loop. pub fn run(self) { - thread::spawn(|| self.event_loop_thread()); + thread::Builder::new().name("notify-rs inotify".to_string()).spawn(|| self.event_loop_thread()); } fn event_loop_thread(mut self) { @@ -418,7 +418,7 @@ impl EventLoop { let event_loop_tx = self.event_loop_tx.clone(); let waker = self.event_loop_waker.clone(); let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie - thread::spawn(move || { + thread::Builder::new().name("notify-rs inotify rename".to_string()).spawn(move || { thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event // An error here means the other end of the channel was closed, a thing that can diff --git a/src/kqueue.rs b/src/kqueue.rs index ba72bb87..0427e609 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -76,7 +76,7 @@ impl EventLoop { // Run the event loop. pub fn run(self) { - thread::spawn(|| self.event_loop_thread()); + thread::Builder::new().name("notify-rs kqueue".to_string()).spawn(|| self.event_loop_thread()); } fn event_loop_thread(mut self) { diff --git a/src/poll.rs b/src/poll.rs index eb3e8539..9e27a1e8 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -76,7 +76,7 @@ impl PollWatcher { let event_handler = self.event_handler.clone(); let event_handler = move |res| emit_event(&event_handler, res); - thread::spawn(move || { + thread::Builder::new().name("notify-rs poll".to_string()).spawn(move || { // In order of priority: // TODO: handle metadata events // TODO: handle renames diff --git a/src/windows.rs b/src/windows.rs index eebb2bbb..e7e7a2df 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -84,7 +84,7 @@ impl ReadDirectoryChangesServer { let (action_tx, action_rx) = unbounded(); // it is, in fact, ok to send the semaphore across threads let sem_temp = wakeup_sem as u64; - thread::spawn(move || { + thread::Builder::new().name("notify-rs windows".to_string()).spawn(move || { let wakeup_sem = sem_temp as HANDLE; let server = ReadDirectoryChangesServer { rx: action_rx, diff --git a/tests/race-with-remove-dir.rs b/tests/race-with-remove-dir.rs index a09f53ff..c45b1f38 100644 --- a/tests/race-with-remove-dir.rs +++ b/tests/race-with-remove-dir.rs @@ -10,7 +10,7 @@ fn test_race_with_remove_dir() { { let tmpdir = tmpdir.path().to_path_buf(); - thread::spawn(move || { + thread::Builder::new().name("notify-rs test-race-with-remove-dir".to_string()).spawn(move || { let mut watcher = notify::recommended_watcher(move |result| { eprintln!("received event: {:?}", result); }) From 2a2f61699e15f7bf3100b0144fe574e1150220ec Mon Sep 17 00:00:00 2001 From: Jasper Bekkers Date: Tue, 1 Feb 2022 17:20:20 +0100 Subject: [PATCH 2/4] cargo fmt --- src/fsevent.rs | 50 ++++---- src/inotify.rs | 22 ++-- src/kqueue.rs | 4 +- src/poll.rs | 218 +++++++++++++++++----------------- src/windows.rs | 26 ++-- tests/race-with-remove-dir.rs | 16 +-- 6 files changed, 177 insertions(+), 159 deletions(-) diff --git a/src/fsevent.rs b/src/fsevent.rs index 59a9990e..3989c323 100644 --- a/src/fsevent.rs +++ b/src/fsevent.rs @@ -432,30 +432,32 @@ impl FsEventWatcher { // channel to pass runloop around let (rl_tx, rl_rx) = unbounded(); - let thread_handle = thread::Builder::new().name("notify-rs fsevents".to_string()).spawn(move || { - let stream = stream.0; - - unsafe { - let cur_runloop = cf::CFRunLoopGetCurrent(); - - fs::FSEventStreamScheduleWithRunLoop( - stream, - cur_runloop, - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(stream); - - // the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop() - rl_tx - .send(CFSendWrapper(cur_runloop)) - .expect("Unable to send runloop to watcher"); - - cf::CFRunLoopRun(); - fs::FSEventStreamStop(stream); - fs::FSEventStreamInvalidate(stream); - fs::FSEventStreamRelease(stream); - } - }); + let thread_handle = thread::Builder::new() + .name("notify-rs fsevents".to_string()) + .spawn(move || { + let stream = stream.0; + + unsafe { + let cur_runloop = cf::CFRunLoopGetCurrent(); + + fs::FSEventStreamScheduleWithRunLoop( + stream, + cur_runloop, + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(stream); + + // the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop() + rl_tx + .send(CFSendWrapper(cur_runloop)) + .expect("Unable to send runloop to watcher"); + + cf::CFRunLoopRun(); + fs::FSEventStreamStop(stream); + fs::FSEventStreamInvalidate(stream); + fs::FSEventStreamRelease(stream); + } + }); // block until runloop has been sent self.runloop = Some((rl_rx.recv().unwrap().0, thread_handle)); diff --git a/src/inotify.rs b/src/inotify.rs index 19d73040..e622266f 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -128,7 +128,9 @@ impl EventLoop { // Run the event loop. pub fn run(self) { - thread::Builder::new().name("notify-rs inotify".to_string()).spawn(|| self.event_loop_thread()); + thread::Builder::new() + .name("notify-rs inotify".to_string()) + .spawn(|| self.event_loop_thread()); } fn event_loop_thread(mut self) { @@ -418,14 +420,16 @@ impl EventLoop { let event_loop_tx = self.event_loop_tx.clone(); let waker = self.event_loop_waker.clone(); let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie - thread::Builder::new().name("notify-rs inotify rename".to_string()).spawn(move || { - thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event - - // An error here means the other end of the channel was closed, a thing that can - // happen normally. - let _ = event_loop_tx.send(EventLoopMsg::RenameTimeout(cookie)); - let _ = waker.wake(); - }); + thread::Builder::new() + .name("notify-rs inotify rename".to_string()) + .spawn(move || { + thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event + + // An error here means the other end of the channel was closed, a thing that can + // happen normally. + let _ = event_loop_tx.send(EventLoopMsg::RenameTimeout(cookie)); + let _ = waker.wake(); + }); } } Err(e) => { diff --git a/src/kqueue.rs b/src/kqueue.rs index 0427e609..b16bc6fa 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -76,7 +76,9 @@ impl EventLoop { // Run the event loop. pub fn run(self) { - thread::Builder::new().name("notify-rs kqueue".to_string()).spawn(|| self.event_loop_thread()); + thread::Builder::new() + .name("notify-rs kqueue".to_string()) + .spawn(|| self.event_loop_thread()); } fn event_loop_thread(mut self) { diff --git a/src/poll.rs b/src/poll.rs index 9e27a1e8..d5df4fb2 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -76,106 +76,113 @@ impl PollWatcher { let event_handler = self.event_handler.clone(); let event_handler = move |res| emit_event(&event_handler, res); - thread::Builder::new().name("notify-rs poll".to_string()).spawn(move || { - // In order of priority: - // TODO: handle metadata events - // TODO: handle renames - // TODO: DRY it up + thread::Builder::new() + .name("notify-rs poll".to_string()) + .spawn(move || { + // In order of priority: + // TODO: handle metadata events + // TODO: handle renames + // TODO: DRY it up - loop { - if !open.load(Ordering::SeqCst) { - break; - } + loop { + if !open.load(Ordering::SeqCst) { + break; + } - if let Ok(mut watches) = watches.lock() { - let current_time = Instant::now(); + if let Ok(mut watches) = watches.lock() { + let current_time = Instant::now(); - for ( - watch, - &mut WatchData { - is_recursive, - ref mut paths, - }, - ) in watches.iter_mut() - { - match fs::metadata(watch) { - Err(e) => { - let err = Err(Error::io(e).add_path(watch.clone())); - event_handler(err); - continue; - } - Ok(metadata) => { - if !metadata.is_dir() { - let mtime = - FileTime::from_last_modification_time(&metadata).seconds(); - match paths.insert( - watch.clone(), - PathData { - mtime, - last_check: current_time, - }, - ) { - None => { - unreachable!(); - } - Some(PathData { - mtime: old_mtime, .. - }) => { - if mtime > old_mtime { - let kind = MetadataKind::WriteTime; - let meta = ModifyKind::Metadata(kind); - let kind = EventKind::Modify(meta); - let ev = Event::new(kind).add_path(watch.clone()); - event_handler(Ok(ev)); + for ( + watch, + &mut WatchData { + is_recursive, + ref mut paths, + }, + ) in watches.iter_mut() + { + match fs::metadata(watch) { + Err(e) => { + let err = Err(Error::io(e).add_path(watch.clone())); + event_handler(err); + continue; + } + Ok(metadata) => { + if !metadata.is_dir() { + let mtime = + FileTime::from_last_modification_time(&metadata) + .seconds(); + match paths.insert( + watch.clone(), + PathData { + mtime, + last_check: current_time, + }, + ) { + None => { + unreachable!(); + } + Some(PathData { + mtime: old_mtime, .. + }) => { + if mtime > old_mtime { + let kind = MetadataKind::WriteTime; + let meta = ModifyKind::Metadata(kind); + let kind = EventKind::Modify(meta); + let ev = + Event::new(kind).add_path(watch.clone()); + event_handler(Ok(ev)); + } } } - } - } else { - let depth = if is_recursive { usize::max_value() } else { 1 }; - for entry in WalkDir::new(watch) - .follow_links(true) - .max_depth(depth) - .into_iter() - .filter_map(|e| e.ok()) - { - let path = entry.path(); + } else { + let depth = + if is_recursive { usize::max_value() } else { 1 }; + for entry in WalkDir::new(watch) + .follow_links(true) + .max_depth(depth) + .into_iter() + .filter_map(|e| e.ok()) + { + let path = entry.path(); - match entry.metadata() { - Err(e) => { - let err = Error::io(e.into()) - .add_path(path.to_path_buf()); - event_handler(Err(err)); - } - Ok(m) => { - let mtime = - FileTime::from_last_modification_time(&m) - .seconds(); - match paths.insert( - path.to_path_buf(), - PathData { - mtime, - last_check: current_time, - }, - ) { - None => { - let kind = - EventKind::Create(CreateKind::Any); - let ev = Event::new(kind) - .add_path(path.to_path_buf()); - event_handler(Ok(ev)); - } - Some(PathData { - mtime: old_mtime, .. - }) => { - if mtime > old_mtime { - let kind = MetadataKind::WriteTime; - let meta = ModifyKind::Metadata(kind); - let kind = EventKind::Modify(meta); - // TODO add new mtime as attr + match entry.metadata() { + Err(e) => { + let err = Error::io(e.into()) + .add_path(path.to_path_buf()); + event_handler(Err(err)); + } + Ok(m) => { + let mtime = + FileTime::from_last_modification_time(&m) + .seconds(); + match paths.insert( + path.to_path_buf(), + PathData { + mtime, + last_check: current_time, + }, + ) { + None => { + let kind = + EventKind::Create(CreateKind::Any); let ev = Event::new(kind) .add_path(path.to_path_buf()); event_handler(Ok(ev)); } + Some(PathData { + mtime: old_mtime, .. + }) => { + if mtime > old_mtime { + let kind = MetadataKind::WriteTime; + let meta = + ModifyKind::Metadata(kind); + let kind = EventKind::Modify(meta); + // TODO add new mtime as attr + let ev = Event::new(kind) + .add_path(path.to_path_buf()); + event_handler(Ok(ev)); + } + } } } } @@ -184,27 +191,26 @@ impl PollWatcher { } } } - } - for (_, &mut WatchData { ref mut paths, .. }) in watches.iter_mut() { - let mut removed = Vec::new(); - for (path, &PathData { last_check, .. }) in paths.iter() { - if last_check < current_time { - let ev = Event::new(EventKind::Remove(RemoveKind::Any)) - .add_path(path.clone()); - event_handler(Ok(ev)); - removed.push(path.clone()); + for (_, &mut WatchData { ref mut paths, .. }) in watches.iter_mut() { + let mut removed = Vec::new(); + for (path, &PathData { last_check, .. }) in paths.iter() { + if last_check < current_time { + let ev = Event::new(EventKind::Remove(RemoveKind::Any)) + .add_path(path.clone()); + event_handler(Ok(ev)); + removed.push(path.clone()); + } + } + for path in removed { + (*paths).remove(&path); } - } - for path in removed { - (*paths).remove(&path); } } - } - thread::sleep(delay); - } - }); + thread::sleep(delay); + } + }); } fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { diff --git a/src/windows.rs b/src/windows.rs index e7e7a2df..617f03f7 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -84,18 +84,20 @@ impl ReadDirectoryChangesServer { let (action_tx, action_rx) = unbounded(); // it is, in fact, ok to send the semaphore across threads let sem_temp = wakeup_sem as u64; - thread::Builder::new().name("notify-rs windows".to_string()).spawn(move || { - let wakeup_sem = sem_temp as HANDLE; - let server = ReadDirectoryChangesServer { - rx: action_rx, - event_handler, - meta_tx, - cmd_tx, - watches: HashMap::new(), - wakeup_sem, - }; - server.run(); - }); + thread::Builder::new() + .name("notify-rs windows".to_string()) + .spawn(move || { + let wakeup_sem = sem_temp as HANDLE; + let server = ReadDirectoryChangesServer { + rx: action_rx, + event_handler, + meta_tx, + cmd_tx, + watches: HashMap::new(), + wakeup_sem, + }; + server.run(); + }); action_tx } diff --git a/tests/race-with-remove-dir.rs b/tests/race-with-remove-dir.rs index c45b1f38..98fd463f 100644 --- a/tests/race-with-remove-dir.rs +++ b/tests/race-with-remove-dir.rs @@ -10,14 +10,16 @@ fn test_race_with_remove_dir() { { let tmpdir = tmpdir.path().to_path_buf(); - thread::Builder::new().name("notify-rs test-race-with-remove-dir".to_string()).spawn(move || { - let mut watcher = notify::recommended_watcher(move |result| { - eprintln!("received event: {:?}", result); - }) - .unwrap(); + thread::Builder::new() + .name("notify-rs test-race-with-remove-dir".to_string()) + .spawn(move || { + let mut watcher = notify::recommended_watcher(move |result| { + eprintln!("received event: {:?}", result); + }) + .unwrap(); - watcher.watch(&tmpdir, RecursiveMode::NonRecursive).unwrap(); - }); + watcher.watch(&tmpdir, RecursiveMode::NonRecursive).unwrap(); + }); } let subdir = tmpdir.path().join("146d921d.tmp"); From 8cb889381dfa3fc9280ca50ef9ff0c53b93e1729 Mon Sep 17 00:00:00 2001 From: Jasper Bekkers Date: Tue, 1 Feb 2022 17:24:02 +0100 Subject: [PATCH 3/4] Discard Result<> from spawn whenever a JoinHandle isn't required --- src/fsevent.rs | 2 +- src/inotify.rs | 4 ++-- src/kqueue.rs | 2 +- src/poll.rs | 2 +- src/windows.rs | 2 +- tests/race-with-remove-dir.rs | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/fsevent.rs b/src/fsevent.rs index 3989c323..af124842 100644 --- a/src/fsevent.rs +++ b/src/fsevent.rs @@ -457,7 +457,7 @@ impl FsEventWatcher { fs::FSEventStreamInvalidate(stream); fs::FSEventStreamRelease(stream); } - }); + })?; // block until runloop has been sent self.runloop = Some((rl_rx.recv().unwrap().0, thread_handle)); diff --git a/src/inotify.rs b/src/inotify.rs index e622266f..c930be64 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -128,7 +128,7 @@ impl EventLoop { // Run the event loop. pub fn run(self) { - thread::Builder::new() + let _ = thread::Builder::new() .name("notify-rs inotify".to_string()) .spawn(|| self.event_loop_thread()); } @@ -420,7 +420,7 @@ impl EventLoop { let event_loop_tx = self.event_loop_tx.clone(); let waker = self.event_loop_waker.clone(); let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie - thread::Builder::new() + let _ = thread::Builder::new() .name("notify-rs inotify rename".to_string()) .spawn(move || { thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event diff --git a/src/kqueue.rs b/src/kqueue.rs index b16bc6fa..6c4c338d 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -76,7 +76,7 @@ impl EventLoop { // Run the event loop. pub fn run(self) { - thread::Builder::new() + let _ = thread::Builder::new() .name("notify-rs kqueue".to_string()) .spawn(|| self.event_loop_thread()); } diff --git a/src/poll.rs b/src/poll.rs index d5df4fb2..a2f74836 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -76,7 +76,7 @@ impl PollWatcher { let event_handler = self.event_handler.clone(); let event_handler = move |res| emit_event(&event_handler, res); - thread::Builder::new() + let _ = thread::Builder::new() .name("notify-rs poll".to_string()) .spawn(move || { // In order of priority: diff --git a/src/windows.rs b/src/windows.rs index 617f03f7..3c007be6 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -84,7 +84,7 @@ impl ReadDirectoryChangesServer { let (action_tx, action_rx) = unbounded(); // it is, in fact, ok to send the semaphore across threads let sem_temp = wakeup_sem as u64; - thread::Builder::new() + let _ = thread::Builder::new() .name("notify-rs windows".to_string()) .spawn(move || { let wakeup_sem = sem_temp as HANDLE; diff --git a/tests/race-with-remove-dir.rs b/tests/race-with-remove-dir.rs index 98fd463f..c25d93e0 100644 --- a/tests/race-with-remove-dir.rs +++ b/tests/race-with-remove-dir.rs @@ -10,7 +10,7 @@ fn test_race_with_remove_dir() { { let tmpdir = tmpdir.path().to_path_buf(); - thread::Builder::new() + let _ = thread::Builder::new() .name("notify-rs test-race-with-remove-dir".to_string()) .spawn(move || { let mut watcher = notify::recommended_watcher(move |result| { From 28902f6cc52e39a89bf2d517ff1b771b0b9c5710 Mon Sep 17 00:00:00 2001 From: Jasper Bekkers Date: Mon, 14 Feb 2022 17:22:47 +0100 Subject: [PATCH 4/4] Clarified names a bit more --- src/fsevent.rs | 2 +- src/inotify.rs | 2 +- src/kqueue.rs | 2 +- src/poll.rs | 2 +- src/windows.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/fsevent.rs b/src/fsevent.rs index af124842..c1200903 100644 --- a/src/fsevent.rs +++ b/src/fsevent.rs @@ -433,7 +433,7 @@ impl FsEventWatcher { let (rl_tx, rl_rx) = unbounded(); let thread_handle = thread::Builder::new() - .name("notify-rs fsevents".to_string()) + .name("notify-rs fsevents loop".to_string()) .spawn(move || { let stream = stream.0; diff --git a/src/inotify.rs b/src/inotify.rs index c930be64..83481cb2 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -129,7 +129,7 @@ impl EventLoop { // Run the event loop. pub fn run(self) { let _ = thread::Builder::new() - .name("notify-rs inotify".to_string()) + .name("notify-rs inotify loop".to_string()) .spawn(|| self.event_loop_thread()); } diff --git a/src/kqueue.rs b/src/kqueue.rs index 6c4c338d..51a1990d 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -77,7 +77,7 @@ impl EventLoop { // Run the event loop. pub fn run(self) { let _ = thread::Builder::new() - .name("notify-rs kqueue".to_string()) + .name("notify-rs kqueue loop".to_string()) .spawn(|| self.event_loop_thread()); } diff --git a/src/poll.rs b/src/poll.rs index a2f74836..04e60caa 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -77,7 +77,7 @@ impl PollWatcher { let event_handler = move |res| emit_event(&event_handler, res); let _ = thread::Builder::new() - .name("notify-rs poll".to_string()) + .name("notify-rs poll loop".to_string()) .spawn(move || { // In order of priority: // TODO: handle metadata events diff --git a/src/windows.rs b/src/windows.rs index 3c007be6..bfa3fa54 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -85,7 +85,7 @@ impl ReadDirectoryChangesServer { // it is, in fact, ok to send the semaphore across threads let sem_temp = wakeup_sem as u64; let _ = thread::Builder::new() - .name("notify-rs windows".to_string()) + .name("notify-rs windows loop".to_string()) .spawn(move || { let wakeup_sem = sem_temp as HANDLE; let server = ReadDirectoryChangesServer {