Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected Error Waiting for Headers #337

Open
Tonyhaenn opened this issue Nov 15, 2023 · 9 comments
Open

Unexpected Error Waiting for Headers #337

Tonyhaenn opened this issue Nov 15, 2023 · 9 comments
Labels

Comments

@Tonyhaenn
Copy link

Describe the bug
We're using elixir grpc to communicate with the Google Cloud Speech V2 models. Intermittently, the call fails and it's not entirely clear why. Is this based on a bad / unexpected response from the speech v2 model, or a subtle bug in the elixir-grpc

%GRPC.RPCError{status: 2, message: "unexpected when waiting for headers: {:trailers, [{"grpc-status", "0"}, {"content-disposition", "attachment"}, {"x-goog-ext-~~~~-bin", "DR+~~~~=="}, {"x-goog-ext-~~~~-bin", "DWG0/EA="}]}"}

To Reproduce
Steps to reproduce the behavior: Unclear unfortunately. Seems indeterminate based on the Google response

Additional context
Looking for some guidance on how to diagnose and correct. We're catching and handling the error, but it'd be nice not to have the error in the first place.

@Tonyhaenn Tonyhaenn added the bug label Nov 15, 2023
@mickel8
Copy link

mickel8 commented Nov 16, 2023

Sometimes, instead of trailers we get

%GRPC.RPCError{status: 2, message: "unexpected when waiting for headers: {:data, <<some data>>}"}

@mickel8
Copy link

mickel8 commented Nov 18, 2023

I found the problem.

For the context, we use grpc streams.

Gun by default sends its messages to a calling process. This means that if you use grpc in a GenServer and you get a message before calling GRPC.Stub.recv, it will be dropped by the default implementation of GenServer's handle_info.

That's why we get unexpected when waiting for headers. Messages are sent from the server correctly (confirmed with Wireshark), gun receives and forwards them correctly too, but they are dropped by GenServer's handle_info 🤔

@polvalente
Copy link
Contributor

This seems like a user-side problem on handling gRPC's messages. I'm closing the issue because of that, but feel free to comment if any improvements on our side could be done!

Maybe on using via a multi-purpose GenServer (which I wouldn't advise due to serial message processing issues), you could match on gRPC's messages specifically to handle differently than the generic ones.

@mickel8
Copy link

mickel8 commented Nov 27, 2023

This seems like a user-side problem on handling gRPC's messages.

Doesn't it mean that we can't basically use grpc streams in a GenSever?

Even if we knew which messages from gun are gRPC related, we can't feed them into gRPC library. The library assumes we always call GRPC.Stub.recv which under the hood calls :gun.await. This will fails if we call GRPC.Stub.recv too late as we will end up in a state where some of gun messages are received by our GenServer's handle_info and some other are received by :gun.await 🤔

To make our implemenetation work, we had to wrap calls to the grpc library into a plain process with a dedicated receive block that doesn't have a default branch.

Another option could be using Mint adapter as Mint provides both passive and active mode but unfortunately it crashes.

@polvalente
Copy link
Contributor

I took a look into the Gun adapter and I can't actually see what would make your GenServer receive the message that should be sent to Gun instead.

Do you have a minimal example of how you're calling gRPC to get the error?

@polvalente polvalente reopened this Nov 27, 2023
@mickel8
Copy link

mickel8 commented Nov 27, 2023

I can try to explain this in more detail:

  1. We start sending data to Google Speech to Text (STT) service
  2. While we are still sending data, STT starts sending responses
  3. Those responses are received by gun. Gun sends them to a process that spawned it. Becasue our GenServer calls grpc functions like connect, it's our GenServer who (under the hood and implicitly) spawns gun process.
  4. Our GenServer gets those first responses in handle_info and drops them.
  5. After we finish streaming to STT, we call GRPC.Stub.recv which calls :gun.await. :gun.await is a simple receive do end block that goes through our GenServer's mailbox. It reads http2 responses and returns them. Those responses are then parsed by grpc. The problem is that some of those responses have already been fetched by handle_info.

In other words, I belive it's all about gun working in the active mode - i.e. it sends HTTP2 responses to a calling process which in the case of grpc is always a process that calls GRPC.Stub.connect.

Does it sound clearer now? I can also try to create some minimal reproducible example in a free time.

@polvalente
Copy link
Contributor

Ah, I think I get it more clearly now. A minimal reproduction would be helpful!

However, I think that the issue at hand has more to do with event ordering than anything. From the workflow described, it seems that your GenServer logic expects to only receive data after you finish sending it, and that doesn't seem to correspond to reality.

A possible (paliative) solution would be to spawn a separate linked process to your GenServer, such that this process will only receive messages from gun (or maybe GenServer.calls from its parent). This would enable you to either just re-enqueue said messages or just never read them (depending on whether you're spawning a plain process or a GenServer of sorts).

It might make sense for this process encapsulation to be absorbed by this library, but I'd need the reproduction to experiment with this.

@beligante
Copy link
Contributor

I can come up with an example for this this week. I've working on an internal library at my workplace to manage connection pools using elixir-grpc and I know the best way to show this

@beligante
Copy link
Contributor

@mickel8 Not sure if you found a workaround for this, but here's what I got

In my code design, I've wrap the processing of grpc streams inside a gen_server. What I was able to do was:

  • I wrap the stream processing inside a task
  • that task will be iterating over the Stream struct (that calls :gun.await )
  • My GenServer will receive the HTTP Packages and forward them to the Task pid
  • Inside the Task pid, the calls to gun.await will no longer hang

You're right on your assumption about this being problem with Gun in active mode. There are some other underlying issues with the Gun adapter leaking the messages to the process that start the connection as well. Anyways, I hope you find the bellow code useful

defmodule Processor do
  #.. my own code 
  defp request(%{stream: nil, processor_meta: processor_meta} = _state, request) do
    channel = ConnectionPool.channel!(processor_meta[:pool])
    %{grpc_module: grpc_module, rpc_name: rpc_name} = processor_meta[:stream_rpc]
    stub = Module.concat(grpc_module, Stub)

    stream =
      stub
      |> apply(rpc_name, [channel])
      |> GRPC.Stub.send_request(request)

      
    # Start the gRPC Streaming request
    {:ok, ex_stream} = GRPC.Stub.recv(stream, timeout: :infinity)
    my_pid = self()

    stream_processor_sup = processor_meta[:task_supervisor]

    task =
      Task.Supervisor.async(stream_processor_sup, fn ->
        # Wrap this gRPC Streaming processing inside a task and store the task pid
        ex_stream
        |> Stream.each(&send(my_pid, {:process_response, &1}))
        |> Stream.run()

        send(my_pid, {:process_response, :done})
      end)

    {stream, task.pid, make_ref()}
  end

  defp do_send_request(%{stream: stream} = state, query_request) do
    stream = GRPC.Stub.send_request(stream, query_request)
    {stream, nil, state.ref}
  end

  @impl GenServer
  def handle_info(
        {:process_response, {:ok, response}},
        state
      ) do
    state.processor_meta[:receive_function].(response)

    {:noreply, state}
  end

  def handle_info({:process_response, :done}, state) do
    {:noreply, state, {:continue, :reset}}
  end

  def handle_info({_ref, {:process_response, :done}}, state) do
    {:noreply, state}
  end

  def handle_info({:process_response, {:error, error}}, state) do
    Logger.error("received error from stream: #{inspect(error)}")
    {:noreply, state, {:continue, :reset}}
  end

  def handle_info(msg, state) do
    # async actions
    case msg do
      # gun specific messages
      {:gun_data, _pid, _ref, _is_fin, _data} ->
        if is_pid(state.stream_task), do: send(state.stream_task, msg)

      {:gun_error, _pid, _ref, _reason} ->
        if is_pid(state.stream_task), do: send(state.stream_task, msg)

      {:gun_trailers, _pid, _ref, _headers} ->
        if is_pid(state.stream_task), do: send(state.stream_task, msg)

      # messages received when the async task finish
      {_ref, :ok} ->
        :ok

      {:DOWN, _ref, :process, _pid, :normal} ->
        :ok

      _other ->
        Logger.warning("unexpected msg: #{inspect(msg)}")
    end

    # gen server reply
    case msg do
      {:DOWN, _ref, :process, _pid, :normal} -> {:noreply, state, {:continue, :reconnect}}
      _other -> {:noreply, state}
    end
  end
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants