Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to consume more than 4MB from pubsub #2948

Open
mahendra-sawarkar-exa opened this issue Jan 3, 2023 · 2 comments
Open

Unable to consume more than 4MB from pubsub #2948

mahendra-sawarkar-exa opened this issue Jan 3, 2023 · 2 comments

Comments

@mahendra-sawarkar-exa
Copy link

We are using below dependencies in out project:

"com.google.cloud" % "google-cloud-pubsub" % "1.105.1" exclude("io.grpc", "grpc-alts"),
"io.grpc" % "grpc-alts" % "1.29.0",
"com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub-grpc" % "3.0.4"

below is the akka code:

val pubsubSubscription = Helper.getSubscriptionPath(customerName)
val pubsubConfig = PullRequest()
  .withSubscription(pubsubSubscription)
  .withReturnImmediately(true)
  .withMaxMessages(1000)

  GooglePubSub.subscribePolling(pubsubConfig, 500.millis)
  .map { pubsubMessage => print(s"I got message $pubsubMessage") )
  ......

but i am getting following error while consuming data from pubsub

Error to processing messages for Code Code(testUser).
io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: gRPC message exceeds maximum size 4194304: 4910368
at io.grpc.Status.asRuntimeException(Status.java:533)
at akka.grpc.internal.UnaryCallAdapter.onClose(UnaryCallAdapter.scala:40)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:413)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:721)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Jan 03, 2023 12:16:07 PM io.grpc.internal.AbstractClientStream$TransportState inboundDataReceived
INFO: Received data on closed stream

please guide, thanks in advance!

@gkatzioura
Copy link
Contributor

I had the same limitation on this one.
I ended up swapping the SubscriberClient on the GrpcSubscriber with a custom one using reflection.
A pr tackling this would be more suitable.

@mahendra-sawarkar-exa
Copy link
Author

@gkatzioura
Thanks for suggestion :)
is this error per pubsub message or for batch of messages?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants