Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nghttpx: Split thread into worker_process and thread #2126

Merged
merged 1 commit into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 37 additions & 22 deletions src/shrpx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ struct WorkerProcess {
WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd
#ifdef ENABLE_HTTP3
,
int quic_ipc_fd, std::vector<WorkerID> worker_ids
int quic_ipc_fd, std::vector<WorkerID> worker_ids, uint16_t seq
#endif // ENABLE_HTTP3
)
: loop(loop),
Expand All @@ -214,7 +214,8 @@ struct WorkerProcess {
#ifdef ENABLE_HTTP3
,
quic_ipc_fd(quic_ipc_fd),
worker_ids(std::move(worker_ids))
worker_ids(std::move(worker_ids)),
seq(seq)
#endif // ENABLE_HTTP3
{
ev_child_init(&worker_process_childev, worker_process_child_cb, worker_pid,
Expand Down Expand Up @@ -246,6 +247,7 @@ struct WorkerProcess {
#ifdef ENABLE_HTTP3
int quic_ipc_fd;
std::vector<WorkerID> worker_ids;
uint16_t seq;
#endif // ENABLE_HTTP3
};

Expand All @@ -255,6 +257,10 @@ void reload_config();

namespace {
std::deque<std::unique_ptr<WorkerProcess>> worker_processes;

#ifdef ENABLE_HTTP3
uint16_t worker_process_seq;
#endif // ENABLE_HTTP3
} // namespace

namespace {
Expand Down Expand Up @@ -1288,11 +1294,19 @@ get_inherited_quic_lingering_worker_process_from_env() {
p = end + 1;
}

std::sort(std::begin(worker_ids), std::end(worker_ids));

lwps.emplace_back(std::move(worker_ids), fd);
}

if (!lwps.empty()) {
const auto &lwp = lwps.back();

if (!lwp.worker_ids.empty() &&
worker_process_seq <= lwp.worker_ids[0].worker_process) {
worker_process_seq = lwp.worker_ids[0].worker_process;
++worker_process_seq;
}
}

return lwps;
}
} // namespace
Expand Down Expand Up @@ -1422,7 +1436,7 @@ int create_quic_ipc_socket(std::array<int, 2> &quic_ipc_fd) {
} // namespace

namespace {
int generate_worker_id(std::vector<WorkerID> &worker_ids,
int generate_worker_id(std::vector<WorkerID> &worker_ids, uint16_t wp_seq,
const Config *config) {
auto &apiconf = config->api;
auto &quicconf = config->quic;
Expand All @@ -1443,14 +1457,14 @@ int generate_worker_id(std::vector<WorkerID> &worker_ids,

worker_ids.resize(num_wid);

uint16_t idx = 0;

for (auto &wid : worker_ids) {
if (create_worker_id(wid, quicconf.server_id) != 0) {
return -1;
}
wid.server = quicconf.server_id;
wid.worker_process = wp_seq;
wid.thread = idx++;
}

std::sort(std::begin(worker_ids), std::end(worker_ids));

return 0;
}
} // namespace
Expand Down Expand Up @@ -1840,7 +1854,7 @@ int event_loop() {

std::vector<WorkerID> worker_ids;

if (generate_worker_id(worker_ids, config) != 0) {
if (generate_worker_id(worker_ids, worker_process_seq, config) != 0) {
return -1;
}
#endif // ENABLE_HTTP3
Expand Down Expand Up @@ -1872,13 +1886,13 @@ int event_loop() {
ev_timer_init(&worker_process_grace_period_timer,
worker_process_grace_period_timercb, 0., 0.);

worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd
worker_process_add(std::make_unique<WorkerProcess>(
loop, pid, ipc_fd
#ifdef ENABLE_HTTP3
,
quic_ipc_fd,
std::move(worker_ids)
,
quic_ipc_fd, std::move(worker_ids), worker_process_seq++
#endif // ENABLE_HTTP3
));
));

// Write PID file when we are ready to accept connection from peer.
// This makes easier to write restart script for nghttpx. Because
Expand Down Expand Up @@ -4010,7 +4024,8 @@ void reload_config() {

std::vector<WorkerID> worker_ids;

if (generate_worker_id(worker_ids, new_config.get()) != 0) {
if (generate_worker_id(worker_ids, worker_process_seq, new_config.get()) !=
0) {
close_not_inherited_fd(new_config.get(), iaddrs);
return;
}
Expand Down Expand Up @@ -4046,13 +4061,13 @@ void reload_config() {

close_unused_inherited_addr(iaddrs);

worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd
worker_process_add(std::make_unique<WorkerProcess>(
loop, pid, ipc_fd
#ifdef ENABLE_HTTP3
,
quic_ipc_fd,
std::move(worker_ids)
,
quic_ipc_fd, std::move(worker_ids), worker_process_seq++
#endif // ENABLE_HTTP3
));
));

worker_process_adjust_limit();

Expand Down
19 changes: 17 additions & 2 deletions src/shrpx_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2975,13 +2975,28 @@ int parse_config(Config *config, int optid, const StringRef &opt,

return 0;
}
case SHRPX_OPTID_WORKERS:
case SHRPX_OPTID_WORKERS: {
#ifdef NOTHREADS
LOG(WARN) << "Threading disabled at build time, no threads created.";
return 0;
#else // !NOTHREADS
return parse_uint(&config->num_worker, opt, optarg);
size_t n;

if (parse_uint(&n, opt, optarg) != 0) {
return -1;
}

if (n > 65530) {
LOG(ERROR) << opt << ": the number of workers must not exceed 65530";

return -1;
}

config->num_worker = n;

return 0;
#endif // !NOTHREADS
}
case SHRPX_OPTID_HTTP2_MAX_CONCURRENT_STREAMS: {
LOG(WARN) << opt << ": deprecated. Use "
<< SHRPX_OPT_FRONTEND_HTTP2_MAX_CONCURRENT_STREAMS << " and "
Expand Down
25 changes: 19 additions & 6 deletions src/shrpx_connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1039,22 +1039,35 @@ void ConnectionHandler::set_worker_ids(std::vector<WorkerID> worker_ids) {
worker_ids_ = std::move(worker_ids);
}

namespace {
ssize_t find_worker_index(const std::vector<WorkerID> &worker_ids,
const WorkerID &wid) {
assert(!worker_ids.empty());

if (wid.server != worker_ids[0].server ||
wid.worker_process != worker_ids[0].worker_process ||
wid.thread >= worker_ids.size()) {
return -1;
}

return wid.thread;
}
} // namespace

Worker *ConnectionHandler::find_worker(const WorkerID &wid) const {
auto it =
std::lower_bound(std::begin(worker_ids_), std::end(worker_ids_), wid);
if (it == std::end(worker_ids_) || *it != wid) {
auto idx = find_worker_index(worker_ids_, wid);
if (idx == -1) {
return nullptr;
}

return workers_[std::distance(std::begin(worker_ids_), it)].get();
return workers_[idx].get();
}

QUICLingeringWorkerProcess *
ConnectionHandler::match_quic_lingering_worker_process_worker_id(
const WorkerID &wid) {
for (auto &lwps : quic_lingering_worker_processes_) {
if (std::binary_search(std::begin(lwps.worker_ids),
std::end(lwps.worker_ids), wid)) {
if (find_worker_index(lwps.worker_ids, wid) != -1) {
return &lwps;
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/shrpx_connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ enum class QUICIPCType {

// WorkerProcesses which are in graceful shutdown period.
struct QUICLingeringWorkerProcess {
// |worker_ids| must be sorted in the lexicographical order.
QUICLingeringWorkerProcess(std::vector<WorkerID> worker_ids, int quic_ipc_fd)
: worker_ids{std::move(worker_ids)}, quic_ipc_fd{quic_ipc_fd} {}

Expand Down Expand Up @@ -202,7 +201,6 @@ class ConnectionHandler {
void set_quic_keying_materials(std::shared_ptr<QUICKeyingMaterials> qkms);
const std::shared_ptr<QUICKeyingMaterials> &get_quic_keying_materials() const;

// |worker_ids| must be sorted in the lexicographical order.
void set_worker_ids(std::vector<WorkerID> worker_ids);
Worker *find_worker(const WorkerID &wid) const;

Expand Down
7 changes: 2 additions & 5 deletions src/shrpx_quic.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ struct WorkerID {
union {
struct {
uint32_t server;
uint32_t thread;
uint16_t worker_process;
uint16_t thread;
};
uint64_t worker;
};
Expand All @@ -104,10 +105,6 @@ inline bool operator!=(const WorkerID &lhd, const WorkerID &rhd) {
return lhd.worker != rhd.worker;
}

inline bool operator<(const WorkerID &lhd, const WorkerID &rhd) {
return lhd.worker < rhd.worker;
}

struct ConnectionID {
WorkerID worker;
uint64_t client;
Expand Down
13 changes: 0 additions & 13 deletions src/shrpx_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1441,17 +1441,4 @@ void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
}
}

#ifdef ENABLE_HTTP3
int create_worker_id(WorkerID &dest, uint32_t server_id) {
dest.server = server_id;

if (RAND_bytes(reinterpret_cast<unsigned char *>(&dest.thread),
sizeof(dest.thread)) != 1) {
return -1;
}

return 0;
}
#endif // ENABLE_HTTP3

} // namespace shrpx
6 changes: 0 additions & 6 deletions src/shrpx_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,12 +468,6 @@ size_t match_downstream_addr_group(
// nullptr. This function may schedule live check.
void downstream_failure(DownstreamAddr *addr, const Address *raddr);

#ifdef ENABLE_HTTP3
// Creates WorkerID used as a prefix of QUIC Connection ID. This
// function returns -1 on failure.
int create_worker_id(WorkerID &dest, uint32_t server_id);
#endif // ENABLE_HTTP3

} // namespace shrpx

#endif // SHRPX_WORKER_H