Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSRPC: Support capnp-over-http for external server bindings #1757

Merged
merged 3 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 22 additions & 2 deletions src/workerd/api/tests/js-rpc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ export let namedServiceBinding = {
assert.strictEqual(sawFinally, true);

// `fetch()` is special, the params get converted into a Request.
let resp = await env.MyService.fetch("http://foo", {method: "POST"});
assert.strictEqual(await resp.text(), "method = POST, url = http://foo");
let resp = await env.MyService.fetch("http://foo/", {method: "POST"});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why is adding the trailing / here necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When sending the request over actual HTTP, the trailing / gets added if it is missing. So in order to make the test consistent between when it runs over a real socket vs. local service binding, I had to add the /.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, ok. Per fetch spec the trailing slash is supposed to be implicit in this case so it technically should be fine to pass in without it (since the url arg here is technically supposed to be passed through the whatwg url parser if we're following spec)... so it just feels a bit odd to have to modify the test to make it work correctly. But I think this is fine for now. I do think at some point we need to revisit how the input url is handled on fetch calls tho.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is if I don't add the / here, then the assert on the next line fails either when using real HTTP (if it doesn't expect a /) or when not using real HTTP (if it does expect a /). In order to be able to write a single assert which works in both cases, I had to add the / on this line.

Arguably this is a bug in service bindings, that they do not implicitly add the / as regular HTTP would, but that bug has been around a while, not introduced in this PR.

assert.strictEqual(await resp.text(), "method = POST, url = http://foo/");

await assert.rejects(() => env.MyService.instanceMethod(), {
name: "TypeError",
Expand Down Expand Up @@ -653,3 +653,23 @@ export let testAsyncStackTrace = {
}
}
}

function withRealSocket(inner) {
return {
async test(controller, env, ctx) {
let subEnv = {...env};
subEnv.MyService = subEnv.MyServiceOverRealSocket;
await inner.test(controller, subEnv, ctx);
}
}
}

// Export versions of various tests above using MyService over a real socket. We only bother
// with thests that use MyService. The `z_` prefix makes these tests run last.
export let z_namedServiceBinding_realSocket = withRealSocket(namedServiceBinding);
export let z_sendStubOverRpc_realSocket = withRealSocket(sendStubOverRpc);
export let z_receiveStubOverRpc_realSocket = withRealSocket(receiveStubOverRpc);
export let z_promisePipelining_realSocket = withRealSocket(promisePipelining);
export let z_crossContextSharingDoesntWork_realSocket = withRealSocket(crossContextSharingDoesntWork);
export let z_serializeRpcPromiseOrProprety_realSocket = withRealSocket(serializeRpcPromiseOrProprety);
export let z_testAsyncStackTrace_realSocket = withRealSocket(testAsyncStackTrace);
14 changes: 14 additions & 0 deletions src/workerd/api/tests/js-rpc-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const unitTests :Workerd.Config = (
bindings = [
(name = "self", service = (name = "js-rpc-test", entrypoint = "nonClass")),
(name = "MyService", service = (name = "js-rpc-test", entrypoint = "MyService")),
(name = "MyServiceOverRealSocket", service = "loop"),
(name = "MyActor", durableObjectNamespace = "MyActor"),
(name = "ActorNoExtends", durableObjectNamespace = "ActorNoExtends"),
(name = "defaultExport", service = "js-rpc-test"),
Expand All @@ -26,5 +27,18 @@ const unitTests :Workerd.Config = (
durableObjectStorage = (inMemory = void),
)
),
( name = "loop",
external = (
address = "loopback:loop",
http = (capnpConnectHost = "cappy")
)
)
],
sockets = [
( name = "loop",
address = "loopback:loop",
service = (name = "js-rpc-test", entrypoint = "MyService"),
http = (capnpConnectHost = "cappy")
)
Comment on lines +30 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work well for us! Thanks! 😃

],
);
12 changes: 11 additions & 1 deletion src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,8 @@ public:
TransientJsRpcTarget(IoContext& ioCtx, jsg::JsRef<jsg::JsObject> object,
bool allowInstanceProperties = false)
: JsRpcTargetBase(ioCtx), object(kj::mv(object)),
allowInstanceProperties(allowInstanceProperties) {}
allowInstanceProperties(allowInstanceProperties),
pendingEvent(ioCtx.registerPendingEvent()) {}

TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) {
return {
Expand All @@ -828,6 +829,15 @@ public:
private:
jsg::JsRef<jsg::JsObject> object;
bool allowInstanceProperties;

// An RpcTarget could receive a new call (in the existing IoContext) at any time, therefore
// its existence counts as a PendingEvent. If we don't hold a PendingEvent, then the IoContext
// may decide that there's nothing more than can possibly happen in this context, and cancel
// itself.
//
// Note that it's OK if we hold this past the lifetime of the IoContext itself; the PendingEvent
// becomes detached in that case and has no effect.
kj::Own<void> pendingEvent;
};

static kj::Maybe<rpc::JsRpcTarget::Client> makeCallPipeline(jsg::Lock& js, jsg::JsValue value) {
Expand Down
24 changes: 17 additions & 7 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,6 @@ IoContext::IncomingRequest::~IoContext_IncomingRequest() noexcept(false) {
metrics->jsDone();
}

IoContext::~IoContext() noexcept(false) {
// Kill the sentinel so that no weak references can refer to this IoContext anymore.
selfRef->invalidate();
}

InputGate::Lock IoContext::getInputLock() {
return KJ_ASSERT_NONNULL(currentInputLock,
"no input lock available in this context").addRef();
Expand Down Expand Up @@ -492,14 +487,29 @@ kj::Promise<bool> IoContext::IncomingRequest::finishScheduled() {

class IoContext::PendingEvent: public kj::Refcounted {
public:
explicit PendingEvent(IoContext& context): context(context) {}
explicit PendingEvent(IoContext& context): maybeContext(context) {}
~PendingEvent() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(PendingEvent);

IoContext& context;
kj::Maybe<IoContext&> maybeContext;
};

IoContext::~IoContext() noexcept(false) {
// Detach the PendingEvent if it still exists.
KJ_IF_SOME(pe, pendingEvent) {
pe.maybeContext = kj::none;
}

// Kill the sentinel so that no weak references can refer to this IoContext anymore.
selfRef->invalidate();
}

IoContext::PendingEvent::~PendingEvent() noexcept(false) {
IoContext& context = KJ_UNWRAP_OR(maybeContext, {
// IoContext must have been destroyed before the PendingEvent was.
return;
});

context.pendingEvent = nullptr;

// We can't execute finalizers just yet. We need to run the event loop to see if any queued
Expand Down
10 changes: 10 additions & 0 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,13 @@ interface EventDispatcher @0xf20697475ec1752d {
# Other methods might be added to handle other kinds of events, e.g. TCP connections, or maybe
# even native Cap'n Proto RPC eventually.
}

interface WorkerdBootstrap {
# Bootstrap interface exposed by workerd when serving Cap'n Proto RPC.

startEvent @0 () -> (dispatcher :EventDispatcher);
# Start a new event. Exactly one event should be delivered to the returned EventDispatcher.
#
# TODO(someday): Pass cfBlobJson? Currently doesn't matter since the cf blob is only present for
# HTTP requests which can be delivered over regular HTTP instead of capnp.
}
51 changes: 51 additions & 0 deletions src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ private:
: test(test), peerFilter(peerFilter), address(kj::mv(address)) {}

kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override {
KJ_IF_SOME(addr, test.sockets.find(address)) {
// If someone is listening on this address, connect directly to them.
return addr->connect();
}

auto [promise, fulfiller] = kj::newPromiseAndFulfiller<kj::Own<kj::AsyncIoStream>>();

test.getSubrequestQueue(address).push({
Expand Down Expand Up @@ -3524,6 +3529,52 @@ KJ_TEST("Server: cache name is passed through to service") {

// =======================================================================================

KJ_TEST("Server: JS RPC over HTTP connections") {
// Test that we can send RPC over an ExternalServer pointing back to our own loopback socket,
// as long as both are configured with a `capnpConnectHost`.

TestServer test(R"((
services = [
( name = "hello",
worker = (
compatibilityDate = "2024-02-23",
compatibilityFlags = ["experimental"],
modules = [
( name = "main.js",
esModule =
`import {WorkerEntrypoint} from "cloudflare:workers";
`export default {
` async fetch(request, env) {
` return new Response("got: " + await env.OUT.frob(3, 11));
` }
`}
`export class MyRpc extends WorkerEntrypoint {
` async frob(a, b) { return a * b + 2; }
`}
)
],
bindings = [( name = "OUT", service = "outbound")]
)
),
(name = "outbound", external = (address = "loopback", http = (capnpConnectHost = "cappy")))
],
sockets = [
( name = "main", address = "test-addr", service = "hello" ),
( name = "alt1", address = "loopback",
service = (name = "hello", entrypoint = "MyRpc"),
http = (capnpConnectHost = "cappy")),
]
))"_kj);

test.server.allowExperimental();
test.start();

auto conn = test.connect("test-addr");
conn.httpGet200("/", "got: 35");
}

// =======================================================================================

// TODO(beta): Test TLS (send and receive)
// TODO(beta): Test CLI overrides

Expand Down