Skip to content

Commit

Permalink
JSRPC: Extend js-rpc-test to test over real sockets, and fix a bug.
Browse files Browse the repository at this point in the history
Without creating a PendingEvent for every incoming RpcTarget, the IoContext may be canceled prematurely, because the system thinks that there's no way more events could arrive.

This problem didn't come up when using direct service bindings because the PendingEvent system does not cancel the IoContext until the thread's event loop has been emptied, just before going back to the OS to wait for I/O. With local service bindings, the communications back and forth between the services all happens entirely in the local event loop without requiring any I/O, so the test completes before the PendingEvent system can kick in and cause a problem. When we use a real socket, communications between Workers requires actual OS I/O, so the PendingEvent has a chance to do its thing.
  • Loading branch information
kentonv committed Mar 5, 2024
1 parent 24f2073 commit be71b89
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
22 changes: 20 additions & 2 deletions src/workerd/api/tests/js-rpc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ export let namedServiceBinding = {
assert.strictEqual(sawFinally, true);

// `fetch()` is special, the params get converted into a Request.
let resp = await env.MyService.fetch("http://foo", {method: "POST"});
assert.strictEqual(await resp.text(), "method = POST, url = http://foo");
let resp = await env.MyService.fetch("http://foo/", {method: "POST"});
assert.strictEqual(await resp.text(), "method = POST, url = http://foo/");

await assert.rejects(() => env.MyService.instanceMethod(), {
name: "TypeError",
Expand Down Expand Up @@ -483,3 +483,21 @@ export let crossContextSharingDoesntWork = {
await assert.rejects(() => env.MyService.tryUseGlobalRpcPromisePipeline(), expectedError);
},
}

function withRealSocket(inner) {
return {
async test(controller, env, ctx) {
let subEnv = {...env};
subEnv.MyService = subEnv.MyServiceOverRealSocket;
await inner.test(controller, subEnv, ctx);
}
}
}

// Export versions of various tests above using MyService over a real socket. We only bother
// with thests that use MyService. The `z_` prefix makes these tests run last.
export let z_namedServiceBinding_realSocket = withRealSocket(namedServiceBinding);
export let z_sendStubOverRpc_realSocket = withRealSocket(sendStubOverRpc);
export let z_receiveStubOverRpc_realSocket = withRealSocket(receiveStubOverRpc);
export let z_promisePipelining_realSocket = withRealSocket(promisePipelining);
export let z_crossContextSharingDoesntWork_realSocket = withRealSocket(crossContextSharingDoesntWork);
14 changes: 14 additions & 0 deletions src/workerd/api/tests/js-rpc-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const unitTests :Workerd.Config = (
bindings = [
(name = "self", service = (name = "js-rpc-test", entrypoint = "nonClass")),
(name = "MyService", service = (name = "js-rpc-test", entrypoint = "MyService")),
(name = "MyServiceOverRealSocket", service = "loop"),
(name = "MyActor", durableObjectNamespace = "MyActor"),
(name = "ActorNoExtends", durableObjectNamespace = "ActorNoExtends"),
(name = "defaultExport", service = "js-rpc-test"),
Expand All @@ -26,5 +27,18 @@ const unitTests :Workerd.Config = (
durableObjectStorage = (inMemory = void),
)
),
( name = "loop",
external = (
address = "loopback:loop",
http = (capnpConnectHost = "cappy")
)
)
],
sockets = [
( name = "loop",
address = "loopback:loop",
service = (name = "js-rpc-test", entrypoint = "MyService"),
http = (capnpConnectHost = "cappy")
)
],
);
12 changes: 11 additions & 1 deletion src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,8 @@ public:
TransientJsRpcTarget(IoContext& ioCtx, jsg::JsRef<jsg::JsObject> object,
bool allowInstanceProperties = false)
: JsRpcTargetBase(ioCtx), object(kj::mv(object)),
allowInstanceProperties(allowInstanceProperties) {}
allowInstanceProperties(allowInstanceProperties),
pendingEvent(ioCtx.registerPendingEvent()) {}

TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) {
return {
Expand All @@ -768,6 +769,15 @@ public:
private:
jsg::JsRef<jsg::JsObject> object;
bool allowInstanceProperties;

// An RpcTarget could receive a new call (in the existing IoContext) at any time, therefore
// its existence counts as a PendingEvent. If we don't hold a PendingEvent, then the IoContext
// may decide that there's nothing more than can possibly happen in this context, and cancel
// itself.
//
// Note that it's OK if we hold this past the lifetime of the IoContext itself; the PendingEvent
// becomes detached in that case and has no effect.
kj::Own<void> pendingEvent;
};

static kj::Maybe<rpc::JsRpcTarget::Client> makeCallPipeline(jsg::Lock& js, jsg::JsValue value) {
Expand Down
24 changes: 17 additions & 7 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,6 @@ IoContext::IncomingRequest::~IoContext_IncomingRequest() noexcept(false) {
metrics->jsDone();
}

IoContext::~IoContext() noexcept(false) {
// Kill the sentinel so that no weak references can refer to this IoContext anymore.
selfRef->invalidate();
}

InputGate::Lock IoContext::getInputLock() {
return KJ_ASSERT_NONNULL(currentInputLock,
"no input lock available in this context").addRef();
Expand Down Expand Up @@ -492,14 +487,29 @@ kj::Promise<bool> IoContext::IncomingRequest::finishScheduled() {

class IoContext::PendingEvent: public kj::Refcounted {
public:
explicit PendingEvent(IoContext& context): context(context) {}
explicit PendingEvent(IoContext& context): maybeContext(context) {}
~PendingEvent() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(PendingEvent);

IoContext& context;
kj::Maybe<IoContext&> maybeContext;
};

IoContext::~IoContext() noexcept(false) {
// Detach the PendingEvent if it still exists.
KJ_IF_SOME(pe, pendingEvent) {
pe.maybeContext = kj::none;
}

// Kill the sentinel so that no weak references can refer to this IoContext anymore.
selfRef->invalidate();
}

IoContext::PendingEvent::~PendingEvent() noexcept(false) {
IoContext& context = KJ_UNWRAP_OR(maybeContext, {
// IoContext must have been destroyed before the PendingEvent was.
return;
});

context.pendingEvent = nullptr;

// We can't execute finalizers just yet. We need to run the event loop to see if any queued
Expand Down

0 comments on commit be71b89

Please sign in to comment.