Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Cloud GCP Pub/Sub Binder: enabling/disabling service account causes different behaviour for subscriptions #2158

Closed
mayur-solace opened this issue Sep 11, 2023 · 12 comments
Labels
priority: p2 type: bug Something isn't working

Comments

@mayur-solace
Copy link

mayur-solace commented Sep 11, 2023

Setup

  1. Using the Spring Cloud GCP v3.6.3
  2. Have setup a Service Account with the necessary permission for the Pub/Sub Binder to work fine.
  3. Have two different subscriptions. One with message ordering and exactly once delivery enabled and second without message ordering enabled.
  4. Running a Pub/Sub binder application with two consumer functions, each consuming from one subscription.

Steps:

  1. When service account is enabled, start the Pub/Sub binder application.
  2. Publish some messages to topic and confirm messages are received by both consumers and logged by the Pub/Sub binder application.
  3. Disable the service account, wait for the Pub/Sub binder to log the exception message
c.g.c.p.v.StreamingSubscriberConnection  : terminated streaming with exception

com.google.api.gax.rpc.UnauthenticatedException: com.google.api.gax.rpc.UnauthenticatedException: io.grpc.StatusRuntimeException: UNAUTHENTICATED: Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.
  1. Enable a service account.
  2. Publish new messages to topic. Confirm only the consumer consuming from subscription on which ordering is enabled consumes message, second consumer is not able to consume messages.

I'm using this sample sink for reproduction with below configuration.

spring.cloud.stream.function.bindings.logUserMessage1-in-0=input1
spring.cloud.stream.bindings.input1.destination=test-topic1

spring.cloud.stream.function.bindings.logUserMessage2-in-0=input2
spring.cloud.stream.bindings.input2.destination=test-topic2

# Optional, as Spring Cloud Stream will autodiscover the correct functional bean.
spring.cloud.function.definition=logUserMessage1;logUserMessage2

# Custom port to avoid conflict with the Source app.
server.port=8081

#spring.cloud.gcp.project-id=[YOUR_GCP_PROJECT_ID]
#spring.cloud.gcp.credentials.location=file:[LOCAL_PATH_TO_CREDENTIALS]

spring.cloud.stream.gcp.pubsub.default.consumer.auto-create-resources=false
spring.cloud.stream.gcp.pubsub.bindings.input1.consumer.subscription-name=test-topic1-sub # manually create subscription with ordering enabled
spring.cloud.stream.gcp.pubsub.bindings.input2.consumer.subscription-name=test-topic2-sub # manually create subscription with ordering not enabled
@Configuration
public class Sink {

  private static final Log LOGGER = LogFactory.getLog(Sink.class);

  @Bean
  public Consumer<String> logUserMessage1() {
    return userMessage -> {
      LOGGER.info(
          String.format(
              "Sub1: New message received > %s:", userMessage));
    };
  }

  @Bean
  public Consumer<String> logUserMessage2() {
    return userMessage -> {
      LOGGER.info(
          String.format(
              "Sub2: New message received > %s:", userMessage));
    };
  }
}

Background
I'm basically trying to simulate a scenario where the consumer may temporarily shutdown for some reason.

@mayur-solace mayur-solace changed the title Spring Cloud GCP Pub/Sub Binder: enabling/disabling service account causes different behaviour for subscripter Spring Cloud GCP Pub/Sub Binder: enabling/disabling service account causes different behaviour for subscriptions Sep 11, 2023
@diegomarquezp diegomarquezp added priority: p2 type: bug Something isn't working labels Sep 12, 2023
@diegomarquezp
Copy link
Contributor

Hi @mayur-solace, thanks for reporting this. Would you mind sharing a reproducible project so we can look into it? Is the configuration and the functions the only difference with the available sample applications?

@mayur-solace
Copy link
Author

mayur-solace commented Sep 12, 2023

Hi @mayur-solace, thanks for reporting this. Would you mind sharing a reproducible project so we can look into it? Is the configuration and the functions the only difference with the available sample applications?

Yes only configuration shared above and functions different.
I'm using the Service Account key json for #spring.cloud.gcp.credentials.location=file:[LOCAL_PATH_TO_CREDENTIALS]

I pushed modified sample here > https://github.com/mayur-solace/spring-cloud-gcp/pull/1/files

@diegomarquezp

@meltsufin
Copy link
Member

@mayur-solace If I understand correctly, you are observing that after disabling and re-enable the service account, only the consumer with ordering enabled recovers?
I wonder if both consumers get the auth exception when you disable the account. It's strange that ordering would somehow have an effect. Maybe it's just a coincidence. What if both subscriptions don't have ordering enabled?

@mayur-solace
Copy link
Author

mayur-solace commented Sep 18, 2023

@meltsufin,

If I understand correctly, you are observing that after disabling and re-enable the service account, only the consumer with ordering enabled recovers?

Your understanding is correct. In other word the issue that I'm reporting here is - why consumer with ordering not enabled don't recover?

I wonder if both consumers get the auth exception when you disable the account.

In my test, both consumers are running in the same application. When service account is disabled, I see only a single thread printing the com.google.api.gax.rpc.UnauthenticatedException. The thread printing exception is one consuming from subscription with ordering not enabled. I could say this because of another issue I observer as mentioned below.

Because of your above point, I did another test to see what happens when I send a message to subscription with ordering enabled through google console and service account used by SCSt application is still disabled.
Here is another, even bigger issue. The application receives a message successfully from subscription with ordering enabled, even though service account is disabled. Sample log.

2023-09-18 10:23:58.663  WARN 83720 --- [bscriber-SE-1-2] c.g.c.p.v.StreamingSubscriberConnection  : failed to send operations

com.google.api.gax.rpc.UnauthenticatedException: io.grpc.StatusRuntimeException: UNAUTHENTICATED: Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:116) ~[gax-2.32.0.jar:2.32.0]
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.32.0.jar:2.32.0]
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.32.0.jar:2.32.0]
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.32.0.jar:2.32.0]
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84) ~[api-common-2.15.0.jar:2.15.0]
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1127) ~[guava-32.1.2-jre.jar:na]
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-32.1.2-jre.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286) ~[guava-32.1.2-jre.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055) ~[guava-32.1.2-jre.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:807) ~[guava-32.1.2-jre.jar:na]
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:578) ~[grpc-stub-1.56.1.jar:1.56.1]
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:548) ~[grpc-stub-1.56.1.jar:1.56.1]
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.56.1.jar:1.56.1]
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.56.1.jar:1.56.1]
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.56.1.jar:1.56.1]
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.32.0.jar:2.32.0]
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[grpc-core-1.56.1.jar:1.56.1]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: io.grpc.StatusRuntimeException: UNAUTHENTICATED: Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.
	at io.grpc.Status.asRuntimeException(Status.java:539) ~[grpc-api-1.56.1.jar:1.56.1]
	... 18 common frames omitted

2023-09-18 10:23:59.524  INFO 83720 --- [sub-subscriber4] com.example.Sink                         : Sub1: New message received > test test test :
2023-09-18 10:23:59.573  INFO 83720 --- [sub-subscriber2] com.example.Sink                         : Sub1: New message received > test test test :
2023-09-18 10:23:59.591  WARN 83720 --- [bscriber-SE-1-4] c.g.c.p.v.StreamingSubscriberConnection  : failed to send operations

com.google.api.gax.rpc.UnauthenticatedException: io.grpc.StatusRuntimeException: UNAUTHENTICATED: Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:116) ~[gax-2.32.0.jar:2.32.0]
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.32.0.jar:2.32.0]
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.32.0.jar:2.32.0]
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.32.0.jar:2.32.0]
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84) ~[api-common-2.15.0.jar:2.15.0]
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1127) ~[guava-32.1.2-jre.jar:na]
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-32.1.2-jre.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286) ~[guava-32.1.2-jre.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055) ~[guava-32.1.2-jre.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:807) ~[guava-32.1.2-jre.jar:na]
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:578) ~[grpc-stub-1.56.1.jar:1.56.1]
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:548) ~[grpc-stub-1.56.1.jar:1.56.1]
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.56.1.jar:1.56.1]
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.56.1.jar:1.56.1]
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.56.1.jar:1.56.1]
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.32.0.jar:2.32.0]
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.56.1.jar:1.56.1]
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[grpc-core-1.56.1.jar:1.56.1]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: io.grpc.StatusRuntimeException: UNAUTHENTICATED: Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.
	at io.grpc.Status.asRuntimeException(Status.java:539) ~[grpc-api-1.56.1.jar:1.56.1]
	... 18 common frames omitted

2023-09-18 10:23:59.598  WARN 83720 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection  : failed to send operations

If you notice, while thread bscriber-SE-1-4, bscriber-SE-1-1 is throwing exception, threads sub-subscriber4 and sub-subscriber2 received a message from subscription with ordering enabled.

What if both subscriptions don't have ordering enabled?

When both the subscribers don't have ordering enabled, none of the consumers receives messages when service account is disabled and re-enabled.

@meltsufin
Copy link
Member

@TimurSadykov @maitrimangal Do you have any advice for us here? I believe this has more to do with auth and Pub/Sub than the wiring in Spring Cloud GCP. I suspect that auth tokens are still active for a short time after the account is disabled.

@mayur-solace
Copy link
Author

@meltsufin where you able to reproduce the issue on your end. Is there a plan for fixing the bug.

I see the priority assigned to this bug is P2. I think the priority should be higher because if the service account is disabled after initial connection Subscriber is still able to consume messages.

@meltsufin
Copy link
Member

@mayur-solace Would you be able to reproduce the issue with the Pub/Sub client library alone? This would help direct the issue to the team that is better positioned to investigate.

@mayur-solace
Copy link
Author

ok, I will give a try to write a reproduction with the Pub/Sub client library.

@mayur-solace
Copy link
Author

@meltsufin Find below the Pub/Sub client library samples for reproduction.

SpringCloudGCPBug2158Subscriber.java
SpringCloudGCPBug2158Publisher.java

Note: I used two separate service account keys for publisher and subscribers, because for this test I want publisher to continue working when I disable service account key used by subscriber.

To summarize,

  1. I was NOT able to reproduce the first issue that I reported (i.e. when service account is disabled and re-enabled, message flow on ordered subscription resumes and message flow on non-orderded subscription stalls).
    In this standalone Pub/Sub client samples, for both ordered and non-ordered subscription, the message flow resumes.

  2. The second issue, when service account is disabled still messages are received is reproduced in sample SpringCloudGCPBug2158Subscriber.

Steps:

  1. start SpringCloudGCPBug2158Subscriber using the service account key setup for subscriber.

  2. start SpringCloudGCPBug2158Publisher using the service account key or ADC setup for publisher.

  3. Check the subscriber logs and see messages are received as expected.

  4. Disable the service account key setup for subscriber.

  5. Check the subscriber logs and see messages are received along with com.google.api.gax.rpc.UnauthenticatedException: io.grpc.StatusRuntimeException: UNAUTHENTICATED: (as reported in second issue)

  6. Wait for few minutes

  7. Enable service account key

  8. After few (10-15) seconds - there will be no more UnauthenticatedException and normal message flow resumes. (for both ordered and non-ordered subscription. This behaviour is NOT same as the Spring Cloud GCP Pub/Sub binder in which message flow on non-ordered subscription stalls. Sample shared in comment)

  9. change the withOrderingEnabled=false in SpringCloudGCPBug2158Subscriber.java and repeat the test for subscription with No Ordering. Repeat the above test steps.

@meltsufin
Copy link
Member

Thank you so much of the repro @mayur-solace!
Would you mind filing a separate issue for (2) in https://github.com/googleapis/java-pubsub/issues and reference the repro. Note that it's not a Spring Cloud GCP issue. So, you might want to rename the classes there.

For (1), it might be an issue in Spring Cloud GCP. Can you make that also a separate issue, but in this repo?

@meltsufin
Copy link
Member

Some more context on (1).

  • A similar question had been asked before, and the conclusion was that Pub/Sub client library is not supposed to reconnect after an auth failure. So, I'm not sure why you're seeing reconnects in some cases.
  • If the Pub/Sub service goes down, there is a way to reconnect according to this.

@zhumin8
Copy link
Contributor

zhumin8 commented Dec 26, 2023

Filed the second issue to Pub/Sub client library googleapis/java-pubsub#1852.
Closing this issue as the part relevant to Spring Cloud GCP is not reproduced.
Please reopen if this is still relevant and reproducible.

@zhumin8 zhumin8 closed this as completed Dec 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority: p2 type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants