Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub Subscriber throws StatusRuntimeException when holding many leases #1126

Closed
relud opened this issue May 11, 2022 · 2 comments · Fixed by #1135
Closed

PubSub Subscriber throws StatusRuntimeException when holding many leases #1126

relud opened this issue May 11, 2022 · 2 comments · Fixed by #1135
Assignees
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. 🚨 This issue needs some love. triage me I really want to be triaged.

Comments

@relud
Copy link
Contributor

relud commented May 11, 2022

Environment details

  1. OS type and version: max os x 10.16; linux 5.4.170+ (osImage: Container-Optimized OS from Google)
  2. Java version: Oracle Java 11.0.10 on mac os; openjdk version "11.0.15" 2022-04-19 in docker/gke
  3. version(s): google-cloud-pubsub 1.116.0, 1.116.1, 1.116.2, 1.116.3, 1.116.4, 1.117.0

Steps to reproduce

  1. create pubsub topic and subscription
  2. publish at least 2775 messages to the topic
  3. use com.google.cloud.pubsub.v1.Subscriber to consume at least 2775 messages and hold their lease for at least 15 seconds.
  4. observe stack trace below

Code example

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class HoldLeases {

  public static void main(String[] args) throws Exception {

    final String projectId = ServiceOptions.getDefaultProjectId();
    final TopicAdminClient topicAdminClient = TopicAdminClient.create();
    final SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();
    final Subscription subscription = Subscription.newBuilder()
        .setName("projects/" + projectId + "/subscriptions/test-subscription-" + UUID.randomUUID())
        .setTopic("projects/" + projectId + "/topics/test-topic-" + UUID.randomUUID())
        .setAckDeadlineSeconds(10).build();

    topicAdminClient.createTopic(subscription.getTopic());
    subscriptionAdminClient.createSubscription(subscription.getName(), subscription.getTopic(),
        PushConfig.getDefaultInstance(), 0);
    final Publisher publisher = Publisher.newBuilder(subscription.getTopic()).build();

    try {
      // publish enough messages to exceed a single ModifyAckDeadline request
      int sentCount = 2775;
      List<ApiFuture<String>> published = new ArrayList<>(sentCount);
      for (int i = 0; i < sentCount; i++) {
        published.add(publisher
            .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("" + i)).build()));
      }
      publisher.publishAllOutstanding();
      for (ApiFuture<String> future : published) {
        future.get();
      }

      // pull all messages and hold them for 1 minute
      final CompletableFuture<Void> done = new CompletableFuture<Void>().orTimeout(15,
          TimeUnit.SECONDS);
      final MessageReceiver receiver = (message, consumer) -> {
        done.whenComplete((result, exception) -> {
          if (exception == null) {
            consumer.ack();
          } else {
            consumer.nack();
          }
        });
      };

      final Subscriber subscriber = Subscriber
          .newBuilder(ProjectSubscriptionName.parse(subscription.getName()), receiver)
          .setFlowControlSettings(
              FlowControlSettings.newBuilder().setMaxOutstandingElementCount(2775L)
                  .setMaxOutstandingRequestBytes(30_000_000L).build())
          .build();
      done.whenComplete((v, e) -> subscriber.stopAsync());
      try {
        subscriber.startAsync();
        subscriber.awaitTerminated();
      } finally {
        subscriber.stopAsync();
      }
    } finally {
      topicAdminClient.deleteTopic(subscription.getTopic());
      subscriptionAdminClient.deleteSubscription(subscription.getName());
      publisher.shutdown();
    }
  }
}

Stack trace

May 11, 2022 11:56:01 AM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes.
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:535)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes.
	at io.grpc.Status.asRuntimeException(Status.java:535)
	... 17 more

Any additional information below

Issue does not appear with library version 1.115.5 and below.

I don't see the issue with less than 2772 messages in flight, but it's didn't start reliably showing until 2775 messages in flight.

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/java-pubsub API. label May 11, 2022
@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label May 12, 2022
@yoshi-automation yoshi-automation added the 🚨 This issue needs some love. label May 16, 2022
@mmicatka
Copy link
Contributor

Thanks for raising this issue, I have put together a fix that is under review now (linked above). Will post a follow-up after the change is merged.

@mmicatka
Copy link
Contributor

The root cause of this issue was an incorrect batching of modacks that arose when the exactly-once delivery changes were implemented. This has been fixed with the above PR (#1135)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. 🚨 This issue needs some love. triage me I really want to be triaged.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants