Skip to content

Commit

Permalink
Merge pull request #2126 from nghttp2/nghttpx-worker-process-thread
Browse files Browse the repository at this point in the history
nghttpx: Split thread into worker_process and thread
  • Loading branch information
tatsuhiro-t committed Mar 31, 2024
2 parents cd7d516 + edd2070 commit 5483edd
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 56 deletions.
59 changes: 37 additions & 22 deletions src/shrpx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ struct WorkerProcess {
WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd
#ifdef ENABLE_HTTP3
,
int quic_ipc_fd, std::vector<WorkerID> worker_ids
int quic_ipc_fd, std::vector<WorkerID> worker_ids, uint16_t seq
#endif // ENABLE_HTTP3
)
: loop(loop),
Expand All @@ -214,7 +214,8 @@ struct WorkerProcess {
#ifdef ENABLE_HTTP3
,
quic_ipc_fd(quic_ipc_fd),
worker_ids(std::move(worker_ids))
worker_ids(std::move(worker_ids)),
seq(seq)
#endif // ENABLE_HTTP3
{
ev_child_init(&worker_process_childev, worker_process_child_cb, worker_pid,
Expand Down Expand Up @@ -246,6 +247,7 @@ struct WorkerProcess {
#ifdef ENABLE_HTTP3
int quic_ipc_fd;
std::vector<WorkerID> worker_ids;
uint16_t seq;
#endif // ENABLE_HTTP3
};

Expand All @@ -255,6 +257,10 @@ void reload_config();

namespace {
std::deque<std::unique_ptr<WorkerProcess>> worker_processes;

#ifdef ENABLE_HTTP3
uint16_t worker_process_seq;
#endif // ENABLE_HTTP3
} // namespace

namespace {
Expand Down Expand Up @@ -1288,11 +1294,19 @@ get_inherited_quic_lingering_worker_process_from_env() {
p = end + 1;
}

std::sort(std::begin(worker_ids), std::end(worker_ids));

lwps.emplace_back(std::move(worker_ids), fd);
}

if (!lwps.empty()) {
const auto &lwp = lwps.back();

if (!lwp.worker_ids.empty() &&
worker_process_seq <= lwp.worker_ids[0].worker_process) {
worker_process_seq = lwp.worker_ids[0].worker_process;
++worker_process_seq;
}
}

return lwps;
}
} // namespace
Expand Down Expand Up @@ -1422,7 +1436,7 @@ int create_quic_ipc_socket(std::array<int, 2> &quic_ipc_fd) {
} // namespace

namespace {
int generate_worker_id(std::vector<WorkerID> &worker_ids,
int generate_worker_id(std::vector<WorkerID> &worker_ids, uint16_t wp_seq,
const Config *config) {
auto &apiconf = config->api;
auto &quicconf = config->quic;
Expand All @@ -1443,14 +1457,14 @@ int generate_worker_id(std::vector<WorkerID> &worker_ids,

worker_ids.resize(num_wid);

uint16_t idx = 0;

for (auto &wid : worker_ids) {
if (create_worker_id(wid, quicconf.server_id) != 0) {
return -1;
}
wid.server = quicconf.server_id;
wid.worker_process = wp_seq;
wid.thread = idx++;
}

std::sort(std::begin(worker_ids), std::end(worker_ids));

return 0;
}
} // namespace
Expand Down Expand Up @@ -1840,7 +1854,7 @@ int event_loop() {

std::vector<WorkerID> worker_ids;

if (generate_worker_id(worker_ids, config) != 0) {
if (generate_worker_id(worker_ids, worker_process_seq, config) != 0) {
return -1;
}
#endif // ENABLE_HTTP3
Expand Down Expand Up @@ -1872,13 +1886,13 @@ int event_loop() {
ev_timer_init(&worker_process_grace_period_timer,
worker_process_grace_period_timercb, 0., 0.);

worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd
worker_process_add(std::make_unique<WorkerProcess>(
loop, pid, ipc_fd
#ifdef ENABLE_HTTP3
,
quic_ipc_fd,
std::move(worker_ids)
,
quic_ipc_fd, std::move(worker_ids), worker_process_seq++
#endif // ENABLE_HTTP3
));
));

// Write PID file when we are ready to accept connection from peer.
// This makes easier to write restart script for nghttpx. Because
Expand Down Expand Up @@ -4010,7 +4024,8 @@ void reload_config() {

std::vector<WorkerID> worker_ids;

if (generate_worker_id(worker_ids, new_config.get()) != 0) {
if (generate_worker_id(worker_ids, worker_process_seq, new_config.get()) !=
0) {
close_not_inherited_fd(new_config.get(), iaddrs);
return;
}
Expand Down Expand Up @@ -4046,13 +4061,13 @@ void reload_config() {

close_unused_inherited_addr(iaddrs);

worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd
worker_process_add(std::make_unique<WorkerProcess>(
loop, pid, ipc_fd
#ifdef ENABLE_HTTP3
,
quic_ipc_fd,
std::move(worker_ids)
,
quic_ipc_fd, std::move(worker_ids), worker_process_seq++
#endif // ENABLE_HTTP3
));
));

worker_process_adjust_limit();

Expand Down
19 changes: 17 additions & 2 deletions src/shrpx_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2975,13 +2975,28 @@ int parse_config(Config *config, int optid, const StringRef &opt,

return 0;
}
case SHRPX_OPTID_WORKERS:
case SHRPX_OPTID_WORKERS: {
#ifdef NOTHREADS
LOG(WARN) << "Threading disabled at build time, no threads created.";
return 0;
#else // !NOTHREADS
return parse_uint(&config->num_worker, opt, optarg);
size_t n;

if (parse_uint(&n, opt, optarg) != 0) {
return -1;
}

if (n > 65530) {
LOG(ERROR) << opt << ": the number of workers must not exceed 65530";

return -1;
}

config->num_worker = n;

return 0;
#endif // !NOTHREADS
}
case SHRPX_OPTID_HTTP2_MAX_CONCURRENT_STREAMS: {
LOG(WARN) << opt << ": deprecated. Use "
<< SHRPX_OPT_FRONTEND_HTTP2_MAX_CONCURRENT_STREAMS << " and "
Expand Down
25 changes: 19 additions & 6 deletions src/shrpx_connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1039,22 +1039,35 @@ void ConnectionHandler::set_worker_ids(std::vector<WorkerID> worker_ids) {
worker_ids_ = std::move(worker_ids);
}

namespace {
ssize_t find_worker_index(const std::vector<WorkerID> &worker_ids,
const WorkerID &wid) {
assert(!worker_ids.empty());

if (wid.server != worker_ids[0].server ||
wid.worker_process != worker_ids[0].worker_process ||
wid.thread >= worker_ids.size()) {
return -1;
}

return wid.thread;
}
} // namespace

Worker *ConnectionHandler::find_worker(const WorkerID &wid) const {
auto it =
std::lower_bound(std::begin(worker_ids_), std::end(worker_ids_), wid);
if (it == std::end(worker_ids_) || *it != wid) {
auto idx = find_worker_index(worker_ids_, wid);
if (idx == -1) {
return nullptr;
}

return workers_[std::distance(std::begin(worker_ids_), it)].get();
return workers_[idx].get();
}

QUICLingeringWorkerProcess *
ConnectionHandler::match_quic_lingering_worker_process_worker_id(
const WorkerID &wid) {
for (auto &lwps : quic_lingering_worker_processes_) {
if (std::binary_search(std::begin(lwps.worker_ids),
std::end(lwps.worker_ids), wid)) {
if (find_worker_index(lwps.worker_ids, wid) != -1) {
return &lwps;
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/shrpx_connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ enum class QUICIPCType {

// WorkerProcesses which are in graceful shutdown period.
struct QUICLingeringWorkerProcess {
// |worker_ids| must be sorted in the lexicographical order.
QUICLingeringWorkerProcess(std::vector<WorkerID> worker_ids, int quic_ipc_fd)
: worker_ids{std::move(worker_ids)}, quic_ipc_fd{quic_ipc_fd} {}

Expand Down Expand Up @@ -202,7 +201,6 @@ class ConnectionHandler {
void set_quic_keying_materials(std::shared_ptr<QUICKeyingMaterials> qkms);
const std::shared_ptr<QUICKeyingMaterials> &get_quic_keying_materials() const;

// |worker_ids| must be sorted in the lexicographical order.
void set_worker_ids(std::vector<WorkerID> worker_ids);
Worker *find_worker(const WorkerID &wid) const;

Expand Down
7 changes: 2 additions & 5 deletions src/shrpx_quic.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ struct WorkerID {
union {
struct {
uint32_t server;
uint32_t thread;
uint16_t worker_process;
uint16_t thread;
};
uint64_t worker;
};
Expand All @@ -104,10 +105,6 @@ inline bool operator!=(const WorkerID &lhd, const WorkerID &rhd) {
return lhd.worker != rhd.worker;
}

inline bool operator<(const WorkerID &lhd, const WorkerID &rhd) {
return lhd.worker < rhd.worker;
}

struct ConnectionID {
WorkerID worker;
uint64_t client;
Expand Down
13 changes: 0 additions & 13 deletions src/shrpx_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1441,17 +1441,4 @@ void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
}
}

#ifdef ENABLE_HTTP3
int create_worker_id(WorkerID &dest, uint32_t server_id) {
dest.server = server_id;

if (RAND_bytes(reinterpret_cast<unsigned char *>(&dest.thread),
sizeof(dest.thread)) != 1) {
return -1;
}

return 0;
}
#endif // ENABLE_HTTP3

} // namespace shrpx
6 changes: 0 additions & 6 deletions src/shrpx_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,12 +468,6 @@ size_t match_downstream_addr_group(
// nullptr. This function may schedule live check.
void downstream_failure(DownstreamAddr *addr, const Address *raddr);

#ifdef ENABLE_HTTP3
// Creates WorkerID used as a prefix of QUIC Connection ID. This
// function returns -1 on failure.
int create_worker_id(WorkerID &dest, uint32_t server_id);
#endif // ENABLE_HTTP3

} // namespace shrpx

#endif // SHRPX_WORKER_H

0 comments on commit 5483edd

Please sign in to comment.