Skip to content

Commit

Permalink
preserve compatibility with previous clients
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Apr 25, 2024
1 parent c10849e commit a8ad424
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 55 deletions.
40 changes: 25 additions & 15 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_worker.cpp
Expand Up @@ -26,15 +26,17 @@
namespace tateyama::endpoint::ipc::bootstrap {

void Worker::do_work() {
tateyama::common::wire::message_header hdr{};
while(true) {
auto hdr = request_wire_container_->peep();
if (hdr.get_length() == 0 && hdr.get_idx() == tateyama::common::wire::message_header::null_request) {
if (request_wire_container_->terminate_requested()) {
VLOG_LP(log_trace) << "received shutdown request: session_id = " << std::to_string(session_id_);
return;
}
try {
hdr = request_wire_container_->peep();
} catch (std::runtime_error &ex) {
continue;
}
if (hdr.get_length() == 0 && hdr.get_idx() == tateyama::common::wire::message_header::terminate_request) {
VLOG_LP(log_trace) << "received shutdown request: session_id = " << std::to_string(session_id_);
return;
}

ipc_request request_obj{*wire_, hdr, database_info_, session_info_, session_store_};
ipc_response response_obj{wire_, hdr.get_idx(), [](){}};
Expand All @@ -50,12 +52,20 @@ void Worker::do_work() {
#endif
while(true) {
try {
auto h = request_wire_container_->peep();
if (h.get_length() == 0 && h.get_idx() == tateyama::common::wire::message_header::null_request) {
if (request_wire_container_->terminate_requested()) {
dispose_session_store();
request_shutdown(tateyama::session::shutdown_request_type::forceful);
}
hdr = request_wire_container_->peep();
} catch (std::runtime_error &ex) {
care_reqreses();
if (is_shuttingdown() && is_completed()) {
wire_->get_response_wire().notify_shutdown();
VLOG_LP(log_info) << "terminate worker because shutdown completed";
break;
}
continue;
}
try {
if (hdr.get_length() == 0 && hdr.get_idx() == tateyama::common::wire::message_header::terminate_request) {
dispose_session_store();
request_shutdown(tateyama::session::shutdown_request_type::forceful);
care_reqreses();
if (is_shuttingdown() && is_completed()) {
wire_->get_response_wire().notify_shutdown();
Expand All @@ -65,9 +75,9 @@ void Worker::do_work() {
continue;
}

auto request = std::make_shared<ipc_request>(*wire_, h, database_info_, session_info_, session_store_);
std::size_t index = h.get_idx();
auto response = std::make_shared<ipc_response>(wire_, h.get_idx(), [this, index](){remove_reqres(index);});
auto request = std::make_shared<ipc_request>(*wire_, hdr, database_info_, session_info_, session_store_);
std::size_t index = hdr.get_idx();
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), [this, index](){remove_reqres(index);});
register_reqres(index,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
Expand Down
5 changes: 0 additions & 5 deletions src/tateyama/endpoint/ipc/bootstrap/server_wires_impl.h
Expand Up @@ -439,7 +439,6 @@ class server_wire_container_impl : public server_wire_container
std::size_t read_point() override { return wire_->read_point(); }
void dispose() override { wire_->dispose(); }
void notify() { wire_->notify(); }
[[nodiscard]] bool terminate_requested() const { return wire_->terminate_requested(); }

// for mainly client, except for terminate request from server
void write(const char* from, const std::size_t len, tateyama::common::wire::message_header::index_type index) {
Expand Down Expand Up @@ -560,10 +559,6 @@ class server_wire_container_impl : public server_wire_container
response_wire_.notify_shutdown();
}

[[nodiscard]] bool terminate_requested() const {
return request_wire_.terminate_requested();
}

// for client
std::unique_ptr<resultset_wires_container_impl> create_resultset_wires_for_client(std::string_view name) {
return std::make_unique<resultset_wires_container_impl>(managed_shared_memory_.get(), name, mtx_shm_);
Expand Down
28 changes: 13 additions & 15 deletions src/tateyama/endpoint/ipc/wire.h
Expand Up @@ -42,7 +42,7 @@ class message_header {
public:
using length_type = std::uint32_t;
using index_type = std::uint16_t;
static constexpr index_type null_request = 0xffff;
static constexpr index_type terminate_request = 0xffff;

static constexpr std::size_t size = sizeof(length_type) + sizeof(index_type);

Expand Down Expand Up @@ -376,18 +376,23 @@ class unidirectional_message_wire : public simple_wire<message_header> {

/**
* @brief wait a request message arives and peep the current header.
* @returnm the essage_header if request message has been received,
* otherwise, say timeout or termination requested, dummy request message whose length is 0 and index is message_header::null_request.
* @returnm the essage_header if request message has been received, for normal reception of request message.
* otherwise, dummy request message whose length is 0 and index is message_header::termination_request for termination request,
* and dummy request message whose length is 0 and index is message_header::timeout for timeout.
*/
message_header peep(const char* base) {
while (true) {
if(stored() >= message_header::size) {
copy_header(base);
return header_received_;
}
if (termination_requested_.load() || onetime_notification_.load()) {
if (termination_requested_.load()) {
termination_requested_.store(false);
return {message_header::terminate_request, 0};
}
if (onetime_notification_.load()) {
onetime_notification_.store(false);
return {message_header::null_request, 0};
return {message_header::terminate_request, 0};
}
boost::interprocess::scoped_lock lock(m_mutex_);
wait_for_read_ = true;
Expand All @@ -396,19 +401,11 @@ class unidirectional_message_wire : public simple_wire<message_header> {
boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(watch_interval * 1000 * 1000))),
[this](){ return (stored() >= message_header::size) || termination_requested_.load() || onetime_notification_.load(); })) {
wait_for_read_ = false;
header_received_ = message_header(message_header::null_request, 0);
return header_received_;
throw std::runtime_error("request has not been received within the specified time");
}
wait_for_read_ = false;
}
}
/**
* @brief check if an termination request has been made
* @retrun true if terminate request has been made
*/
[[nodiscard]] bool terminate_requested() {
return termination_requested_.load();
}
/**
* @brief wake up the worker immediately.
*/
Expand Down Expand Up @@ -462,8 +459,9 @@ class unidirectional_message_wire : public simple_wire<message_header> {

// for response
class unidirectional_response_wire : public simple_wire<response_header> {
constexpr static std::size_t watch_interval = 5;
public:
constexpr static std::size_t watch_interval = 5;

unidirectional_response_wire(boost::interprocess::managed_shared_memory* managed_shm_ptr, std::size_t capacity) : simple_wire<response_header>(managed_shm_ptr, capacity) {}

/**
Expand Down
11 changes: 9 additions & 2 deletions test/tateyama/endpoint/ipc/ipc_client.h
Expand Up @@ -38,11 +38,17 @@ class ipc_client {
ipc_client(std::string_view database_name, std::size_t session_id, tateyama::proto::endpoint::request::Handshake& hs);
ipc_client(std::string_view database_name, std::size_t session_id) : ipc_client(database_name, session_id, default_endpoint_handshake_) {
}
~ipc_client() { disconnect(); }

void send(const std::size_t tag, const std::string &message, std::size_t index_offset = 0);
void receive(std::string &message);
void receive(std::string &message, tateyama::proto::framework::response::Header::PayloadType& type);
void disconnect() { request_wire_->disconnect(); }

void disconnect() {
if (!disconnected_) {
request_wire_->disconnect();
disconnected_ = true;
}
}
resultset_wires_container* create_resultset_wires();
void dispose_resultset_wires(resultset_wires_container *rwc);

Expand Down Expand Up @@ -73,6 +79,7 @@ class ipc_client {
tsubakuro::common::wire::session_wire_container::wire_container *request_wire_ { };
tsubakuro::common::wire::session_wire_container::response_wire_container *response_wire_ { };
tateyama::proto::endpoint::request::Handshake default_endpoint_handshake_{ };
bool disconnected_{ };

void handshake();
void receive(std::string &message, tateyama::proto::framework::response::Header::PayloadType type, bool do_check);
Expand Down
8 changes: 4 additions & 4 deletions test/tateyama/endpoint/ipc/ipc_session_limit_test.cpp
Expand Up @@ -66,10 +66,10 @@ class ipc_session_limit_test_server_client: public server_client_gtest_base {
}

void client_thread_serial() {
std::vector<ipc_client> clients { };
std::vector<std::unique_ptr<ipc_client>> clients { };
for (int i = 1; i <= nsession_; i++) {
try {
clients.emplace_back(cfg_);
clients.emplace_back(std::make_unique<ipc_client>(cfg_));
} catch (std::exception &ex) {
std::cout << "nproc=" << i << ", ipc_max_session=" << ipc_max_session_ << " : " << ex.what()
<< std::endl;
Expand All @@ -83,8 +83,8 @@ class ipc_session_limit_test_server_client: public server_client_gtest_base {
return;
}
}
for (ipc_client &client : clients) {
send_receive_loop(client);
for (auto& e : clients) {
send_receive_loop(*e);
}
}

Expand Down
26 changes: 12 additions & 14 deletions test/tsubakuro/common/wire/wire.h
Expand Up @@ -42,7 +42,7 @@ class message_header {
public:
using length_type = std::uint32_t;
using index_type = std::uint16_t;
static constexpr index_type null_request = 0xffff;
static constexpr index_type terminate_request = 0xffff;

static constexpr std::size_t size = sizeof(length_type) + sizeof(index_type);

Expand Down Expand Up @@ -376,18 +376,23 @@ class unidirectional_message_wire : public simple_wire<message_header> {

/**
* @brief wait a request message arives and peep the current header.
* @returnm the essage_header if request message has been received,
* otherwise, say timeout or termination requested, dummy request message whose length is 0 and index is message_header::null_request.
* @returnm the essage_header if request message has been received, for normal reception of request message.
* otherwise, dummy request message whose length is 0 and index is message_header::termination_request for termination request,
* and dummy request message whose length is 0 and index is message_header::timeout for timeout.
*/
message_header peep(const char* base) {
while (true) {
if(stored() >= message_header::size) {
copy_header(base);
return header_received_;
}
if (termination_requested_.load() || onetime_notification_.load()) {
if (termination_requested_.load()) {
termination_requested_.store(false);
return {message_header::terminate_request, 0};
}
if (onetime_notification_.load()) {
onetime_notification_.store(false);
return {message_header::null_request, 0};
return {message_header::terminate_request, 0};
}
boost::interprocess::scoped_lock lock(m_mutex_);
wait_for_read_ = true;
Expand All @@ -396,19 +401,11 @@ class unidirectional_message_wire : public simple_wire<message_header> {
boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(watch_interval * 1000 * 1000))),
[this](){ return (stored() >= message_header::size) || termination_requested_.load() || onetime_notification_.load(); })) {
wait_for_read_ = false;
header_received_ = message_header(message_header::null_request, 0);
return header_received_;
throw std::runtime_error("request has not been received within the specified time");
}
wait_for_read_ = false;
}
}
/**
* @brief check if an termination request has been made
* @retrun true if terminate request has been made
*/
[[nodiscard]] bool terminate_requested() {
return termination_requested_.load();
}
/**
* @brief wake up the worker immediately.
*/
Expand Down Expand Up @@ -474,6 +471,7 @@ class unidirectional_response_wire : public simple_wire<response_header> {
if (timeout == 0) {
timeout = watch_interval * 1000 * 1000;
}

while (true) {
if (closed_.load()) {
header_received_ = response_header(0, 0, 0);
Expand Down

0 comments on commit a8ad424

Please sign in to comment.