Skip to content

Commit

Permalink
graceful shutdown on SIGINT/SIGTERM, rate-limiting, basic echo websoc…
Browse files Browse the repository at this point in the history
…ket server
  • Loading branch information
catink123 committed Jan 7, 2024
1 parent f3d19df commit 7ff8305
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ add_executable(
http_listener.cpp
http_session.hpp
http_session.cpp
websocket_session.hpp
websocket_session.cpp
)

set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 17)
Expand Down
22 changes: 22 additions & 0 deletions src/client/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,27 @@
</head>
<body>
<h1>Test Client Page</h1>

<p style="display: flex; gap: 5px">
<span>Text</span>
<input type="text" id="ws-message">
</p>

<button id="ws-send" disabled>Send</button>

<script>
let ws = new WebSocket("ws://" + location.host);

ws.addEventListener('message', e => {
alert(`Message from server: ${e.data}`);
});

ws.addEventListener('open', () => {
document.querySelector('#ws-send').disabled = false;
document.querySelector('#ws-send').addEventListener('click', () => {
ws.send(document.querySelector('#ws-message').value);
});
});
</script>
</body>
</html>
2 changes: 2 additions & 0 deletions src/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
#include <boost/config.hpp>
Expand All @@ -13,5 +14,6 @@ namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::beast::net;
using tcp = net::ip::tcp;
namespace websocket = beast::websocket;

#endif
79 changes: 61 additions & 18 deletions src/http_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ http_session::http_session(
tcp::socket&& socket,
const std::shared_ptr<const std::string>& doc_root
) : stream(std::move(socket)),
doc_root(doc_root) {}
doc_root(doc_root)
{
static_assert(queue_limit > 0, "queue limit must be non-zero and positive");
response_queue.reserve(queue_limit);
}

void http_session::run() {
net::dispatch(
Expand All @@ -17,12 +21,15 @@ void http_session::run() {
}

void http_session::do_read() {
// clear the request
req = {};
// make a new parser for each request
parser.emplace();

// set a max body size of 10k to prevent abuse
parser->body_limit(10000);

stream.expires_after(std::chrono::seconds(30));

http::async_read(stream, buffer, req,
http::async_read(stream, buffer, *parser,
beast::bind_front_handler(
&http_session::on_read,
shared_from_this()
Expand All @@ -46,23 +53,57 @@ void http_session::on_read(
return;
}

send_response(
handle_request(*doc_root, std::move(req))
// if the request is a WebSocket Upgrade
if (websocket::is_upgrade(parser->get())) {
// create a new websocket session, moving the socket and request into it
std::make_shared<websocket_session>(
stream.release_socket()
)->do_accept(parser->release());
return;
}

// send the response back
queue_write(
handle_request(*doc_root, parser->release())
);

// if the response queue is not at it's limit, try to add another response to the queue
if (response_queue.size() < queue_limit) {
do_read();
}
}

void http_session::send_response(http::message_generator&& msg) {
bool keep_alive = msg.keep_alive();
void http_session::queue_write(http::message_generator msg) {
// store the work
response_queue.push_back(std::move(msg));

beast::async_write(
stream,
std::move(msg),
beast::bind_front_handler(
&http_session::on_write,
shared_from_this(),
keep_alive
)
);
// if there wasn't any work before, start the write loop
if (response_queue.size() == 1) {
do_write();
}
}

bool http_session::do_write() {
const bool was_full = response_queue.size() == queue_limit;

if (!response_queue.empty()) {
http::message_generator msg = std::move(response_queue.front());
response_queue.erase(response_queue.begin());

bool keep_alive = msg.keep_alive();

beast::async_write(
stream,
std::move(msg),
beast::bind_front_handler(
&http_session::on_write,
shared_from_this(),
keep_alive
)
);
}

return was_full;
}

std::string path_cat(
Expand Down Expand Up @@ -274,7 +315,9 @@ void http_session::on_write(
return do_close();
}

do_read();
if (do_write()) {
do_read();
}
}

void http_session::do_close() {
Expand Down
12 changes: 10 additions & 2 deletions src/http_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
#define HTTP_SESSION_HPP

#include "common.hpp"
#include <boost/optional.hpp>
#include <memory>
#include <string>
#include <chrono>
#include "websocket_session.hpp"

beast::string_view mime_type(
beast::string_view path
Expand All @@ -26,7 +28,12 @@ class http_session : public std::enable_shared_from_this<http_session> {
beast::tcp_stream stream;
beast::flat_buffer buffer;
std::shared_ptr<const std::string> doc_root;
http::request<http::string_body> req;

// a queue to prevent overload
static constexpr std::size_t queue_limit = 16;
std::vector<http::message_generator> response_queue;

boost::optional<http::request_parser<http::string_body>> parser;

public:
http_session(
Expand All @@ -42,10 +49,11 @@ class http_session : public std::enable_shared_from_this<http_session> {
std::size_t bytes_transferred
);

void send_response(http::message_generator&& msg);
void queue_write(http::message_generator msg);

void do_close();

bool do_write();
void on_write(
bool keep_alive,
beast::error_code ec,
Expand Down
16 changes: 16 additions & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <nlohmann/json.hpp>
#include <boost/asio/signal_set.hpp>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -22,6 +23,16 @@ int main() {

std::cout << "Server started at " << ADDRESS << ":" << PORT << "." << std::endl;

// graceful shutdown
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait(
[&](const beast::error_code&, int) {
// abort all operations
ioc.stop();
std::cout << "Server is stopping..." << std::endl;
}
);

std::vector<std::thread> v;
v.reserve(THREAD_COUNT - 1);
for (auto i = 0; i < THREAD_COUNT - 1; ++i) {
Expand All @@ -33,5 +44,10 @@ int main() {
}
ioc.run();

// if the program is here, the graceful shutdown is in progress, wait for all threads to end
for (std::thread& th : v) {
th.join();
}

return 0;
}
69 changes: 69 additions & 0 deletions src/websocket_session.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include "websocket_session.hpp"

websocket_session::websocket_session(
tcp::socket&& socket
) : ws(std::move(socket)) {}

void websocket_session::on_accept(beast::error_code ec) {
if (ec) {
std::cerr << "Couldn't accept a WebSocket request: " << ec.message() << std::endl;
return;
}

// read the message
do_read();
}

void websocket_session::do_read() {
// read into buffer
ws.async_read(
buffer,
beast::bind_front_handler(
&websocket_session::on_read,
shared_from_this()
)
);
}

void websocket_session::on_read(
beast::error_code ec,
std::size_t bytes_transferred
) {
boost::ignore_unused(bytes_transferred);

if (ec == websocket::error::closed) {
return;
}

if (ec) {
std::cerr << "Couldn't read incoming WebSocket message: " << ec.message() << std::endl;
return;
}

ws.text(ws.got_text());

ws.async_write(
buffer.data(),
beast::bind_front_handler(
&websocket_session::on_write,
shared_from_this()
)
);
}

void websocket_session::on_write(
beast::error_code ec,
std::size_t bytes_transferred
) {
boost::ignore_unused(bytes_transferred);

if (ec) {
std::cerr << "Couldn't write to a WebSocket stream: " << ec.message() << std::endl;
return;
}

buffer.consume(buffer.size());

// start another read
do_read();
}
50 changes: 50 additions & 0 deletions src/websocket_session.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#ifndef WEBSOCKET_SESSION_HPP
#define WEBSOCKET_SESSION_HPP

#include "common.hpp"
#include <boost/beast/websocket.hpp>
#include <memory>

class websocket_session : public std::enable_shared_from_this<websocket_session> {
websocket::stream<beast::tcp_stream> ws;
beast::flat_buffer buffer;

public:
explicit websocket_session(tcp::socket&& socket);

template<class Body, class Allocator>
void do_accept(
http::request<Body, http::basic_fields<Allocator>> req
) {
ws.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::server
)
);

// append a Server field to every response
ws.set_option(
websocket::stream_base::decorator(
[](websocket::response_type& res) {
res.set(http::field::server, VERSION);
}
)
);

ws.async_accept(
req,
beast::bind_front_handler(
&websocket_session::on_accept,
shared_from_this()
)
);
}

private:
void on_accept(beast::error_code ec);
void do_read();
void on_write(beast::error_code ec, std::size_t bytes_transferred);
void on_read(beast::error_code ec, std::size_t bytes_transferred);
};

#endif

0 comments on commit 7ff8305

Please sign in to comment.