Skip to content

Commit a5e8c5c

Browse files
addaleaxcodebytere
authored andcommittedJun 7, 2020
src: split out callback queue implementation from Environment
This isn’t conceptually tied to anything Node.js-specific at all. PR-URL: #33272 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
1 parent 77caf92 commit a5e8c5c

File tree

6 files changed

+179
-120
lines changed

6 files changed

+179
-120
lines changed
 

‎node.gyp

+2
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,8 @@
636636
'src/base_object.h',
637637
'src/base_object-inl.h',
638638
'src/base64.h',
639+
'src/callback_queue.h',
640+
'src/callback_queue-inl.h',
639641
'src/connect_wrap.h',
640642
'src/connection_wrap.h',
641643
'src/debug_utils.h',

‎src/callback_queue-inl.h

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#ifndef SRC_CALLBACK_QUEUE_INL_H_
2+
#define SRC_CALLBACK_QUEUE_INL_H_
3+
4+
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5+
6+
#include "callback_queue.h"
7+
8+
namespace node {
9+
10+
template <typename R, typename... Args>
11+
template <typename Fn>
12+
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
13+
CallbackQueue<R, Args...>::CreateCallback(Fn&& fn, bool refed) {
14+
return std::make_unique<CallbackImpl<Fn>>(std::move(fn), refed);
15+
}
16+
17+
template <typename R, typename... Args>
18+
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
19+
CallbackQueue<R, Args...>::Shift() {
20+
std::unique_ptr<Callback> ret = std::move(head_);
21+
if (ret) {
22+
head_ = ret->get_next();
23+
if (!head_)
24+
tail_ = nullptr; // The queue is now empty.
25+
}
26+
size_--;
27+
return ret;
28+
}
29+
30+
template <typename R, typename... Args>
31+
void CallbackQueue<R, Args...>::Push(std::unique_ptr<Callback> cb) {
32+
Callback* prev_tail = tail_;
33+
34+
size_++;
35+
tail_ = cb.get();
36+
if (prev_tail != nullptr)
37+
prev_tail->set_next(std::move(cb));
38+
else
39+
head_ = std::move(cb);
40+
}
41+
42+
template <typename R, typename... Args>
43+
void CallbackQueue<R, Args...>::ConcatMove(CallbackQueue<R, Args...>&& other) {
44+
size_ += other.size_;
45+
if (tail_ != nullptr)
46+
tail_->set_next(std::move(other.head_));
47+
else
48+
head_ = std::move(other.head_);
49+
tail_ = other.tail_;
50+
other.tail_ = nullptr;
51+
other.size_ = 0;
52+
}
53+
54+
template <typename R, typename... Args>
55+
size_t CallbackQueue<R, Args...>::size() const {
56+
return size_.load();
57+
}
58+
59+
template <typename R, typename... Args>
60+
CallbackQueue<R, Args...>::Callback::Callback(bool refed)
61+
: refed_(refed) {}
62+
63+
template <typename R, typename... Args>
64+
bool CallbackQueue<R, Args...>::Callback::is_refed() const {
65+
return refed_;
66+
}
67+
68+
template <typename R, typename... Args>
69+
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
70+
CallbackQueue<R, Args...>::Callback::get_next() {
71+
return std::move(next_);
72+
}
73+
74+
template <typename R, typename... Args>
75+
void CallbackQueue<R, Args...>::Callback::set_next(
76+
std::unique_ptr<Callback> next) {
77+
next_ = std::move(next);
78+
}
79+
80+
template <typename R, typename... Args>
81+
template <typename Fn>
82+
CallbackQueue<R, Args...>::CallbackImpl<Fn>::CallbackImpl(
83+
Fn&& callback, bool refed)
84+
: Callback(refed),
85+
callback_(std::move(callback)) {}
86+
87+
template <typename R, typename... Args>
88+
template <typename Fn>
89+
R CallbackQueue<R, Args...>::CallbackImpl<Fn>::Call(Args... args) {
90+
return callback_(std::forward<Args>(args)...);
91+
}
92+
93+
} // namespace node
94+
95+
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
96+
97+
#endif // SRC_CALLBACK_QUEUE_INL_H_

‎src/callback_queue.h

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#ifndef SRC_CALLBACK_QUEUE_H_
2+
#define SRC_CALLBACK_QUEUE_H_
3+
4+
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5+
6+
#include <atomic>
7+
8+
namespace node {
9+
10+
// A queue of C++ functions that take Args... as arguments and return R
11+
// (this is similar to the signature of std::function).
12+
// New entries are added using `CreateCallback()`/`Push()`, and removed using
13+
// `Shift()`.
14+
// The `refed` flag is left for easier use in situations in which some of these
15+
// should be run even if nothing else is keeping the event loop alive.
16+
template <typename R, typename... Args>
17+
class CallbackQueue {
18+
public:
19+
class Callback {
20+
public:
21+
explicit inline Callback(bool refed);
22+
23+
virtual ~Callback() = default;
24+
virtual R Call(Args... args) = 0;
25+
26+
inline bool is_refed() const;
27+
28+
private:
29+
inline std::unique_ptr<Callback> get_next();
30+
inline void set_next(std::unique_ptr<Callback> next);
31+
32+
bool refed_;
33+
std::unique_ptr<Callback> next_;
34+
35+
friend class CallbackQueue;
36+
};
37+
38+
template <typename Fn>
39+
inline std::unique_ptr<Callback> CreateCallback(Fn&& fn, bool refed);
40+
41+
inline std::unique_ptr<Callback> Shift();
42+
inline void Push(std::unique_ptr<Callback> cb);
43+
// ConcatMove adds elements from 'other' to the end of this list, and clears
44+
// 'other' afterwards.
45+
inline void ConcatMove(CallbackQueue&& other);
46+
47+
// size() is atomic and may be called from any thread.
48+
inline size_t size() const;
49+
50+
private:
51+
template <typename Fn>
52+
class CallbackImpl final : public Callback {
53+
public:
54+
CallbackImpl(Fn&& callback, bool refed);
55+
R Call(Args... args) override;
56+
57+
private:
58+
Fn callback_;
59+
};
60+
61+
std::atomic<size_t> size_ {0};
62+
std::unique_ptr<Callback> head_;
63+
Callback* tail_ = nullptr;
64+
};
65+
66+
} // namespace node
67+
68+
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
69+
70+
#endif // SRC_CALLBACK_QUEUE_H_

‎src/env-inl.h

+6-74
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
2626

2727
#include "aliased_buffer.h"
28+
#include "callback_queue-inl.h"
2829
#include "env.h"
2930
#include "node.h"
3031
#include "util-inl.h"
@@ -730,50 +731,9 @@ inline void IsolateData::set_options(
730731
options_ = std::move(options);
731732
}
732733

733-
std::unique_ptr<Environment::NativeImmediateCallback>
734-
Environment::NativeImmediateQueue::Shift() {
735-
std::unique_ptr<Environment::NativeImmediateCallback> ret = std::move(head_);
736-
if (ret) {
737-
head_ = ret->get_next();
738-
if (!head_)
739-
tail_ = nullptr; // The queue is now empty.
740-
}
741-
size_--;
742-
return ret;
743-
}
744-
745-
void Environment::NativeImmediateQueue::Push(
746-
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
747-
NativeImmediateCallback* prev_tail = tail_;
748-
749-
size_++;
750-
tail_ = cb.get();
751-
if (prev_tail != nullptr)
752-
prev_tail->set_next(std::move(cb));
753-
else
754-
head_ = std::move(cb);
755-
}
756-
757-
void Environment::NativeImmediateQueue::ConcatMove(
758-
NativeImmediateQueue&& other) {
759-
size_ += other.size_;
760-
if (tail_ != nullptr)
761-
tail_->set_next(std::move(other.head_));
762-
else
763-
head_ = std::move(other.head_);
764-
tail_ = other.tail_;
765-
other.tail_ = nullptr;
766-
other.size_ = 0;
767-
}
768-
769-
size_t Environment::NativeImmediateQueue::size() const {
770-
return size_.load();
771-
}
772-
773734
template <typename Fn>
774735
void Environment::CreateImmediate(Fn&& cb, bool ref) {
775-
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
776-
std::move(cb), ref);
736+
auto callback = native_immediates_.CreateCallback(std::move(cb), ref);
777737
native_immediates_.Push(std::move(callback));
778738
}
779739

@@ -793,8 +753,8 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
793753

794754
template <typename Fn>
795755
void Environment::SetImmediateThreadsafe(Fn&& cb) {
796-
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
797-
std::move(cb), false);
756+
auto callback =
757+
native_immediates_threadsafe_.CreateCallback(std::move(cb), false);
798758
{
799759
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
800760
native_immediates_threadsafe_.Push(std::move(callback));
@@ -804,8 +764,8 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {
804764

805765
template <typename Fn>
806766
void Environment::RequestInterrupt(Fn&& cb) {
807-
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
808-
std::move(cb), false);
767+
auto callback =
768+
native_immediates_interrupts_.CreateCallback(std::move(cb), false);
809769
{
810770
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
811771
native_immediates_interrupts_.Push(std::move(callback));
@@ -814,34 +774,6 @@ void Environment::RequestInterrupt(Fn&& cb) {
814774
RequestInterruptFromV8();
815775
}
816776

817-
Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
818-
: refed_(refed) {}
819-
820-
bool Environment::NativeImmediateCallback::is_refed() const {
821-
return refed_;
822-
}
823-
824-
std::unique_ptr<Environment::NativeImmediateCallback>
825-
Environment::NativeImmediateCallback::get_next() {
826-
return std::move(next_);
827-
}
828-
829-
void Environment::NativeImmediateCallback::set_next(
830-
std::unique_ptr<NativeImmediateCallback> next) {
831-
next_ = std::move(next);
832-
}
833-
834-
template <typename Fn>
835-
Environment::NativeImmediateCallbackImpl<Fn>::NativeImmediateCallbackImpl(
836-
Fn&& callback, bool refed)
837-
: NativeImmediateCallback(refed),
838-
callback_(std::move(callback)) {}
839-
840-
template <typename Fn>
841-
void Environment::NativeImmediateCallbackImpl<Fn>::Call(Environment* env) {
842-
callback_(env);
843-
}
844-
845777
inline bool Environment::can_call_into_js() const {
846778
return can_call_into_js_ && !is_stopping();
847779
}

‎src/env.cc

+2-3
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ void Environment::RunAndClearInterrupts() {
690690
}
691691
DebugSealHandleScope seal_handle_scope(isolate());
692692

693-
while (std::unique_ptr<NativeImmediateCallback> head = queue.Shift())
693+
while (auto head = queue.Shift())
694694
head->Call(this);
695695
}
696696
}
@@ -716,8 +716,7 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
716716
auto drain_list = [&]() {
717717
TryCatchScope try_catch(this);
718718
DebugSealHandleScope seal_handle_scope(isolate());
719-
while (std::unique_ptr<NativeImmediateCallback> head =
720-
native_immediates_.Shift()) {
719+
while (auto head = native_immediates_.Shift()) {
721720
if (head->is_refed())
722721
ref_count++;
723722

‎src/env.h

+2-43
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "inspector_agent.h"
3030
#include "inspector_profiler.h"
3131
#endif
32+
#include "callback_queue.h"
3233
#include "debug_utils.h"
3334
#include "handle_wrap.h"
3435
#include "node.h"
@@ -1395,49 +1396,7 @@ class Environment : public MemoryRetainer {
13951396

13961397
std::list<ExitCallback> at_exit_functions_;
13971398

1398-
class NativeImmediateCallback {
1399-
public:
1400-
explicit inline NativeImmediateCallback(bool refed);
1401-
1402-
virtual ~NativeImmediateCallback() = default;
1403-
virtual void Call(Environment* env) = 0;
1404-
1405-
inline bool is_refed() const;
1406-
inline std::unique_ptr<NativeImmediateCallback> get_next();
1407-
inline void set_next(std::unique_ptr<NativeImmediateCallback> next);
1408-
1409-
private:
1410-
bool refed_;
1411-
std::unique_ptr<NativeImmediateCallback> next_;
1412-
};
1413-
1414-
template <typename Fn>
1415-
class NativeImmediateCallbackImpl final : public NativeImmediateCallback {
1416-
public:
1417-
NativeImmediateCallbackImpl(Fn&& callback, bool refed);
1418-
void Call(Environment* env) override;
1419-
1420-
private:
1421-
Fn callback_;
1422-
};
1423-
1424-
class NativeImmediateQueue {
1425-
public:
1426-
inline std::unique_ptr<NativeImmediateCallback> Shift();
1427-
inline void Push(std::unique_ptr<NativeImmediateCallback> cb);
1428-
// ConcatMove adds elements from 'other' to the end of this list, and clears
1429-
// 'other' afterwards.
1430-
inline void ConcatMove(NativeImmediateQueue&& other);
1431-
1432-
// size() is atomic and may be called from any thread.
1433-
inline size_t size() const;
1434-
1435-
private:
1436-
std::atomic<size_t> size_ {0};
1437-
std::unique_ptr<NativeImmediateCallback> head_;
1438-
NativeImmediateCallback* tail_ = nullptr;
1439-
};
1440-
1399+
typedef CallbackQueue<void, Environment*> NativeImmediateQueue;
14411400
NativeImmediateQueue native_immediates_;
14421401
Mutex native_immediates_threadsafe_mutex_;
14431402
NativeImmediateQueue native_immediates_threadsafe_;

0 commit comments

Comments
 (0)
Please sign in to comment.