Skip to content

Commit

Permalink
Add EventHandler trait to replace EventFn (#346)
Browse files Browse the repository at this point in the history
* Add `EventHandler` trait to replace `EventFn`
  • Loading branch information
a1phyr committed Aug 12, 2021
1 parent 6d1ef9c commit d7e2279
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 105 deletions.
7 changes: 5 additions & 2 deletions examples/async_monitor.rs
@@ -1,6 +1,9 @@
use notify::{RecommendedWatcher, RecursiveMode, Event, Watcher};
use futures::{
channel::mpsc::{channel, Receiver},
SinkExt, StreamExt,
};
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;
use futures::{SinkExt, StreamExt, channel::mpsc::{channel, Receiver}};

fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
let (mut tx, rx) = channel(1);
Expand Down
4 changes: 1 addition & 3 deletions examples/hot_reload_tide/src/main.rs
@@ -1,12 +1,10 @@

// Imagine this is a web app that remembers information about audio messages.
// It has a config.json file that acts as a database,
// you can edit the configuration and the app will pick up changes without the need to restart it.
// This concept is known as hot-reloading.
use hot_reload_tide::messages::{load_config, Config};
use notify::{
event::ModifyKind,
Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
event::ModifyKind, Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
};
use std::path::Path;
use std::sync::{Arc, Mutex};
Expand Down
2 changes: 1 addition & 1 deletion examples/monitor_raw.rs
Expand Up @@ -6,7 +6,7 @@ fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {

// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let mut watcher = RecommendedWatcher::new(move |res| tx.send(res).unwrap())?;
let mut watcher = RecommendedWatcher::new(tx)?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
Expand Down
31 changes: 13 additions & 18 deletions src/fsevent.rs
Expand Up @@ -15,7 +15,7 @@
#![allow(non_upper_case_globals, dead_code)]

use crate::event::*;
use crate::{Config, Error, EventFn, RecursiveMode, Result, Watcher};
use crate::{Config, Error, EventHandler, RecursiveMode, Result, Watcher};
use crossbeam_channel::{unbounded, Sender};
use fsevent_sys as fs;
use fsevent_sys::core_foundation as cf;
Expand Down Expand Up @@ -63,7 +63,7 @@ pub struct FsEventWatcher {
since_when: fs::FSEventStreamEventId,
latency: cf::CFTimeInterval,
flags: fs::FSEventStreamCreateFlags,
event_fn: Arc<Mutex<dyn EventFn>>,
event_handler: Arc<Mutex<dyn EventHandler>>,
runloop: Option<(cf::CFRunLoopRef, thread::JoinHandle<()>)>,
recursive_info: HashMap<PathBuf, bool>,
}
Expand Down Expand Up @@ -224,7 +224,7 @@ fn translate_flags(flags: StreamFlags, precise: bool) -> Vec<Event> {
}

struct StreamContextInfo {
event_fn: Arc<Mutex<dyn EventFn>>,
event_handler: Arc<Mutex<dyn EventHandler>>,
recursive_info: HashMap<PathBuf, bool>,
}

Expand All @@ -249,15 +249,15 @@ extern "C" {
}

impl FsEventWatcher {
fn from_event_fn(event_fn: Arc<Mutex<dyn EventFn>>) -> Result<Self> {
fn from_event_handler(event_handler: Arc<Mutex<dyn EventHandler>>) -> Result<Self> {
Ok(FsEventWatcher {
paths: unsafe {
cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks)
},
since_when: fs::kFSEventStreamEventIdSinceNow,
latency: 0.0,
flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer,
event_fn,
event_handler,
runloop: None,
recursive_info: HashMap::new(),
})
Expand Down Expand Up @@ -343,11 +343,7 @@ impl FsEventWatcher {
}

// https://github.com/thibaudgg/rb-fsevent/blob/master/ext/fsevent_watch/main.c
fn append_path(
&mut self,
path: &Path,
recursive_mode: RecursiveMode,
) -> Result<()> {
fn append_path(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
if !path.exists() {
return Err(Error::path_not_found().add_path(path.into()));
}
Expand Down Expand Up @@ -382,7 +378,7 @@ impl FsEventWatcher {
// stream is closed. This means we will leak the context if we panic before reacing
// `FSEventStreamRelease`.
let context = Box::into_raw(Box::new(StreamContextInfo {
event_fn: self.event_fn.clone(),
event_handler: self.event_handler.clone(),
recursive_info: self.recursive_info.clone(),
}));

Expand Down Expand Up @@ -486,7 +482,7 @@ unsafe fn callback_impl(
) {
let event_paths = event_paths as *const *const libc::c_char;
let info = info as *const StreamContextInfo;
let event_fn = &(*info).event_fn;
let event_handler = &(*info).event_handler;

for p in 0..num_events {
let path = CStr::from_ptr(*event_paths.add(p))
Expand Down Expand Up @@ -521,16 +517,16 @@ unsafe fn callback_impl(
for ev in translate_flags(flag, true).into_iter() {
// TODO: precise
let ev = ev.add_path(path.clone());
let mut event_fn = event_fn.lock().expect("lock not to be poisoned");
(event_fn)(Ok(ev));
let mut event_handler = event_handler.lock().expect("lock not to be poisoned");
event_handler.handle_event(Ok(ev));
}
}
}

impl Watcher for FsEventWatcher {
/// Create a new watcher.
fn new<F: EventFn>(event_fn: F) -> Result<Self> {
Self::from_event_fn(Arc::new(Mutex::new(event_fn)))
fn new<F: EventHandler>(event_handler: F) -> Result<Self> {
Self::from_event_handler(Arc::new(Mutex::new(event_handler)))
}

fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
Expand Down Expand Up @@ -565,10 +561,9 @@ fn test_fsevent_watcher_drop() {
let dir = tempfile::tempdir().unwrap();

let (tx, rx) = std::sync::mpsc::channel();
let event_fn = move |res| tx.send(res).unwrap();

{
let mut watcher = FsEventWatcher::new(event_fn).unwrap();
let mut watcher = FsEventWatcher::new(tx).unwrap();
watcher.watch(dir.path(), RecursiveMode::Recursive).unwrap();
thread::sleep(Duration::from_millis(2000));
println!("is running -> {}", watcher.is_running());
Expand Down
39 changes: 21 additions & 18 deletions src/inotify.rs
Expand Up @@ -5,7 +5,7 @@
//! will return events for the directory itself, and for files inside the directory.

use super::event::*;
use super::{Config, Error, ErrorKind, EventFn, RecursiveMode, Result, Watcher};
use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
use crossbeam_channel::{bounded, unbounded, Sender};
use inotify as inotify_sys;
use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
Expand Down Expand Up @@ -35,7 +35,7 @@ struct EventLoop {
event_loop_tx: crossbeam_channel::Sender<EventLoopMsg>,
event_loop_rx: crossbeam_channel::Receiver<EventLoopMsg>,
inotify: Option<Inotify>,
event_fn: Box<dyn EventFn>,
event_handler: Box<dyn EventHandler>,
watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
paths: HashMap<WatchDescriptor, PathBuf>,
rename_event: Option<Event>,
Expand All @@ -56,9 +56,9 @@ enum EventLoopMsg {
}

#[inline]
fn send_pending_rename_event(rename_event: &mut Option<Event>, event_fn: &mut dyn EventFn) {
fn send_pending_rename_event(rename_event: &mut Option<Event>, event_handler: &mut dyn EventHandler) {
if let Some(e) = rename_event.take() {
event_fn(Ok(e));
event_handler.handle_event(Ok(e));
}
}

Expand Down Expand Up @@ -96,7 +96,7 @@ fn remove_watch_by_event(
}

impl EventLoop {
pub fn new(inotify: Inotify, event_fn: Box<dyn EventFn>) -> Result<Self> {
pub fn new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self> {
let (event_loop_tx, event_loop_rx) = crossbeam_channel::unbounded::<EventLoopMsg>();
let poll = mio::Poll::new()?;

Expand All @@ -114,7 +114,7 @@ impl EventLoop {
event_loop_tx,
event_loop_rx,
inotify: Some(inotify),
event_fn,
event_handler,
watches: HashMap::new(),
paths: HashMap::new(),
rename_event: None,
Expand Down Expand Up @@ -142,7 +142,7 @@ impl EventLoop {

// Process whatever happened.
for event in &events {
self.handle_event(&event);
self.handle_event(event);
}

// Stop, if we're done.
Expand Down Expand Up @@ -188,7 +188,7 @@ impl EventLoop {
let current_cookie = self.rename_event.as_ref().and_then(|e| e.tracker());
// send pending rename event only if the rename event for which the timer has been created hasn't been handled already; otherwise ignore this timeout
if current_cookie == Some(cookie) {
send_pending_rename_event(&mut self.rename_event, &mut *self.event_fn);
send_pending_rename_event(&mut self.rename_event, &mut *self.event_handler);
}
}
EventLoopMsg::Configure(config, tx) => {
Expand Down Expand Up @@ -218,7 +218,7 @@ impl EventLoop {
num_events += 1;
if event.mask.contains(EventMask::Q_OVERFLOW) {
let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
(self.event_fn)(ev);
self.event_handler.handle_event(ev);
}

let path = match event.name {
Expand All @@ -229,7 +229,10 @@ impl EventLoop {
};

if event.mask.contains(EventMask::MOVED_FROM) {
send_pending_rename_event(&mut self.rename_event, &mut *self.event_fn);
send_pending_rename_event(
&mut self.rename_event,
&mut *self.event_handler,
);
remove_watch_by_event(&path, &self.watches, &mut remove_watches);
self.rename_event = Some(
Event::new(EventKind::Modify(ModifyKind::Name(
Expand All @@ -243,7 +246,7 @@ impl EventLoop {
if event.mask.contains(EventMask::MOVED_TO) {
if let Some(e) = self.rename_event.take() {
if e.tracker() == Some(event.cookie as usize) {
(self.event_fn)(Ok(e.clone()));
self.event_handler.handle_event(Ok(e.clone()));
evs.push(
Event::new(EventKind::Modify(ModifyKind::Name(
RenameMode::To,
Expand Down Expand Up @@ -384,12 +387,12 @@ impl EventLoop {
if !evs.is_empty() {
send_pending_rename_event(
&mut self.rename_event,
&mut *self.event_fn,
&mut *self.event_handler,
);
}

for ev in evs {
(self.event_fn)(Ok(ev));
self.event_handler.handle_event(Ok(ev));
}
}
}
Expand Down Expand Up @@ -421,7 +424,7 @@ impl EventLoop {
}
}
Err(e) => {
(self.event_fn)(Err(Error::io(e)));
self.event_handler.handle_event(Err(Error::io(e)));
}
}
}
Expand Down Expand Up @@ -554,9 +557,9 @@ fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry
}

impl INotifyWatcher {
fn from_event_fn(event_fn: Box<dyn EventFn>) -> Result<Self> {
fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> {
let inotify = Inotify::init()?;
let event_loop = EventLoop::new(inotify, event_fn)?;
let event_loop = EventLoop::new(inotify, event_handler)?;
let channel = event_loop.event_loop_tx.clone();
let waker = event_loop.event_loop_waker.clone();
event_loop.run();
Expand Down Expand Up @@ -598,8 +601,8 @@ impl INotifyWatcher {

impl Watcher for INotifyWatcher {
/// Create a new watcher.
fn new<F: EventFn>(event_fn: F) -> Result<Self> {
Self::from_event_fn(Box::new(event_fn))
fn new<F: EventHandler>(event_handler: F) -> Result<Self> {
Self::from_event_handler(Box::new(event_handler))
}

fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
Expand Down
21 changes: 11 additions & 10 deletions src/kqueue.rs
Expand Up @@ -5,7 +5,7 @@
//! pieces of kernel code termed filters.

use super::event::*;
use super::{Error, EventFn, RecursiveMode, Result, Watcher};
use super::{Error, EventHandler, RecursiveMode, Result, Watcher};
use crossbeam_channel::{unbounded, Sender};
use kqueue::{EventData, EventFilter, FilterFlag, Ident};
use std::collections::HashMap;
Expand All @@ -32,7 +32,7 @@ struct EventLoop {
event_loop_tx: crossbeam_channel::Sender<EventLoopMsg>,
event_loop_rx: crossbeam_channel::Receiver<EventLoopMsg>,
kqueue: kqueue::Watcher,
event_fn: Box<dyn EventFn>,
event_handler: Box<dyn EventHandler>,
watches: HashMap<PathBuf, bool>,
}

Expand All @@ -49,7 +49,7 @@ enum EventLoopMsg {
}

impl EventLoop {
pub fn new(kqueue: kqueue::Watcher, event_fn: Box<dyn EventFn>) -> Result<Self> {
pub fn new(kqueue: kqueue::Watcher, event_handler: Box<dyn EventHandler>) -> Result<Self> {
let (event_loop_tx, event_loop_rx) = crossbeam_channel::unbounded::<EventLoopMsg>();
let poll = mio::Poll::new()?;

Expand All @@ -67,7 +67,7 @@ impl EventLoop {
event_loop_tx,
event_loop_rx,
kqueue,
event_fn,
event_handler,
watches: HashMap::new(),
};
Ok(event_loop)
Expand All @@ -93,7 +93,7 @@ impl EventLoop {

// Process whatever happened.
for event in &events {
self.handle_event(&event);
self.handle_event(event);
}

// Stop, if we're done.
Expand Down Expand Up @@ -225,7 +225,7 @@ impl EventLoop {
}
}
.add_path(path);
(self.event_fn)(Ok(event));
self.event_handler.handle_event(Ok(event));
}
// as we don't add any other EVFILTER to kqueue we should never get here
kqueue::Event { ident: _, data: _ } => unreachable!(),
Expand Down Expand Up @@ -320,9 +320,9 @@ fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry
}

impl KqueueWatcher {
fn from_event_fn(event_fn: Box<dyn EventFn>) -> Result<Self> {
fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> {
let kqueue = kqueue::Watcher::new()?;
let event_loop = EventLoop::new(kqueue, event_fn)?;
let event_loop = EventLoop::new(kqueue, event_handler)?;
let channel = event_loop.event_loop_tx.clone();
let waker = event_loop.event_loop_waker.clone();
event_loop.run();
Expand Down Expand Up @@ -374,10 +374,11 @@ impl KqueueWatcher {

impl Watcher for KqueueWatcher {
/// Create a new watcher.
fn new<F: EventFn>(event_fn: F) -> Result<Self> {
Self::from_event_fn(Box::new(event_fn))
fn new<F: EventHandler>(event_handler: F) -> Result<Self> {
Self::from_event_handler(Box::new(event_handler))
}


fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path, recursive_mode)
}
Expand Down

0 comments on commit d7e2279

Please sign in to comment.