Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dgram: make UDPWrap more reusable #31871

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/dgram.js
Expand Up @@ -231,7 +231,9 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
this.on('listening', onListening);
}

if (port instanceof UDP) {
if (port !== null &&
typeof port === 'object' &&
typeof port.recvStart === 'function') {
jasnell marked this conversation as resolved.
Show resolved Hide resolved
replaceHandle(this, port);
startListening(this);
return this;
Expand Down
205 changes: 151 additions & 54 deletions src/udp_wrap.cc
Expand Up @@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env,
}


inline bool SendWrap::have_callback() const {
bool SendWrap::have_callback() const {
return have_callback_;
}

UDPListener::~UDPListener() {
if (wrap_ != nullptr)
wrap_->set_listener(nullptr);
}

UDPWrapBase::~UDPWrapBase() {
set_listener(nullptr);
}

UDPListener* UDPWrapBase::listener() const {
CHECK_NOT_NULL(listener_);
return listener_;
}

void UDPWrapBase::set_listener(UDPListener* listener) {
if (listener_ != nullptr)
listener_->wrap_ = nullptr;
listener_ = listener;
if (listener_ != nullptr) {
CHECK_NULL(listener_->wrap_);
listener_->wrap_ = this;
}
}

UDPWrapBase* UDPWrapBase::FromObject(Local<Object> obj) {
CHECK_GT(obj->InternalFieldCount(), UDPWrapBase::kUDPWrapBaseField);
return static_cast<UDPWrapBase*>(
obj->GetAlignedPointerFromInternalField(UDPWrapBase::kUDPWrapBaseField));
}

void UDPWrapBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
env->SetProtoMethod(t, "recvStart", RecvStart);
env->SetProtoMethod(t, "recvStop", RecvStop);
}

UDPWrap::UDPWrap(Environment* env, Local<Object> object)
: HandleWrap(env,
object,
reinterpret_cast<uv_handle_t*>(&handle_),
AsyncWrap::PROVIDER_UDPWRAP) {
object->SetAlignedPointerInInternalField(
UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));

int r = uv_udp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0); // can't fail anyway

set_listener(this);
}


Expand All @@ -91,7 +130,8 @@ void UDPWrap::Initialize(Local<Object> target,
Environment* env = Environment::GetCurrent(context);

Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
t->InstanceTemplate()->SetInternalFieldCount(UDPWrap::kInternalFieldCount);
t->InstanceTemplate()->SetInternalFieldCount(
UDPWrapBase::kInternalFieldCount);
Local<String> udpString =
FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");
t->SetClassName(udpString);
Expand All @@ -112,6 +152,7 @@ void UDPWrap::Initialize(Local<Object> target,
Local<FunctionTemplate>(),
attributes);

UDPWrapBase::AddMethods(env, t);
env->SetProtoMethod(t, "open", Open);
env->SetProtoMethod(t, "bind", Bind);
env->SetProtoMethod(t, "connect", Connect);
Expand All @@ -120,8 +161,6 @@ void UDPWrap::Initialize(Local<Object> target,
env->SetProtoMethod(t, "connect6", Connect6);
env->SetProtoMethod(t, "send6", Send6);
env->SetProtoMethod(t, "disconnect", Disconnect);
env->SetProtoMethod(t, "recvStart", RecvStart);
env->SetProtoMethod(t, "recvStop", RecvStop);
env->SetProtoMethod(t, "getpeername",
GetSockOrPeerName<UDPWrap, uv_udp_getpeername>);
env->SetProtoMethod(t, "getsockname",
Expand Down Expand Up @@ -220,6 +259,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
flags);
}

if (err == 0)
wrap->listener()->OnAfterBind();

args.GetReturnValue().Set(err);
}

Expand Down Expand Up @@ -464,14 +506,10 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
CHECK(args[3]->IsBoolean());
}

Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Array> chunks = args[1].As<Array>();
// it is faster to fetch the length of the
// array in js-land
size_t count = args[2].As<Uint32>()->Value();
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();

size_t msg_size = 0;

MaybeStackBuffer<uv_buf_t, 16> bufs(count);

Expand All @@ -482,7 +520,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
size_t length = Buffer::Length(chunk);

bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
msg_size += length;
}

int err = 0;
Expand All @@ -492,14 +529,36 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
const unsigned short port = args[3].As<Uint32>()->Value();
node::Utf8Value address(env->isolate(), args[4]);
err = sockaddr_for_family(family, address.out(), port, &addr_storage);
if (err == 0) {
if (err == 0)
addr = reinterpret_cast<sockaddr*>(&addr_storage);
}
}

uv_buf_t* bufs_ptr = *bufs;
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
if (err == 0) {
wrap->current_send_req_wrap_ = args[0].As<Object>();
wrap->current_send_has_callback_ =
sendto ? args[5]->IsTrue() : args[3]->IsTrue();

err = wrap->Send(*bufs, count, addr);

wrap->current_send_req_wrap_.Clear();
wrap->current_send_has_callback_ = false;
}
jasnell marked this conversation as resolved.
Show resolved Hide resolved

args.GetReturnValue().Set(err);
}

ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
size_t count,
const sockaddr* addr) {
if (IsHandleClosing()) return UV_EBADF;

size_t msg_size = 0;
for (size_t i = 0; i < count; i++)
msg_size += bufs_ptr[i].len;

int err = 0;
if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
if (err == UV_ENOSYS || err == UV_EAGAIN) {
err = 0;
} else if (err >= 0) {
Expand All @@ -517,28 +576,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
CHECK_EQ(static_cast<size_t>(err), msg_size);
// + 1 so that the JS side can distinguish 0-length async sends from
// 0-length sync sends.
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
return;
return msg_size + 1;
}
}
}

if (err == 0) {
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
req_wrap->msg_size = msg_size;

err = req_wrap->Dispatch(uv_udp_send,
&wrap->handle_,
bufs_ptr,
count,
addr,
OnSend);
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
if (req_wrap == nullptr) return UV_ENOSYS;

err = req_wrap->Dispatch(
uv_udp_send,
&handle_,
bufs_ptr,
count,
addr,
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
self->listener()->OnSendDone(
ReqWrap<uv_udp_send_t>::from_req(req), status);
}});
if (err)
delete req_wrap;
}

args.GetReturnValue().Set(err);
return err;
}


ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
SendWrap* req_wrap = new SendWrap(env(),
current_send_req_wrap_,
current_send_has_callback_);
req_wrap->msg_size = msg_size;
return req_wrap;
}


Expand All @@ -552,31 +624,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
}


void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
AsyncWrap* UDPWrap::GetAsyncWrap() {
return this;
}

int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
return uv_udp_getpeername(&handle_, name, namelen);
}

int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
return uv_udp_getsockname(&handle_, name, namelen);
}

void UDPWrapBase::RecvStart(const FunctionCallbackInfo<Value>& args) {
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart());
}

int UDPWrap::RecvStart() {
if (IsHandleClosing()) return UV_EBADF;
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
// UV_EALREADY means that the socket is already bound but that's okay
if (err == UV_EALREADY)
err = 0;
args.GetReturnValue().Set(err);
return err;
}


void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int r = uv_udp_recv_stop(&wrap->handle_);
args.GetReturnValue().Set(r);
void UDPWrapBase::RecvStop(const FunctionCallbackInfo<Value>& args) {
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop());
}

int UDPWrap::RecvStop() {
if (IsHandleClosing()) return UV_EBADF;
return uv_udp_recv_stop(&handle_);
}


void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
if (req_wrap->have_callback()) {
Environment* env = req_wrap->env();
HandleScope handle_scope(env->isolate());
Expand All @@ -593,43 +680,53 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
void UDPWrap::OnAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
*buf = wrap->env()->AllocateManaged(suggested_size).release();
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
reinterpret_cast<uv_udp_t*>(handle));
*buf = wrap->listener()->OnAlloc(suggested_size);
}

uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
return env()->AllocateManaged(suggested_size).release();
}

void UDPWrap::OnRecv(uv_udp_t* handle,
ssize_t nread,
const uv_buf_t* buf_,
const struct sockaddr* addr,
const uv_buf_t* buf,
const sockaddr* addr,
unsigned int flags) {
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
Environment* env = wrap->env();
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
wrap->listener()->OnRecv(nread, *buf, addr, flags);
}

AllocatedBuffer buf(env, *buf_);
void UDPWrap::OnRecv(ssize_t nread,
const uv_buf_t& buf_,
const sockaddr* addr,
unsigned int flags) {
Environment* env = this->env();
AllocatedBuffer buf(env, buf_);
if (nread == 0 && addr == nullptr) {
return;
}

HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

Local<Object> wrap_obj = wrap->object();
Local<Value> argv[] = {
Integer::New(env->isolate(), nread),
wrap_obj,
object(),
Undefined(env->isolate()),
Undefined(env->isolate())
};

if (nread < 0) {
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
return;
}

buf.Resize(nread);
argv[2] = buf.ToBuffer().ToLocalChecked();
argv[3] = AddressToJS(env, addr);
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
}

MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,
Expand Down