Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
CarterLi committed May 6, 2023
1 parent 3e5f8fb commit e42d01d
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 55 deletions.
39 changes: 20 additions & 19 deletions .vscode/c_cpp_properties.json
@@ -1,20 +1,21 @@
{
"configurations": [
{
"name": "Linux",
"includePath": [
"${workspaceFolder}/**"
],
"defines": [],
"compilerPath": "/usr/bin/clang++",
"cStandard": "c11",
"cppStandard": "c++20",
"intelliSenseMode": "clang-x64",
"compilerArgs": [
"-fcoroutines-ts",
"-stdlib=libc++"
]
}
],
"version": 4
}
"configurations": [
{
"name": "Linux",
"includePath": [
"${workspaceFolder}/**",
"${workspaceFolder}/include"
],
"defines": [],
"compilerPath": "/usr/bin/clang++",
"cStandard": "c17",
"cppStandard": "c++20",
"intelliSenseMode": "linux-clang-arm64",
"compilerArgs": [
"-fcoroutines"
],
"configurationProvider": "ms-vscode.cmake-tools"
}
],
"version": 4
}
9 changes: 5 additions & 4 deletions demo/echo_server.cpp
Expand Up @@ -20,8 +20,8 @@ enum {
int runningCoroutines = 0;

uio::task<> accept_connection(uio::io_service& service, int serverfd) {
while (int clientfd = co_await service.accept(serverfd, nullptr, nullptr)) {
[](uio::io_service& service, int clientfd) -> uio::task<> {
service.accept_multishot(serverfd, nullptr, nullptr).set_callback([&service, serverfd](int clientfd, unsigned flags) -> uio::task<> {

fmt::print("sockfd {} is accepted; number of running coroutines: {}\n",
clientfd, ++runningCoroutines);
#if USE_SPLICE
Expand Down Expand Up @@ -63,8 +63,9 @@ uio::task<> accept_connection(uio::io_service& service, int serverfd) {
co_await service.close(clientfd);
fmt::print("sockfd {} is closed; number of running coroutines: {}\n",
clientfd, --runningCoroutines);
}(service, clientfd);
}
});

co_return;
}

int main(int argc, char *argv[]) {
Expand Down
20 changes: 19 additions & 1 deletion include/liburing/io_service.hpp
Expand Up @@ -430,6 +430,24 @@ class io_service {
return await_work(sqe, iflags);
}

/** Accept connections on a socket asynchronously
* @see accept4(2)
* @see io_uring_enter(2) IORING_ACCEPT_MULTISHOT
* @param iflags IOSQE_* flags
* @return
*/
sqe_awaitable accept_multishot(
int fd,
sockaddr *addr,
socklen_t *addrlen,
int flags = 0,
uint8_t iflags = 0
) noexcept {
auto* sqe = io_uring_get_sqe_safe();
io_uring_prep_multishot_accept(sqe, fd, addr, addrlen, flags);
return await_work(sqe, iflags);
}

/** Initiate a connection on a socket asynchronously
* @see connect(2)
* @see io_uring_enter(2) IORING_OP_CONNECT
Expand Down Expand Up @@ -721,7 +739,7 @@ class io_service {
io_uring_for_each_cqe(&ring, head, cqe) {
++cqe_count;
auto coro = static_cast<resolver *>(io_uring_cqe_get_data(cqe));
if (coro) coro->resolve(cqe->res);
if (coro) coro->resolve(cqe->res, cqe->flags);
}

printf_if_verbose(__FILE__ ": Found %u cqe(s), looping...\n", cqe_count);
Expand Down
46 changes: 15 additions & 31 deletions include/liburing/sqe_awaitable.hpp
Expand Up @@ -9,59 +9,42 @@

namespace uio {
struct resolver {
virtual void resolve(int result) noexcept = 0;
virtual void resolve(int32_t result, uint32_t flags) noexcept = 0;
};

struct resume_resolver final: resolver {
friend struct sqe_awaitable;

void resolve(int result) noexcept override {
void resolve(int32_t result, uint32_t flags) noexcept override {
this->result = result;
this->flags = flags;
handle.resume();
}

private:
std::coroutine_handle<> handle;
int result = 0;
uint32_t result = 0;
uint32_t flags = 0;
};
static_assert(std::is_trivially_destructible_v<resume_resolver>);

struct deferred_resolver final: resolver {
void resolve(int result) noexcept override {
this->result = result;
}

#ifndef NDEBUG
~deferred_resolver() {
assert(!!result && "deferred_resolver is destructed before it's resolved");
}
#endif

std::optional<int> result;
};

struct callback_resolver final: resolver {
callback_resolver(std::function<void (int result)>&& cb): cb(std::move(cb)) {}
callback_resolver(std::function<void (int32_t result, uint32_t flags)>&& cb): cb(std::move(cb)) {}

void resolve(int result) noexcept override {
this->cb(result);
delete this;
void resolve(int32_t result, uint32_t flags) noexcept override {
this->cb(result, flags);
if (!(flags & IORING_CQE_F_MORE)) delete this;
}

private:
std::function<void (int result)> cb;
std::function<void (int32_t result, uint32_t flags)> cb;
};

struct sqe_awaitable {
// TODO: use cancel_token to implement cancellation
sqe_awaitable(io_uring_sqe* sqe) noexcept: sqe(sqe) {}

// User MUST keep resolver alive before the operation is finished
void set_deferred(deferred_resolver& resolver) {
io_uring_sqe_set_data(sqe, &resolver);
}

void set_callback(std::function<void (int result)> cb) {
void set_callback(std::function<void (int32_t result, uint32_t flags)> cb) {
io_uring_sqe_set_data(sqe, new callback_resolver(std::move(cb)));
}

Expand All @@ -72,14 +55,15 @@ struct sqe_awaitable {

await_sqe(io_uring_sqe* sqe): sqe(sqe) {}

constexpr bool await_ready() const noexcept { return false; }
constexpr bool await_ready() const noexcept { return !!sqe; }

void await_suspend(std::coroutine_handle<> handle) noexcept {
resolver.handle = handle;
io_uring_sqe_set_data(sqe, &resolver);
if (sqe) io_uring_sqe_set_data(sqe, &resolver);
sqe = nullptr;
}

constexpr int await_resume() const noexcept { return resolver.result; }
constexpr int await_resume() noexcept { return resolver.result; }
};

return await_sqe(sqe);
Expand Down

0 comments on commit e42d01d

Please sign in to comment.