Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to save transport context? #392

Open
axos88 opened this issue Feb 9, 2023 · 24 comments · May be fixed by #393 or tikue/tarpc#10
Open

How to save transport context? #392

axos88 opened this issue Feb 9, 2023 · 24 comments · May be fixed by #393 or tikue/tarpc#10

Comments

@axos88
Copy link

axos88 commented Feb 9, 2023

I'm trying to implement an MQTTv5 transport, and I have it more-or-less up and running, with fixed request and response topics.

There are two challanges I'm facing:

  1. Mqtt is not a one-to-one protocol, where each client will make a different connection to the server, but an aggregating one, where the messages of all clients will be seen through the same stream. The problem with this is that the message ids will collide. I'm thinking about sending randomized message ids instead of the sequential ones, that should mitigate the issue more-or-less. Is that correct?
  2. The way MQTTv5 request-response works is that the client publishing the message publishes two pieces of information together with the message: A correlation data (which will be the message id in this case), and a response topic where the client is awaiting the response. Which will be different in cases of requests coming from different clients, but the protocol allows for each message to contain different response topics as well. How can I save this information when decoding the ClientReq, so that when the Response arrives at the Sink I can publish the response to the correct topic?

Basically I'd need a way to save a transport context through the lifecycle of the request, so that it can be accessed when sending the response through the MQTT broker.

@tikue
Copy link
Collaborator

tikue commented Feb 10, 2023

Hi, thanks for your questions!

  1. In my mind, implementers of the transport trait are obliged to transmit the messages as-is: at the time the server begins processing a message received from the transport, the message contents should be unaltered from the original client message. However, that does not mean the transport can't alter the contents while the message is in transmission, as long as the original can be reconstructed later. For example, TCP also breaks the contents of a message into potentially multiple TCP/IP packets that each have their own stream identifiers so that the packets can be reordered and the original message can be reconstructed. I don't know anything about MQTT, but I suggest seeing if something similar is possible, where you wrap the inner message with additional headers and the inner message is just an uninterpreted payload. In this case, I think the transport probably needs to be generating its own IDs.
  2. I think this is similar to the first question, but I'm not totally sure. Basically, can you create your own packet that wraps the client message as a payload?

@axos88
Copy link
Author

axos88 commented Feb 10, 2023

Yes, basically that is what I am asking, "wrapping" the packet can be done to include the response topic and the correlation data information, and on the other side the transport can unpackage keeping only the actual client request, however after the service has processed it and calculated the response, I would need access to that data to "wrap" the response with the same information that the request was "wrapped" with. And this is causing my headache, I'm not sure how I can save some contextual data during the lifecycle of the request on the server side, between the time the request arrives at the stream, and the response is written to the sink.

@tikue
Copy link
Collaborator

tikue commented Feb 13, 2023

Can you link me to an overview of the MQTTv5 protocol? I'd like to understand what's contained in an MQTTv5 packet. Is there any differentiation between stream ID and message ID?

@axos88
Copy link
Author

axos88 commented Feb 14, 2023

This is an explanation of the request response flow in mqtt v5. The correlation id has roughly the same purpose as the message id, the response topic doesnt have an equivalent, and each client will want to listen to responses on another topic, so they only see their own

@axos88
Copy link
Author

axos88 commented Feb 14, 2023

@axos88
Copy link
Author

axos88 commented Feb 14, 2023

The MQTT specifies the correlation ID as basically anything the cilent wants, and the server needs to send it back unmodified. The protocol also allows a response topic to be sent, to tell the server where the client expects the response, which may be different from client to client, or even for each request made by a single client. My first idea was to encode the request id in the correlation data, but I'm reluctant to add this constraint, because if another client (written in another language for ex) comes around that simply assumes that it can send any data there (or has its own conflicting assumptions on what to use it for), that would cause issues. Basically we'd make it part of the tarpc protocol that not any data can be encoded there, and some clients may or may not be okay with that, so I'm more reassured if it handle it as an opaque value.

Either way the response topic still needs to be kept throughout the request's lifecycle somehow, because there could be multiple clients sending requests to the server and they should obviously not see the responses the server sends to one another, and they may have different permissions to see one topic or another.

A hack would be to compile the response topic from the request id, such as tarpc/responses/<request_id>, but that would be a nightmare to set up permissions for.

I'll release a repo shortly with the implementation of the MQTT transport, just need to clean the code up a bit.

@axos88
Copy link
Author

axos88 commented Feb 14, 2023

https://github.com/axos88/tarpc-mqtt-transport - still needs some work

@axos88
Copy link
Author

axos88 commented Feb 15, 2023

Ah one more thought - Having this context info saved would make it possible for tarpc to work over UDP as well, since the source IP/port can be saved in the context and the response could be sent to the correct place. The two problems are somewhat analogous.

@axos88
Copy link
Author

axos88 commented Apr 10, 2023

@tikue ^

@tikue
Copy link
Collaborator

tikue commented Apr 10, 2023

Sorry for the delay, @axos88! I'll try to prioritize this in the next few weeks.

@tikue
Copy link
Collaborator

tikue commented Apr 14, 2023

Ok, I have now taken a very quick look at MQTT and understand a little better what you're trying to do! I have a few clarifying questions:

  1. You suggested each request's correlation data needs be the tarpc request ID. What's the reason it has to exactly match? The tarpc transport's job is to transmit the request ID somewhere, but as far as I can tell, MQTT doesn't care at all what you put in the correlation data. If the correlation data can be anything you want it to be, then what if, instead of having the correlation data just contain the request ID, it contained a serialized struct that looks like this:
    struct TarpcCorrelationData {
      client_id: String,
      request_id: u64,
    }
    Then it should be straightforward for the client-side transport to forward it back to the correct client?
  2. I'm still not totally sure about the question about saving the response topic. Based on my reading of tarpc-mqtt-transport/src/client_transport.rs, the client subscribes to one topic when the client is initialized. How are you imagining the response topic subscription to work? It seems like all clients publishing over the transport would be using the same response topic?

@axos88
Copy link
Author

axos88 commented Apr 15, 2023

Thanks for looking into this.

  1. No it doesn't have to be an exact match, what is sent as a correlation is is an implementation detail on the client side and treated opaquely on the server side, it just seemed like a comfortable thing to use that will make it unique. Also the client id of the sender is not something you have access to when reading a message off the topic (unless the client decides to send it as part of the message in a preagreed manner). The main design goal of mqtt would be to not care who sent the message, just care about the data being sent.... There is also no concern about sending the data back to the correct client. All messages are broadcast to all subscribers of a topic, so as long as it is sent to the correct topic, the server does not need to concern with targeting a specific client. Note however that as described below, a custom client could start sending requests with random correlation data and still be compliant with the mqtt spec. Actually afaik the spec specifically discourages parsing the correlation data in any way, it should be treated as opaque.

  2. The client side of the current implementation does subscribe to a static topic, but that's an implementation choice. The protocol itself states that each request can be made with a different response topic. In reality i started talking with tarpc from an angular frontend, and in that case sending each request with a unique response topic was easier to implement.

What I'm trying to say is that if we want to be compliant with the spec, we need to implement a way to let the client define the request topic, and the correlation data not the transport.

@axos88
Copy link
Author

axos88 commented Apr 15, 2023

But I also think that having a context being produced and consumed by the transport could be useful for other purposes as well. I'm not sure it's possible to supply that to the context in the requests, but that would be awesome.
Imagine leaving authentication to the transport, and the request handlers would not need to concern themselves with that.
Such as sending the authentication info in say an http header as per spec or mqtt property, the transport being able to extract it from there, and the service could check wether the requestor has authenticated, it is authorized to perform an action without having to concern itself with how the auth/authz proof is sent on the wire

Currently I sent in an auth field in each request, and parse the jwt token from the request itself, but it feels quirky and unnatural. What I'd imagine doing is something like context.get<Permissions>()?.require(Read)?;

With this feature the requests themselves would not need to include an auth field, and the transport could read and check auth from the place that is most natural or as described in their own spec. Authorization header for http, Mqtt property for Mqtt.

@tikue
Copy link
Collaborator

tikue commented Apr 15, 2023

Hm, I'm a little confused about the issue around compliance with the spec. From my perspective, the tarpc transport is the MQTT client? MQTT is being used to implement a tarpc transport; from the tarpc user's perspective, it's an implementation detail. MQTT is not being exposed directly to the tarpc user.

I agree it would be cool to be able to stash arbitrary metadata in the context! Auth is the example I always think about, too. There is data that pretty much every RPC implementation needs, and it's orthogonal to the application itself. I've experimented with context extensions before, but I haven't landed on a design that I really like. The most straightforward solution is to add some kind of an AnyMap to the context, but I don't love that it's not type safe—e.g. it would be nice if a service could specify a bound on the context requiring that a username be present, but you can't really do that with a simple AnyMap. But maybe I'm overthinking it; something is better than nothing, after all.

@axos88
Copy link
Author

axos88 commented Apr 15, 2023

So the MQTTv5 spec defined how a request-response should look like over MQTT.
It defines the CorrelationData and the ResponseTopic properties to be used for this purpose (the CorrelationData to be able to pair up a response with the request, and the response topic so that the "server" responds to a topic the "client" is listening/subscribed to. It also states that The value of the Correlation Data only has meaning to the sender of the Request Message and receiver of the Response Message, which is a quirky way to say that it should be handled opaquely by the "server".

So if we want our MqttTrasport for TARPC to be compliant with the MQTTv5 Request-Response spec, we cannot say that "the client needs to send this or that (the client Id, the request ID) in the CorrelationData", since the server needs to simply parrot back the CD without making any assumptions about its contents. It MAY do that, but the server cannot rely on the fact. If a random client comes along (such as my TS client), and sends a differently generated CD, we need to be able to respond to that client as well as long as the message payload is a valid TARPC request.

Yep, I was also thinking about an anymap, and was wondering as well if we could make this more typesafe. Actix does something like that I guess with its extractors, but I'm not sure if we'd be able to make use of that code for this purpose.

All in all something IS better than nothing, but I don't have such strong negative feelings about the use of an AnyMap either. I wouldn't say it's typeless, you just can't reason something being there at compile time, the same way you can't reason about the presence of a key in a HashMap.

As far as I remember, an older version of actix (not sure, maybe it was another fw) also used an anymap internally (for extensions of an HttpRequest), and failed the requests if the data the handler was looking for was not there. Before running the handler, the framework would look at the anymap, and if it could not extract either one of the requested context, it would fail the request without running the handler. If successful, the handler would be handed the extracted context params, and would not actually see the raw AnyMap. It was handled magically by some macro I think.

@axos88
Copy link
Author

axos88 commented Apr 15, 2023

To answer your question. Yes, the tarpc transport IS the mqtt client, on both sides of the transport, and in a perfect world where everyone is using rust and this library, it would be an implementation detail. (Even in that case I'd prefer staying compliant with the MQTT spec), but this becomes even more desirable if we admit that some clients may be generating requests from code that is not written in rust or using our library. Then the way we send messages on the mqtt broker is not an implementation detail anymore, since this other piece of software needs to be compatible with it. IF they use an MQTT library that has its own opinion on how to generate a CD (because the spec lets them do that as they desire), they may have to fight the library before being able to generate valid messages understood by the tarpc server, which is to be avioded IMHO.

@tikue
Copy link
Collaborator

tikue commented Apr 15, 2023

IF they use an MQTT library that has its own opinion on how to generate a CD (because the spec lets them do that as they desire)

Now I'm confused :) Is the MQTT library in control of the correlation data, or is the MQTT client (who is using the MQTT library) in control of the correlation data?

this becomes even more desirable if we admit that some clients may be generating requests from code that is not written in rust or using our library.

I'm confused, how do other clients affect what's done by the rust client? A UID should be globally unique, so I don't think other MQTT clients connecting to the same server would interfere. Unless you're saying you want the tarpc client to receive messages from other clients? (but then that would not really be implementing the tarpc spec, which is request/response between two peers).

@axos88
Copy link
Author

axos88 commented Apr 16, 2023

Sorry, it was pretty late, may have not answered clearly.
I'm not entirely sure, as I'm just getting acquinted with Mqtt as well, but I think either the Mqtt client library, or the user calling into the library can be in control of what is sent in the CD.
The client I'm using from typescript does not natively support request-responses, so I'm manually publishing a conforming request message to the topic, and I'm subscribing to the response topic. Then using the CD to match the response with the request. I'd imagine that being handled automatically by a more evolved Mqtt client, where the user doesn't need to care about the CD.

For the second point. The confusion may come from mixing up terminologies. In the Mqtt world both the tarpc server and the tarpc client act as mqtt clients, and talk to each other via the mqtt broker. So the server is a client.....
I'll try to use tarpc server (processing the RPC) and tarpc client (making the RPC request) from now on.

The tarpc server itself should be able to receive messages from a tarpc client using this library, but should also receive messages from a tarpc client written in typescript, or assembly as long as they both send the correct message format.
To make implementing a tarpc client in typescript as easy as possible, we should aim to comply with the mqtt spec on how the CD should be handled by the tarpc server (opaquely), because the mqtt library available to them may or may not give enough control for them to use a specially constructed correlation data.

@tikue
Copy link
Collaborator

tikue commented Apr 16, 2023

Ok, got it, thanks! Here is the way I'm envisioning this working.

  • I think it is compliant with the MQTT spec, but please let me know if I'm still confused
  • It should not require any changes to the tarpc library.
  • Rust types I'll use in my descriptions below:
    type RequestId = u64;
    type ClientUid = String;
    
    #[derive(Serialize, Deserialize)]
    struct CorrelationData {
      request_id: RequestId,
      client_uid: ClientUid,
    }

The tarpc client transport has two layers:

  1. First, there is the handle that each "client" holds. This is what actually implements the tarpc Transport trait, but it is really just a shim, basically in-memory channel similar to tarpc::transport::channel, but instead of writing ClientMessage into the channel, it will write a tuple, (ClientUid, ClientMessage). Besides the MQTT transport, it just has one piece of state, the ClientUid, which the user specifies when initializing the transport.

  2. The second layer of the transport, which does all the heavy lifting, is basically running on a tokio task that is spawned when the transport is initialized. It performs two actions concurrently (via tokio::select! or something—this is similar to how I would model a bidirectional transport; it's also similar to how the tarpc::client::RequestDispatch works):

    1. receiving requests from its in-memory channel and publishing them with the mqtt transport
    2. receiving responses from the mqtt transport and sending them send back to the appropriate client.

    To do these two things, it needs to manage some state:

    1. The receiving end of the in-memory channel, something like Receiver<(ClientUid, ClientMessage)>
    2. The MQTT transport
    3. A map from client uid to the sending end of the in-memory transport that each logical client holds, like Map<ClientUid, Sender<Response>.

    When this task receives a (ClientUid, ClientMessage), it sets the correlation data property equal to CorrelationData { request_id: request.id, client_uid }. When it reads a response from its MQTT transport, it reads the ClientUid from the correlation data and uses that to look up the correct channel to forward the response to. (It doesn't need the request_id at this point; it's only used by the server side).

The tarpc server transport is a little bit simpler; it shouldn't need a spawned task to manage anything. It would need one piece of state besides the MQTT transport, a Map<RequestId, CorrelationData>, so every time it receives a request from the MQTT transport, it would insert the correlation data keyed on the request ID:

correlation_map.insert(request_id, msg.properties().get_binary(PropertyCode::CorrelationData));

The tarpc server transport wouldn't have to inspect the correlation data at all—it'd still be entirely opaque. And the tarpc server itself would not even know about the correlation data. When the tarpc server sends a tarpc::Response into the server mqtt transport, the transport would remove from its map the correlation data keyed on response.request_id and attach that in the MQTT message properties.

@axos88
Copy link
Author

axos88 commented Apr 16, 2023

Yeah, I think this approach could work, with minor modifications - besides the CD, the server needs to track the response topic as well, so it knows which topic to send the response to, but it can be part of the same Map<RequestId, (CorrelationData, ResponseTopic)>.

I don't follow why you'd want to have two layers on the client side. I think this approach can be implemented without having an in-memory transport in between.

Anyways, when I was working on the transport, I tried to avoid saving the Map<RequestId (CD, RT)> state in the transport itself, because you'd then need to do some housekeeping on both sides of the channel in case some requests are never answered for example, which complicate things. (When to do the housekeeping? How would that be triggered? What to do if an entry for a RequestId is not found? How does that fit in with request cancellations? Etc.)

Adding the (CD, RT) state to the transport request itself on the server side would avoid this problem, since they'd be cleaned up when the request itself is cleaned up, and the compiler would guarantee it's always there.

With all the context now available, can you take another look at tikue#10? Definately needs cleanup, but it should give you an idea of what I was trying to achieve (and it works)

Something similar could be used for saving authentication/whatever context as well I think.

@tikue
Copy link
Collaborator

tikue commented Apr 16, 2023

I don't follow why you'd want to have two layers on the client side. I think this approach can be implemented without having an in-memory transport in between.

You were mentioning wanting to have multiple clients using the same mqtt transport, right? How does that work exactly? from a quick glance, I thought the mqtt transport would only allow you to set up one AsyncReceiver. But if you can have one AsyncReceiver per client, all sharing the same underlying mqtt transport, then I don't think any multiplexing would be needed. I trust that you know how paho_mqtt works better than I do :)

I tried to avoid saving the Map<RequestId (CD, RT)> state in the transport itself, because you'd then need to do some housekeeping on both sides of the channel in case some requests are never answered for example, which complicate things.

Ahh, yes, this is indeed a problem! I agree, the context would be a nice place to store this mapping.

With all the context now available, can you take another look at tikue#10?

Yes, I'll take a look once it's cleaned up! I kind of implied it earlier, but to be explicit, I think the only modification needed is to add some kind of AnyMap to context::Context. This map would not need to be serializable; it would just be for passing context between the transport and application, and back. So that means:

  1. I don't want to add a RequestSequencer. It shouldn't be necessary if the correlation data contains a ClientUid.
  2. I don't want to add a ContextualChannel. It shouldn't be necessary if context::Context is extensible.

@axos88
Copy link
Author

axos88 commented Apr 16, 2023

  1. In my current implementation all clients send their requests to the same Mqtt topic that the server listens to.

The request sequencer was added to avoid collisions between the request IDs (two different clients sending two requests with the same request id would confuse the server since the incoming channel is shared), although it is a half assed solution - I agree. Now that I'm thinking about it ( I'm writing from mobile,) but I think an approach where each client would send to a unique topic /request/<unique thing such as Mqtt client id> could be used instead. Then the server side transport could use the topic to salt and hash the incoming request id, creating a practically globally unique request id for all request across clients.

  1. The current doc says the context's purpose is to track a request throughout the system. If we allow the transport to inject data, and retrieve data from there, we would be changing it's purpose a bit, especially if we intend to store derived data there, such as auth info. I'm fine with that, but just wanted to double check with you.

@tikue
Copy link
Collaborator

tikue commented Apr 16, 2023

Re 2: that sounds like trace::Context? context::Context says "A request context that carries request-scoped information like deadlines and trace information." I think this wording applies to the use case we're considering.

@tikue
Copy link
Collaborator

tikue commented Apr 16, 2023

Regarding the server-side request ID issue, I think there shouldn't be a need to change the request ID at all, though I realize now what I fleshed out earlier would be insufficient. It can be done, though, with a similar setup to what I described for the client multiplexing.

  1. The server should have a separate BaseChannel for each client. It's just that the channels will all share a common MQTT transport. As long as each client has its own channel, request IDs won't collide. This does require the server to know all the clients sharing the mqtt.
  2. The transport layer can do what I fleshed out for the client side, where a spawned task reads from the MQTT transport and forwards to the correct server channel; and each server channel sends responses into a channel that the task reads from and forwards into the MQTT transport.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants