Skip to content

Commit

Permalink
improve worker termination method
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Apr 19, 2024
1 parent b24f148 commit ff17d98
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/tateyama/endpoint/common/worker_common.h
Expand Up @@ -103,7 +103,7 @@ class worker_common {
}

[[nodiscard]] bool is_quiet() {
return !has_incomplete_response() && !has_incomplete_resultset();
return !has_incomplete_response() && !has_incomplete_resultset() && terminated();
}

/**
Expand Down
36 changes: 17 additions & 19 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_listener.h
Expand Up @@ -145,14 +145,10 @@ class ipc_listener {
auto& worker = workers_.at(index);
worker->register_worker_in_context(worker);
worker->run();
std::shared_ptr<Worker> worker_ptr{};
{
std::unique_lock<std::mutex> lock(mtx_workers_);
worker_ptr = std::move(worker);
}
{
std::unique_lock<std::mutex> lock(mtx_undertakers_);
undertakers_.emplace(std::move(worker_ptr));
std::unique_lock<std::mutex> lock_w(mtx_workers_);
std::unique_lock<std::mutex> lock_u(mtx_undertakers_);
undertakers_.emplace(std::move(worker));
}
connection_queue.disconnect(index);
});
Expand Down Expand Up @@ -229,21 +225,23 @@ class ipc_listener {
void confirm_workers_termination() {
bool message_output{false};
while (true) {
bool worker_remain{false};
for (auto&& worker : workers_) {
std::shared_ptr<Worker> worker_ptr = worker;
if (worker_ptr) {
if (!message_output) { // message output for the first worker only
VLOG_LP(log_trace) << "wait for remaining worker thread, session id = " << worker_ptr->session_id();
message_output = true;
{
std::unique_lock<std::mutex> lock(mtx_workers_);
bool worker_remain{false};
for (auto& worker : workers_) {
if (worker) {
if (!message_output) { // message output for the first worker only
VLOG_LP(log_trace) << "wait for remaining worker thread, session id = " << worker->session_id();
message_output = true;
}
worker_remain = true;
}
worker_remain = true;
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
if (!worker_remain) {
break;
}
}
if (!worker_remain) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
while (!care_undertakers()) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
Expand Down
35 changes: 17 additions & 18 deletions src/tateyama/endpoint/stream/bootstrap/stream_listener.h
Expand Up @@ -164,14 +164,10 @@ class stream_listener {
auto& worker = workers_.at(index);
worker->register_worker_in_context(worker);
worker->run();
std::shared_ptr<stream_worker> worker_ptr{};
{
std::unique_lock<std::mutex> lock(mtx_workers_);
worker_ptr = std::move(worker);
}
{
std::unique_lock<std::mutex> lock(mtx_undertakers_);
undertakers_.emplace(std::move(worker_ptr));
std::unique_lock<std::mutex> lock_w(mtx_workers_);
std::unique_lock<std::mutex> lock_u(mtx_undertakers_);
undertakers_.emplace(std::move(worker));
}
});
session_id++;
Expand Down Expand Up @@ -227,20 +223,23 @@ class stream_listener {
void confirm_workers_termination() {
bool message_output{false};
while (true) {
bool worker_remain{false};
for (auto&& worker : workers_) {
std::shared_ptr<stream_worker> worker_ptr = worker;
if (worker_ptr) {
if (!message_output) {
VLOG_LP(log_trace) << "wait for remaining worker thread, session id = " << worker->session_id();
message_output = true;
{
std::unique_lock<std::mutex> lock(mtx_workers_);
bool worker_remain{false};
for (auto& worker : workers_) {
if (worker) {
if (!message_output) { // message output for the first worker only
VLOG_LP(log_trace) << "wait for remaining worker thread, session id = " << worker->session_id();
message_output = true;
}
worker_remain = true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
if (!worker_remain) {
break;
}
}
if (!worker_remain) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
while (!care_undertakers()) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
Expand Down

0 comments on commit ff17d98

Please sign in to comment.