diff --git a/node.gyp b/node.gyp index c0e2dbe908ecb6..ad572f89728ee5 100644 --- a/node.gyp +++ b/node.gyp @@ -636,6 +636,8 @@ 'src/base_object.h', 'src/base_object-inl.h', 'src/base64.h', + 'src/callback_queue.h', + 'src/callback_queue-inl.h', 'src/connect_wrap.h', 'src/connection_wrap.h', 'src/debug_utils.h', diff --git a/src/callback_queue-inl.h b/src/callback_queue-inl.h new file mode 100644 index 00000000000000..e83c81cd0dd802 --- /dev/null +++ b/src/callback_queue-inl.h @@ -0,0 +1,97 @@ +#ifndef SRC_CALLBACK_QUEUE_INL_H_ +#define SRC_CALLBACK_QUEUE_INL_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "callback_queue.h" + +namespace node { + +template +template +std::unique_ptr::Callback> +CallbackQueue::CreateCallback(Fn&& fn, bool refed) { + return std::make_unique>(std::move(fn), refed); +} + +template +std::unique_ptr::Callback> +CallbackQueue::Shift() { + std::unique_ptr ret = std::move(head_); + if (ret) { + head_ = ret->get_next(); + if (!head_) + tail_ = nullptr; // The queue is now empty. + } + size_--; + return ret; +} + +template +void CallbackQueue::Push(std::unique_ptr cb) { + Callback* prev_tail = tail_; + + size_++; + tail_ = cb.get(); + if (prev_tail != nullptr) + prev_tail->set_next(std::move(cb)); + else + head_ = std::move(cb); +} + +template +void CallbackQueue::ConcatMove(CallbackQueue&& other) { + size_ += other.size_; + if (tail_ != nullptr) + tail_->set_next(std::move(other.head_)); + else + head_ = std::move(other.head_); + tail_ = other.tail_; + other.tail_ = nullptr; + other.size_ = 0; +} + +template +size_t CallbackQueue::size() const { + return size_.load(); +} + +template +CallbackQueue::Callback::Callback(bool refed) + : refed_(refed) {} + +template +bool CallbackQueue::Callback::is_refed() const { + return refed_; +} + +template +std::unique_ptr::Callback> +CallbackQueue::Callback::get_next() { + return std::move(next_); +} + +template +void CallbackQueue::Callback::set_next( + std::unique_ptr next) { + next_ = std::move(next); +} + +template +template +CallbackQueue::CallbackImpl::CallbackImpl( + Fn&& callback, bool refed) + : Callback(refed), + callback_(std::move(callback)) {} + +template +template +R CallbackQueue::CallbackImpl::Call(Args... args) { + return callback_(std::forward(args)...); +} + +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#endif // SRC_CALLBACK_QUEUE_INL_H_ diff --git a/src/callback_queue.h b/src/callback_queue.h new file mode 100644 index 00000000000000..ebf975e6391d13 --- /dev/null +++ b/src/callback_queue.h @@ -0,0 +1,70 @@ +#ifndef SRC_CALLBACK_QUEUE_H_ +#define SRC_CALLBACK_QUEUE_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include + +namespace node { + +// A queue of C++ functions that take Args... as arguments and return R +// (this is similar to the signature of std::function). +// New entries are added using `CreateCallback()`/`Push()`, and removed using +// `Shift()`. +// The `refed` flag is left for easier use in situations in which some of these +// should be run even if nothing else is keeping the event loop alive. +template +class CallbackQueue { + public: + class Callback { + public: + explicit inline Callback(bool refed); + + virtual ~Callback() = default; + virtual R Call(Args... args) = 0; + + inline bool is_refed() const; + + private: + inline std::unique_ptr get_next(); + inline void set_next(std::unique_ptr next); + + bool refed_; + std::unique_ptr next_; + + friend class CallbackQueue; + }; + + template + inline std::unique_ptr CreateCallback(Fn&& fn, bool refed); + + inline std::unique_ptr Shift(); + inline void Push(std::unique_ptr cb); + // ConcatMove adds elements from 'other' to the end of this list, and clears + // 'other' afterwards. + inline void ConcatMove(CallbackQueue&& other); + + // size() is atomic and may be called from any thread. + inline size_t size() const; + + private: + template + class CallbackImpl final : public Callback { + public: + CallbackImpl(Fn&& callback, bool refed); + R Call(Args... args) override; + + private: + Fn callback_; + }; + + std::atomic size_ {0}; + std::unique_ptr head_; + Callback* tail_ = nullptr; +}; + +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#endif // SRC_CALLBACK_QUEUE_H_ diff --git a/src/env-inl.h b/src/env-inl.h index fcb58dc6709d13..b08b4f77b26ab5 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -25,6 +25,7 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "aliased_buffer.h" +#include "callback_queue-inl.h" #include "env.h" #include "node.h" #include "util-inl.h" @@ -730,50 +731,9 @@ inline void IsolateData::set_options( options_ = std::move(options); } -std::unique_ptr -Environment::NativeImmediateQueue::Shift() { - std::unique_ptr ret = std::move(head_); - if (ret) { - head_ = ret->get_next(); - if (!head_) - tail_ = nullptr; // The queue is now empty. - } - size_--; - return ret; -} - -void Environment::NativeImmediateQueue::Push( - std::unique_ptr cb) { - NativeImmediateCallback* prev_tail = tail_; - - size_++; - tail_ = cb.get(); - if (prev_tail != nullptr) - prev_tail->set_next(std::move(cb)); - else - head_ = std::move(cb); -} - -void Environment::NativeImmediateQueue::ConcatMove( - NativeImmediateQueue&& other) { - size_ += other.size_; - if (tail_ != nullptr) - tail_->set_next(std::move(other.head_)); - else - head_ = std::move(other.head_); - tail_ = other.tail_; - other.tail_ = nullptr; - other.size_ = 0; -} - -size_t Environment::NativeImmediateQueue::size() const { - return size_.load(); -} - template void Environment::CreateImmediate(Fn&& cb, bool ref) { - auto callback = std::make_unique>( - std::move(cb), ref); + auto callback = native_immediates_.CreateCallback(std::move(cb), ref); native_immediates_.Push(std::move(callback)); } @@ -793,8 +753,8 @@ void Environment::SetUnrefImmediate(Fn&& cb) { template void Environment::SetImmediateThreadsafe(Fn&& cb) { - auto callback = std::make_unique>( - std::move(cb), false); + auto callback = + native_immediates_threadsafe_.CreateCallback(std::move(cb), false); { Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); native_immediates_threadsafe_.Push(std::move(callback)); @@ -804,8 +764,8 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) { template void Environment::RequestInterrupt(Fn&& cb) { - auto callback = std::make_unique>( - std::move(cb), false); + auto callback = + native_immediates_interrupts_.CreateCallback(std::move(cb), false); { Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); native_immediates_interrupts_.Push(std::move(callback)); @@ -814,34 +774,6 @@ void Environment::RequestInterrupt(Fn&& cb) { RequestInterruptFromV8(); } -Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed) - : refed_(refed) {} - -bool Environment::NativeImmediateCallback::is_refed() const { - return refed_; -} - -std::unique_ptr -Environment::NativeImmediateCallback::get_next() { - return std::move(next_); -} - -void Environment::NativeImmediateCallback::set_next( - std::unique_ptr next) { - next_ = std::move(next); -} - -template -Environment::NativeImmediateCallbackImpl::NativeImmediateCallbackImpl( - Fn&& callback, bool refed) - : NativeImmediateCallback(refed), - callback_(std::move(callback)) {} - -template -void Environment::NativeImmediateCallbackImpl::Call(Environment* env) { - callback_(env); -} - inline bool Environment::can_call_into_js() const { return can_call_into_js_ && !is_stopping(); } diff --git a/src/env.cc b/src/env.cc index 5b8a63a0017c4f..36a1932310d5d6 100644 --- a/src/env.cc +++ b/src/env.cc @@ -690,7 +690,7 @@ void Environment::RunAndClearInterrupts() { } DebugSealHandleScope seal_handle_scope(isolate()); - while (std::unique_ptr head = queue.Shift()) + while (auto head = queue.Shift()) head->Call(this); } } @@ -716,8 +716,7 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { auto drain_list = [&]() { TryCatchScope try_catch(this); DebugSealHandleScope seal_handle_scope(isolate()); - while (std::unique_ptr head = - native_immediates_.Shift()) { + while (auto head = native_immediates_.Shift()) { if (head->is_refed()) ref_count++; diff --git a/src/env.h b/src/env.h index deeeb2a385ab75..1e13f7c21f5b62 100644 --- a/src/env.h +++ b/src/env.h @@ -29,6 +29,7 @@ #include "inspector_agent.h" #include "inspector_profiler.h" #endif +#include "callback_queue.h" #include "debug_utils.h" #include "handle_wrap.h" #include "node.h" @@ -1395,49 +1396,7 @@ class Environment : public MemoryRetainer { std::list at_exit_functions_; - class NativeImmediateCallback { - public: - explicit inline NativeImmediateCallback(bool refed); - - virtual ~NativeImmediateCallback() = default; - virtual void Call(Environment* env) = 0; - - inline bool is_refed() const; - inline std::unique_ptr get_next(); - inline void set_next(std::unique_ptr next); - - private: - bool refed_; - std::unique_ptr next_; - }; - - template - class NativeImmediateCallbackImpl final : public NativeImmediateCallback { - public: - NativeImmediateCallbackImpl(Fn&& callback, bool refed); - void Call(Environment* env) override; - - private: - Fn callback_; - }; - - class NativeImmediateQueue { - public: - inline std::unique_ptr Shift(); - inline void Push(std::unique_ptr cb); - // ConcatMove adds elements from 'other' to the end of this list, and clears - // 'other' afterwards. - inline void ConcatMove(NativeImmediateQueue&& other); - - // size() is atomic and may be called from any thread. - inline size_t size() const; - - private: - std::atomic size_ {0}; - std::unique_ptr head_; - NativeImmediateCallback* tail_ = nullptr; - }; - + typedef CallbackQueue NativeImmediateQueue; NativeImmediateQueue native_immediates_; Mutex native_immediates_threadsafe_mutex_; NativeImmediateQueue native_immediates_threadsafe_;