Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport Agnostic RPC #249

Closed
1 of 14 tasks
CMCDragonkai opened this issue Oct 18, 2021 · 82 comments · Fixed by #498
Closed
1 of 14 tasks

Transport Agnostic RPC #249

CMCDragonkai opened this issue Oct 18, 2021 · 82 comments · Fixed by #498
Assignees
Labels
design Requires design epic Big issue with multiple subissues r&d:polykey:core activity 1 Secret Vault Sharing and Secret History Management r&d:polykey:core activity 3 Peer to Peer Federated Hierarchy

Comments

@CMCDragonkai
Copy link
Member

CMCDragonkai commented Oct 18, 2021

We have reviewed the gRPC API, and worked with it extensively. It's time to work out a better RPC layer for PK. There are several problems to to consider here:

We have 2 main proto files:

  1. proto/schemas/Client.proto
  2. proto/schemas/Agent.proto

And a Test.proto as well, this will need to be used to generate the marshaling code.

Additional context

Tasks

  1. - Review the proto3 guide, naming standard with reference to the proto files, look for ways to abstract or clean up the data
    • Creating .proto definition subdomains. #279
    • - Version the Client.proto and Agent.proto with version names and test out multiple-version services, and find out what the client does when the version wanted is not available
    • - Test out grpc reflection for versioning if necessary
    • - Update all the primitive types, and any usage of 64 bit int should have the string hint
    • - Add some benchmarks to the grpc, and check if MAX_CONCURRENT_CONNECTIONS is used
    • - Clean up types Wrapper methods in GRPCClientClient to get parameter types of the wrapped GRPC call #200 or gRPC abstractions based on new grpc libraries
    • - Consider how to shutdown the grpc server and terminate all client connections as well
  2. - Figure out CQRS, pagination and streaming
    • - Clean up js-pagination client and server side utilities
    • - Pagination on unary calls, and streaming calls that can be initialised with a pagination parameters (which would enable some sort of CQRS and event sourcing concepts)
  3. - Review in reference to authentication and sessions
  4. - Check the HTTP API and web gateway compatibility
  5. - Deal with the fact that the new grpc-js 1.4.1 version has some incompatibility: TypeError: http2Server.on is not a function.
@CMCDragonkai CMCDragonkai added the procedure Action that must be executed label Oct 18, 2021
@CMCDragonkai CMCDragonkai self-assigned this Oct 18, 2021
@CMCDragonkai
Copy link
Member Author

Note that all fields in proto3 is optional and this is all facilitated by a default value. https://stackoverflow.com/questions/31801257/why-required-and-optional-is-removed-in-protocol-buffers-3

The default values are all based on the types of the fields. So for primitives like numbers, it's 0, but for other subtypes, it is null in JS.

Optional fields returned in proto 3.5. What this means is usually generates a handler to check whether it was set at all, that way you can differentiate between a value not being set by the client versus the client setting the value to the default value. This can be important when you want to change behaviour if the client really didn't set the value.

with optional you get the ability to explicitly check if a field is set. Lets say you have a field int32 confidence. Currently when receiving a message which such a type you cannot know the difference between confidence = 0 or confidence not set. Because default values are optimized away in the serialization. If you mark the field as optional then presumably some extra bits are set in the serialization and a has_confidence() method will be generated so that you on the receiving end can disambiguate the two.

@CMCDragonkai
Copy link
Member Author

Regarding all the number types in proto3. There are 64 bit numbers:

int32
int64
uint32
uint64
sint32
sint64
double
float

Due to this issue protocolbuffers/protobuf#3666. There's no support for bigint yet. The default loses precision.

Work around is:

message dummy { 
  uint64 bigInt = 1; [jstype = JS_STRING] 
}

This turns it into a string.

@CMCDragonkai
Copy link
Member Author

This shows the mapping from the proto3 code to the generated JS code: https://developers.google.com/protocol-buffers/docs/reference/javascript-generated, of interest is the map, bytes, one of, and enums.

@CMCDragonkai
Copy link
Member Author

The guide says that if we want to stop using a certain field in a message. It's important to reserve the field number and also the field name so that it cannot be used again. This means that older clients won't get confused.

Basically the field numbers don't have to be actually in sequence. They represent unique positions in the message.

message Foo {
  reserved 2, 15, 9 to 11;
  reserved "foo", "bar";
}

enum Foo {
  reserved 2, 15, 9 to 11, 40 to max;
  reserved "FOO", "BAR";
}

@CMCDragonkai
Copy link
Member Author

Regarding pagination I like this usage of repeated:

message SearchResponse {
  repeated Result results = 1;
}

message Result {
  string url = 1;
  string title = 2;
  repeated string snippets = 3;
}

It seems that this makes it easier to define a single message type and then lists of messages. I had done this previously in OpenAPI when I had situations where you have GET /resource/1 vs GET /resources where the former acquires a single resource, and the latter fetches a page of resources.

I imagine that responses may have further metadata beyond just the core data. So I could imagine types being represented like:

// the actual domain structure
XDomainStructure {
  // ...
}

// a single resource /resources/1
YResponse {
  XDomainStucture x_domain = 1;
}

// multiple resources /resources
ZResponse {
  repeated XDomainStructure x_domains = 1;
}

That way "response" messages are differentiated from domain structures that we want to work with.

So in our proto definitions we can identify the relevant domain structures we want to work with, these should really be derived manually by the programmer by investigating each domain. For example in our notifications domain, we have a domain structure representing a notification message. But this is not the same type as the type of request & response messages being sent back and forth on the gRPC API about notifications.

@CMCDragonkai
Copy link
Member Author

It's possible to also import proto files. This can make it easier to manage our proto definitions if we separate our type definitions from our message definitions from our service definitions. It could allow us to form subdirectories in src/proto/schemas. We would have to check if our CLI command scripts/proto-generate.sh can actually do this though.

Importing is just a matter of:

import "myproject/other_protos.proto";

To re-export, just use import public "other.proto";.

I believe the name should start from where the --proto_path is set to. This should be src/proto/schemas.

@CMCDragonkai
Copy link
Member Author

I forgot if we were ever able to use any of the google protos. Like import "google/protobuf/any.proto";. The any type allows us to nest any kind of message type. It will be serialized as bytes. We are likely not going to use this particular type google.protobuf.Any though.

However there are a number of other types that are quite useful like google/protobuf/timestamp.proto. That's all located here: https://developers.google.com/protocol-buffers/docs/reference/google.protobuf

@CMCDragonkai
Copy link
Member Author

CMCDragonkai commented Oct 18, 2021

The oneof can have properties that use different field numbers. But no matter what if you set any one of those properties, it should clear the other properties.

The map only works for fixed types for keys and values. So for arbitrary values, we should use JSON encoding and return it as a string.

The map also doesn't work as nested. So a value of map can't be another map.

Although you should be able to put another map inside a message type, and that would be sufficient to nest maps.

@CMCDragonkai
Copy link
Member Author

Need to point out that re-reading https://medium.com/expedia-group-tech/the-weird-world-of-grpc-tooling-for-node-js-part-3-d994de02bedc seems to mean that our current usage of static tooling means that importing packages may not work. Not sure... at this point.

@CMCDragonkai
Copy link
Member Author

This issue seems to indicate all we need to do is copy the proto source code verbatim in our src/proto/schemas and then it should work. agreatfool/grpc_tools_node_protoc_ts#72

@CMCDragonkai
Copy link
Member Author

CMCDragonkai commented Oct 19, 2021

Note that a gRPC channel tends to have a limit number of max concurrent connections (from that point onwards rpc requests are queued). I believe we have separate channels to each different node. We aren't likely to hit this max number in production right now, but the chattiness may increase when we integrate gestalt synchronisation #190. This issue has more details grpc/grpc#21386. The current work around is to create new channels and load balance between channels, however this is considered to be a temporary solution.

Use a pool of gRPC channels to distribute RPCs over multiple connections (channels must have different channel args to prevent re-use so define a use-specific channel arg such as channel number).

This will impact any benchmarking we do that involves launching multiple concurrent gRPC requests.

This limit is mentioned here: https://github.com/grpc/grpc-node/blob/master/PACKAGE-COMPARISON.md and also compares the @grpc/grpc-js vs grpc library (that is now deprecated).

@CMCDragonkai
Copy link
Member Author

CMCDragonkai commented Oct 19, 2021

Regarding API versioning. gRPC is naturally backwards compatible. So as long as there are no breaking changes to the API, it's possible to keep using the same version, and clients can continue to work as normal. However when there is a breaking change, the MS guide recommends using package names.

So right now we have agentInterface, but we can instead call it agentInterface.v1. And then increment this version specifier whenever we have backwards incompatible changes.

This basically means one can create different service interfaces, and it is possible to run multiple versions of the same API. So right now we may have agent service, but we may also run 2 agent services on the same port. This would be similar to having api/v1 and api/v2 in a RESTful URL where both still work. The only challenge here is to maintain handlers for the old API as well.

However this doesn't telegraph to the client side what version it is using? It's just a matter of the client selecting which version they want to use, and hoping that the agent side still has that version. Maybe the lack of that version service existing is enough to signal to the client that their source code version is too old and needs to be updated. This will need to be prototyped in the test GRPC to see what happens when the relevant versioned package is no longer being offered as a service, what exceptions/errors occur.

This would benefit from being able to break up the proto files into subdirectories so that way common messages and type declarations can be shared.


It seems that there was an idea to use server reflection for the client to query about the protobuf descriptions. https://stackoverflow.com/a/41646864/582917

Server reflection is not available directly on grpc-js grpc/grpc-node#79. However there are libraries that have built on top to include it:

The examples given are that grpcurl can then be used directly without having access to the proto files since it is possible to request the proto files from the service.

It has some relationship to the descriptor proto: https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/descriptor.proto

Also there's option declaration. Not sure how that works yet.

@CMCDragonkai
Copy link
Member Author

Found an interesting library that builds on top of @grpc/grpc-js https://github.com/deeplay-io/nice-grpc they seem to have done a lot of work on gRPC js and making it more user friendly. It's too complicated right now to integrate, but would be worth checking out later.

@CMCDragonkai
Copy link
Member Author

Relevant errors that we should have more tests for: https://www.grpc.io/docs/guides/error/

@CMCDragonkai CMCDragonkai added the design Requires design label Oct 19, 2021
@CMCDragonkai
Copy link
Member Author

Recommend reading over this @tegefaulkes when you're finished with your checklist in vaults refactoring.

@tegefaulkes
Copy link
Contributor

Playing around with getting the message definitions to exist within their own .proto files and packages.

It seems doable, I can define for example the vault message inside domains/Vaults.proto and import them into Client.Proto with import public "domains/Vaults.proto";. From there I can use the messages inside a service by doing

//OLD
rpc VaultsRename(VaultRenameMessage) returns (VaultMessage) {};

//NEW
rpc VaultsRename(Vault.Rename) returns (Vault.Vault);

When construction the messages now we can do VaultMessage = new clientPB.Vaults.Rename(). I'm likely going to rename clientPB to messages at some point.

@CMCDragonkai
Copy link
Member Author

CMCDragonkai commented Oct 20, 2021 via email

@tegefaulkes
Copy link
Contributor

I didn't check what it should be but I can change it if needed.

@CMCDragonkai
Copy link
Member Author

I think we should have in our src/config.ts:

  • stateVersion - for the node state
  • serviceVersion - for the GRPC service (or HTTP service)
  • sourceVersion - same as package.json version (ideally we can import this)

The sourceVersion is ideally fetched from the package.json. However we don't want to use import packageJson from '../package.json'; because that screws up the dist build. Instead you can make config.ts read up one directory. When you distribute src/config.ts, it becomes dist/config.ts. This then can access the ../package.json that will be in the NPM distribution.

However I'm not sure if this will work under vercel/pkg. @tegefaulkes can you check this out as well.

@CMCDragonkai
Copy link
Member Author

@tegefaulkes remember to tick off things as you're making progress. And if you have a new PR/branch, create a PR for it and link it here too. The task list should be copied there.

@tegefaulkes
Copy link
Contributor

I'm still working on creating common domain types and common message types. Almost done with that.

I've created a new branch off of master for this under API_Review

@tegefaulkes tegefaulkes mentioned this issue Oct 27, 2021
24 tasks
@tegefaulkes
Copy link
Contributor

Consider how to shutdown the grpc server and terminate all client connections as well

Looks like GRPCServer.ts already has a method for doing this.

public closeServerForce(): void {
   this.server.forceShutdown();
}

@CMCDragonkai
Copy link
Member Author

CMCDragonkai commented Nov 15, 2022

I wonder how backpressure works in the case of rxjs streams. Normally it would depend on the underlying network protocol. But if the RPC client is not subscribed to the rxjs stream, it is possible the rxjs stream is reading it into an internal buffer. Or maybe not... it sort of depends on how the data from the socket/stream is being read to local rxjs stream. More info here: https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/backpressure.html

But pull streams backpressure is driven by the client.

@CMCDragonkai
Copy link
Member Author

I've tried out MuxRPC and it's revealed some interesting ideas, but it also has some problems.

Similar to deepkit, there's this notion that each side can have a manifest. So upon creating a "MRPC" instance, you pass a "remote" manifest" and a "local manifest"

const rpc = MRPC(remoteManifest, localManifest);

The idea being that servers would only have a localManifest, while clients would only have a remoteManifest, while peers would have both manifests.

These manifests correspond to the controllers in deepkit RPC.

However again, it isn't clear in the docs nor the examples how the handlers on each side's manifest are supposed to make use of the other's manifest.

There's no mention of any "remote side" in any of the handlers. I checked the arguments to the handlers, and there's still nothing. It appears you'd have to somehow acquire this information when a connection is established, but it seems very underbaked here.

It seems that it would be easy if each handler just received a ctx parameter like we do with our context decorated functions, and inside this ctx, every handler could be supplied with the remote information. We could combine this with our timedCancellable decorator that means every handler would have access to deadlines, abortion signals, client information and potentially the client's remote handlers too.

One good thing is how the underlying socket is definitely not part of the MuxRPC. It's ultimately handled by yourself. All MuxRPC really needs is a "duplex reliable stream".

It makes use of an abstraction called "pull-stream" as the duplex reliable stream that it consumes. Any NodeJS stream can be converted to a "pull stream" using their utility packages such as stream-to-pull-stream.

More info to be written.

@CMCDragonkai
Copy link
Member Author

I wrote a comparison of some multiplexed transport protocols here: #234 (comment).

The conclusion is that based on the transport protocols that we are going to use, we do not require muxing/demuxing to be built into the RPC system. The RPC system should instead just rely on the underlying transport to provide the muxing/demuxing system.

Here are some examples:

  1. For the client service, when using websockets, just create a new websocket connection for every single RPC call. If this is HTTP1, then this is a new TCP connection each time. If this is HTTP2, then this is a new HTTP2 stream within 1 TCP connection.
  2. For the client service, when using HTTP, just do a new HTTP request/response for each RPC call. See: https://github.com/davedoesdev/browser-http2-duplex for hacking together "streaming" support with HTTP.
  3. For the agent service, when using QUIC, just create a new QUIC stream for every single RPC call. QUIC will multiple every single QUIC stream within 1 QUIC "connection".

This is great news, since this means our RPC system is significantly simpler, and no custom muxing/demuxing is required and third party integration is simpler as they don't need a custom muxer/demuxer either.

@CMCDragonkai
Copy link
Member Author

Now regarding the streaming calls in MuxRPC, there are some interesting ideas here.

Unlike deepkit RPC, the RPC system in MuxRPC requires runtime specification about how the handlers are supposed to be called. Deepkit seems to rely on TS types and decorators and maybe some meta-programming.

The manifest basically specifies 4 types of handlers:

  1. async - unary
  2. source - server stream
  3. sink - client stream
  4. duplex - duplex stream

For the client calling a handler, even if the server states that it is synchronous, the handler will automatically be converted asynchronous because it goes over the network.

For async, this is just a standard unary function call. There's a bit of confusion here because RPC makes network calls seem like normal function calls, and this is actually considered a disadvantage of RPC, because network calls are not like normal function calls, and that's why message passing always provides an explicit API. However RPC is more convenient. So a middle ground might be to provide a function call but in a specific "pattern"/"structure" that ensures that users understand that this is an RPC call, and not a normal function. This means for example, parameters and return values must be serialisable/deserialisable, they cannot just be pointers to rich objects in-memory. We do not have distributed memory here. And that parameters and return values are always "copied" thus there is no reference counting of any objects inside the handler scopes.

So in mux rpc, the handler is just a regular function (with any number of parameters). I recommend that we change this so that the function always takes a single "input" and a single context object, and returns a single output. This enables explicit parsing of the input, and also explicit plumbing of exceptions. I believe this is better then letting the RPC system decide how to parse the input, and how to plumb exceptions. Functional composition is better than implicit magic. However if a local exception occurs in the handler, this could prevent returning anything. In that case, the RPC system should catch this exception and provide a particular default error response to the caller (a sort of 500 error in HTTP).

The source handlers returns a "source" stream. The sink handler returns a "sink" stream. The duplex handlers returns a compound source and sink stream object.

Notice that in all 3 cases, the stream is the returned object. There is no processing of the stream via input parameters. In fact in all 3 cases, the handler can still be called with arbitrary serialisable/deserialisable parameters. The stream object that it returns is a custom "pull-stream" concept. What exactly is a pull stream?

A pull stream is an abstraction that enables sinks to pull from sources. The sources are not allowed to emit any data, until the sink reads from the source. I looked into the code for pull-streams, and the both sinks and sources are functional closures that use event emitters. I think the original reason to develop this was to provide some sort of backpressure.

NodeJS streams also have native backpressure. They force backpressure by pausing the source until the sink reads again, and then pauses every single time. This is why the examples in muxrpc show how to convert NodeJS streams to pull streams.

Web streams also do the same thing. They were optimised for IO patterns and is used internally by the fetch API.

This article compares streams (specifically web streams) to RxJS observables: https://surma.dev/things/streams-for-reactive-programming/

This leads to the conclusion that RxJS observables are not the right kind of abstraction to use for streams in RPC. To the need to have backpressure, pull-oriented streams are better suited for RPC. It's always possible to build push-flow on top of pull-flow if you need it. Pull-flow is also designed around single-consumer, which is the case for RPC, while push-flow is designed around multi-consumer. I think pull-flow as the default is better for RPC design. If the application requires push-flow, they can design that around the RPC's pull-flow and transform the RPC's pull-stream into an rxjs observable subsequently.

To build push on top of pull, constantly be pulling from the pull API, and then push out the chunks to any consumers.

The opposite of building pull on top of push is much more code.

This sort of means there's no need to use pull-streams anymore. We can just and use web streams which are more portable and more modern.

But hold on... there's another more "language-level" abstraction that is also a pull-based dataflow that we already use all over the PK codebase. That's the iterators and async iterators provided by the async generator functions.

At the same time, node and web streams satisfy the iterable interface, so they can be easily worked with as if they were async iterators.

https://nodejs.org/api/stream.html#streams-compatibility-with-async-generators-and-async-iterators

// Readable Stream to async iterator
// Using https://nodejs.org/api/stream.html#streamreadablefromiterable-options
for await (const chunk of readableStream) {
  // use the chunks
}
// Async iterator to Readable Stream (source)
const abortController = new AbortController();

async function *generate () {
  let n = 0;
  while (!abortController.signal.aborted) {
    yield n;
    n++;
  }
}

const readableStream = Readable.from(generate());

readableStream.on('close', () => {
  abortController.abort();
});
// Piping async iterator to writable stream

import { pipeline }  from 'stream/promises';

const abortController = new AbortController();

async function *generate() {
  let n = 0;
  while (!abortController.signal.aborted) {
    yield n;
    n++;
  }
}

// I think this works...?
const iterator = generate();

try {
  // This appears to return a value?
  await pipeline(iterator, writableStream);
} catch (e) {
  abortController.abort();
  throw e;
}

The above examples are for node streams. But web streams also support the same concepts.

// Readable Stream to async iterator

// If iterator exits, then readable stream is closed
for (const chunk of readableStream) {
  // do something
}

// If iterator exits, then readable stream stays open
for (const chunk of readableStream.values({ preventCancel: true })) {
  // do something
}
// Async iterator to Readable Stream (source)
function iteratorToStream(iterator) {
  return new ReadableStream({
    async pull(controller) {
      const { value, done } = await iterator.next();
      if (done) {
        controller.close();
      } else {
        controller.enqueue(value);
      }
    },
  });
}
// Piping async iterator to writable web stream

import { WritableStream } from 'web-streams-polyfill/ponyfill';

const abortController = new AbortController();

async function *generate() {
  let n = 0;
  while (!abortController.signal.aborted) {
    yield n;
    n++;
  }
}

const iterator = generate();

const writableStream = new WritableStream({
  close() {
    abortController.abort();
  },
});

const writer = writableStream.getWriter();
for await (const chunk of iterator) {
  // Wait for it to be ready
  await writer.ready;
  // Write a chunk
  await writer.write(chunk);
}
// Wait for it to be ready (flushed)
await writer.ready;
// Close the writable stream
await writer.close();

Note that NodeJS streams supports both objects and buffers. While web streams are naturally Uint8Array only. For network communication, assuming that we want to make parsing/serialisation/deserialisation explicit, it makes sense to focus specifically on buffers or Uint8Array rather than objects. This also ensures that the data is in fact serialisable, since rich objects cannot be just passed around. Over the network.

Let's suppose our RPC handlers looked this:

// Stream oriented

const rpc = new RPC({
  async handler(input: ReadableStream): WritableStream {
    // Parse the input data (assuming this produces some structure)
    const inputParsed = parse(input);
    const writableStream = new WritableStream();
    const writer = writableStream.getWriter();
    // The writing must be done asynchronously
    // Not within here, but you must return this prior

    (async () => {
      for await (const chunk of iterator) {
        await writer.ready;
        await writer.write(chunk);
      }
      await writer.ready;
      await writer.close();
    })();
    return writableStream;
  }
});

You can see here it's a bit awkward to write because you have to return the WritableStream first before asynchronously writing to the stream.

Alternatively we can use first-class features of the language, and instead of taking streams and returning streams. We can instead take async iterators and return async iterators.

const rpc = new RPC({
  async *handler(input: Generator<Buffer>): Generator<Buffer> {
    // Now that you have 2 generators
    // you can independently read data from the first generator
    // while yield data to the output generator
    for await (const chunk of input) {
      yield chunk;
    }
    // Or more simply `yield* input;`
    // Which simply is a "echo" handler
  }
});

Notice here that the Generator type is a composition of Iterable and Iterator.

Why use a generator instead of Iterable or Iterator.

Well the Iterator type is flexible but not convenient. You have to use next.

The Iterable type is more convenient with the for..of syntax, but you have to call the [Symbol.iterator] to get access to the iterator, if you need to do more complex interactions.

2 independent generators are necessary to have concurrent sources and sinks.

We naturally have a form of backpressure here. We only acquire data from the input generator when we pull from it by asking for the next piece of data. At the same time, the yield will be frozen unless the the caller pulls from us.

The client call then has to do something like this:

// And we want to pass in a generator here

async function* input () {
  yield Buffer.from('hello');
  yield Buffer.from('world');
};

const g = client.handler(input);

for await (const chunk of g) {
  console.log(chunk);
}

Combining this with ixjs increases the power and flexibility of the generators. We can perform transformations easily on the generators.

This design starts with first-class language features. It also focuses on the most powerful communication paradigm first, that is duplex communication, and then it's possible to abstract over duplex to constrain it to server streaming, client streaming or unary calls.

The key point here is that server streaming just means no input generator is provided... or more appropriately, the input generator immediately returns.

Client streaming means the output generator immediately returns.

While unary calls is where both input generator and output generator immediately returns.

Behind the scenes, the RPC system has to plumb the generator data into streams offered the underlying transport layer. Ideally these would be web streams, so we would use the appropriate transformations. Serialisation/deserialisation is not actually part of the RPC system, but instead used inside each handler. This allows a sigificant amount of flexibility since each individual handler can pick how they want to serialise/deserialise their data. The generators are limited to working with buffer chunks (Uint8Array).

We can prototype this first without any RPC serialisation/deserialisation in a single process.

Further prototypes can be done to demonstrate how to convert such pull-dataflow into push data flow. I'm starting to see how network communication should be pull-based which can be converted to push-flows either in-memory through rxjs or other kinds of protocols that are non-rpc, but instead messaging based.... this might end up being a very elegant design!

@CMCDragonkai
Copy link
Member Author

Still need to address how "metadata" gets passed like "header" information or session token information (for authentication). And how exceptions are passed around, how this interact with the ctx and the ability to callback on the client.

@CMCDragonkai
Copy link
Member Author

CMCDragonkai commented Nov 18, 2022

Here's an idea for communicating stream exceptions to the handler's input async iterator.

async function *gf() {
  while (true) {
    await sleep(100);
    try {
      yield 'string';
    } catch (e) {
      yield; // <-- this is the trick
      throw e;
    } 
  }
}

async function main() {
  const g = gf();
  setTimeout(() => {
    void g.throw(new Error('Stream Failure'));
  }, 250);

  for (let i = 0; i < 10; i++) {
    try {
      console.log(await g.next());
    } catch (e) {
      console.log('Consumed an exception!', e.message);
      break;
    }
  }
}

It was a bit tricky to realise how this works. The g.throw function actually returns a promise. When you throw an exception into the generator, the generator rethrows it out if it is not caught inside the generator function. Now when it rethrows it out, that is actually sent to the promise rejection of the g.throw() call. If you don't catch it here, it would then become an unhandled promise rejection (and thus kill the node process).

What we want to do, is to essentially "signal" to the handler that is consuming the input iterator that there is an problem. But because the input iterator is a "pull-based" data flow, this error can only be signalled when the handler is calling next() on the iterator.

So when I first tried this, I just thought that rethrowing the exception is sufficient inside the generator. But it didn't work because the throw() gives back a promise rejection.

So the trick to fix this problem is in the generator function, catch the exception on the yield 'string';, then call yield; which will give a promise fulfilled on the throw() call, and then throw the exception afterwards, which will throw the exception to the next g.next() call.

In this way, only when the handler wants to next() on the generator will they then receive the relevant exception.

@CMCDragonkai
Copy link
Member Author

Alternatively if the async generator function itself is what is pulling data from the stream, then it can just throw exception there, and that will affect the next() call.

The web stream API is actually promise-oriented, not event oriented. So technically it fits well with the async iterator API too.

We could also just use web streams directly too. See: https://github.com/ReactiveX/IxJS/blob/master/src/asynciterable/fromdomstream.ts

@CMCDragonkai
Copy link
Member Author

CMCDragonkai commented Nov 19, 2022

Just a note about GRPC metadata.

All GRPC calls can have 1 leading metadata and 1 trailing metadata.

For a given handler, for example unary call, you're given a call object.

For server to send the leading metadata, this is done with call.sendMetadata(). This must be done before any other data is sent.

To send trailing metadata, this has to be done as part of the callback. Which is sent along with the response value.

In a duplex stream handler, one can again do call.sendMetadata for the leading metadata.

But when it ends the stream with call.end(trailingMetadata) it can pass the trailing metadata.

Therefore the important thing to remember here is that the leading and trailing metadata both only occurs once at the beginning and at the end.

Each side is capable of receiving and processing the metadata sent by the other side.

In the case of unary handlers, they can get the leading metadata with call.metadata. To get the call's trailing metadata, I think they need to do call.on('metadata').

On the unary caller side, they can get the leading metadata from the server with call.on('metadata'), I believe this may actually be called twice, with the second time being the trailing metadata.

You can see here I've completely forgotten how the API works, it's too complex in grpc.

For our RPC, we need to simplify this metadata situation. I think it does make sense that there can be a leading and trailing metadata, but this needs to be made more simpler. In fact, assuming we have 2 abstraction levels: web streams and async generators, we can understand that the web streams will basically emit the raw data. While the async generators can provide processed data (we can also have raw async generators).

If the data is processed, we would naturally expect that the initial message be the leading metadata, and the last message to be the trailing metadata.

But this depends on our "protocol abstraction" above the raw binary stream. For RPC purposes, we will need a least the ability to:

  1. Leading and trailing metadata
  2. Forwarding of exceptions (through the metadata system)
  3. Structured messages (where unary call simply means 1 structured message)

If an handler is only expecting 1 structured message, we could parse to completion of the structured message, but if there is more than 1 structured message, or data that we don't expect, this can then be considered a protocol error.

So unlike other RPC systems, we want to be able to expose the lower level in case we need to do something special, and then specialise to higher-order abstractions like JSON RPC or BSON RPC.

@CMCDragonkai
Copy link
Member Author

Moving this to todo, will be the next priority for @tegefaulkes after merging the crypto feature upgrade.

@CMCDragonkai
Copy link
Member Author

CMCDragonkai commented Nov 22, 2022

Once we build higher-level RPC protocols like JSONRPC or BSONRPC with defined error behaviour and metadata structure, that's also when we can add in rules for versioning/backwards-compatibility and pagination.

I'm considering supporting JSON RPC first as it's the most easiest to consume for third party APIs.

Then to supporting an additional more efficient interface to support binary data. Choices are:

  • BSON
  • CBOR
  • MessagePack
  • Protobuf

Key point is that JSON should be supported first. Then we also need to define a consistent way of representing binary data. Right now that's always been { type: 'Buffer', data: [123, ... ] } which is obviously very inefficient. Instead we should define something that can take base64 encoded data, that is agreed upon spec from client and server side.

We can review Ethereum's API style to see how ours fit in. https://ethereum.org/da/developers/docs/apis/json-rpc/

As well as https://github.com/microsoft/vs-streamjsonrpc/blob/main/doc/index.md which discusses why JSON RPC is suitable for peer to peer communication.

@CMCDragonkai
Copy link
Member Author

@tegefaulkes my initial experiments with generator based API is located in https://github.com/MatrixAI/Polykey/tree/feature-rpc.

Please take over that branch and get a new PR for it. The experiments can be copied over after rebasing. The only relevant code is in the test-* files at the project root.

@CMCDragonkai
Copy link
Member Author

I think based on our discussions last time, it's worth while to spec out the details of how this will work in #495. This issue is more about just generally transport agnostic RPC. #495 is more specific to how to use JSON RPC for us. A combination of our generator ideas and decorators should be used to enable JSON RPC based handlers. We're still at a prototyping stage so nothing is set in stone.

We will need to meet again Friday to discuss.

@CMCDragonkai CMCDragonkai removed their assignment Dec 8, 2022
@marcj
Copy link

marcj commented Dec 8, 2022

Not sure why I got so many emails from this thread, but let me say something I noticed before I unsubscribe: In the hundreds of hours you spent looking at all these implementations, trying them out, and especially writing so much text, you could have instead written your own RPC implementation that is perfectly tailored to your needs.

@CMCDragonkai
Copy link
Member Author

That's very strange... how did you get so many emails from this thread...?

We are working on our own RPC implementation now! The text is just examining all the trade-offs.

@CMCDragonkai
Copy link
Member Author

All the tasks above regarding GRPC should be crossed out @tegefaulkes, and replaced with tasks related to JSONRPC.

@tegefaulkes
Copy link
Contributor

In the future we shouldn't re-purpose issues like this. It makes it a little confusing what's going on in review. For something like this where we change track on an issue I think we should close the old ones. Outline the change of approach such as creating our own RPC to solve the problems. create new issues for the problem.

To a degree I think issues should be immutable. Normal changes like updating the spec is fine. But a complete revamp of the topic isn't good.

@CMCDragonkai
Copy link
Member Author

CMCDragonkai commented Feb 14, 2023

While this has been closed, this work has now been broken down into smaller issues:

@CMCDragonkai
Copy link
Member Author

@tegefaulkes CQRS is still a pending problem but only when we start tackling the GUI app again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design Requires design epic Big issue with multiple subissues r&d:polykey:core activity 1 Secret Vault Sharing and Secret History Management r&d:polykey:core activity 3 Peer to Peer Federated Hierarchy
Development

Successfully merging a pull request may close this issue.

4 participants