Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSubInboundChannelAdapter swallows assertion errors #1103

Closed
damienhollis opened this issue Apr 27, 2022 · 13 comments
Closed

PubSubInboundChannelAdapter swallows assertion errors #1103

damienhollis opened this issue Apr 27, 2022 · 13 comments

Comments

@damienhollis
Copy link

Describe the bug
When a message is received via PubSubInboundChannelAdapter and during the message flow it causes an assertion error, there is absolutely no logging. If the adapter is configured to auto acknowledge, the message is simply redelivered over and over. This makes errors caused by assertions very hard to find.

All types of exception should be logged by the PubSubInboundChannelAdapter in some form.

Attached is an example program that demonstrates the issue. To run it do the following:

  • docker-compose up -d
  • ./mvnw test

pubsub.zip

@ddixit14 ddixit14 self-assigned this Apr 27, 2022
@elefeint
Copy link
Contributor

@artembilan What is the general Spring Integration philosophy when handling Error type throwables? When implementing Pub/Sub adapters, we've done similarly to Spring Integration and made sure to handle any RuntimeException. It's understandable that Error should not be caught as a matter of Java philosophy, but has the use-case of assertions throwing Error subtypes come up before? The current issue is that we provide helpful logging for a RuntimeException but not for other error types.

@damienhollis I am not sure built-in Java assertions are helpful here -- if a message is malformed, you'd more commonly send it to a dead-letter queue, or do manual logging of some sort. Pub/Sub will redeliver any message that has failed to get acked, so whether auto-ack mode is on or not, you'll still get redelivery, unless the server-side DLQ is enabled. I would suggest doing custom validation logic rather than relying on Assert behavior.

@elefeint
Copy link
Contributor

@damienhollis Also take a look at Spring org.springframework.util.Assert -- those utils throw runtime exceptions.

@artembilan
Copy link
Contributor

It's not PubSubInboundChannelAdapter responsibility to do something with these type of errors.
If you really use assert, then it is better to localize it over there and not try to rely on some "magic" from the server.

The PubSubInboundChannelAdapter is based on the MessageProducerSupport which has a logic like this:

	protected void sendMessage(Message<?> messageArg) {
		Message<?> message = messageArg;
		if (message == null) {
			throw new MessagingException("cannot send a null message");
		}
		message = trackMessageIfAny(message);
		try {
			this.messagingTemplate.send(getRequiredOutputChannel(), message);
		}
		catch (RuntimeException ex) {
			if (!sendErrorMessageIfNecessary(message, ex)) {
				throw ex;
			}
		}
	}

So, if you downstream flow throws a RuntimeException and you configure this channel adapter with an errorChannel, then you are good to handle them in one central point.
That's confirm what @elefeint is talking about.

Just for info: https://stackoverflow.com/questions/2758224/what-does-the-java-assert-keyword-do-and-when-should-it-be-used

@damienhollis
Copy link
Author

@elefeint and @artembilan thanks for your responses.

The fix for the particular case we encountered was to change the assert to a RuntimeException. So we have already implemented your proposed fix in our code.

However, my bigger concern is hitting other assertions in the code and that those will be silently ignored. To us, this seems like a very strange position to take for both spring-integration and for this particular adapter - it does not seem like a sensible default position. Similarly, if we were getting an OutOfMemoryError, I would not want that to just be ignored as it is critical information.

For better or worse, we use assert a lot in our code base (maybe something we need to reconsider), so this is worrying. In this particular case we can wrap our call to the service that might throw an AssertionError and convert it to a RuntimeException but that means wherever something is called by spring-integration we need to add this extra logic.

@elefeint
Copy link
Contributor

The Java Errors won't normally get caught by anything, so they would cause the application to terminate. They would not get ignored. Do you have something in the code catching all Throwables perhaps?

@artembilan
Copy link
Contributor

Yes, that was my thought, too. Spring Integration only catches those RuntimeExceptions. The rest are just re-thrown back to the library generating data for messages in such a channel adapter. In our case it is Pub/Sub subscriber. So, we need to see what configuration is applied over there for those not handled errors.

@damienhollis
Copy link
Author

The Errors are caught in FutureTask

public void run() {
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

and then nothing else happens with them.

The last pubsub code to deal with them is in MessageDispatcher:

  private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
    // This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
    // use below in the consumers
    SettableApiFuture<AckReply> ackReplySettableApiFuture = SettableApiFuture.create();
    ApiFutures.addCallback(ackReplySettableApiFuture, ackHandler, MoreExecutors.directExecutor());

    Runnable deliverMessageTask =
        new Runnable() {
          @Override
          public void run() {
            try {
              if (ackHandler
                  .totalExpiration
                  .plusSeconds(messageDeadlineSeconds.get())
                  .isBefore(now())) {
                // Message expired while waiting. We don't extend these messages anymore,
                // so it was probably sent to someone else. Don't work on it.
                // Don't nack it either, because we'd be nacking someone else's message.
                ackHandler.forget();
                return;
              }
              if (shouldSetMessageFuture()) {
                // This is the message future that is propagated to the user
                SettableApiFuture<AckResponse> messageFuture =
                    ackHandler.getMessageFutureIfExists();
                final AckReplyConsumerWithResponse ackReplyConsumerWithResponse =
                    new AckReplyConsumerWithResponse() {
                      @Override
                      public Future<AckResponse> ack() {
                        ackReplySettableApiFuture.set(AckReply.ACK);
                        return messageFuture;
                      }

                      @Override
                      public Future<AckResponse> nack() {
                        ackReplySettableApiFuture.set(AckReply.NACK);
                        return messageFuture;
                      }
                    };
                receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse);
              } else {
                final AckReplyConsumer ackReplyConsumer =
                    new AckReplyConsumer() {
                      @Override
                      public void ack() {
                        ackReplySettableApiFuture.set(AckReply.ACK);
                      }

                      @Override
                      public void nack() {
                        ackReplySettableApiFuture.set(AckReply.NACK);
                      }
                    };
                receiver.receiveMessage(message, ackReplyConsumer);
              }
            } catch (Exception e) {
              ackReplySettableApiFuture.setException(e);
            }
          }
        };
    if (message.getOrderingKey().isEmpty()) {
      executor.execute(deliverMessageTask);
    } else {
      sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
    }
  }

@oliverhenlich
Copy link

However, my bigger concern is hitting other assertions in the code and that those will be silently ignored. To us, this seems like a very strange position to take for both spring-integration and for this particular adapter - it does not seem like a sensible default position. Similarly, if we were getting an OutOfMemoryError, I would not want that to just be ignored as it is critical information.

I agree with this assessment.

If assertions don't get reported or logged anywhere (i.e. they are swallowed by the framework and it's components) does that mean there is an expectation that applications run with asserts turned off or that all relevant asserts be converted to RuntimeExceptions? It feels like if that is the case, that there should be a warning in the documentation about this. Perhaps in https://docs.spring.io/spring-integration/reference/html/error-handling.html?

@elefeint
Copy link
Contributor

The mismatch is philosophical: Spring Integration expects to handle runtime exceptions, but allow the rest to kill the app (as is proper). Pub/Sub client library, however, must make sure message handling gets completed, no matter what. So the client library catches all Throwables, nacks them and moves on, preventing application shutdown.

We can probably catch-log-rethrow non-RuntimeException throwables in Spring Cloud GCP to make this more user-friendly. As a matter of user application practice, it's definitely not ideal to be throwing Error subtypes in message handling code.

@damienhollis
Copy link
Author

We have treated asserts as checking preconditions but not necessarily as fatal to the whole application, so we have used them a lot. However, thinking more about it and the fact they are subclasses of Error, it makes sense to treat them as fatal in general. I think we will start to replace them with some kind of runtime exception but we can't do the overnight and if other types of errors occur, e.g. OOM it would be useful to log the issue, so we know these things are happening.

I think all you really need to do is when you nack the message, you can also log the Throwable - you don't need to rethrow it (haven't checked the code but that is my recollection).

@elefeint
Copy link
Contributor

elefeint commented Jun 1, 2022

I dug into this behavior again, and while we can work around it in the Spring Integration adapter, the same issue exists in the template level and the underlying client library.
I've filed googleapis/java-pubsub#1156 to fix the underlying issue.

You should definitely not keep java Error-throwing assertions in the production code, but at the same time, you are correct in that the Spring Cloud GCP / client library layers should at some point log useful information about an Error.

@damienhollis
Copy link
Author

Thanks @elefeint - appreciate your work on this and logging Errors will be very helpful in general.

At the same time, we are actively working to remove asserts from our code base based on your suggestions and other reading we have done.

@elefeint
Copy link
Contributor

Thanks! I'll close this issue for now; there will likely be nothing to do in Spring Cloud GCP once the underlying client library logging is added.

@elefeint elefeint added client-library type: enhancement New feature or request and removed awaiting labels Oct 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants