Skip to content

Commit

Permalink
introduce the chain of responsibility pattern to the processing of th…
Browse files Browse the repository at this point in the history
…e routing service
  • Loading branch information
t-horikawa committed May 1, 2024
1 parent 0312bf2 commit 900bcc3
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 110 deletions.
61 changes: 49 additions & 12 deletions src/tateyama/endpoint/common/worker_common.h
Expand Up @@ -35,6 +35,8 @@

#include <tateyama/proto/endpoint/request.pb.h>
#include <tateyama/proto/endpoint/response.pb.h>
#include <tateyama/proto/core/request.pb.h>
#include <tateyama/proto/core/response.pb.h>
#include <tateyama/proto/diagnostics.pb.h>

#include "request.h"
Expand Down Expand Up @@ -249,19 +251,43 @@ class worker_common {
return true;
}

case tateyama::proto::endpoint::request::Request::kShutdown:
default: // error
{
VLOG_LP(log_trace) << "received shutdown request, slot = " << slot; //NOLINT
std::stringstream ss;
ss << "bad request for endpoint: " << rq.command_case();
LOG(INFO) << ss.str();
notify_client(res.get(), tateyama::proto::diagnostics::Code::INVALID_REQUEST, ss.str());
}
return false;
}
}

bool routing_service_chain(const std::shared_ptr<tateyama::api::server::request>& req,
const std::shared_ptr<tateyama::api::server::response>& res) {
auto data = req->payload();
tateyama::proto::core::request::Request rq{};
if(! rq.ParseFromArray(data.data(), static_cast<int>(data.size()))) {
std::string error_message{"request parse error"};
LOG(INFO) << error_message;
notify_client(res.get(), tateyama::proto::diagnostics::Code::INVALID_REQUEST, error_message);
return false;
}

switch (rq.command_case()) {

case tateyama::proto::core::request::Request::kShutdown:
{
VLOG_LP(log_trace) << "received shutdown request"; //NOLINT
{
tateyama::session::shutdown_request_type shutdown_type{};
switch (rq.shutdown().type()) {
case tateyama::proto::endpoint::request::ShutdownType::SHUTDOWN_TYPE_NOT_SET:
case tateyama::proto::core::request::ShutdownType::SHUTDOWN_TYPE_NOT_SET:
shutdown_type = tateyama::session::shutdown_request_type::forceful;
break;
case tateyama::proto::endpoint::request::ShutdownType::GRACEFUL:
case tateyama::proto::core::request::ShutdownType::GRACEFUL:
shutdown_type = tateyama::session::shutdown_request_type::graceful;
break;
case tateyama::proto::endpoint::request::ShutdownType::FORCEFUL:
case tateyama::proto::core::request::ShutdownType::FORCEFUL:
shutdown_type = tateyama::session::shutdown_request_type::forceful;
break;
default: // error
Expand All @@ -270,22 +296,33 @@ class worker_common {
request_shutdown(shutdown_type);

// FIXME confirm when response should be sent
tateyama::proto::endpoint::response::Shutdown rp{};
tateyama::proto::core::response::Shutdown rp{};
auto body = rp.SerializeAsString();
res->body(body);
return true;
}
return true;
}

default: // error
case tateyama::proto::core::request::Request::kUpdateExpirationTime:
{
std::stringstream ss;
ss << "bad request (cancel in endpoint): " << rq.command_case();
LOG(INFO) << ss.str();
notify_client(res.get(), tateyama::proto::diagnostics::Code::INVALID_REQUEST, ss.str());
// mock impl. for UpdateExpirationTime // TODO
auto et = rq.update_expiration_time().expiration_time();
VLOG_LP(log_debug) <<
"UpdateExpirationTime received session_id:" << req->session_id() <<
" expiration_time:" << et;
tateyama::proto::core::response::UpdateExpirationTime rp{};
rp.mutable_success();
res->session_id(req->session_id());
auto body = rp.SerializeAsString();
res->body(body);
rp.clear_success();
return true;
}
return false;

default:
// this request can not be handled here;
return false;
}
}

Expand Down
39 changes: 30 additions & 9 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_worker.cpp
Expand Up @@ -81,23 +81,44 @@ void Worker::do_work() {
register_reqres(index,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
if (request->service_id() != tateyama::framework::service_id_endpoint_broker) {
bool exit_frag = false;
switch (request->service_id()) {
case tateyama::framework::service_id_endpoint_broker:
if (!endpoint_service(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response),
index)) {
VLOG_LP(log_info) << "terminate worker because endpoint service returns an error";
exit_frag = true;
}
break;

case tateyama::framework::service_id_routing:
if (routing_service_chain(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
break;
}
if (!service_(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
exit_frag = true;
}
break;

default:
if (!is_shuttingdown()) {
if (!service_(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
break;
exit_frag = true;
}
} else {
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "");
}
} else {
if (!endpoint_service(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response),
index)) {
VLOG_LP(log_info) << "terminate worker because endpoint service returns an error";
break;
}
break;

}
if (exit_frag) {
break;
}
request->dispose();
request = nullptr;
Expand Down
31 changes: 23 additions & 8 deletions src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp
Expand Up @@ -85,7 +85,29 @@ void stream_worker::do_work()
register_reqres(slot,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
if (request->service_id() != tateyama::framework::service_id_endpoint_broker) {
switch (request->service_id()) {
case tateyama::framework::service_id_endpoint_broker:
if (endpoint_service(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response),
slot)) {
continue;
}
VLOG_LP(log_info) << "terminate worker because endpoint service returns an error";
break;

case tateyama::framework::service_id_routing:
if (routing_service_chain(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
continue;
}
if (service_(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
continue;
}
VLOG_LP(log_info) << "terminate worker because service returns an error";
break;

default:
if (!is_shuttingdown()) {
if(service_(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
Expand All @@ -96,13 +118,6 @@ void stream_worker::do_work()
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "");
continue;
}
} else {
if (endpoint_service(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response),
slot)) {
continue;
}
VLOG_LP(log_info) << "terminate worker because endpoint service returns an error";
}
request = nullptr;
response = nullptr;
Expand Down
33 changes: 14 additions & 19 deletions src/tateyama/framework/routing_service.cpp
Expand Up @@ -31,6 +31,7 @@

#include <tateyama/proto/core/request.pb.h>
#include <tateyama/proto/core/response.pb.h>
#include <tateyama/proto/diagnostics.pb.h>

namespace tateyama::framework {

Expand Down Expand Up @@ -59,7 +60,7 @@ bool routing_service::shutdown(environment&) {
return true;
}

bool handle_update_expiration_time(
static bool service_(
std::shared_ptr<request> const& req,
std::shared_ptr<response> const& res
) {
Expand All @@ -70,26 +71,20 @@ bool handle_update_expiration_time(
VLOG_LP(log_error) << "request parse error";
return false;
}
if(rq.command_case() != ns::Request::kUpdateExpirationTime) {
tateyama::proto::diagnostics::Record record{};
switch (rq.command_case()) {
case ns::Request::kUpdateExpirationTime:
LOG_LP(INFO) << "does not support UpdateExpirationTime for this endpoint";
record.set_code(tateyama::proto::diagnostics::Code::UNSUPPORTED_OPERATION);
res->error(record);
return true;

default:
record.set_code(tateyama::proto::diagnostics::Code::UNKNOWN);
res->error(record);
LOG_LP(ERROR) << "bad request destination (routing service): " << rq.command_case();
return false;
}
if(! rq.has_update_expiration_time()) {
LOG_LP(ERROR) << "bad request content";
return false;
}
// mock impl. for UpdateExpirationTime // TODO
auto et = rq.update_expiration_time().expiration_time();
VLOG_LP(log_debug) <<
"UpdateExpirationTime received session_id:" << req->session_id() <<
" expiration_time:" << et;
tateyama::proto::core::response::UpdateExpirationTime rp{};
rp.mutable_success();
res->session_id(req->session_id());
auto body = rp.SerializeAsString();
res->body(body);
rp.clear_success();
return true;
}

bool routing_service::operator()(std::shared_ptr<request> req, std::shared_ptr<response> res) {
Expand All @@ -99,7 +94,7 @@ bool routing_service::operator()(std::shared_ptr<request> req, std::shared_ptr<r
}
if (req->service_id() == tag) {
// must be UpdateExpirationTime
return handle_update_expiration_time(req, res);
return service_(req, res);
}
if (auto destination = services_->find_by_id(req->service_id()); destination != nullptr) {
destination->operator()(std::move(req), std::move(res));
Expand Down
26 changes: 25 additions & 1 deletion src/tateyama/proto/core/request.proto
Expand Up @@ -19,9 +19,13 @@ message Request {

// the request command.
oneof command {
// update session expiration time operation.
UpdateExpirationTime update_expiration_time = 11;

// shutdown operation.
Shutdown shutdown = 12;
}
reserved 12 to 99;
reserved 13 to 99;
}

// update session expiration time
Expand All @@ -30,3 +34,23 @@ message UpdateExpirationTime {
// the expiration time (milliseconds from now) to be set
uint64 expiration_time = 1;
}

// kind of shutdown type.
enum ShutdownType {

// The default shutdown type.
SHUTDOWN_TYPE_NOT_SET = 0;

// Waits for the ongoing requests and safely shutdown the session.
GRACEFUL = 1;

// Cancelling the ongoing requests and safely shutdown the session.
FORCEFUL = 2;
}

// request shutdown to the session.
message Shutdown {

// the shutdown type.
ShutdownType type = 1;
}
5 changes: 5 additions & 0 deletions src/tateyama/proto/core/response.proto
Expand Up @@ -28,3 +28,8 @@ message UpdateExpirationTime {
UnknownError unknown_error = 12;
}
}

// shutdown operation.
message Shutdown {
// no special message
}
27 changes: 2 additions & 25 deletions src/tateyama/proto/endpoint/request.proto
Expand Up @@ -24,11 +24,8 @@ message Request {

// cancel operation.
Cancel cancel = 12;

// shutdown operation.
Shutdown shutdown = 13;
}
reserved 14 to 99;
reserved 13 to 99;
}

// handshake operation.
Expand Down Expand Up @@ -83,24 +80,4 @@ message WireInformation {
// cancel operation.
message Cancel {
// no special properties.
}

// kind of shutdown type.
enum ShutdownType {

// The default shutdown type.
SHUTDOWN_TYPE_NOT_SET = 0;

// Waits for the ongoing requests and safely shutdown the session.
GRACEFUL = 1;

// Cancelling the ongoing requests and safely shutdown the session.
FORCEFUL = 2;
}

// request shutdown to the session.
message Shutdown {

// the shutdown type.
ShutdownType type = 1;
}
}
5 changes: 0 additions & 5 deletions src/tateyama/proto/endpoint/response.proto
Expand Up @@ -38,9 +38,4 @@ message Handshake {
// the session id.
uint64 session_id = 11;
}
}

// shutdown operation.
message Shutdown {
// no special message
}
5 changes: 3 additions & 2 deletions test/tateyama/endpoint/ipc/ipc_info_test.cpp
Expand Up @@ -45,6 +45,7 @@ static constexpr std::size_t datachannel_buffer_size = 64 * 1024;
static constexpr tateyama::common::wire::message_header::index_type index_ = 1;
static constexpr std::string_view response_test_message = "opqrstuvwxyz";
static constexpr std::string_view request_test_message = "abcdefgh";
static constexpr std::size_t service_id_of_info_service = 101;

class info_service : public tateyama::framework::routing_service {
public:
Expand All @@ -53,7 +54,7 @@ class info_service : public tateyama::framework::routing_service {
bool shutdown(tateyama::framework::environment&) { return true; }
std::string_view label() const noexcept { return __func__; }

id_type id() const noexcept { return 100; } // dummy
id_type id() const noexcept { return service_id_of_info_service; }
bool operator ()(std::shared_ptr<tateyama::api::server::request> req,
std::shared_ptr<tateyama::api::server::response> res) override {
req_ = req;
Expand Down Expand Up @@ -109,7 +110,7 @@ TEST_F(ipc_info_test, basic) {
cci.release_credential();
hs.release_client_information();

client->send(0, std::string(request_test_message)); // we do not care service_id nor request message here
client->send(service_id_of_info_service, std::string(request_test_message)); // we do not care service_id nor request message here
std::string res{};
client->receive(res);

Expand Down

0 comments on commit 900bcc3

Please sign in to comment.