Skip to content

Commit

Permalink
decouple writing from reading in websockets, implement a messaging sy…
Browse files Browse the repository at this point in the history
…stem through JSON, update client page to test messaging
  • Loading branch information
catink123 committed Jan 10, 2024
1 parent 7ff8305 commit ef5ba7b
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 22 deletions.
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ add_executable(
http_session.cpp
websocket_session.hpp
websocket_session.cpp
json_message.hpp
json_message.cpp
)

set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 17)
Expand Down
30 changes: 19 additions & 11 deletions src/client/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,33 @@
<body>
<h1>Test Client Page</h1>

<p style="display: flex; gap: 5px">
<span>Text</span>
<input type="text" id="ws-message">
</p>

<button id="ws-send" disabled>Send</button>
<button disabled id="send-query">Query State</button>
<button disabled id="send-up">Send Raise</button>
<button disabled id="send-down">Send Lower</button>

<script>
let ws = new WebSocket("ws://" + location.host);

ws.addEventListener('message', e => {
alert(`Message from server: ${e.data}`);
let msg = JSON.parse(e.data);
if (msg.type == "text") alert(`Text from Server: ${msg.payload}`);
if (msg.type == "query_state_result") alert(`Current Gate State: ${msg.payload}`);
if (msg.type == "availability") alert(`Availability: ${msg.payload}`);
});

function setListener(selector, type, listener) {
document.querySelector(selector).addEventListener(type, listener);
}

function bindMessage(selector, msgObj) {
setListener(selector, "click", () => ws.send(JSON.stringify(msgObj)));
}

ws.addEventListener('open', () => {
document.querySelector('#ws-send').disabled = false;
document.querySelector('#ws-send').addEventListener('click', () => {
ws.send(document.querySelector('#ws-message').value);
});
document.querySelectorAll('button').forEach(btn => btn.disabled = false);
bindMessage("#send-query", { type: "query_state", payload: null });
bindMessage("#send-up", { type: "change_state", payload: true });
bindMessage("#send-down", { type: "change_state", payload: false });
});
</script>
</body>
Expand Down
54 changes: 54 additions & 0 deletions src/json_message.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include "json_message.hpp"

std::string json_message::type_to_str(const MessageType& type) {
if (type == QueryState) return "query_state";
if (type == QueryStateResult) return "query_state_result";
if (type == ChangeState) return "change_state";
if (type == Availability) return "availability";
if (type == Text) return "text";
}

Check warning on line 9 in src/json_message.cpp

View workflow job for this annotation

GitHub Actions / build_and_upload

'json_message::type_to_str': not all control paths return a value [D:\a\gate-control\gate-control\out\windows-x64-release\src\GateControl.vcxproj]

Check warning on line 9 in src/json_message.cpp

View workflow job for this annotation

GitHub Actions / build_and_upload

'json_message::type_to_str': not all control paths return a value [D:\a\gate-control\gate-control\out\windows-x64-debug\src\GateControl.vcxproj]

json_message::MessageType json_message::str_to_type(const std::string_view str) {
if (str == "query_state") return QueryState;
if (str == "query_state_result") return QueryStateResult;
if (str == "change_state") return ChangeState;
if (str == "availability") return Availability;
if (str == "text") return Text;
}

Check warning on line 17 in src/json_message.cpp

View workflow job for this annotation

GitHub Actions / build_and_upload

'json_message::str_to_type': not all control paths return a value [D:\a\gate-control\gate-control\out\windows-x64-release\src\GateControl.vcxproj]

Check warning on line 17 in src/json_message.cpp

View workflow job for this annotation

GitHub Actions / build_and_upload

'json_message::str_to_type': not all control paths return a value [D:\a\gate-control\gate-control\out\windows-x64-debug\src\GateControl.vcxproj]

std::string json_message::create_message(
const MessageType type,
const nlohmann::json payload
) {
return nlohmann::json(
{
{ "type", type_to_str(type) },
{ "payload", payload }
}
).dump();
}

std::string json_message::create_message(
const std::pair<const MessageType, const nlohmann::json> msg
) {
return create_message(msg.first, msg.second);
}

json_message json_message::parse_message(
const std::string_view str
) {
nlohmann::json parsed_json = nlohmann::json::parse(str);

if (
!parsed_json.is_object() ||
!parsed_json["type"].is_string() ||
(!parsed_json["payload"].is_primitive() && !parsed_json["payload"].is_structured())
) {
throw json_message_parse_error("malformed JSON data");
}

return json_message(
json_message::str_to_type(parsed_json["type"].get<std::string>()),
parsed_json["payload"]
);
}
45 changes: 45 additions & 0 deletions src/json_message.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#ifndef JSON_MESSAGE_HPP
#define JSON_MESSAGE_HPP

#include <nlohmann/json.hpp>
#include <string>

class json_message {
public:
enum MessageType {
QueryState,
QueryStateResult,
ChangeState,
Availability,
Text
};

class json_message_parse_error : std::runtime_error {
public:
json_message_parse_error(const char* what) : std::runtime_error(what) {}
};

MessageType type;
nlohmann::json::value_t payload;

json_message(
const MessageType type,
nlohmann::json::value_t payload
) : type(type), payload(payload) {}

static std::string type_to_str(const MessageType& type);
static MessageType str_to_type(const std::string_view str);

static std::string create_message(
const MessageType type,
const nlohmann::json payload = nullptr
);

static std::string create_message(
const std::pair<const MessageType, const nlohmann::json> msg
);

static json_message parse_message(const std::string_view str);
};

#endif
67 changes: 56 additions & 11 deletions src/websocket_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ void websocket_session::on_accept(beast::error_code ec) {

// read the message
do_read();

// attempt to clear the write queue
do_write();
}

void websocket_session::do_read() {
Expand All @@ -25,6 +28,34 @@ void websocket_session::do_read() {
);
}

void websocket_session::do_write() {
// if there is something to send, do it
if (write_queue.size() > 0) {
write_buffer = write_queue.front();
write_queue.pop();

ws.async_write(
net::buffer(write_buffer),
beast::bind_front_handler(
&websocket_session::on_write,
shared_from_this()
)
);
}
// if there isn't, call this function again asyncronously
else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));

net::post(
ws.get_executor(),
beast::bind_front_handler(
&websocket_session::do_write,
shared_from_this()
)
);
}
}

void websocket_session::on_read(
beast::error_code ec,
std::size_t bytes_transferred
Expand All @@ -40,15 +71,16 @@ void websocket_session::on_read(
return;
}

ws.text(ws.got_text());
if (ws.got_text()) {
auto buffer_data = buffer.data();
std::string message(reinterpret_cast<const char*>(buffer_data.data()), buffer_data.size());

ws.async_write(
buffer.data(),
beast::bind_front_handler(
&websocket_session::on_write,
shared_from_this()
)
);
handle_message(message);
}

buffer.consume(buffer.size());

do_read();
}

void websocket_session::on_write(
Expand All @@ -62,8 +94,21 @@ void websocket_session::on_write(
return;
}

buffer.consume(buffer.size());
do_write();
}

// start another read
do_read();
void websocket_session::queue_message(std::string_view message) {
write_queue.push(std::string(message));
}

void websocket_session::handle_message(std::string_view message) {
try {
auto parsed_msg = json_message::parse_message(message);

if (parsed_msg.type == json_message::QueryState)
queue_message(json_message::create_message(json_message::QueryStateResult));
if (parsed_msg.type == json_message::ChangeState)
queue_message(json_message::create_message(json_message::Text, "changing of state not implemented!"));
}
catch (...) {}
}
11 changes: 11 additions & 0 deletions src/websocket_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@

#include "common.hpp"
#include <boost/beast/websocket.hpp>
#include <boost/asio/placeholders.hpp>
#include <memory>
#include <queue>
#include <thread>
#include "json_message.hpp"

class websocket_session : public std::enable_shared_from_this<websocket_session> {
websocket::stream<beast::tcp_stream> ws;
beast::flat_buffer buffer;
std::string write_buffer;
std::queue<std::string> write_queue;

public:
explicit websocket_session(tcp::socket&& socket);
Expand Down Expand Up @@ -42,9 +48,14 @@ class websocket_session : public std::enable_shared_from_this<websocket_session>

private:
void on_accept(beast::error_code ec);
void do_write();
void do_read();
void on_write(beast::error_code ec, std::size_t bytes_transferred);
void on_read(beast::error_code ec, std::size_t bytes_transferred);

void queue_message(std::string_view message);

void handle_message(std::string_view message);
};

#endif

0 comments on commit ef5ba7b

Please sign in to comment.