Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Watcher object safe #336

Merged
merged 4 commits into from Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -19,9 +19,11 @@

## unreleased

- CHANGE: Make `Watcher` object safe [#336]
- CHANGE: Change EventFn to take FnMut [#333]

[#333]: https://github.com/notify-rs/notify/pull/333
[#336]: https://github.com/notify-rs/notify/pull/336

## 5.0.0-pre.10 (2021-06-04)

Expand Down
4 changes: 2 additions & 2 deletions examples/async_monitor.rs
Expand Up @@ -7,7 +7,7 @@ fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver<notify::Resul

// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let watcher = Watcher::new_immediate(move |res| {
let watcher = RecommendedWatcher::new(move |res| {
futures::executor::block_on(async {
tx.send(res).await.unwrap();
})
Expand All @@ -21,7 +21,7 @@ async fn async_watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
watcher.watch(path, RecursiveMode::Recursive)?;
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;

while let Some(res) = rx.next().await {
match res {
Expand Down
7 changes: 4 additions & 3 deletions examples/hot_reload_tide/src/main.rs
Expand Up @@ -8,6 +8,7 @@ use notify::{
event::ModifyKind,
Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
};
use std::path::Path;
use std::sync::{Arc, Mutex};
use tide::{Body, Response};

Expand All @@ -28,11 +29,11 @@ async fn main() -> tide::Result<()> {

// We listen to file changes by giving Notify
// a function that will get called when events happen
let mut watcher: RecommendedWatcher =
let mut watcher =
// To make sure that the config lives as long as the function
// we need to move the ownership of the config inside the function
// To learn more about move please read [Using move Closures with Threads](https://doc.rust-lang.org/book/ch16-01-threads.html?highlight=move#using-move-closures-with-threads)
Watcher::new_immediate(move |result: Result<Event, Error>| {
RecommendedWatcher::new(move |result: Result<Event, Error>| {
let event = result.unwrap();

if event.kind == EventKind::Modify(ModifyKind::Any) {
Expand All @@ -43,7 +44,7 @@ async fn main() -> tide::Result<()> {
}
})?;

watcher.watch(CONFIG_PATH, RecursiveMode::Recursive)?;
watcher.watch(Path::new(CONFIG_PATH), RecursiveMode::Recursive)?;

// We set up a web server using [Tide](https://github.com/http-rs/tide)
let mut app = tide::with_state(config);
Expand Down
4 changes: 2 additions & 2 deletions examples/monitor_raw.rs
Expand Up @@ -6,11 +6,11 @@ fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {

// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| tx.send(res).unwrap())?;
let mut watcher = RecommendedWatcher::new(move |res| tx.send(res).unwrap())?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
watcher.watch(path, RecursiveMode::Recursive)?;
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;

for res in rx {
match res {
Expand Down
48 changes: 24 additions & 24 deletions src/fsevent.rs
Expand Up @@ -20,7 +20,6 @@ use crossbeam_channel::{unbounded, Receiver, Sender};
use fsevent_sys as fs;
use fsevent_sys::core_foundation as cf;
use std::collections::HashMap;
use std::convert::AsRef;
use std::ffi::CStr;
use std::os::raw;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -250,6 +249,11 @@ extern "C" {
}

impl FsEventWatcher {
/// Create a new watcher.
pub fn new<F: EventFn>(event_fn: F) -> Result<Self> {
Self::from_event_fn(Arc::new(Mutex::new(event_fn)))
}

fn from_event_fn(event_fn: Arc<Mutex<dyn EventFn>>) -> Result<Self> {
Ok(FsEventWatcher {
paths: unsafe {
Expand Down Expand Up @@ -309,14 +313,14 @@ impl FsEventWatcher {
}
}

fn remove_path<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
let str_path = path.as_ref().to_str().unwrap();
fn remove_path(&mut self, path: &Path) -> Result<()> {
let str_path = path.to_str().unwrap();
unsafe {
let mut err: cf::CFErrorRef = ptr::null_mut();
let cf_path = cf::str_path_to_cfstring_ref(str_path, &mut err);
if cf_path.is_null() {
cf::CFRelease(err as cf::CFRef);
return Err(Error::watch_not_found().add_path(path.as_ref().into()));
return Err(Error::watch_not_found().add_path(path.into()));
}

let mut to_remove = Vec::new();
Expand All @@ -335,10 +339,10 @@ impl FsEventWatcher {
cf::CFArrayRemoveValueAtIndex(self.paths, *idx);
}
}
let p = if let Ok(canonicalized_path) = path.as_ref().canonicalize() {
let p = if let Ok(canonicalized_path) = path.canonicalize() {
canonicalized_path
} else {
path.as_ref().to_owned()
path.to_owned()
};
match self.recursive_info.remove(&p) {
Some(_) => Ok(()),
Expand All @@ -347,29 +351,29 @@ impl FsEventWatcher {
}

// https://github.com/thibaudgg/rb-fsevent/blob/master/ext/fsevent_watch/main.c
fn append_path<P: AsRef<Path>>(
fn append_path(
&mut self,
path: P,
path: &Path,
recursive_mode: RecursiveMode,
) -> Result<()> {
if !path.as_ref().exists() {
return Err(Error::path_not_found().add_path(path.as_ref().into()));
if !path.exists() {
return Err(Error::path_not_found().add_path(path.into()));
}
let str_path = path.as_ref().to_str().unwrap();
let str_path = path.to_str().unwrap();
unsafe {
let mut err: cf::CFErrorRef = ptr::null_mut();
let cf_path = cf::str_path_to_cfstring_ref(str_path, &mut err);
if cf_path.is_null() {
// Most likely the directory was deleted, or permissions changed,
// while the above code was running.
cf::CFRelease(err as cf::CFRef);
return Err(Error::path_not_found().add_path(path.as_ref().into()));
return Err(Error::path_not_found().add_path(path.into()));
}
cf::CFArrayAppendValue(self.paths, cf_path);
cf::CFRelease(cf_path);
}
self.recursive_info.insert(
path.as_ref().to_path_buf().canonicalize().unwrap(),
path.to_path_buf().canonicalize().unwrap(),
recursive_mode.is_recursive(),
);
Ok(())
Expand Down Expand Up @@ -539,16 +543,12 @@ unsafe fn callback_impl(
}

impl Watcher for FsEventWatcher {
fn new_immediate<F: EventFn>(event_fn: F) -> Result<FsEventWatcher> {
FsEventWatcher::from_event_fn(Arc::new(Mutex::new(event_fn)))
}

fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path.as_ref(), recursive_mode)
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path, recursive_mode)
}

fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
self.unwatch_inner(path.as_ref())
fn unwatch(&mut self, path: &Path) -> Result<()> {
self.unwatch_inner(path)
}

fn configure(&mut self, config: Config) -> Result<bool> {
Expand Down Expand Up @@ -578,7 +578,7 @@ fn test_fsevent_watcher_drop() {
let event_fn = move |res| tx.send(res).unwrap();

{
let mut watcher: RecommendedWatcher = Watcher::new_immediate(event_fn).unwrap();
let mut watcher = FsEventWatcher::new(event_fn).unwrap();
watcher.watch(dir.path(), RecursiveMode::Recursive).unwrap();
thread::sleep(Duration::from_millis(2000));
println!("is running -> {}", watcher.is_running());
Expand All @@ -599,7 +599,7 @@ fn test_fsevent_watcher_drop() {
}

#[test]
fn test_steam_context_info_send() {
fn check_send<T: Send>() {}
fn test_steam_context_info_send_and_sync() {
fn check_send<T: Send + Sync>() {}
check_send::<StreamContextInfo>();
}
23 changes: 15 additions & 8 deletions src/inotify.rs
Expand Up @@ -554,6 +554,11 @@ fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry
}

impl INotifyWatcher {
/// Create a new watcher.
pub fn new<F: EventFn>(event_fn: F) -> Result<Self> {
Self::from_event_fn(Box::new(event_fn))
}

fn from_event_fn(event_fn: Box<dyn EventFn>) -> Result<Self> {
let inotify = Inotify::init()?;
let event_loop = EventLoop::new(inotify, event_fn)?;
Expand Down Expand Up @@ -597,16 +602,12 @@ impl INotifyWatcher {
}

impl Watcher for INotifyWatcher {
fn new_immediate<F: EventFn>(event_fn: F) -> Result<INotifyWatcher> {
INotifyWatcher::from_event_fn(Box::new(event_fn))
}

fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path.as_ref(), recursive_mode)
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path, recursive_mode)
}

fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
self.unwatch_inner(path.as_ref())
fn unwatch(&mut self, path: &Path) -> Result<()> {
self.unwatch_inner(path)
}

fn configure(&mut self, config: Config) -> Result<bool> {
Expand All @@ -624,3 +625,9 @@ impl Drop for INotifyWatcher {
self.waker.wake().unwrap();
}
}

#[test]
fn inotify_watcher_is_send_and_sync() {
fn check<T: Send + Sync>() {}
check::<INotifyWatcher>();
}
48 changes: 27 additions & 21 deletions src/lib.rs
Expand Up @@ -20,11 +20,12 @@
//! # Examples
//!
//! ```
//! # use std::path::Path;
//! use notify::{Watcher, RecommendedWatcher, RecursiveMode, Result};
//!
//! fn main() -> Result<()> {
//! // Automatically select the best implementation for your platform.
//! let mut watcher: RecommendedWatcher = Watcher::new_immediate(|res| {
//! let mut watcher = notify::recommended_watcher(|res| {
//! match res {
//! Ok(event) => println!("event: {:?}", event),
//! Err(e) => println!("watch error: {:?}", e),
Expand All @@ -33,7 +34,7 @@
//!
//! // Add a path to be watched. All files and directories at that path and
//! // below will be monitored for changes.
//! watcher.watch(".", RecursiveMode::Recursive)?;
//! watcher.watch(Path::new("."), RecursiveMode::Recursive)?;
//!
//! Ok(())
//! }
Expand All @@ -48,10 +49,11 @@
//!
//! ```
//! # use notify::{Watcher, RecommendedWatcher, RecursiveMode, Result};
//! # use std::path::Path;
//! # use std::time::Duration;
//! # fn main() -> Result<()> {
//! # // Automatically select the best implementation for your platform.
//! # let mut watcher: RecommendedWatcher = Watcher::new_immediate(|res| {
//! # let mut watcher = RecommendedWatcher::new(|res| {
//! # match res {
//! # Ok(event) => println!("event: {:?}", event),
//! # Err(e) => println!("watch error: {:?}", e),
Expand All @@ -60,7 +62,7 @@
//!
//! # // Add a path to be watched. All files and directories at that path and
//! # // below will be monitored for changes.
//! # watcher.watch(".", RecursiveMode::Recursive)?;
//! # watcher.watch(Path::new("."), RecursiveMode::Recursive)?;
//!
//! use notify::Config;
//! watcher.configure(Config::PreciseEvents(true))?;
Expand All @@ -77,6 +79,7 @@
//!
//! ```
//! # use notify::{RecommendedWatcher, RecursiveMode, Result, Watcher};
//! # use std::path::Path;
//! #
//! # fn main() -> Result<()> {
//! fn event_fn(res: Result<notify::Event>) {
Expand All @@ -86,10 +89,10 @@
//! }
//! }
//!
//! let mut watcher1: RecommendedWatcher = Watcher::new_immediate(event_fn)?;
//! let mut watcher2: RecommendedWatcher = Watcher::new_immediate(event_fn)?;
//! # watcher1.watch(".", RecursiveMode::Recursive)?;
//! # watcher2.watch(".", RecursiveMode::Recursive)?;
//! let mut watcher1 = notify::recommended_watcher(event_fn)?;
//! let mut watcher2 = notify::recommended_watcher(event_fn)?;
//! # watcher1.watch(Path::new("."), RecursiveMode::Recursive)?;
//! # watcher2.watch(Path::new("."), RecursiveMode::Recursive)?;
//! #
//! # Ok(())
//! # }
Expand All @@ -100,7 +103,6 @@
pub use config::{Config, RecursiveMode};
pub use error::{Error, ErrorKind, Result};
pub use event::{Event, EventKind};
use std::convert::AsRef;
use std::path::Path;

#[cfg(target_os = "macos")]
Expand Down Expand Up @@ -136,14 +138,7 @@ impl<F> EventFn for F where F: 'static + FnMut(Result<Event>) + Send {}
/// Watcher is implemented per platform using the best implementation available on that platform.
/// In addition to such event driven implementations, a polling implementation is also provided
/// that should work on any platform.
pub trait Watcher: Sized {
/// Create a new watcher in _immediate_ mode.
///
/// Events will be sent using the provided `tx` immediately after they occur.
fn new_immediate<F>(event_fn: F) -> Result<Self>
where
F: EventFn;

pub trait Watcher {
/// Begin watching a new path.
///
/// If the `path` is a directory, `recursive_mode` will be evaluated. If `recursive_mode` is
Expand All @@ -159,15 +154,15 @@ pub trait Watcher: Sized {
///
/// [#165]: https://github.com/notify-rs/notify/issues/165
/// [#166]: https://github.com/notify-rs/notify/issues/166
fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()>;
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>;

/// Stop watching a path.
///
/// # Errors
///
/// Returns an error in the case that `path` has not been watched or if removing the watch
/// fails.
fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()>;
fn unwatch(&mut self, path: &Path) -> Result<()>;

/// Configure the watcher at runtime.
///
Expand Down Expand Up @@ -200,9 +195,20 @@ pub type RecommendedWatcher = PollWatcher;
/// _immediate_ mode.
///
/// See [`Watcher::new_immediate`](trait.Watcher.html#tymethod.new_immediate).
pub fn immediate_watcher<F>(event_fn: F) -> Result<RecommendedWatcher>
pub fn recommended_watcher<F>(event_fn: F) -> Result<RecommendedWatcher>
where
F: EventFn,
{
Watcher::new_immediate(event_fn)
// All recommended watchers currently implement `new`, so just call that.
RecommendedWatcher::new(event_fn)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_object_safe() {
let _watcher: &dyn Watcher = &NullWatcher;
}
}