Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsevent should join on thread shutdown #337

Merged
merged 1 commit into from Jun 28, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 7 additions & 17 deletions src/fsevent.rs
Expand Up @@ -16,7 +16,7 @@

use crate::event::*;
use crate::{Config, Error, EventFn, RecursiveMode, Result, Watcher};
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{unbounded, Sender};
use fsevent_sys as fs;
use fsevent_sys::core_foundation as cf;
use std::collections::HashMap;
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct FsEventWatcher {
latency: cf::CFTimeInterval,
flags: fs::FSEventStreamCreateFlags,
event_fn: Arc<Mutex<dyn EventFn>>,
runloop: Option<(cf::CFRunLoopRef, Receiver<()>)>,
runloop: Option<(cf::CFRunLoopRef, thread::JoinHandle<()>)>,
recursive_info: HashMap<PathBuf, bool>,
}

Expand Down Expand Up @@ -290,7 +290,7 @@ impl FsEventWatcher {
return;
}

if let Some((runloop, done)) = self.runloop.take() {
if let Some((runloop, thread_handle)) = self.runloop.take() {
unsafe {
let runloop = runloop as *mut raw::c_void;

Expand All @@ -301,11 +301,8 @@ impl FsEventWatcher {
cf::CFRunLoopStop(runloop);
}

// sync done channel
match done.recv() {
Ok(()) => (),
Err(_) => panic!("the runloop may not be finished!"),
}
// Wait for the thread to shut down.
thread_handle.join().expect("thread to shut down");
}
}

Expand Down Expand Up @@ -381,9 +378,6 @@ impl FsEventWatcher {
return Err(Error::path_not_found());
}

// done channel is used to sync quit status of runloop thread
let (done_tx, done_rx) = unbounded();

// We need to associate the stream context with our callback in order to propagate events
// to the rest of the system. This will be owned by the stream, and will be freed when the
// stream is closed. This means we will leak the context if we panic before reacing
Expand Down Expand Up @@ -427,7 +421,7 @@ impl FsEventWatcher {
// channel to pass runloop around
let (rl_tx, rl_rx) = unbounded();

thread::spawn(move || {
let thread_handle = thread::spawn(move || {
let stream = stream.0;

unsafe {
Expand All @@ -450,13 +444,9 @@ impl FsEventWatcher {
fs::FSEventStreamInvalidate(stream);
fs::FSEventStreamRelease(stream);
}

done_tx
.send(())
.expect("error while signal run loop is done");
});
// block until runloop has been sent
self.runloop = Some((rl_rx.recv().unwrap().0, done_rx));
self.runloop = Some((rl_rx.recv().unwrap().0, thread_handle));
0xpr03 marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
Expand Down