Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New API Proposal #270

Open
sleipnir opened this issue Sep 2, 2022 · 9 comments
Open

New API Proposal #270

sleipnir opened this issue Sep 2, 2022 · 9 comments

Comments

@sleipnir
Copy link
Collaborator

sleipnir commented Sep 2, 2022

Is your feature request related to a problem? Please describe.

One thing that has always bothered me about the elixir-grpc API is that it differs a lot in how we handle Stream gRPC in other languages and why we need a separate module and functions to handle responses from a gRPC stream. I would like to handle a grpc stream as fluidly as we do in other languages in which basically the types generated for input and output of an rpc stream are a stream type in the language, whereas in the case of elixir-grpc an output type of a stream is not an output type of the elixir function, instead we use a helper module (Server.send_reply) to output a response but this does not reflect in the function's Typespec as a stream.
I know it's hard to visualize what I'm saying so I'll put some examples from other languages here to demonstrate:

Kotlin Raw:

override fun routeChat(requests: Flow<RouteNote>): Flow<RouteNote> =
  flow {
    // could use transform, but it's currently experimental
    requests.collect { note ->
      val notes: MutableList<RouteNote> = routeNotes.computeIfAbsent(note.location) {
        Collections.synchronizedList(mutableListOf<RouteNote>())
      }
      for (prevNote in notes.toTypedArray()) { // thread-safe snapshot
        emit(prevNote)
      }
      notes += note
    }
  }

kotlin with Akka gRPC:

override fun handle(streamIn: Source<EventSourcedProto.EventSourcedStreamIn, NotUsed>?): Source<EventSourcedProto.EventSourcedStreamOut, NotUsed> =
        streamIn!!.log("CloudState-User")
                .map {
                    if (it.hasInit()) {
                        return@map it.init
                    }

                    if (it.hasCommand()) {
                        return@map it.command
                    }

                    if (!it.hasEvent()) {
                    } else {
                        return@map it.event
                    }

                }
}

Java with Akka gRPC:

@Override
  public Source<HelloReply, NotUsed> sayHelloToAll(Source<HelloRequest, NotUsed> in) {
    return in.runWith(inboundHub, system);
  }

Java Raw:

@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(StreamObserver<StockQuote> responseObserver) {
    return new StreamObserver<Stock>() {
        @Override
        public void onNext(Stock request) {
            for (int i = 1; i <= 5; i++) {
                StockQuote stockQuote = StockQuote.newBuilder()
                  .setPrice(fetchStockPriceBid(request))
                  .setOfferNumber(i)
                  .setDescription("Price for stock:" + request.getTickerSymbol())
                  .build();
                responseObserver.onNext(stockQuote);
            }
        }

        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }

        //handle OnError() ...
    };
}

Rust:

async fn route_chat(
    &self,
    request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<Self::RouteChatStream>, Status> {
    let mut notes = HashMap::new();
    let mut stream = request.into_inner();

    let output = async_stream::try_stream! {
        while let Some(note) = stream.next().await {
            let note = note?;

            let location = note.location.clone().unwrap();

            let location_notes = notes.entry(location).or_insert(vec![]);
            location_notes.push(note);

            for note in location_notes {
                yield note.clone();
            }
        }
    };

    Ok(Response::new(Box::pin(output)
        as Self::RouteChatStream))

}

Python:

def StreamCall(
        self, request_iterator: Iterable[phone_pb2.StreamCallRequest],
        context: grpc.ServicerContext
    ) -> Iterable[phone_pb2.StreamCallResponse]:
        try:
            request = next(request_iterator)
            logging.info("Received a phone call request for number [%s]",
                         request.phone_number)
        except StopIteration:
            raise RuntimeError("Failed to receive call request")
        # Simulate the acceptance of call request
        time.sleep(1)
        yield create_state_response(phone_pb2.CallState.NEW)

GO:

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      return nil
    }
    if err != nil {
      return err
    }
    key := serialize(in.Location)
                ... // look for notes to be sent to client
    for _, note := range s.routeNotes[key] {
      if err := stream.Send(note); err != nil {
        return err
      }
    }
  }
}

Notice that when we have an output stream in grpc declared then we also have an output data type as a Stream

Describe the solution you'd like

I would like given the following rpc to be created:

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

The equivalent implementation code in elixir would look like this:

@spec record_route(Enumerable.t()) :: Enumerable.t()
 def route_chat(stream) do
   Flow.from_enumerable(stream)
   |> Flow.map(fn item -> do_something(item) end)
   # |> Flow.run() # We should not materialize the stream here as this should be the responsibility of the library
 end

This would make stream development sound more natural to those coming from other languages and would also make it possible for Elixir development to be more compatible with Elixir's own Stream types.

But notice that in the code above there are some tricks:

First a Flow.map or Stream.map does not return an Enumerable in fact they only return a "DAG" (i'm using DAG just as an illustration for an execution path type) of the operations to be performed on the stream. Even something like Stream.run or Flown.run wouldn't also return a stream, it would just run the DAG.

As gRPC streams are by definition Unbounded, that is, they are infinite and "decoupled" from each other (by "decoupled" I mean input and output streams), so calling Enum.to_list(stream) would not solve the problem either. In this case an API that makes more sense for Elixir, and also true for other languages and frameworks (see the above example of Java with Akka) might be something like:

alias GRPC.Server.Stream, as: GrpcStream

@spec record_route(GRPC.Server.Stream.t()) :: GRPC.Server.Stream.t()
def route_chat(stream) do
  output_stream =
    Flow.from_enumerable(stream)
    |> Flow.map(fn item -> do_something(item) end)
    # |> Flow.run() # We should not materialize the stream here as this should be the responsibility of the library
   
   GrpcStream.run_with(output_stream) # or    GrpcStream.materialize(output_stream)
 end

@polvalente wdyt?

@beligante
Copy link
Contributor

beligante commented Oct 22, 2022

@sleipnir I think that's a really good. I pass trough the same problem as you.

One thing that I thought in doing here is a more elixir-wise solution and instead of return a stream we could start send messages to the process that started the stream. Similarly to what gun/mint does for the requests.

So, the process would receive an already parsed message on the stream. And you could implement an something like a gen_server to receive the messages. For example

defmodule MyStreamHandlerProcess do
  use GenServer
  
  # ... boilerplate for GS
  
  def handle_info({:elixir_grpc, {:headers, headers}}, state) do
    # ...
  end
  
  def handle_info({:elixir_grpc, {:data, data}}, state) do
    # ...  
  end
  
  def handle_info({:elixir_grpc, {:trailers, trailers}}, state) do
    # ...
  end
  
  def handle_info({:elixir_grpc, :done}, state) do
    # ...
  end
  
end

This GenServer could also be responsible not only to receive the messages, but also send the data (in the case of a bidirectional stream.

One workaround that I found to handle this on my projects to avoid having to wait for the server stream to end to start processing the messages is to do something similar to this. This adds a bit of async processing on your code.

defmodule MyStreamHandlerProcess do
  use GenServer

  # ... boilerplate for GS

  def send_request(grpc_stream) do
    {:ok, ex_stream} = GRPC.Stub.recv(grpc_stream)
    pid = self()
    
    # async cause Stream.run() will block the code until the stream is done
    Task.async(fn -> 
      ex_stream
      |> Stream.each(fn resp ->
        case resp do
          {:ok, data} -> send(pid, {:elixir_grpc, {:data, data}})
          # Covers
          # {:headers, headers}
          # {:trailers, trailers}
          # {:error, error}
          _other -> send(pid, {:elixir_grpc, resp})
        end
      end)
      |> Stream.run() # code will be blocked until the stream end
    
      # send a message to tell the process that the stream has ended
      send(pid, {:elixir_grpc, :done})
    end)
  end

  def handle_info({:elixir_grpc, {:headers, headers}}, state) do
    # ...
  end

  def handle_info({:elixir_grpc, {:data, data}}, state) do
    # ...
  end

  def handle_info({:elixir_grpc, {:trailers, trailers}}, state) do
    # ...
  end

  def handle_info({:elixir_grpc, :done}, state) do
    # ...
  end

end

What do you think?

@sleipnir
Copy link
Collaborator Author

This GenServer could also be responsible not only to receive the messages, but also send the data (in the case of a bidirectional stream.

One workaround that I found to handle this on my projects to avoid having to wait for the server stream to end to start processing the messages is to do something similar to this. This adds a bit of async processing on your code.

Hi @beligante, thanks for the interaction here. Sorry for the late reply, I've been quite busy on another project lately.

Overall I agree with your approach and have adopted similar strategies in my projects as well. However, one thing that you need to be careful with is the management of GenServer because you need to create a GenServer process for each different incoming connection, or use a pool of processes, otherwise GenServer itself becomes a bottleneck.
This also adds overhead overall (since it's more of an active process for the duration of the stream connection).

To be honest I would still like a flow-based API, but understanding that this might be complicated.

A reasonable alternative might be to use the GenStage API along the lines of what Broadway already does. The advantage of something based on GenStage is that you would already natively include backpressure capabilities in the gRPC library, something that Akka gRPC has already demonstrated to be the state of the art for gRPC libraries.

I strongly recommend taking a look at the Akka gRPC API and general Akka Streams concepts. It can be very useful as inspiration here, as other Elixir libraries like Flow have followed similar paths in essence and have been very successful.

What do you think?

@SimonWoolf
Copy link

SimonWoolf commented Oct 27, 2022

@beligante I'm just attempting to integrate elixir-grpc now, and before I even found this issue, I was trying to use a workaround like to the one you sketched out here (doing Stream.run() in a Task on the stream of responses), to avoid the problem of it blocking the genserver.

However, as far as I can tell, that doesn't work? The stream generation is calling :gun.await, but that is just a receive intercepting gun-specific messages sent to the calling process. So it's not achieving anything when called from a task process. The task never processes anything, and the calling process is just sent raw :gun_data messages.

I have a workaround, which is to intercept messages from gun and forward them to the receiver (whose pid can be stored in the state), eg

def handle_info(data, state) do
  if is_tuple(data) && data |> elem(0) |> to_string |> String.starts_with?("gun") do
    send(state.receiver_pid, data)
  else ...

This works, but is hacky. You said you'd implemented this in your own projects, in which case I suspect I'm overlooking a better way of doing things? Thanks in advance 🙂

@beligante
Copy link
Contributor

@sleipnir - Sorry late reply too - same problem you're having hahahaha - I think we can skip apologies next time

I gave a second look in what you're saying, but I'm not sure if I'm following + I think I misunderstood your your request.
(correct me if I'm wrong)

  • You request is for how servers handle streams or it's on both - how client and server - handles the streams?
  • Your pain is that server and client have to deal with other things rather than data when working with Streams, right?
    • I mean, On the streams we receive headers and trailers

In the ideal world, you would like the library to add more abstraction so that you can work we streams more purely. That's what I understood - Is that correct?

@beligante
Copy link
Contributor

beligante commented Oct 29, 2022

Hey @SimonWoolf

About your question. Yeah! I had to do a similar approach because of that strange behavior of gun - Just omitted for brevity . IDK why that happens to be honest hahaha. But yeah. That workaround is needed unfortunately :/

I've added a PR to add a new adapter to use Mint => #272 - Would love some reviews BTW 👁️

And with that adapter we don't have this problem as I'm wrapping the messages inside a GenServer.
If you want to test, checkout my branch and follow the instructions here:
https://gist.github.com/beligante/f715d3c8896df8b42d771f2b66106743

(I've been testing that PR on my staging environments for a while it's on a pretty good state FYI)

@sleipnir
Copy link
Collaborator Author

sleipnir commented Oct 29, 2022

Hi @beligante

@sleipnir - Sorry late reply too - same problem you're having hahahaha - I think we can skip apologies next time

I gave a second look in what you're saying, but I'm not sure if I'm following + I think I misunderstood your your request. (correct me if I'm wrong)

* You request is for how servers handle streams or it's on both - how client and server - handles the streams?

I refer to the server side, I think the biggest gap in the library is on the server side. Obviously the client side would benefit from such an approach as well.

Your pain is that server and client have to deal with other things rather than data when working with Streams, right?
I mean, On the streams we receive headers and trailers

In the ideal world, you would like the library to add more abstraction so that you can work we streams more purely. That's what I understood - Is that correct?

By Stream I mean stream processing semantics and not the gRPC specification. I think the stream processing semantics can be good semantics for dealing with gRPC streams especially those of type Unbounded, ie infinite.

This would bring the side benefit I referred to of back pressure. If we just add semantics, like you said "more elixir-wise solution", we still won't be able to handle back pressure, Not the kind I'm referring to (throtle, buffering, window, etc.) without having to do a lot of the work on the user side of the library. And then I cite two Elixir libraries that solved this problem, Genstate and Broadway.

I usually use a GenServer and a DynamicSupervisor, in addition to including a start-of-stream message in my protos, to aid in the handling of gRPC streams. But I think it's a bad hack mainly because you have to monitor the PIDs of the process and this ends up adding complexities to the code which can lead to failures that are difficult to track in production.

I really believe the library can go a step further and provide an API with batteries included like Akka gRPC.

@wingyplus
Copy link
Contributor

I'm not sure if we should discuss the error details mentioned in #109?

@sleipnir
Copy link
Collaborator Author

I'm not sure if we should discuss the error details mentioned in #109?

Maybe yes

@polvalente
Copy link
Contributor

I'm not sure if we should discuss the error details mentioned in #109?

Maybe yes

Since there's a specific issue for that and a WIP PR, perhaps this discussions would be more productive in #109, but I agree that it's relevant

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants