diff --git a/google-cloud-pubsub/pom.xml b/google-cloud-pubsub/pom.xml index 27e436e44..22fe71e2d 100644 --- a/google-cloud-pubsub/pom.xml +++ b/google-cloud-pubsub/pom.xml @@ -95,6 +95,11 @@ + + org.mockito + mockito-core + test + junit junit diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java new file mode 100644 index 000000000..199186004 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java @@ -0,0 +1,25 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import java.util.concurrent.Future; + +public interface AckReplyConsumerWithResponse { + Future ack(); + + Future nack(); +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java new file mode 100644 index 000000000..3b67ce219 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java @@ -0,0 +1,85 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.api.core.SettableApiFuture; +import java.util.Optional; + +public class AckRequestData { + private final String ackId; + private final Optional> messageFuture; + + protected AckRequestData(Builder builder) { + this.ackId = builder.ackId; + this.messageFuture = builder.messageFuture; + } + + public String getAckId() { + return ackId; + } + + public SettableApiFuture getMessageFutureIfExists() { + return this.messageFuture.orElse(null); + } + + public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) { + if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) { + switch (ackResponse) { + case SUCCESSFUL: + if (setResponseOnSuccess) { + this.messageFuture.get().set(ackResponse); + } + break; + case INVALID: + case OTHER: + case PERMISSION_DENIED: + case FAILED_PRECONDITION: + // Non-succesful messages will get set for both acks, nacks, and modacks + this.messageFuture.get().set(ackResponse); + break; + } + } + return this; + } + + public boolean hasMessageFuture() { + return this.messageFuture.isPresent(); + } + + public static Builder newBuilder(String ackId) { + return new Builder(ackId); + } + + /** Builder of {@link AckRequestData AckRequestData}. */ + protected static final class Builder { + private final String ackId; + private Optional> messageFuture = Optional.empty(); + + protected Builder(String ackId) { + this.ackId = ackId; + } + + public Builder setMessageFuture(SettableApiFuture messageFuture) { + this.messageFuture = Optional.of(messageFuture); + return this; + } + + public AckRequestData build() { + return new AckRequestData(this); + } + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckResponse.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckResponse.java new file mode 100644 index 000000000..162d87bc1 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckResponse.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +public enum AckResponse { + PERMISSION_DENIED, + FAILED_PRECONDITION, + SUCCESSFUL, + INVALID, + OTHER +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 4177c6e01..a9f73d5c3 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -28,19 +28,8 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -57,8 +46,8 @@ */ class MessageDispatcher { private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName()); - private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9; + @InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9; @InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100); private final Executor executor; @@ -68,24 +57,31 @@ class MessageDispatcher { private final Duration ackExpirationPadding; private final Duration maxAckExtensionPeriod; - private final int maxSecondsPerAckExtension; - private final MessageReceiver receiver; + private int minDurationPerAckExtensionSeconds; + private final boolean minDurationPerAckExtensionDefaultUsed; + private final int maxDurationPerAckExtensionSeconds; + private final boolean maxDurationPerAckExtensionDefaultUsed; + + // Only one of receiver or receiverWithAckResponse will be set + private MessageReceiver receiver; + private MessageReceiverWithAckResponse receiverWithAckResponse; + private final AckProcessor ackProcessor; private final FlowController flowController; + + private AtomicBoolean enableExactlyOnceDelivery; + private final Waiter messagesWaiter; // Maps ID to "total expiration time". If it takes longer than this, stop extending. private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>(); - private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); - private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); - private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>(); - // Start the deadline at the minimum ack deadline so messages which arrive before this is - // updated will not have a long ack deadline. - private final AtomicInteger messageDeadlineSeconds = - new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS); + private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(); private final AtomicBoolean extendDeadline = new AtomicBoolean(true); private final Lock jobLock; private ScheduledFuture backgroundJob; @@ -94,28 +90,6 @@ class MessageDispatcher { // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; - /** Stores the data needed to asynchronously modify acknowledgement deadlines. */ - static class PendingModifyAckDeadline { - final List ackIds; - final int deadlineExtensionSeconds; - - PendingModifyAckDeadline(int deadlineExtensionSeconds, String... ackIds) { - this(deadlineExtensionSeconds, Arrays.asList(ackIds)); - } - - private PendingModifyAckDeadline(int deadlineExtensionSeconds, Collection ackIds) { - this.ackIds = new ArrayList(ackIds); - this.deadlineExtensionSeconds = deadlineExtensionSeconds; - } - - @Override - public String toString() { - return String.format( - "PendingModifyAckDeadline{extension: %d sec, ackIds: %s}", - deadlineExtensionSeconds, ackIds); - } - } - /** Internal representation of a reply to a Pubsub message, to be sent back to the service. */ public enum AckReply { ACK, @@ -124,21 +98,30 @@ public enum AckReply { /** Handles callbacks for acking/nacking messages from the {@link MessageReceiver}. */ private class AckHandler implements ApiFutureCallback { - private final String ackId; + private final AckRequestData ackRequestData; private final int outstandingBytes; private final long receivedTimeMillis; private final Instant totalExpiration; - private AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) { - this.ackId = ackId; + private AckHandler( + AckRequestData ackRequestData, int outstandingBytes, Instant totalExpiration) { + this.ackRequestData = ackRequestData; this.outstandingBytes = outstandingBytes; this.receivedTimeMillis = clock.millisTime(); this.totalExpiration = totalExpiration; } + public AckRequestData getAckRequestData() { + return ackRequestData; + } + + public SettableApiFuture getMessageFutureIfExists() { + return this.ackRequestData.getMessageFutureIfExists(); + } + /** Stop extending deadlines for this message and free flow control. */ private void forget() { - if (pendingMessages.remove(ackId) == null) { + if (pendingMessages.remove(this.ackRequestData.getAckId()) == null) { /* * We're forgetting the message for the second time. Probably because we ran out of total * expiration, forget the message, then the user finishes working on the message, and forget @@ -154,64 +137,77 @@ private void forget() { public void onFailure(Throwable t) { logger.log( Level.WARNING, - "MessageReceiver failed to process ack ID: " + ackId + ", the message will be nacked.", + "MessageReceiver failed to process ack ID: " + + this.ackRequestData.getAckId() + + ", the message will be nacked.", t); - pendingNacks.add(ackId); + this.ackRequestData.setResponse(AckResponse.OTHER, false); + pendingNacks.add(this.ackRequestData); forget(); } @Override public void onSuccess(AckReply reply) { - LinkedBlockingQueue destination; switch (reply) { case ACK: - destination = pendingAcks; + pendingAcks.add(this.ackRequestData); // Record the latency rounded to the next closest integer. ackLatencyDistribution.record( Ints.saturatedCast( (long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D))); break; case NACK: - destination = pendingNacks; + pendingNacks.add(this.ackRequestData); break; default: throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply)); } - destination.add(ackId); forget(); } } interface AckProcessor { - void sendAckOperations( - List acksToSend, List ackDeadlineExtensions); + public void sendAckOperations(List ackRequestDataList); + + public void sendModackOperations(List modackRequestDataList); } - MessageDispatcher( - MessageReceiver receiver, - AckProcessor ackProcessor, - Duration ackExpirationPadding, - Duration maxAckExtensionPeriod, - Duration maxDurationPerAckExtension, - Distribution ackLatencyDistribution, - FlowController flowController, - Executor executor, - ScheduledExecutorService systemExecutor, - ApiClock clock) { - this.executor = executor; - this.systemExecutor = systemExecutor; - this.ackExpirationPadding = ackExpirationPadding; - this.maxAckExtensionPeriod = maxAckExtensionPeriod; - this.maxSecondsPerAckExtension = Math.toIntExact(maxDurationPerAckExtension.getSeconds()); - this.receiver = receiver; - this.ackProcessor = ackProcessor; - this.flowController = flowController; - // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS - this.ackLatencyDistribution = ackLatencyDistribution; + private MessageDispatcher(Builder builder) { + executor = builder.executor; + systemExecutor = builder.systemExecutor; + ackExpirationPadding = builder.ackExpirationPadding; + maxAckExtensionPeriod = builder.maxAckExtensionPeriod; + + minDurationPerAckExtensionSeconds = + Math.toIntExact(builder.minDurationPerAckExtension.getSeconds()); + minDurationPerAckExtensionDefaultUsed = builder.minDurationPerAckExtensionDefaultUsed; + maxDurationPerAckExtensionSeconds = + Math.toIntExact(builder.maxDurationPerAckExtension.getSeconds()); + maxDurationPerAckExtensionDefaultUsed = builder.maxDurationPerAckExtensionDefaultUsed; + + // Start the deadline at the minimum ack deadline so messages which arrive before this is + // updated will not have a long ack deadline. + if (minDurationPerAckExtensionDefaultUsed) { + messageDeadlineSeconds.set(Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds())); + } else { + messageDeadlineSeconds.set(minDurationPerAckExtensionSeconds); + } + + receiver = builder.receiver; + receiverWithAckResponse = builder.receiverWithAckResponse; + + ackProcessor = builder.ackProcessor; + flowController = builder.flowController; + enableExactlyOnceDelivery = new AtomicBoolean(builder.enableExactlyOnceDelivery); + ackLatencyDistribution = builder.ackLatencyDistribution; + clock = builder.clock; jobLock = new ReentrantLock(); messagesWaiter = new Waiter(); - this.clock = clock; - this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor); + sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor); + } + + private boolean shouldSetMessageFuture() { + return receiverWithAckResponse != null; } void start() { @@ -256,7 +252,7 @@ public void run() { newDeadlineSec - ackExpirationPadding.getSeconds(), TimeUnit.SECONDS); } - processOutstandingAckOperations(); + processOutstandingOperations(); } catch (Throwable t) { // Catch everything so that one run failing doesn't prevent subsequent runs. logger.log(Level.WARNING, "failed to run periodic job", t); @@ -286,7 +282,7 @@ void stop() { } finally { jobLock.unlock(); } - processOutstandingAckOperations(); + processOutstandingOperations(); } @InternalApi @@ -299,6 +295,43 @@ int getMessageDeadlineSeconds() { return messageDeadlineSeconds.get(); } + @InternalApi + void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) { + // Sanity check that we are changing the enableExactlyOnceDelivery state + if (enableExactlyOnceDelivery == this.enableExactlyOnceDelivery.get()) { + return; + } + + this.enableExactlyOnceDelivery.set(enableExactlyOnceDelivery); + + // If a custom value for minDurationPerAckExtension, we should respect that + if (!minDurationPerAckExtensionDefaultUsed) { + return; + } + + // We just need to update the minDurationPerAckExtensionSeconds as the + // maxDurationPerAckExtensionSeconds does not change + int possibleNewMinAckDeadlineExtensionSeconds; + + if (enableExactlyOnceDelivery) { + possibleNewMinAckDeadlineExtensionSeconds = + Math.toIntExact( + Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds()); + } else { + possibleNewMinAckDeadlineExtensionSeconds = + Math.toIntExact(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION.getSeconds()); + } + + // If we are not using the default maxDurationAckExtension, check if the + // minAckDeadlineExtensionExactlyOnce needs to be bounded by the set max + if (!maxDurationPerAckExtensionDefaultUsed + && (possibleNewMinAckDeadlineExtensionSeconds > maxDurationPerAckExtensionSeconds)) { + minDurationPerAckExtensionSeconds = maxDurationPerAckExtensionSeconds; + } else { + minDurationPerAckExtensionSeconds = possibleNewMinAckDeadlineExtensionSeconds; + } + } + private static class OutstandingMessage { private final ReceivedMessage receivedMessage; private final AckHandler ackHandler; @@ -313,9 +346,13 @@ void processReceivedMessages(List messages) { Instant totalExpiration = now().plus(maxAckExtensionPeriod); List outstandingBatch = new ArrayList<>(messages.size()); for (ReceivedMessage message : messages) { + AckRequestData.Builder builder = AckRequestData.newBuilder(message.getAckId()); + if (shouldSetMessageFuture()) { + builder.setMessageFuture(SettableApiFuture.create()); + } + AckRequestData ackRequestData = builder.build(); AckHandler ackHandler = - new AckHandler( - message.getAckId(), message.getMessage().getSerializedSize(), totalExpiration); + new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration); if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) { // putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the // previously-mapped element. @@ -328,7 +365,7 @@ void processReceivedMessages(List messages) { continue; } outstandingBatch.add(new OutstandingMessage(message, ackHandler)); - pendingReceipts.add(message.getAckId()); + pendingReceipts.add(ackRequestData); } processBatch(outstandingBatch); @@ -363,20 +400,11 @@ private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) { } private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) { - final SettableApiFuture response = SettableApiFuture.create(); - final AckReplyConsumer consumer = - new AckReplyConsumer() { - @Override - public void ack() { - response.set(AckReply.ACK); - } + // This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection + // use below in the consumers + SettableApiFuture ackReplySettableApiFuture = SettableApiFuture.create(); + ApiFutures.addCallback(ackReplySettableApiFuture, ackHandler, MoreExecutors.directExecutor()); - @Override - public void nack() { - response.set(AckReply.NACK); - } - }; - ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); Runnable deliverMessageTask = new Runnable() { @Override @@ -392,10 +420,42 @@ public void run() { ackHandler.forget(); return; } + if (shouldSetMessageFuture()) { + // This is the message future that is propagated to the user + SettableApiFuture messageFuture = + ackHandler.getMessageFutureIfExists(); + final AckReplyConsumerWithResponse ackReplyConsumerWithResponse = + new AckReplyConsumerWithResponse() { + @Override + public Future ack() { + ackReplySettableApiFuture.set(AckReply.ACK); + return messageFuture; + } + + @Override + public Future nack() { + ackReplySettableApiFuture.set(AckReply.NACK); + return messageFuture; + } + }; + receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse); + } else { + final AckReplyConsumer ackReplyConsumer = + new AckReplyConsumer() { + @Override + public void ack() { + ackReplySettableApiFuture.set(AckReply.ACK); + } - receiver.receiveMessage(message, consumer); + @Override + public void nack() { + ackReplySettableApiFuture.set(AckReply.NACK); + } + }; + receiver.receiveMessage(message, ackReplyConsumer); + } } catch (Exception e) { - response.setException(e); + ackReplySettableApiFuture.setException(e); } } }; @@ -409,26 +469,32 @@ public void run() { /** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */ @InternalApi int computeDeadlineSeconds() { - int sec = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); - - if ((maxSecondsPerAckExtension > 0) && (sec > maxSecondsPerAckExtension)) { - sec = maxSecondsPerAckExtension; + int deadlineSeconds = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); + + // Bound deadlineSeconds by extensions + if (!maxDurationPerAckExtensionDefaultUsed + && (deadlineSeconds > maxDurationPerAckExtensionSeconds)) { + deadlineSeconds = maxDurationPerAckExtensionSeconds; + } else if (deadlineSeconds < minDurationPerAckExtensionSeconds) { + deadlineSeconds = minDurationPerAckExtensionSeconds; } - // Use Ints.constrainToRange when we get guava 21. - if (sec < Subscriber.MIN_ACK_DEADLINE_SECONDS) { - sec = Subscriber.MIN_ACK_DEADLINE_SECONDS; - } else if (sec > Subscriber.MAX_ACK_DEADLINE_SECONDS) { - sec = Subscriber.MAX_ACK_DEADLINE_SECONDS; + // Bound deadlineSeconds by hard limits in subscriber + if (deadlineSeconds < Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()) { + deadlineSeconds = Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()); + } else if (deadlineSeconds > Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()) { + deadlineSeconds = Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()); } - return sec; + + return deadlineSeconds; } @InternalApi void extendDeadlines() { int extendSeconds = getMessageDeadlineSeconds(); - List modacks = new ArrayList<>(); - PendingModifyAckDeadline modack = new PendingModifyAckDeadline(extendSeconds); + int numAckIdToSend = 0; + Map deadlineExtensionModacks = + new HashMap(); Instant now = now(); Instant extendTo = now.plusSeconds(extendSeconds); @@ -436,7 +502,12 @@ void extendDeadlines() { String ackId = entry.getKey(); Instant totalExpiration = entry.getValue().totalExpiration; if (totalExpiration.isAfter(extendTo)) { - modack.ackIds.add(ackId); + ModackRequestData modackRequestData = + deadlineExtensionModacks.computeIfAbsent( + extendSeconds, + deadlineExtensionSeconds -> new ModackRequestData(deadlineExtensionSeconds)); + modackRequestData.addAckRequestData(entry.getValue().getAckRequestData()); + numAckIdToSend++; continue; } @@ -445,43 +516,161 @@ void extendDeadlines() { entry.getValue().forget(); if (totalExpiration.isAfter(now)) { int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS)); - modacks.add(new PendingModifyAckDeadline(sec, ackId)); + ModackRequestData modackRequestData = + deadlineExtensionModacks.computeIfAbsent( + sec, extensionSeconds -> new ModackRequestData(extensionSeconds)); + modackRequestData.addAckRequestData(entry.getValue().getAckRequestData()); + numAckIdToSend++; } } - logger.log(Level.FINER, "Sending {0} modacks", modack.ackIds.size() + modacks.size()); - modacks.add(modack); - List acksToSend = Collections.emptyList(); - ackProcessor.sendAckOperations(acksToSend, modacks); + if (numAckIdToSend > 0) { + logger.log(Level.FINER, "Sending {0} modacks", numAckIdToSend); + ackProcessor.sendModackOperations( + new ArrayList(deadlineExtensionModacks.values())); + } } @InternalApi - void processOutstandingAckOperations() { - List modifyAckDeadlinesToSend = new ArrayList<>(); + void processOutstandingOperations() { + List modackRequestData = new ArrayList(); - List acksToSend = new ArrayList<>(); - pendingAcks.drainTo(acksToSend); - logger.log(Level.FINER, "Sending {0} acks", acksToSend.size()); + // Nacks are modacks with an expiration of 0 + List nackRequestDataList = new ArrayList(); + pendingNacks.drainTo(nackRequestDataList); - PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0); - pendingNacks.drainTo(nacksToSend.ackIds); - logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size()); - if (!nacksToSend.ackIds.isEmpty()) { - modifyAckDeadlinesToSend.add(nacksToSend); + if (!nackRequestDataList.isEmpty()) { + modackRequestData.add(new ModackRequestData(0, nackRequestDataList)); } + logger.log(Level.FINER, "Sending {0} nacks", nackRequestDataList.size()); - PendingModifyAckDeadline receiptsToSend = - new PendingModifyAckDeadline(getMessageDeadlineSeconds()); - pendingReceipts.drainTo(receiptsToSend.ackIds); - logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size()); - if (!receiptsToSend.ackIds.isEmpty()) { - modifyAckDeadlinesToSend.add(receiptsToSend); + List ackRequestDataReceipts = new ArrayList(); + pendingReceipts.drainTo(ackRequestDataReceipts); + if (!ackRequestDataReceipts.isEmpty()) { + modackRequestData.add( + new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts)); } + logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size()); + + ackProcessor.sendModackOperations(modackRequestData); + + List ackRequestDataList = new ArrayList(); + pendingAcks.drainTo(ackRequestDataList); + logger.log(Level.FINER, "Sending {0} acks", ackRequestDataList.size()); - ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend); + ackProcessor.sendAckOperations(ackRequestDataList); } private Instant now() { return Instant.ofEpochMilli(clock.millisTime()); } + + /** Builder of {@link MessageDispatcher MessageDispatchers}. */ + public static final class Builder { + private MessageReceiver receiver; + private MessageReceiverWithAckResponse receiverWithAckResponse; + + private AckProcessor ackProcessor; + private Duration ackExpirationPadding; + private Duration maxAckExtensionPeriod; + private Duration minDurationPerAckExtension; + private boolean minDurationPerAckExtensionDefaultUsed; + private Duration maxDurationPerAckExtension; + private boolean maxDurationPerAckExtensionDefaultUsed; + + private Distribution ackLatencyDistribution; + private FlowController flowController; + private boolean enableExactlyOnceDelivery; + + private Executor executor; + private ScheduledExecutorService systemExecutor; + private ApiClock clock; + + protected Builder(MessageReceiver receiver) { + this.receiver = receiver; + } + + protected Builder(MessageReceiverWithAckResponse receiverWithAckResponse) { + this.receiverWithAckResponse = receiverWithAckResponse; + } + + public Builder setAckProcessor(AckProcessor ackProcessor) { + this.ackProcessor = ackProcessor; + return this; + } + + public Builder setAckExpirationPadding(Duration ackExpirationPadding) { + this.ackExpirationPadding = ackExpirationPadding; + return this; + } + + public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { + this.maxAckExtensionPeriod = maxAckExtensionPeriod; + return this; + } + + public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) { + this.minDurationPerAckExtension = minDurationPerAckExtension; + return this; + } + + public Builder setMinDurationPerAckExtensionDefaultUsed( + boolean minDurationPerAckExtensionDefaultUsed) { + this.minDurationPerAckExtensionDefaultUsed = minDurationPerAckExtensionDefaultUsed; + return this; + } + + public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) { + this.maxDurationPerAckExtension = maxDurationPerAckExtension; + return this; + } + + public Builder setMaxDurationPerAckExtensionDefaultUsed( + boolean maxDurationPerAckExtensionDefaultUsed) { + this.maxDurationPerAckExtensionDefaultUsed = maxDurationPerAckExtensionDefaultUsed; + return this; + } + + public Builder setAckLatencyDistribution(Distribution ackLatencyDistribution) { + this.ackLatencyDistribution = ackLatencyDistribution; + return this; + } + + public Builder setFlowController(FlowController flowController) { + this.flowController = flowController; + return this; + } + + public Builder setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) { + this.enableExactlyOnceDelivery = enableExactlyOnceDelivery; + return this; + } + + public Builder setExecutor(Executor executor) { + this.executor = executor; + return this; + } + + public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) { + this.systemExecutor = systemExecutor; + return this; + } + + public Builder setApiClock(ApiClock clock) { + this.clock = clock; + return this; + } + + public MessageDispatcher build() { + return new MessageDispatcher(this); + } + } + + public static Builder newBuilder(MessageReceiver receiver) { + return new Builder(receiver); + } + + public static Builder newBuilder(MessageReceiverWithAckResponse receiverWithAckResponse) { + return new Builder(receiverWithAckResponse); + } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java new file mode 100644 index 000000000..49792be07 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java @@ -0,0 +1,23 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.pubsub.v1.PubsubMessage; + +public interface MessageReceiverWithAckResponse { + void receiveMessage(PubsubMessage message, AckReplyConsumerWithResponse consumer); +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java new file mode 100644 index 000000000..b4d2dae0f --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java @@ -0,0 +1,52 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import java.util.*; + +class ModackRequestData { + private final int deadlineExtensionSeconds; + private List ackRequestData; + + ModackRequestData(int deadlineExtensionSeconds) { + this.deadlineExtensionSeconds = deadlineExtensionSeconds; + this.ackRequestData = new ArrayList(); + } + + ModackRequestData(int deadlineExtensionSeconds, AckRequestData... ackRequestData) { + this.deadlineExtensionSeconds = deadlineExtensionSeconds; + this.ackRequestData = Arrays.asList(ackRequestData); + } + + ModackRequestData(int deadlineExtensionSeconds, List ackRequestData) { + this.deadlineExtensionSeconds = deadlineExtensionSeconds; + this.ackRequestData = ackRequestData; + } + + public int getDeadlineExtensionSeconds() { + return deadlineExtensionSeconds; + } + + public List getAckRequestData() { + return ackRequestData; + } + + public ModackRequestData addAckRequestData(AckRequestData ackRequestData) { + this.ackRequestData.add(ackRequestData); + return this; + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 249d896b7..d1af3a3e9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -16,41 +16,31 @@ package com.google.cloud.pubsub.v1; -import static com.google.cloud.pubsub.v1.Subscriber.DEFAULT_MAX_DURATION_PER_ACK_EXTENSION; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import com.google.api.core.AbstractApiService; -import com.google.api.core.ApiClock; -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; -import com.google.api.core.InternalApi; -import com.google.api.core.SettableApiFuture; +import com.google.api.core.*; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.Distribution; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.ApiExceptionFactory; -import com.google.api.gax.rpc.ClientStream; -import com.google.api.gax.rpc.ResponseObserver; -import com.google.api.gax.rpc.StreamController; +import com.google.api.gax.rpc.*; import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor; -import com.google.cloud.pubsub.v1.MessageDispatcher.PendingModifyAckDeadline; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; import com.google.protobuf.Empty; -import com.google.pubsub.v1.AcknowledgeRequest; -import com.google.pubsub.v1.ModifyAckDeadlineRequest; -import com.google.pubsub.v1.StreamingPullRequest; -import com.google.pubsub.v1.StreamingPullResponse; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.pubsub.v1.*; +import com.google.rpc.ErrorInfo; import io.grpc.Status; -import java.util.List; -import java.util.UUID; +import io.grpc.protobuf.StatusProto; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -64,15 +54,21 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName()); - @InternalApi static final Duration DEFAULT_STREAM_ACK_DEADLINE = Duration.ofSeconds(60); - @InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600); - @InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10); private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100); private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10); + + private static final long INITIAL_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS = 100; + private static final long MAX_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS = + Duration.ofSeconds(10).toMillis(); private static final int MAX_PER_REQUEST_CHANGES = 1000; - private final Duration streamAckDeadline; - private final SubscriberStub stub; + private final String PERMANENT_FAILURE_INVALID_ACK_ID_METADATA = + "PERMANENT_FAILURE_INVALID_ACK_ID"; + private final String TRANSIENT_FAILURE_METADATA_PREFIX = "TRANSIENT_"; + + private Duration inititalStreamAckDeadline; + + private final SubscriberStub subscriberStub; private final int channelAffinity; private final String subscription; private final ScheduledExecutorService systemExecutor; @@ -81,6 +77,9 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final FlowControlSettings flowControlSettings; private final boolean useLegacyFlowControl; + // Keeps track of requests without closed futures + private final Set pendingRequests = ConcurrentHashMap.newKeySet(); + private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); private final Waiter ackOperationsWaiter = new Waiter(); @@ -88,6 +87,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final Lock lock = new ReentrantLock(); private ClientStream clientStream; + private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false); + /** * The same clientId is used across all streaming pull connections that are created. This is * intentional, as it indicates to the server that any guarantees made for a stream that @@ -95,48 +96,71 @@ final class StreamingSubscriberConnection extends AbstractApiService implements */ private final String clientId = UUID.randomUUID().toString(); - public StreamingSubscriberConnection( - String subscription, - MessageReceiver receiver, - Duration ackExpirationPadding, - Duration maxAckExtensionPeriod, - Duration maxDurationPerAckExtension, - Distribution ackLatencyDistribution, - SubscriberStub stub, - int channelAffinity, - FlowControlSettings flowControlSettings, - boolean useLegacyFlowControl, - FlowController flowController, - ScheduledExecutorService executor, - ScheduledExecutorService systemExecutor, - ApiClock clock) { - this.subscription = subscription; - this.systemExecutor = systemExecutor; - if (maxDurationPerAckExtension.compareTo(DEFAULT_MAX_DURATION_PER_ACK_EXTENSION) == 0) { - this.streamAckDeadline = DEFAULT_STREAM_ACK_DEADLINE; - } else if (maxDurationPerAckExtension.compareTo(MIN_STREAM_ACK_DEADLINE) < 0) { - this.streamAckDeadline = MIN_STREAM_ACK_DEADLINE; - } else if (maxDurationPerAckExtension.compareTo(MAX_STREAM_ACK_DEADLINE) > 0) { - this.streamAckDeadline = MAX_STREAM_ACK_DEADLINE; + private StreamingSubscriberConnection(Builder builder) { + subscription = builder.subscription; + systemExecutor = builder.systemExecutor; + + // We need to set the default stream ack deadline on the initial request, this will be + // updated by modack requests in the message dispatcher + if (builder.maxDurationPerAckExtensionDefaultUsed) { + // If the default is used, check if exactly once is enabled and set appropriately + if (builder.exactlyOnceDeliveryEnabled) { + inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT; + } else { + inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT; + } + } else if (builder.maxDurationPerAckExtension.compareTo(Subscriber.MIN_STREAM_ACK_DEADLINE) + < 0) { + // We will not be able to extend more than the default minimum + inititalStreamAckDeadline = Subscriber.MIN_STREAM_ACK_DEADLINE; + } else if (builder.maxDurationPerAckExtension.compareTo(Subscriber.MAX_STREAM_ACK_DEADLINE) + > 0) { + // Will not be able to extend past the max + inititalStreamAckDeadline = Subscriber.MAX_STREAM_ACK_DEADLINE; + } else { + inititalStreamAckDeadline = builder.maxDurationPerAckExtension; + } + + subscriberStub = builder.subscriberStub; + channelAffinity = builder.channelAffinity; + exactlyOnceDeliveryEnabled.set(builder.exactlyOnceDeliveryEnabled); + + MessageDispatcher.Builder messageDispatcherBuilder; + if (builder.receiver != null) { + messageDispatcherBuilder = MessageDispatcher.newBuilder(builder.receiver); } else { - this.streamAckDeadline = maxDurationPerAckExtension; - } - this.stub = stub; - this.channelAffinity = channelAffinity; - this.messageDispatcher = - new MessageDispatcher( - receiver, - this, - ackExpirationPadding, - maxAckExtensionPeriod, - maxDurationPerAckExtension, - ackLatencyDistribution, - flowController, - executor, - systemExecutor, - clock); - this.flowControlSettings = flowControlSettings; - this.useLegacyFlowControl = useLegacyFlowControl; + messageDispatcherBuilder = MessageDispatcher.newBuilder(builder.receiverWithAckResponse); + } + + messageDispatcher = + messageDispatcherBuilder + .setAckProcessor(this) + .setAckExpirationPadding(builder.ackExpirationPadding) + .setMaxAckExtensionPeriod(builder.maxAckExtensionPeriod) + .setMinDurationPerAckExtension(builder.minDurationPerAckExtension) + .setMinDurationPerAckExtensionDefaultUsed(builder.minDurationPerAckExtensionDefaultUsed) + .setMaxDurationPerAckExtension(builder.maxDurationPerAckExtension) + .setMaxDurationPerAckExtensionDefaultUsed(builder.maxDurationPerAckExtensionDefaultUsed) + .setAckLatencyDistribution(builder.ackLatencyDistribution) + .setFlowController(builder.flowController) + .setEnableExactlyOnceDelivery(builder.exactlyOnceDeliveryEnabled) + .setExecutor(builder.executor) + .setSystemExecutor(builder.systemExecutor) + .setApiClock(builder.clock) + .build(); + + flowControlSettings = builder.flowControlSettings; + useLegacyFlowControl = builder.useLegacyFlowControl; + } + + public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled( + boolean isExactlyOnceDeliveryEnabled) { + exactlyOnceDeliveryEnabled.set(isExactlyOnceDeliveryEnabled); + return this; + } + + public boolean isExactlyOnceDeliveryEnabled() { + return exactlyOnceDeliveryEnabled.get(); } @Override @@ -192,7 +216,14 @@ public void onStart(StreamController controller) { @Override public void onResponse(StreamingPullResponse response) { channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); + + boolean exactlyOnceDeliveryEnabledResponse = + response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled(); + + setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse); + messageDispatcher.setEnableExactlyOnceDelivery(exactlyOnceDeliveryEnabledResponse); messageDispatcher.processReceivedMessages(response.getReceivedMessagesList()); + // Only request more if we're not shutdown. // If errorFuture is done, the stream has either failed or hung up, // and we don't need to request. @@ -222,10 +253,13 @@ public void onComplete() { private void initialize() { final SettableApiFuture errorFuture = SettableApiFuture.create(); + final ResponseObserver responseObserver = new StreamingPullResponseObserver(errorFuture); + ClientStream initClientStream = - stub.streamingPullCallable() + subscriberStub + .streamingPullCallable() .splitCall( responseObserver, GrpcCallContext.createDefault().withChannelAffinity(channelAffinity)); @@ -236,7 +270,7 @@ private void initialize() { initClientStream.send( StreamingPullRequest.newBuilder() .setSubscription(subscription) - .setStreamAckDeadlineSeconds((int) streamAckDeadline.getSeconds()) + .setStreamAckDeadlineSeconds(Math.toIntExact(inititalStreamAckDeadline.getSeconds())) .setClientId(clientId) .setMaxOutstandingMessages( this.useLegacyFlowControl @@ -287,6 +321,7 @@ public void onFailure(Throwable cause) { cause, GrpcStatusCode.of(Status.fromThrowable(cause).getCode()), false); logger.log(Level.SEVERE, "terminated streaming with exception", gaxException); runShutdown(); + setFailureFutureOutstandingMessages(cause); notifyFailed(gaxException); return; } @@ -319,52 +354,372 @@ private boolean isAlive() { return state == State.RUNNING || state == State.STARTING; } + public void setResponseOutstandingMessages(AckResponse ackResponse) { + // We will close the futures with ackResponse - if there are multiple references to the same + // future they will be handled appropriately + logger.log( + Level.WARNING, "Setting response: {0} on outstanding messages", ackResponse.toString()); + for (AckRequestData ackRequestData : pendingRequests) { + ackRequestData.setResponse(ackResponse, false); + } + + // Clear our pending requests + pendingRequests.clear(); + } + + private void setFailureFutureOutstandingMessages(Throwable t) { + AckResponse ackResponse; + + if (isExactlyOnceDeliveryEnabled()) { + if (!(t instanceof ApiException)) { + ackResponse = AckResponse.OTHER; + } + + ApiException apiException = (ApiException) t; + switch (apiException.getStatusCode().getCode()) { + case FAILED_PRECONDITION: + ackResponse = AckResponse.FAILED_PRECONDITION; + break; + case PERMISSION_DENIED: + ackResponse = AckResponse.PERMISSION_DENIED; + break; + default: + ackResponse = AckResponse.OTHER; + } + } else { + // We should set success regardless if ExactlyOnceDelivery is not enabled + ackResponse = AckResponse.SUCCESSFUL; + } + + setResponseOutstandingMessages(ackResponse); + } + @Override - public void sendAckOperations( - List acksToSend, List ackDeadlineExtensions) { - ApiFutureCallback loggingCallback = - new ApiFutureCallback() { - @Override - public void onSuccess(Empty empty) { - ackOperationsWaiter.incrementPendingCount(-1); - } + public void sendAckOperations(List ackRequestDataList) { + sendAckOperations(ackRequestDataList, INITIAL_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS); + } - @Override - public void onFailure(Throwable t) { - ackOperationsWaiter.incrementPendingCount(-1); - Level level = isAlive() ? Level.WARNING : Level.FINER; - logger.log(level, "failed to send operations", t); - } - }; + @Override + public void sendModackOperations(List modackRequestDataList) { + sendModackOperations(modackRequestDataList, INITIAL_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS); + } + private void sendAckOperations( + List ackRequestDataList, long currentBackoffMillis) { int pendingOperations = 0; - for (PendingModifyAckDeadline modack : ackDeadlineExtensions) { - for (List idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) { - ApiFuture future = - stub.modifyAckDeadlineCallable() + for (List ackRequestDataInRequestList : + Lists.partition(ackRequestDataList, MAX_PER_REQUEST_CHANGES)) { + List ackIdsInRequest = new ArrayList<>(); + for (AckRequestData ackRequestData : ackRequestDataInRequestList) { + ackIdsInRequest.add(ackRequestData.getAckId()); + if (ackRequestData.hasMessageFuture()) { + // Add to our pending requests if we care about the response + pendingRequests.add(ackRequestData); + } + } + ApiFutureCallback callback = + getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis); + ApiFuture ackFuture = + subscriberStub + .acknowledgeCallable() + .futureCall( + AcknowledgeRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(ackIdsInRequest) + .build()); + ApiFutures.addCallback(ackFuture, callback, directExecutor()); + pendingOperations++; + } + ackOperationsWaiter.incrementPendingCount(pendingOperations); + } + + private void sendModackOperations( + List modackRequestDataList, long currentBackoffMillis) { + // Send modacks + int pendingOperations = 0; + for (ModackRequestData modackRequestData : modackRequestDataList) { + List ackIdsInRequest = new ArrayList<>(); + for (List ackRequestDataInRequestList : + Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) { + for (AckRequestData ackRequestData : ackRequestDataInRequestList) { + ackIdsInRequest.add(ackRequestData.getAckId()); + if (ackRequestData.hasMessageFuture()) { + // Add to our pending requests if we care about the response + pendingRequests.add(ackRequestData); + } + } + ApiFutureCallback callback = + getCallback( + modackRequestData.getAckRequestData(), + modackRequestData.getDeadlineExtensionSeconds(), + true, + currentBackoffMillis); + ApiFuture modackFuture = + subscriberStub + .modifyAckDeadlineCallable() .futureCall( ModifyAckDeadlineRequest.newBuilder() .setSubscription(subscription) - .addAllAckIds(idChunk) - .setAckDeadlineSeconds(modack.deadlineExtensionSeconds) + .addAllAckIds(ackIdsInRequest) + .setAckDeadlineSeconds(modackRequestData.getDeadlineExtensionSeconds()) .build()); - ApiFutures.addCallback(future, loggingCallback, directExecutor()); + ApiFutures.addCallback(modackFuture, callback, directExecutor()); pendingOperations++; } } + ackOperationsWaiter.incrementPendingCount(pendingOperations); + } - for (List idChunk : Lists.partition(acksToSend, MAX_PER_REQUEST_CHANGES)) { - ApiFuture future = - stub.acknowledgeCallable() - .futureCall( - AcknowledgeRequest.newBuilder() - .setSubscription(subscription) - .addAllAckIds(idChunk) - .build()); - ApiFutures.addCallback(future, loggingCallback, directExecutor()); - pendingOperations++; + private Map getMetadataMapFromThrowable(Throwable t) + throws InvalidProtocolBufferException { + // This converts a Throwable (from a "OK" grpc response) to a map of metadata + // will be of the format: + // { + // "ACK-ID-1": "PERMANENT_*", + // "ACK-ID-2": "TRANSIENT_*" + // } + com.google.rpc.Status status = StatusProto.fromThrowable(t); + Map metadataMap = new HashMap<>(); + if (status != null) { + for (Any any : status.getDetailsList()) { + if (any.is(ErrorInfo.class)) { + ErrorInfo errorInfo = any.unpack(ErrorInfo.class); + metadataMap = errorInfo.getMetadataMap(); + } + } } + return metadataMap; + } - ackOperationsWaiter.incrementPendingCount(pendingOperations); + private ApiFutureCallback getCallback( + List ackRequestDataList, + int deadlineExtensionSeconds, + boolean isModack, + long currentBackoffMillis) { + // This callback handles retries, and sets message futures + + // Check if ack or nack + boolean setResponseOnSuccess = (!isModack || (deadlineExtensionSeconds == 0)) ? true : false; + + return new ApiFutureCallback() { + @Override + public void onSuccess(Empty empty) { + ackOperationsWaiter.incrementPendingCount(-1); + for (AckRequestData ackRequestData : ackRequestDataList) { + // This will check if a response is needed, and if it has already been set + ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); + // Remove from our pending operations + pendingRequests.remove(ackRequestData); + } + } + + @Override + public void onFailure(Throwable t) { + // Remove from our pending operations + ackOperationsWaiter.incrementPendingCount(-1); + + if (!isExactlyOnceDeliveryEnabled()) { + Level level = isAlive() ? Level.WARNING : Level.FINER; + logger.log(level, "failed to send operations", t); + return; + } + + List ackRequestDataArrayRetryList = new ArrayList<>(); + try { + Map metadataMap = getMetadataMapFromThrowable(t); + ackRequestDataList.forEach( + ackRequestData -> { + String ackId = ackRequestData.getAckId(); + if (metadataMap.containsKey(ackId)) { + // An error occured + String errorMessage = metadataMap.get(ackId); + if (errorMessage.startsWith(TRANSIENT_FAILURE_METADATA_PREFIX)) { + // Retry all "TRANSIENT_*" error messages - do not set message future + logger.log(Level.WARNING, "Transient error message, will resend", errorMessage); + ackRequestDataArrayRetryList.add(ackRequestData); + } else if (errorMessage.equals(PERMANENT_FAILURE_INVALID_ACK_ID_METADATA)) { + // Permanent failure, send + logger.log( + Level.WARNING, + "Permanent error invalid ack id message, will not resend", + errorMessage); + ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess); + } else { + logger.log( + Level.WARNING, "Unknown error message, will not resend", errorMessage); + ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess); + } + } else { + ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); + } + // Remove from our pending + pendingRequests.remove(ackRequestData); + }); + } catch (InvalidProtocolBufferException e) { + // If we fail to parse out the errorInfo, we should retry all + logger.log( + Level.WARNING, "Exception occurred when parsing throwable {0} for errorInfo", t); + ackRequestDataArrayRetryList.addAll(ackRequestDataList); + } + + // Handle retries + if (!ackRequestDataArrayRetryList.isEmpty()) { + long newBackoffMillis = + Math.min(currentBackoffMillis * 2, MAX_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS); + systemExecutor.schedule( + new Runnable() { + @Override + public void run() { + if (isModack) { + // Create a new modackRequest with only the retries + ModackRequestData modackRequestData = + new ModackRequestData( + deadlineExtensionSeconds, ackRequestDataArrayRetryList); + sendModackOperations( + Collections.singletonList(modackRequestData), newBackoffMillis); + } else { + sendAckOperations(ackRequestDataArrayRetryList, newBackoffMillis); + } + } + }, + currentBackoffMillis, + TimeUnit.MILLISECONDS); + } + + Level level = isAlive() ? Level.WARNING : Level.FINER; + logger.log(level, "failed to send operations", t); + } + }; + } + + /** Builder of {@link StreamingSubscriberConnection StreamingSubscriberConnections}. */ + public static final class Builder { + private MessageReceiver receiver; + private MessageReceiverWithAckResponse receiverWithAckResponse; + private String subscription; + private Duration ackExpirationPadding; + private Duration maxAckExtensionPeriod; + private Duration minDurationPerAckExtension; + private boolean minDurationPerAckExtensionDefaultUsed; + private Duration maxDurationPerAckExtension; + private boolean maxDurationPerAckExtensionDefaultUsed; + + private Distribution ackLatencyDistribution; + private SubscriberStub subscriberStub; + private int channelAffinity; + private FlowController flowController; + private FlowControlSettings flowControlSettings; + private boolean exactlyOnceDeliveryEnabled; + private boolean useLegacyFlowControl; + private ScheduledExecutorService executor; + private ScheduledExecutorService systemExecutor; + private ApiClock clock; + + protected Builder(MessageReceiver receiver) { + this.receiver = receiver; + } + + protected Builder(MessageReceiverWithAckResponse receiverWithAckResponse) { + this.receiverWithAckResponse = receiverWithAckResponse; + } + + public Builder setSubscription(String subscription) { + this.subscription = subscription; + return this; + } + + public Builder setAckExpirationPadding(Duration ackExpirationPadding) { + this.ackExpirationPadding = ackExpirationPadding; + return this; + } + + public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { + this.maxAckExtensionPeriod = maxAckExtensionPeriod; + return this; + } + + public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) { + this.minDurationPerAckExtension = minDurationPerAckExtension; + return this; + } + + public Builder setMinDurationPerAckExtensionDefaultUsed( + boolean minDurationPerAckExtensionDefaultUsed) { + this.minDurationPerAckExtensionDefaultUsed = minDurationPerAckExtensionDefaultUsed; + return this; + } + + public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) { + this.maxDurationPerAckExtension = maxDurationPerAckExtension; + return this; + } + + public Builder setMaxDurationPerAckExtensionDefaultUsed( + boolean maxDurationPerAckExtensionDefaultUsed) { + this.maxDurationPerAckExtensionDefaultUsed = maxDurationPerAckExtensionDefaultUsed; + return this; + } + + public Builder setAckLatencyDistribution(Distribution ackLatencyDistribution) { + this.ackLatencyDistribution = ackLatencyDistribution; + return this; + } + + public Builder setSubscriberStub(SubscriberStub subscriberStub) { + this.subscriberStub = subscriberStub; + return this; + } + + public Builder setChannelAffinity(int channelAffinity) { + this.channelAffinity = channelAffinity; + return this; + } + + public Builder setFlowController(FlowController flowController) { + this.flowController = flowController; + return this; + } + + public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { + this.flowControlSettings = flowControlSettings; + return this; + } + + public Builder setUseLegacyFlowControl(boolean useLegacyFlowControl) { + this.useLegacyFlowControl = useLegacyFlowControl; + return this; + } + + public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) { + this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled; + return this; + } + + public Builder setExecutor(ScheduledExecutorService executor) { + this.executor = executor; + return this; + } + + public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) { + this.systemExecutor = systemExecutor; + return this; + } + + public Builder setClock(ApiClock clock) { + this.clock = clock; + return this; + } + + public StreamingSubscriberConnection build() { + return new StreamingSubscriberConnection(this); + } + } + + public static Builder newBuilder(MessageReceiver receiver) { + return new Builder(receiver); + } + + public static Builder newBuilder(MessageReceiverWithAckResponse receiverWithAckResponse) { + return new Builder(receiverWithAckResponse); } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 656a19fdd..4ee66b031 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -89,13 +89,28 @@ * details. */ public class Subscriber extends AbstractApiService implements SubscriberInterface { - @InternalApi static final Duration DEFAULT_MAX_DURATION_PER_ACK_EXTENSION = Duration.ofMillis(0); private static final int THREADS_PER_CHANNEL = 5; private static final int MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; // 20MB API maximum message size. - @InternalApi static final int MAX_ACK_DEADLINE_SECONDS = 600; - @InternalApi static final int MIN_ACK_DEADLINE_SECONDS = 10; - private static final Duration ACK_EXPIRATION_PADDING = Duration.ofSeconds(5); + + @InternalApi static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60); + + @InternalApi + static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY = + Duration.ofMinutes(1); + + @InternalApi static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION = Duration.ofMinutes(0); + @InternalApi static final Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION = Duration.ofSeconds(0); + + @InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10); + @InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600); + + @InternalApi static final Duration STREAM_ACK_DEADLINE_DEFAULT = Duration.ofSeconds(60); + + @InternalApi + static final Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT = Duration.ofSeconds(60); + + @InternalApi static final Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(5); private static final Logger logger = Logger.getLogger(Subscriber.class.getName()); @@ -104,32 +119,43 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final boolean useLegacyFlowControl; private final Duration maxAckExtensionPeriod; private final Duration maxDurationPerAckExtension; + private final boolean maxDurationPerAckExtensionDefaultUsed; + private final Duration minDurationPerAckExtension; + private final boolean minDurationPerAckExtensionDefaultUsed; + // The ExecutorProvider used to generate executors for processing messages. private final ExecutorProvider executorProvider; // An instantiation of the SystemExecutorProvider used for processing acks // and other system actions. @Nullable private final ScheduledExecutorService alarmsExecutor; private final Distribution ackLatencyDistribution = - new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); + new Distribution(Math.toIntExact(MAX_STREAM_ACK_DEADLINE.getSeconds()) + 1); - private SubscriberStub subStub; + private SubscriberStub subscriberStub; private final SubscriberStubSettings subStubSettings; private final FlowController flowController; + private boolean exactlyOnceDeliveryEnabled = false; private final int numPullers; private final MessageReceiver receiver; + private final MessageReceiverWithAckResponse receiverWithAckResponse; private final List streamingSubscriberConnections; private final ApiClock clock; private final List backgroundResources = new ArrayList<>(); private Subscriber(Builder builder) { receiver = builder.receiver; + receiverWithAckResponse = builder.receiverWithAckResponse; flowControlSettings = builder.flowControlSettings; useLegacyFlowControl = builder.useLegacyFlowControl; - subscriptionName = builder.subscriptionName; + subscriptionName = builder.subscription; maxAckExtensionPeriod = builder.maxAckExtensionPeriod; maxDurationPerAckExtension = builder.maxDurationPerAckExtension; + maxDurationPerAckExtensionDefaultUsed = builder.maxDurationPerAckExtensionDefaultUsed; + minDurationPerAckExtension = builder.minDurationPerAckExtension; + minDurationPerAckExtensionDefaultUsed = builder.minDurationPerAckExtensionDefaultUsed; + clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); flowController = @@ -140,6 +166,8 @@ private Subscriber(Builder builder) { .setLimitExceededBehavior(LimitExceededBehavior.Block) .build()); + exactlyOnceDeliveryEnabled = builder.exactlyOnceDeliveryEnabled; + this.numPullers = builder.parallelPullCount; executorProvider = builder.executorProvider; @@ -175,7 +203,7 @@ private Subscriber(Builder builder) { // We regularly look up the distribution for a good subscription deadline. // So we seed the distribution with the minimum value to start with. // Distribution is percentile-based, so this value will eventually lose importance. - ackLatencyDistribution.record(MIN_ACK_DEADLINE_SECONDS); + ackLatencyDistribution.record(Math.toIntExact(MIN_STREAM_ACK_DEADLINE.getSeconds())); } /** @@ -189,6 +217,11 @@ public static Builder newBuilder(ProjectSubscriptionName subscription, MessageRe return newBuilder(subscription.toString(), receiver); } + public static Builder newBuilder( + ProjectSubscriptionName subscription, MessageReceiverWithAckResponse receiver) { + return newBuilder(subscription.toString(), receiver); + } + /** * Constructs a new {@link Builder}. * @@ -200,6 +233,10 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) return new Builder(subscription, receiver); } + public static Builder newBuilder(String subscription, MessageReceiverWithAckResponse receiver) { + return new Builder(subscription, receiver); + } + /** Returns the delivery attempt count for a received {@link PubsubMessage} */ public static Integer getDeliveryAttempt(PubsubMessage message) { if (!message.containsAttributes("googclient_deliveryattempt")) { @@ -262,7 +299,7 @@ protected void doStart() { logger.log(Level.FINE, "Starting subscriber group."); try { - this.subStub = GrpcSubscriberStub.create(subStubSettings); + this.subscriberStub = GrpcSubscriberStub.create(subStubSettings); } catch (IOException e) { // doesn't matter what we throw, the Service will just catch it and fail to start. throw new IllegalStateException(e); @@ -310,7 +347,7 @@ public void run() { private void runShutdown() { stopAllStreamingConnections(); shutdownBackgroundResources(); - subStub.shutdownNow(); + subscriberStub.shutdownNow(); } private void startStreamingConnections() { @@ -321,22 +358,37 @@ private void startStreamingConnections() { backgroundResources.add(new ExecutorAsBackgroundResource((executor))); } - streamingSubscriberConnections.add( - new StreamingSubscriberConnection( - subscriptionName, - receiver, - ACK_EXPIRATION_PADDING, - maxAckExtensionPeriod, - maxDurationPerAckExtension, - ackLatencyDistribution, - subStub, - i, - flowControlSettings, - useLegacyFlowControl, - flowController, - executor, - alarmsExecutor, - clock)); + StreamingSubscriberConnection.Builder streamingSubscriberConnectionBuilder; + + if (receiverWithAckResponse != null) { + streamingSubscriberConnectionBuilder = + StreamingSubscriberConnection.newBuilder(receiverWithAckResponse); + } else { + streamingSubscriberConnectionBuilder = StreamingSubscriberConnection.newBuilder(receiver); + } + + StreamingSubscriberConnection streamingSubscriberConnection = + streamingSubscriberConnectionBuilder + .setSubscription(subscriptionName) + .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT) + .setMaxAckExtensionPeriod(maxAckExtensionPeriod) + .setMinDurationPerAckExtension(minDurationPerAckExtension) + .setMinDurationPerAckExtensionDefaultUsed(minDurationPerAckExtensionDefaultUsed) + .setMaxDurationPerAckExtension(maxDurationPerAckExtension) + .setMaxDurationPerAckExtensionDefaultUsed(maxDurationPerAckExtensionDefaultUsed) + .setAckLatencyDistribution(ackLatencyDistribution) + .setSubscriberStub(subscriberStub) + .setChannelAffinity(i) + .setFlowControlSettings(flowControlSettings) + .setFlowController(flowController) + .setUseLegacyFlowControl(useLegacyFlowControl) + .setExecutor(executor) + .setSystemExecutor(alarmsExecutor) + .setClock(clock) + .setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled) + .build(); + + streamingSubscriberConnections.add(streamingSubscriberConnection); } startConnections( streamingSubscriberConnections, @@ -402,7 +454,6 @@ private void stopConnections(List connections) { /** Builder of {@link Subscriber Subscribers}. */ public static final class Builder { - private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60); static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS = FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(1000L) @@ -415,15 +466,21 @@ public static final class Builder { .build(); private static final AtomicInteger SYSTEM_EXECUTOR_COUNTER = new AtomicInteger(); - private String subscriptionName; + private String subscription; private MessageReceiver receiver; + private MessageReceiverWithAckResponse receiverWithAckResponse; private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; - private Duration maxDurationPerAckExtension = DEFAULT_MAX_DURATION_PER_ACK_EXTENSION; + private Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION; + private boolean minDurationPerAckExtensionDefaultUsed = true; + private Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION; + private boolean maxDurationPerAckExtensionDefaultUsed = true; private boolean useLegacyFlowControl = false; private FlowControlSettings flowControlSettings = DEFAULT_FLOW_CONTROL_SETTINGS; + private boolean exactlyOnceDeliveryEnabled = false; + private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; private ExecutorProvider systemExecutorProvider = null; private TransportChannelProvider channelProvider = @@ -438,11 +495,16 @@ public static final class Builder { private int parallelPullCount = 1; private String endpoint = SubscriberStubSettings.getDefaultEndpoint(); - Builder(String subscriptionName, MessageReceiver receiver) { - this.subscriptionName = subscriptionName; + Builder(String subscription, MessageReceiver receiver) { + this.subscription = subscription; this.receiver = receiver; } + Builder(String subscription, MessageReceiverWithAckResponse receiverWithAckResponse) { + this.subscription = subscription; + this.receiverWithAckResponse = receiverWithAckResponse; + } + /** * {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub * endpoint. @@ -511,6 +573,22 @@ public Builder setUseLegacyFlowControl(boolean value) { return this; } + /** + * Enables/Disabled ExactlyOnceDelivery + * + *

Will update the minDurationPerAckExtension if a user-provided value is not set + */ + public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) { + // If exactlyOnceDeliveryIsEnabled we want to update the default minAckDeadlineExtension if + // applicable + if (exactlyOnceDeliveryEnabled && this.minDurationPerAckExtensionDefaultUsed) { + this.minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY; + } + + this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled; + return this; + } + /** * Set the maximum period a message ack deadline will be extended. Defaults to one hour. * @@ -537,8 +615,37 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { *

MaxDurationPerAckExtension configuration can be disabled by specifying a zero duration. */ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) { - Preconditions.checkArgument(maxDurationPerAckExtension.toMillis() >= 0); + // If a non-default min is set, make sure min is less than max + Preconditions.checkArgument( + maxDurationPerAckExtension.toMillis() >= 0 + && (this.minDurationPerAckExtensionDefaultUsed + || (this.minDurationPerAckExtension.toMillis() + < maxDurationPerAckExtension.toMillis()))); this.maxDurationPerAckExtension = maxDurationPerAckExtension; + this.maxDurationPerAckExtensionDefaultUsed = false; + return this; + } + + /** + * Set the lower bound for a single mod ack extention period. + * + *

The ack deadline will continue to be extended by up to this duration until + * MinAckExtensionPeriod is reached. Setting MinDurationPerAckExtension bounds the minimum + * amount of time before a mesage re-delivery in the event the Subscriber fails to extend the + * deadline. + * + *

MinDurationPerAckExtension configuration can be disabled by specifying a zero duration. + */ + public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) { + // If a non-default max is set, make sure min is less than max + Preconditions.checkArgument( + minDurationPerAckExtension.toMillis() >= 0 + && (this.maxDurationPerAckExtensionDefaultUsed + || (minDurationPerAckExtension.toMillis() + < this.maxDurationPerAckExtension.toMillis()))); + + this.minDurationPerAckExtension = minDurationPerAckExtension; + this.minDurationPerAckExtensionDefaultUsed = false; return this; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index 914a95d8c..7fcee6b2e 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -17,17 +17,14 @@ package com.google.cloud.pubsub.it; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assume.assumeTrue; import com.google.api.gax.rpc.PermissionDeniedException; import com.google.auto.value.AutoValue; import com.google.cloud.ServiceOptions; -import com.google.cloud.pubsub.v1.AckReplyConsumer; -import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.Publisher; -import com.google.cloud.pubsub.v1.Subscriber; -import com.google.cloud.pubsub.v1.SubscriptionAdminClient; -import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.*; import com.google.common.util.concurrent.MoreExecutors; import com.google.iam.v1.Binding; import com.google.iam.v1.GetIamPolicyRequest; @@ -35,15 +32,12 @@ import com.google.iam.v1.SetIamPolicyRequest; import com.google.iam.v1.TestIamPermissionsRequest; import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.PushConfig; -import com.google.pubsub.v1.Subscription; -import com.google.pubsub.v1.SubscriptionName; -import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.*; import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; @@ -63,6 +57,8 @@ public class ITPubSubTest { System.getenv("GOOGLE_CLOUD_TESTS_IN_VPCSC") != null && System.getenv("GOOGLE_CLOUD_TESTS_IN_VPCSC").equalsIgnoreCase("true"); + private static final int MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; + @Rule public Timeout globalTimeout = Timeout.seconds(300); @AutoValue @@ -76,6 +72,19 @@ static MessageAndConsumer create(PubsubMessage message, AckReplyConsumer consume } } + @AutoValue + abstract static class MessageAndConsumerWithResponse { + abstract PubsubMessage message(); + + abstract AckReplyConsumerWithResponse consumerWithResponse(); + + static MessageAndConsumerWithResponse create( + PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) { + return new AutoValue_ITPubSubTest_MessageAndConsumerWithResponse( + message, consumerWithResponse); + } + } + @BeforeClass public static void setupClass() throws Exception { topicAdminClient = TopicAdminClient.create(); @@ -97,12 +106,14 @@ private Subscription getSubscription( SubscriptionName subscriptionName, TopicName topicName, PushConfig pushConfig, - int ackDeadline) { + int ackDeadline, + boolean enableExactlyOnceDelivery) { return Subscription.newBuilder() .setName(subscriptionName.toString()) .setTopic(topicName.toString()) .setPushConfig(pushConfig) .setAckDeadlineSeconds(ackDeadline) + .setEnableExactlyOnceDelivery(enableExactlyOnceDelivery) .build(); } @@ -161,7 +172,8 @@ public void testVPCPushSubscriber() { subscriptionName, topicName, PushConfig.newBuilder().setPushEndpoint("https://random_point").build(), - 10)); + 10, + false)); subscriptionAdminClient.deleteSubscription(subscriptionName); Assert.fail("No exception raised"); } catch (PermissionDeniedException e) { @@ -184,7 +196,7 @@ public void testPublishSubscribe() throws Exception { topicAdminClient.createTopic(topicName); subscriptionAdminClient.createSubscription( - getSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), 10)); + getSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), 10, false)); final BlockingQueue receiveQueue = new LinkedBlockingQueue<>(); Subscriber subscriber = @@ -197,6 +209,7 @@ public void receiveMessage( receiveQueue.offer(MessageAndConsumer.create(message, consumer)); } }) + .setExactlyOnceDeliveryEnabled(true) .build(); subscriber.addListener( new Subscriber.Listener() { @@ -217,26 +230,203 @@ public void failed(Subscriber.State from, Throwable failure) { publisher.shutdown(); publisher.awaitTermination(1, TimeUnit.MINUTES); + MessageAndConsumer toAck = pollQueueMessageAndConsumer(receiveQueue); // Ack the first message. - MessageAndConsumer toAck = pollQueue(receiveQueue); toAck.consumer().ack(); - // Nack the other. - MessageAndConsumer toNack = pollQueue(receiveQueue); - assertThat(toNack.message().getData()).isNotEqualTo(toAck.message().getData()); + MessageAndConsumer toNack = pollQueueMessageAndConsumer(receiveQueue); + // Because we are not using ordering keys, we have to compare the received messages to each + // other + assertNotEquals(toNack.message().getData(), toAck.message().getData()); + // Nack toNack.consumer().nack(); // We should get the nacked message back. - MessageAndConsumer redelivered = pollQueue(receiveQueue); - assertThat(redelivered.message().getData()).isEqualTo(toNack.message().getData()); - redelivered.consumer().ack(); + MessageAndConsumer redeliveredToAck = pollQueueMessageAndConsumer(receiveQueue); + assertEquals(toNack.message().getData(), redeliveredToAck.message().getData()); + redeliveredToAck.consumer().ack(); subscriber.stopAsync().awaitTerminated(); subscriptionAdminClient.deleteSubscription(subscriptionName); topicAdminClient.deleteTopic(topicName); } - private MessageAndConsumer pollQueue(BlockingQueue queue) throws InterruptedException { + @Test + public void testPublishSubscribeMessageFutures() throws Exception { + TopicName topicName = + TopicName.newBuilder() + .setProject(projectId) + .setTopic(formatForTest("testing-publish-subscribe-message-futures")) + .build(); + SubscriptionName subscriptionName = + SubscriptionName.of(projectId, formatForTest("testing-publish-subscribe-message-futures")); + + topicAdminClient.createTopic(topicName); + subscriptionAdminClient.createSubscription( + getSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), 10, false)); + + final BlockingQueue receiveQueue = new LinkedBlockingQueue<>(); + Subscriber subscriber = + Subscriber.newBuilder( + subscriptionName.toString(), + new MessageReceiverWithAckResponse() { + @Override + public void receiveMessage( + final PubsubMessage message, + final AckReplyConsumerWithResponse consumerWithResponse) { + receiveQueue.offer( + MessageAndConsumerWithResponse.create(message, consumerWithResponse)); + } + }) + .setChannelProvider( + SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder() + .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) + .build()) + .setExactlyOnceDeliveryEnabled(false) + .build(); + subscriber.addListener( + new Subscriber.Listener() { + public void failed(Subscriber.State from, Throwable failure) { + receiveQueue.offer(failure); + } + }, + MoreExecutors.directExecutor()); + subscriber.startAsync(); + + Publisher publisher = Publisher.newBuilder(topicName).build(); + publisher + .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg1")).build()) + .get(); + publisher + .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build()) + .get(); + + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + + // Ack the first message. + MessageAndConsumerWithResponse toAck = pollQueueMessageAndConsumerWithResponse(receiveQueue); + Future ackResponseFuture = toAck.consumerWithResponse().ack(); + assertEquals(AckResponse.SUCCESSFUL, ackResponseFuture.get()); + + MessageAndConsumerWithResponse toNack = pollQueueMessageAndConsumerWithResponse(receiveQueue); + // Because we are not using ordering keys, we have to compare the received messages to each + // other + assertNotEquals(toNack.message().getData(), toAck.message().getData()); + Future nackResponseFuture = toNack.consumerWithResponse().nack(); + assertEquals(AckResponse.SUCCESSFUL, nackResponseFuture.get()); + + MessageAndConsumerWithResponse redeliveredToAck = + pollQueueMessageAndConsumerWithResponse(receiveQueue); + Future redeliveredToAckResponse = redeliveredToAck.consumerWithResponse().ack(); + + assertEquals(toNack.message().getData(), redeliveredToAck.message().getData()); + assertEquals(AckResponse.SUCCESSFUL, redeliveredToAckResponse.get()); + + subscriber.stopAsync().awaitTerminated(); + subscriptionAdminClient.deleteSubscription(subscriptionName); + topicAdminClient.deleteTopic(topicName); + } + + @Test + public void testPublishSubscribeExactlyOnce() throws Exception { + TopicName topicName = + TopicName.newBuilder() + .setProject(projectId) + .setTopic(formatForTest("testing-publish-subscribe-exactly-once-topic")) + .build(); + SubscriptionName subscriptionName = + SubscriptionName.of( + projectId, formatForTest("testing-publish-subscribe-exactly-once-subscription")); + + topicAdminClient.createTopic(topicName); + subscriptionAdminClient.createSubscription( + getSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), 60, true)); + + final BlockingQueue receiveQueue = new LinkedBlockingQueue<>(); + Subscriber subscriber = + Subscriber.newBuilder( + subscriptionName.toString(), + new MessageReceiverWithAckResponse() { + @Override + public void receiveMessage( + final PubsubMessage message, + final AckReplyConsumerWithResponse consumerWithResponse) { + receiveQueue.offer( + MessageAndConsumerWithResponse.create(message, consumerWithResponse)); + } + }) + .setChannelProvider( + SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder() + .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) + .build()) + .setExactlyOnceDeliveryEnabled(false) + .build(); + subscriber.addListener( + new Subscriber.Listener() { + public void failed(Subscriber.State from, Throwable failure) { + receiveQueue.offer(failure); + } + }, + MoreExecutors.directExecutor()); + subscriber.startAsync(); + + Publisher publisher = Publisher.newBuilder(topicName).build(); + publisher + .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg1")).build()) + .get(); + publisher + .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build()) + .get(); + + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + + // Ack the first message. + MessageAndConsumerWithResponse toAck = pollQueueMessageAndConsumerWithResponse(receiveQueue); + Future ackResponseFuture = toAck.consumerWithResponse().ack(); + assertEquals(AckResponse.SUCCESSFUL, ackResponseFuture.get()); + + MessageAndConsumerWithResponse toNack = pollQueueMessageAndConsumerWithResponse(receiveQueue); + // Because we are not using ordering keys, we have to compare the received messages to each + // other + assertNotEquals(toNack.message().getData(), toAck.message().getData()); + Future nackResponseFuture = toNack.consumerWithResponse().nack(); + assertEquals(AckResponse.SUCCESSFUL, nackResponseFuture.get()); + + MessageAndConsumerWithResponse redeliveredToAck = + pollQueueMessageAndConsumerWithResponse(receiveQueue); + Future redeliveredToAckResponse = redeliveredToAck.consumerWithResponse().ack(); + + assertEquals(toNack.message().getData(), redeliveredToAck.message().getData()); + assertEquals(AckResponse.SUCCESSFUL, redeliveredToAckResponse.get()); + + subscriber.stopAsync().awaitTerminated(); + subscriptionAdminClient.deleteSubscription(subscriptionName); + topicAdminClient.deleteTopic(topicName); + } + + private MessageAndConsumer pollQueueMessageAndConsumer(BlockingQueue queue) + throws InterruptedException { + Object obj = pollQueue(queue); + if (obj instanceof MessageAndConsumer) { + return (MessageAndConsumer) obj; + } + throw new IllegalStateException( + "expected either MessageAndConsumer or Throwable, found: " + obj); + } + + private MessageAndConsumerWithResponse pollQueueMessageAndConsumerWithResponse( + BlockingQueue queue) throws InterruptedException { + Object obj = pollQueue(queue); + if (obj instanceof MessageAndConsumerWithResponse) { + return (MessageAndConsumerWithResponse) obj; + } + throw new IllegalStateException( + "expected either MessageAndConsumerWithResponse or Throwable, found: " + obj); + } + + private Object pollQueue(BlockingQueue queue) throws InterruptedException { Object obj = queue.poll(10, TimeUnit.MINUTES); if (obj == null) { return null; @@ -244,10 +434,7 @@ private MessageAndConsumer pollQueue(BlockingQueue queue) throws Interru if (obj instanceof Throwable) { throw new IllegalStateException("unexpected error", (Throwable) obj); } - if (obj instanceof MessageAndConsumer) { - return (MessageAndConsumer) obj; - } - throw new IllegalStateException( - "expected either MessageAndConsumer or Throwable, found: " + obj); + + return obj; } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/CustomArgumentMatchers.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/CustomArgumentMatchers.java new file mode 100644 index 000000000..a98e5c4e1 --- /dev/null +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/CustomArgumentMatchers.java @@ -0,0 +1,208 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import java.util.*; +import org.mockito.ArgumentMatcher; + +/** Shared Custom Argument Matchers for Tests w/ Mock Futures */ +public class CustomArgumentMatchers { + public static class AcknowledgeRequestMatcher implements ArgumentMatcher { + private AcknowledgeRequest left; + + AcknowledgeRequestMatcher(AcknowledgeRequest acknowledgeRequest) { + this.left = acknowledgeRequest; + } + + @Override + public boolean matches(AcknowledgeRequest right) { + Set leftAckIdSet = new HashSet(this.left.getAckIdsList()); + Set rightAckIdSet = new HashSet(right.getAckIdsList()); + return this.left.getSubscription().equals(right.getSubscription()) + && leftAckIdSet.equals(rightAckIdSet); + } + } + + public static class ModifyAckDeadlineRequestMatcher + implements ArgumentMatcher { + private ModifyAckDeadlineRequest left; + + ModifyAckDeadlineRequestMatcher(ModifyAckDeadlineRequest modifyAckDeadlineRequest) { + this.left = modifyAckDeadlineRequest; + } + + @Override + public boolean matches(ModifyAckDeadlineRequest right) { + Set leftAckIdSet = new HashSet(this.left.getAckIdsList()); + Set rightAckIdSet = new HashSet(right.getAckIdsList()); + return this.left.getSubscription().equals(right.getSubscription()) + && this.left.getAckDeadlineSeconds() == right.getAckDeadlineSeconds() + && leftAckIdSet.equals(rightAckIdSet); + } + } + + public static class AckRequestDataMatcher implements ArgumentMatcher { + private AckRequestData left; + + private static Comparator comparator = + new Comparator() { + + @Override + public int compare(AckRequestData ackRequestData, AckRequestData t1) { + return ackRequestData.getAckId().compareTo(t1.getAckId()); + } + }; + + AckRequestDataMatcher(AckRequestData left) { + this.left = left; + } + + @Override + public boolean matches(AckRequestData right) { + return this.left.getAckId() == right.getAckId(); + } + } + + public static class AckRequestDataListMatcher implements ArgumentMatcher> { + private List left; + + AckRequestDataListMatcher(List ackRequestDataList) { + this.left = ackRequestDataList; + } + + @Override + public boolean matches(List right) { + // We only really care about the ackIds, the futures will be mocked + if (this.left.size() != right.size()) { + return false; + } + + // We just want to compare the ackIds not the futures and do not care about order (or + // duplicates) + this.left.sort(AckRequestDataMatcher.comparator); + right.sort(AckRequestDataMatcher.comparator); + + Iterator iteratorLeft = this.left.iterator(); + Iterator iteratorRight = right.iterator(); + + while (iteratorLeft.hasNext() && iteratorRight.hasNext()) { + if (iteratorLeft.next().getAckId() != iteratorRight.next().getAckId()) { + return false; + } + } + return true; + } + } + + public static class ModackRequestDataMatcher implements ArgumentMatcher { + private ModackRequestData left; + + private static Comparator comparator = + new Comparator() { + + @Override + public int compare(ModackRequestData left, ModackRequestData right) { + // Compare deadline extensions first + int deadlineExtensionDifference = + left.getDeadlineExtensionSeconds() - right.getDeadlineExtensionSeconds(); + if (deadlineExtensionDifference != 0) { + return deadlineExtensionDifference; + } + + // Then sort and compare ackIds + List ackRequestDataListLeft = left.getAckRequestData(); + List ackRequestDataListRight = right.getAckRequestData(); + + ackRequestDataListLeft.sort(AckRequestDataMatcher.comparator); + ackRequestDataListRight.sort(AckRequestDataMatcher.comparator); + + Iterator iteratorLeft = ackRequestDataListLeft.iterator(); + Iterator iteratorRight = ackRequestDataListRight.iterator(); + int compareAcks; + + while (iteratorLeft.hasNext() && iteratorRight.hasNext()) { + String ackIdLeft = iteratorLeft.next().getAckId(); + String ackIdRight = iteratorRight.next().getAckId(); + compareAcks = ackIdLeft.compareTo(ackIdRight); + + if (compareAcks != 0) { + return compareAcks; + } + } + + if (iteratorLeft.hasNext()) { + return 1; + } + if (iteratorRight.hasNext()) { + return -1; + } else { + return 0; + } + } + }; + + ModackRequestDataMatcher(ModackRequestData left) { + this.left = left; + } + + @Override + public boolean matches(ModackRequestData right) { + return ModackRequestDataMatcher.comparator.compare(this.left, right) == 0; + } + } + + public static class ModackRequestDataListMatcher + implements ArgumentMatcher> { + private List left; + + ModackRequestDataListMatcher(List modackRequestDataList) { + this.left = modackRequestDataList; + } + + @Override + public boolean matches(List right) { + // First check size + if (this.left.size() != right.size()) { + return false; + } + + // Sort first + this.left.sort(ModackRequestDataMatcher.comparator); + right.sort(ModackRequestDataMatcher.comparator); + + Iterator iteratorLeft = this.left.iterator(); + Iterator iteratorRight = right.iterator(); + + ModackRequestData modackRequestDataLeft; + ModackRequestData modackRequestDataRight; + + while (iteratorLeft.hasNext() && iteratorRight.hasNext()) { + + ModackRequestDataMatcher modackRequestDataMatcher = + new ModackRequestDataMatcher(iteratorLeft.next()); + + if (!modackRequestDataMatcher.matches(iteratorRight.next())) { + return false; + } + } + + return true; + } + } +} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java index e63b68748..d8f026fee 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java @@ -16,7 +16,6 @@ package com.google.cloud.pubsub.v1; -import com.google.api.core.ApiClock; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; @@ -48,7 +47,7 @@ public class FakeScheduledExecutorService extends AbstractExecutorService private final FakeClock clock = new FakeClock(); private final Deque expectedWorkQueue = new LinkedList<>(); - public ApiClock getClock() { + public FakeClock getClock() { return clock; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java index 260071b9b..173248041 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java @@ -49,7 +49,7 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase { private final AtomicBoolean subscriptionInitialized = new AtomicBoolean(false); private String subscription = ""; private final AtomicInteger messageAckDeadline = - new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS); + new AtomicInteger(Math.toIntExact(Subscriber.STREAM_ACK_DEADLINE_DEFAULT.getSeconds())); private final AtomicInteger getSubscriptionCalled = new AtomicInteger(); private StreamingPullRequest lastSeenRequest; private final List openedStreams = new ArrayList<>(); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 88a015f6e..3ff13acfc 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -17,23 +17,17 @@ package com.google.cloud.pubsub.v1; import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; -import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.Distribution; -import com.google.auto.value.AutoValue; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import org.junit.Before; import org.junit.Test; import org.threeten.bp.Duration; @@ -41,50 +35,51 @@ public class MessageDispatcherTest { private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data"); private static final int DELIVERY_INFO_COUNT = 3; + private static final String ACK_ID = "ACK-ID"; private static final ReceivedMessage TEST_MESSAGE = ReceivedMessage.newBuilder() - .setAckId("ackid") + .setAckId(ACK_ID) .setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build()) .setDeliveryAttempt(DELIVERY_INFO_COUNT) .build(); - private static final Runnable NOOP_RUNNABLE = - new Runnable() { - @Override - public void run() { - // No-op; don't do anything. - } - }; private static final int MAX_SECONDS_PER_ACK_EXTENSION = 60; + private static final int MIN_ACK_DEADLINE_SECONDS = 10; + private static final Duration MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60); + private static final Duration ACK_EXPIRATION_PADDING_DEFAULT = + Subscriber.ACK_EXPIRATION_PADDING_DEFAULT; - private MessageDispatcher dispatcher; - private LinkedBlockingQueue consumers; - private List sentAcks; - private List sentModAcks; + private Distribution mockAckLatencyDistribution; + + private MessageDispatcher.AckProcessor mockAckProcessor; private FakeClock clock; - private FlowController flowController; private boolean messageContainsDeliveryAttempt; - @AutoValue - abstract static class ModAckItem { - abstract String ackId(); + private FakeScheduledExecutorService systemExecutor; - abstract int seconds(); + private static MessageReceiver messageReceiver; + private static MessageReceiverWithAckResponse messageReceiverWithAckResponse; - static ModAckItem of(String ackId, int seconds) { - return new AutoValue_MessageDispatcherTest_ModAckItem(ackId, seconds); - } - } + private LinkedBlockingQueue consumers; + private LinkedBlockingQueue consumersWithResponse; @Before public void setUp() { + systemExecutor = new FakeScheduledExecutorService(); + clock = new FakeClock(); + mockAckLatencyDistribution = mock(Distribution.class); + + mockAckProcessor = mock(MessageDispatcher.AckProcessor.class); + messageContainsDeliveryAttempt = true; + consumers = new LinkedBlockingQueue<>(); - sentAcks = new ArrayList<>(); - sentModAcks = new ArrayList<>(); + consumersWithResponse = new LinkedBlockingQueue<>(); - MessageReceiver receiver = + // We are instantiating "real" message receivers to easily ack/nack messages + messageReceiver = new MessageReceiver() { @Override - public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + public void receiveMessage( + final PubsubMessage message, final AckReplyConsumer ackReplyConsumer) { assertThat(message.getData()).isEqualTo(MESSAGE_DATA); if (messageContainsDeliveryAttempt) { assertTrue(message.containsAttributes("googclient_deliveryattempt")); @@ -93,159 +88,410 @@ public void receiveMessage(final PubsubMessage message, final AckReplyConsumer c } else { assertFalse(message.containsAttributes("googclient_deliveryattempt")); } - consumers.add(consumer); + consumers.add(ackReplyConsumer); } }; - MessageDispatcher.AckProcessor processor = - new MessageDispatcher.AckProcessor() { - public void sendAckOperations( - List acksToSend, - List ackDeadlineExtensions) { - sentAcks.addAll(acksToSend); - for (MessageDispatcher.PendingModifyAckDeadline modack : ackDeadlineExtensions) { - for (String ackId : modack.ackIds) { - sentModAcks.add(ModAckItem.of(ackId, modack.deadlineExtensionSeconds)); - } + + messageReceiverWithAckResponse = + new MessageReceiverWithAckResponse() { + @Override + public void receiveMessage( + PubsubMessage message, AckReplyConsumerWithResponse ackReplyConsumerWithResponse) { + assertThat(message.getData()).isEqualTo(MESSAGE_DATA); + if (messageContainsDeliveryAttempt) { + assertTrue(message.containsAttributes("googclient_deliveryattempt")); + assertThat(message.getAttributesOrThrow("googclient_deliveryattempt")) + .isEqualTo(Integer.toString(DELIVERY_INFO_COUNT)); + } else { + assertFalse(message.containsAttributes("googclient_deliveryattempt")); } + consumersWithResponse.add(ackReplyConsumerWithResponse); } }; + } - // This executor isn't used because we're not actually scheduling anything until we call - // dispatcher.start(), which we're not doing here. - ScheduledThreadPoolExecutor systemExecutor = new ScheduledThreadPoolExecutor(1); - systemExecutor.shutdownNow(); + @Test + public void testSetupAndTeardown() { + MessageDispatcher messageDispatcher = getMessageDispatcher(); - clock = new FakeClock(); - flowController = - new FlowController( - FlowControlSettings.newBuilder() - .setMaxOutstandingElementCount(1L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) - .build()); - - dispatcher = - new MessageDispatcher( - receiver, - processor, - Duration.ofSeconds(5), - Duration.ofMinutes(60), - Duration.ofSeconds(MAX_SECONDS_PER_ACK_EXTENSION), - new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1), - flowController, - MoreExecutors.directExecutor(), - systemExecutor, - clock); - dispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS); + messageDispatcher.start(); + messageDispatcher.stop(); + } - messageContainsDeliveryAttempt = true; + @Test + public void testReceiptMessageReceiver() { + MessageReceiver mockMessageReceiver = mock(MessageReceiver.class); + MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiver); + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + messageDispatcher.processOutstandingOperations(); + + // Assert expected behavior + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add( + new ModackRequestData( + MIN_ACK_DEADLINE_SECONDS, AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build())); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); + verify(mockMessageReceiver, never()) + .receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumer.class)); } @Test - public void testReceipt() { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); - dispatcher.processOutstandingAckOperations(); - assertThat(sentModAcks) - .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); + public void testReceiptMessageReceiverWithAckResponse() { + MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse = + mock(MessageReceiverWithAckResponse.class); + MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiverWithAckResponse); + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + messageDispatcher.processOutstandingOperations(); + + // Assert expected behavior + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add( + new ModackRequestData( + MIN_ACK_DEADLINE_SECONDS, AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build())); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); + verify(mockMessageReceiverWithAckResponse, never()) + .receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumerWithResponse.class)); } @Test - public void testReceiptNoDeliveryAttempt() { - messageContainsDeliveryAttempt = false; - ReceivedMessage messageNoDeliveryAttempt = - ReceivedMessage.newBuilder() - .setAckId("ackid") - .setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build()) - .build(); - dispatcher.processReceivedMessages(Collections.singletonList(messageNoDeliveryAttempt)); - dispatcher.processOutstandingAckOperations(); - assertThat(sentModAcks) - .contains( - ModAckItem.of( - messageNoDeliveryAttempt.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); + public void testConsumerAckMessageReceiver() { + MessageDispatcher messageDispatcher = getMessageDispatcher(messageReceiver); + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + + try { + // Ack a message + consumers.take().ack(); + } catch (Throwable t) { + // In case our consumers fail + throw new AssertionError(); + } + + messageDispatcher.processOutstandingOperations(); + + // Assert expected behavior + List ackRequestDataList = new ArrayList(); + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + ackRequestDataList.add(ackRequestData); + + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add(new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, ackRequestData)); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); + verify(mockAckProcessor, times(1)) + .sendAckOperations( + argThat(new CustomArgumentMatchers.AckRequestDataListMatcher(ackRequestDataList))); } @Test - public void testAck() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); - consumers.take().ack(); - dispatcher.processOutstandingAckOperations(); - assertThat(sentAcks).contains(TEST_MESSAGE.getAckId()); + public void testConsumerAckMessageReceiverWithAckResponse() { + MessageDispatcher messageDispatcher = getMessageDispatcher(messageReceiverWithAckResponse); + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + Future ackResponseFuture; + + try { + // Ack a message - at this point we do not care about the message future so just drop it + consumersWithResponse.take().ack(); + } catch (Throwable t) { + // In case our consumers fail + throw new AssertionError(); + } + + messageDispatcher.processOutstandingOperations(); + + // Assert expected behavior + List ackRequestDataList = new ArrayList(); + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + ackRequestDataList.add(ackRequestData); + + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add(new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, ackRequestData)); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); + verify(mockAckProcessor, times(1)) + .sendAckOperations( + argThat(new CustomArgumentMatchers.AckRequestDataListMatcher(ackRequestDataList))); + } + + @Test + public void testConsumerNackMessageReceiver() { + MessageDispatcher messageDispatcher = getMessageDispatcher(messageReceiver); + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + + try { + consumers.take().nack(); + } catch (Throwable t) { + // Just in case something went wrong with our consumers + throw new AssertionError(); + } + + messageDispatcher.processOutstandingOperations(); + + // Assert expected behavior + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add(new ModackRequestData(0, ackRequestData)); + modackRequestDataList.add(new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, ackRequestData)); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); } @Test - public void testNack() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); - consumers.take().nack(); - dispatcher.processOutstandingAckOperations(); - assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0)); + public void testConsumerNackMessageReceiverWithAckResponse() { + MessageDispatcher messageDispatcher = getMessageDispatcher(messageReceiverWithAckResponse); + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + + try { + // Ack a message - at this point we do not care about the message future so just drop it + consumersWithResponse.take().nack(); + } catch (Throwable t) { + // Just in case something went wrong with our consumers + throw new AssertionError(); + } + + messageDispatcher.processOutstandingOperations(); + + // Assert expected behavior + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add(new ModackRequestData(0, ackRequestData)); + modackRequestDataList.add(new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, ackRequestData)); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); } @Test - public void testExtension() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); - dispatcher.extendDeadlines(); - assertThat(sentModAcks) - .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); + public void testExtension() { + MessageDispatcher messageDispatcher = getMessageDispatcher(); + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + messageDispatcher.extendDeadlines(); + + // Assert expected behavior + List ackRequestDataList = new ArrayList(); + + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + ackRequestDataList.add(ackRequestData); + + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add(new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, ackRequestData)); - sentModAcks.clear(); - consumers.take().ack(); - dispatcher.extendDeadlines(); - assertThat(sentModAcks).isEmpty(); + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); } @Test - public void testExtension_Close() { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); - dispatcher.extendDeadlines(); - assertThat(sentModAcks) - .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); - sentModAcks.clear(); + public void testExtension_ExpirationExtension() { + MessageDispatcher messageDispatcher = getMessageDispatcher(); + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + int secondsLeft = 5; + // Advance clock to have 5 seconds left in extension period + clock.advance(MAX_ACK_EXTENSION_PERIOD.getSeconds() - secondsLeft, TimeUnit.SECONDS); + messageDispatcher.extendDeadlines(); - // Default total expiration is an hour (60*60 seconds). We normally would extend by 10s. - // However, only extend by 5s here, since there's only 5s left before total expiration. - clock.advance(60 * 60 - 5, TimeUnit.SECONDS); - dispatcher.extendDeadlines(); - assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 5)); + // Assert expected behavior + List ackRequestDataList = new ArrayList(); + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + ackRequestDataList.add(ackRequestData); + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add(new ModackRequestData(secondsLeft, ackRequestData)); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); } @Test public void testExtension_GiveUp() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); - dispatcher.extendDeadlines(); - assertThat(sentModAcks) - .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); - sentModAcks.clear(); + MessageDispatcher messageDispatcher = getMessageDispatcher(); + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); // If we run extendDeadlines after totalExpiration, we shouldn't send anything. - // In particular, don't send negative modacks. clock.advance(1, TimeUnit.DAYS); - dispatcher.extendDeadlines(); - assertThat(sentModAcks).isEmpty(); + messageDispatcher.extendDeadlines(); - // We should be able to reserve another item in the flow controller and not block. - flowController.reserve(1, 0); - dispatcher.stop(); + // Assert expected behavior + verify(mockAckProcessor, times(0)).sendAckOperations(eq(Collections.emptyList())); + verify(mockAckProcessor, times(0)).sendModackOperations(eq(Collections.emptyList())); } @Test - public void testDeadlineAdjustment() throws Exception { - assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10); + public void testAckExtensionDefaultsExactlyOnceDeliveryOffThenOn() { + // EnableExactlyOnceDelivery is turned off by default + MessageDispatcher messageDispatcher = + MessageDispatcher.newBuilder(mock(MessageReceiver.class)) + .setAckLatencyDistribution(mockAckLatencyDistribution) + .setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) + .setMinDurationPerAckExtensionDefaultUsed(true) + .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) + .setMaxDurationPerAckExtensionDefaultUsed(true) + .build(); + + // We should be using the Subscriber set hard deadlines + assertMinAndMaxAckDeadlines( + messageDispatcher, + Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()), + Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); - clock.advance(42, TimeUnit.SECONDS); - consumers.take().ack(); + messageDispatcher.setEnableExactlyOnceDelivery(true); - assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(42); + // Should only change min deadline + assertMinAndMaxAckDeadlines( + messageDispatcher, + Math.toIntExact( + Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds()), + Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); } @Test - public void testMaxDurationPerAckExtension() throws Exception { - assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10); + public void testAckExtensionDefaultsExactlyOnceDeliveryOnThenOff() { + MessageDispatcher messageDispatcher = + MessageDispatcher.newBuilder(mock(MessageReceiver.class)) + .setAckLatencyDistribution(mockAckLatencyDistribution) + .setEnableExactlyOnceDelivery(true) + .setMinDurationPerAckExtension( + Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY) + .setMinDurationPerAckExtensionDefaultUsed(true) + .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) + .setMaxDurationPerAckExtensionDefaultUsed(true) + .build(); + + assertMinAndMaxAckDeadlines( + messageDispatcher, + Math.toIntExact( + Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds()), + Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); + + messageDispatcher.setEnableExactlyOnceDelivery(false); + + // Should change min deadline + assertMinAndMaxAckDeadlines( + messageDispatcher, + Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()), + Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); + } + + @Test + public void testAckExtensionCustomMinExactlyOnceDeliveryOffThenOn() { + int customMinSeconds = 30; + MessageDispatcher messageDispatcher = + MessageDispatcher.newBuilder(mock(MessageReceiver.class)) + .setAckLatencyDistribution(mockAckLatencyDistribution) + .setMinDurationPerAckExtension(Duration.ofSeconds(customMinSeconds)) + .setMinDurationPerAckExtensionDefaultUsed(false) + .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) + .setMaxDurationPerAckExtensionDefaultUsed(true) + .build(); - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); - clock.advance(MAX_SECONDS_PER_ACK_EXTENSION + 5, TimeUnit.SECONDS); - consumers.take().ack(); + assertMinAndMaxAckDeadlines( + messageDispatcher, + customMinSeconds, + Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); + + messageDispatcher.setEnableExactlyOnceDelivery(true); + + // no changes should occur + assertMinAndMaxAckDeadlines( + messageDispatcher, + customMinSeconds, + Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); + } + + @Test + public void testAckExtensionCustomMaxExactlyOnceDeliveryOffThenOn() { + int customMaxSeconds = 30; + MessageDispatcher messageDispatcher = + MessageDispatcher.newBuilder(mock(MessageReceiver.class)) + .setAckLatencyDistribution(mockAckLatencyDistribution) + .setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) + .setMinDurationPerAckExtensionDefaultUsed(true) + .setMaxDurationPerAckExtension(Duration.ofSeconds(customMaxSeconds)) + .setMaxDurationPerAckExtensionDefaultUsed(false) + .build(); + + assertMinAndMaxAckDeadlines( + messageDispatcher, + Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()), + customMaxSeconds); + + messageDispatcher.setEnableExactlyOnceDelivery(true); + + // Because the customMaxSeconds is above the + // DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY, we should use the customMaxSeconds + // as the new min + assertMinAndMaxAckDeadlines(messageDispatcher, customMaxSeconds, customMaxSeconds); + } + + private void assertMinAndMaxAckDeadlines( + MessageDispatcher messageDispatcher, int minAckDeadline, int maxAckDeadline) { + // Helper function to assert if min and max deadlines are being respected + + // Set distribution to return a low value to assert min value + when(mockAckLatencyDistribution.getPercentile( + MessageDispatcher.PERCENTILE_FOR_ACK_DEADLINE_UPDATES)) + .thenReturn(0); + assertEquals(minAckDeadline, messageDispatcher.computeDeadlineSeconds()); + + // Set distribution to return a high value to assert max value + when(mockAckLatencyDistribution.getPercentile( + MessageDispatcher.PERCENTILE_FOR_ACK_DEADLINE_UPDATES)) + .thenReturn(60 * 60); + assertEquals(maxAckDeadline, messageDispatcher.computeDeadlineSeconds()); + } + + private MessageDispatcher getMessageDispatcher() { + return getMessageDispatcher(mock(MessageReceiver.class)); + } + + private MessageDispatcher getMessageDispatcher(MessageReceiver messageReceiver) { + return getMessageDispatcherFromBuilder(MessageDispatcher.newBuilder(messageReceiver)); + } + + private MessageDispatcher getMessageDispatcher( + MessageReceiverWithAckResponse messageReceiverWithAckResponse) { + return getMessageDispatcherFromBuilder( + MessageDispatcher.newBuilder(messageReceiverWithAckResponse)); + } + + private MessageDispatcher getMessageDispatcherFromBuilder(MessageDispatcher.Builder builder) { + MessageDispatcher messageDispatcher = + builder + .setAckProcessor(mockAckProcessor) + .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT) + .setMaxAckExtensionPeriod(MAX_ACK_EXTENSION_PERIOD) + .setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) + .setMinDurationPerAckExtensionDefaultUsed(true) + .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) + .setMaxDurationPerAckExtensionDefaultUsed(true) + .setAckLatencyDistribution(mock(Distribution.class)) + .setFlowController(mock(FlowController.class)) + .setExecutor(MoreExecutors.directExecutor()) + .setSystemExecutor(systemExecutor) + .setApiClock(clock) + .build(); - assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(MAX_SECONDS_PER_ACK_EXTENSION); + messageDispatcher.setMessageDeadlineSeconds(MIN_ACK_DEADLINE_SECONDS); + return messageDispatcher; } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java new file mode 100644 index 000000000..6ad951001 --- /dev/null +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java @@ -0,0 +1,490 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.Distribution; +import com.google.api.gax.rpc.StatusCode; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.protobuf.Any; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.rpc.ErrorInfo; +import com.google.rpc.Status; +import io.grpc.StatusException; +import io.grpc.protobuf.StatusProto; +import java.util.*; +import java.util.concurrent.ExecutionException; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.threeten.bp.Duration; + +/** Tests for {@link StreamingSubscriberConnection}. */ +public class StreamingSubscriberConnectionTest { + @Rule public TestName testName = new TestName(); + + private FakeScheduledExecutorService systemExecutor; + private FakeScheduledExecutorService executor; + private FakeClock clock; + private SubscriberStub mockSubscriberStub; + + private static final String MOCK_SUBSCRIPTION_NAME = "MOCK-SUBSCRIPTION"; + private static final String MOCK_ACK_ID_SUCCESS = "MOCK-ACK-ID-SUCCESS"; + private static final String MOCK_ACK_ID_SUCCESS_2 = "MOCK-ACK-ID-SUCCESS-2"; + private static final String MOCK_ACK_ID_NACK_SUCCESS = "MOCK-ACK-ID-NACK-SUCCESS"; + // Successful modacks should not return a message + private static final String MOCK_ACK_ID_SUCCESS_NO_MESSAGE = "MOCK-ACK-ID-SUCCESS-NO-MESSAGE"; + private static final String MOCK_ACK_ID_TRANSIENT_FAILURE_UNORDERED_ACK_ID_THEN_SUCCESS = + "MOCK-ACK-ID-TRANSIENT-FAILURE-UNORDERED-ACK-ID-THEN-SUCCESS"; + private static final String MOCK_ACK_ID_TRANSIENT_FAILURE_SERVICE_UNAVAILABLE_THEN_SUCCESS = + "MOCK-ACK-ID-TRANSIENT-FAILURE-SERVICE-UNAVAILABLE-THEN-SUCCESS"; + private static final String MOCK_ACK_ID_INVALID = "MOCK-ACK-ID-INVALID"; + private static final String MOCK_ACK_ID_OTHER = "MOCK-ACK-ID-OTHER"; + + private static final String PERMANENT_FAILURE_INVALID_ACK_ID = "PERMANENT_FAILURE_INVALID_ACK_ID"; + private static final String TRANSIENT_FAILURE_UNORDERED_ACK_ID = + "TRANSIENT_FAILURE_UNORDERED_ACK_ID"; + private static final String TRANSIENT_FAILURE_SERVICE_UNAVAILABLE = + "TRANSIENT_FAILURE_SERVICE_UNAVAILABLE"; + private static final String PERMANENT_FAILURE_OTHER = "I_DO_NOT_MATCH_ANY_KNOWN_ERRORS"; + + private static int MOCK_ACK_EXTENSION_DEFAULT = 10; + private static Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(10); + private static int MAX_DURATION_PER_ACK_EXTENSION_DEFAULT_SECONDS = 10; + + @Before + public void setUp() { + systemExecutor = new FakeScheduledExecutorService(); + clock = systemExecutor.getClock(); + mockSubscriberStub = mock(SubscriberStub.class, RETURNS_DEEP_STUBS); + } + + @After + public void tearDown() { + systemExecutor.shutdown(); + } + + @Test + public void testSetupAndTeardown() { + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnection(false); + + streamingSubscriberConnection.startAsync(); + streamingSubscriberConnection.awaitRunning(); + streamingSubscriberConnection.stopAsync(); + streamingSubscriberConnection.awaitTerminated(); + } + + @Test + public void testSendAckOperationsExactlyOnceDisabledNoMessageFutures() { + // Setup mocks + List modackRequestDataList = new ArrayList(); + + ModackRequestData modackRequestDataSuccess = + new ModackRequestData( + MOCK_ACK_EXTENSION_DEFAULT, AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build()); + modackRequestDataList.add(modackRequestDataSuccess); + + ModackRequestData modackRequestDataNack = + new ModackRequestData(0, AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build()); + modackRequestDataList.add(modackRequestDataNack); + + List ackRequestDataList = new ArrayList(); + AckRequestData ackRequestDataSuccess = AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS).build(); + ackRequestDataList.add(ackRequestDataSuccess); + + // Instantiate class and run operation(s) + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnection(false); + streamingSubscriberConnection.sendAckOperations(ackRequestDataList); + streamingSubscriberConnection.sendModackOperations(modackRequestDataList); + + // Assert expected behavior + verify(mockSubscriberStub, times(2)).modifyAckDeadlineCallable(); + verify(mockSubscriberStub, times(1)).acknowledgeCallable(); + } + + @Test + public void testSendAckOperationsExactlyOnceEnabledMessageFuturesModacks() { + // Setup + + // The list(s) of ackIds allows us to mock the grpc response(s) + List ackIdsInitialRequest = new ArrayList<>(); + List ackIdsRetryRequest = new ArrayList<>(); + + Map errorInfoMetadataMapInitialRequest = new HashMap(); + List modackRequestDataList = new ArrayList(); + + ModackRequestData modackRequestDataDefault = new ModackRequestData(MOCK_ACK_EXTENSION_DEFAULT); + + // Nack SUCCESS + SettableApiFuture messageFutureSuccessExpected = SettableApiFuture.create(); + ModackRequestData modackRequestDataSuccess = + new ModackRequestData( + 0, + AckRequestData.newBuilder(MOCK_ACK_ID_NACK_SUCCESS) + .setMessageFuture(messageFutureSuccessExpected) + .build()); + modackRequestDataList.add(modackRequestDataSuccess); + + // SUCCESS - no message + SettableApiFuture messageFutureNotDoneExpected = SettableApiFuture.create(); + modackRequestDataDefault.addAckRequestData( + AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS_NO_MESSAGE) + .setMessageFuture(messageFutureNotDoneExpected) + .build()); + ackIdsInitialRequest.add(MOCK_ACK_ID_SUCCESS_NO_MESSAGE); + + // INVALID + SettableApiFuture messageFutureInvalidExpected = SettableApiFuture.create(); + modackRequestDataDefault.addAckRequestData( + AckRequestData.newBuilder(MOCK_ACK_ID_INVALID) + .setMessageFuture(messageFutureInvalidExpected) + .build()); + errorInfoMetadataMapInitialRequest.put(MOCK_ACK_ID_INVALID, PERMANENT_FAILURE_INVALID_ACK_ID); + ackIdsInitialRequest.add(MOCK_ACK_ID_INVALID); + + // OTHER + SettableApiFuture messageFutureOtherExpected = SettableApiFuture.create(); + modackRequestDataDefault.addAckRequestData( + AckRequestData.newBuilder(MOCK_ACK_ID_OTHER) + .setMessageFuture(messageFutureOtherExpected) + .build()); + errorInfoMetadataMapInitialRequest.put(MOCK_ACK_ID_OTHER, PERMANENT_FAILURE_OTHER); + ackIdsInitialRequest.add(MOCK_ACK_ID_OTHER); + + // Initial) FAILURE - TRANSIENT SERVICE UNAVAILABLE + // Retry) SUCCESS - but no message future set + SettableApiFuture messageFutureTransientFailureServiceUnavailableThenSuccess = + SettableApiFuture.create(); + modackRequestDataDefault.addAckRequestData( + AckRequestData.newBuilder(MOCK_ACK_ID_TRANSIENT_FAILURE_SERVICE_UNAVAILABLE_THEN_SUCCESS) + .setMessageFuture(messageFutureTransientFailureServiceUnavailableThenSuccess) + .build()); + errorInfoMetadataMapInitialRequest.put( + MOCK_ACK_ID_TRANSIENT_FAILURE_SERVICE_UNAVAILABLE_THEN_SUCCESS, + TRANSIENT_FAILURE_SERVICE_UNAVAILABLE); + ackIdsInitialRequest.add(MOCK_ACK_ID_TRANSIENT_FAILURE_SERVICE_UNAVAILABLE_THEN_SUCCESS); + ackIdsRetryRequest.add(MOCK_ACK_ID_TRANSIENT_FAILURE_SERVICE_UNAVAILABLE_THEN_SUCCESS); + + // Initial) FAILURE - TRANSIENT - UNORDERED ACK ID + // Retry) SUCCESS - but no message future set + SettableApiFuture messageFutureTransientFailureUnorderedAckIdThenSuccess = + SettableApiFuture.create(); + modackRequestDataDefault.addAckRequestData( + AckRequestData.newBuilder(MOCK_ACK_ID_TRANSIENT_FAILURE_UNORDERED_ACK_ID_THEN_SUCCESS) + .setMessageFuture(messageFutureTransientFailureUnorderedAckIdThenSuccess) + .build()); + errorInfoMetadataMapInitialRequest.put( + MOCK_ACK_ID_TRANSIENT_FAILURE_UNORDERED_ACK_ID_THEN_SUCCESS, + TRANSIENT_FAILURE_UNORDERED_ACK_ID); + ackIdsInitialRequest.add(MOCK_ACK_ID_TRANSIENT_FAILURE_UNORDERED_ACK_ID_THEN_SUCCESS); + ackIdsRetryRequest.add(MOCK_ACK_ID_TRANSIENT_FAILURE_UNORDERED_ACK_ID_THEN_SUCCESS); + + modackRequestDataList.add(modackRequestDataDefault); + + // Build our requests so we can set our mock responses + ModifyAckDeadlineRequest modifyAckDeadlineRequestNack = + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(MOCK_SUBSCRIPTION_NAME) + .addAckIds(MOCK_ACK_ID_NACK_SUCCESS) + .setAckDeadlineSeconds(0) + .build(); + + ModifyAckDeadlineRequest modifyAckDeadlineRequestInitial = + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(MOCK_SUBSCRIPTION_NAME) + .addAllAckIds(ackIdsInitialRequest) + .setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT) + .build(); + + ModifyAckDeadlineRequest modifyAckDeadlineRequestRetry = + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(MOCK_SUBSCRIPTION_NAME) + .addAllAckIds(ackIdsRetryRequest) + .setAckDeadlineSeconds(MOCK_ACK_EXTENSION_DEFAULT) + .build(); + + // Set mock grpc responses + when(mockSubscriberStub.modifyAckDeadlineCallable().futureCall(modifyAckDeadlineRequestNack)) + .thenReturn(ApiFutures.immediateFuture(null)); + when(mockSubscriberStub.modifyAckDeadlineCallable().futureCall(modifyAckDeadlineRequestInitial)) + .thenReturn( + ApiFutures.immediateFailedFuture( + getMockStatusException(errorInfoMetadataMapInitialRequest))); + when(mockSubscriberStub + .modifyAckDeadlineCallable() + .futureCall( + argThat( + new CustomArgumentMatchers.ModifyAckDeadlineRequestMatcher( + modifyAckDeadlineRequestRetry)))) + .thenReturn(ApiFutures.immediateFuture(null)); + + // Instantiate class and run operation(s) + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnection(true); + + streamingSubscriberConnection.sendModackOperations(modackRequestDataList); + + // Backoff + systemExecutor.advanceTime(Duration.ofSeconds(200)); + + // Assert expected behavior + verify(mockSubscriberStub.modifyAckDeadlineCallable(), times(1)) + .futureCall(modifyAckDeadlineRequestNack); + verify(mockSubscriberStub.modifyAckDeadlineCallable(), times(1)) + .futureCall(modifyAckDeadlineRequestInitial); + verify(mockSubscriberStub.modifyAckDeadlineCallable(), times(1)) + .futureCall(modifyAckDeadlineRequestRetry); + verify(mockSubscriberStub, never()).acknowledgeCallable(); + + try { + assertEquals(AckResponse.SUCCESSFUL, messageFutureSuccessExpected.get()); + assertEquals(AckResponse.INVALID, messageFutureInvalidExpected.get()); + assertEquals(AckResponse.OTHER, messageFutureOtherExpected.get()); + assertFalse(messageFutureTransientFailureServiceUnavailableThenSuccess.isDone()); + assertFalse(messageFutureTransientFailureUnorderedAckIdThenSuccess.isDone()); + } catch (InterruptedException | ExecutionException e) { + // In case something goes wrong retrieving the futures + throw new AssertionError(); + } + } + + @Test + public void testSendAckOperationsExactlyOnceEnabledMessageFuturesAcks() { + // Setup + + // The list(s) of ackIds allows us to mock the grpc response(s) + List ackIdsInitialRequest = new ArrayList<>(); + List ackIdsRetryRequest = new ArrayList<>(); + + Map errorInfoMetadataMapInitialRequest = new HashMap(); + List ackRequestDataList = new ArrayList(); + + // SUCCESS + SettableApiFuture messageFutureSuccessExpected = SettableApiFuture.create(); + ackRequestDataList.add( + AckRequestData.newBuilder(MOCK_ACK_ID_SUCCESS) + .setMessageFuture(messageFutureSuccessExpected) + .build()); + ackIdsInitialRequest.add(MOCK_ACK_ID_SUCCESS); + + // INVALID + SettableApiFuture messageFutureInvalidExpected = SettableApiFuture.create(); + ackRequestDataList.add( + AckRequestData.newBuilder(MOCK_ACK_ID_INVALID) + .setMessageFuture(messageFutureInvalidExpected) + .build()); + errorInfoMetadataMapInitialRequest.put(MOCK_ACK_ID_INVALID, PERMANENT_FAILURE_INVALID_ACK_ID); + ackIdsInitialRequest.add(MOCK_ACK_ID_INVALID); + + // OTHER + SettableApiFuture messageFutureOtherExpected = SettableApiFuture.create(); + ackRequestDataList.add( + AckRequestData.newBuilder(MOCK_ACK_ID_OTHER) + .setMessageFuture(messageFutureOtherExpected) + .build()); + errorInfoMetadataMapInitialRequest.put(MOCK_ACK_ID_OTHER, PERMANENT_FAILURE_OTHER); + ackIdsInitialRequest.add(MOCK_ACK_ID_OTHER); + + // Initial) FAILURE - TRANSIENT SERVICE UNAVAILABLE + // Retry) SUCCESS + SettableApiFuture messageFutureTransientFailureServiceUnavailableThenSuccess = + SettableApiFuture.create(); + ackRequestDataList.add( + AckRequestData.newBuilder(MOCK_ACK_ID_TRANSIENT_FAILURE_SERVICE_UNAVAILABLE_THEN_SUCCESS) + .setMessageFuture(messageFutureTransientFailureServiceUnavailableThenSuccess) + .build()); + errorInfoMetadataMapInitialRequest.put( + MOCK_ACK_ID_TRANSIENT_FAILURE_SERVICE_UNAVAILABLE_THEN_SUCCESS, + TRANSIENT_FAILURE_SERVICE_UNAVAILABLE); + ackIdsInitialRequest.add(MOCK_ACK_ID_TRANSIENT_FAILURE_SERVICE_UNAVAILABLE_THEN_SUCCESS); + ackIdsRetryRequest.add(MOCK_ACK_ID_TRANSIENT_FAILURE_SERVICE_UNAVAILABLE_THEN_SUCCESS); + + // Initial) FAILURE - TRANSIENT - UNORDERED ACK ID + // Retry) SUCCESS + SettableApiFuture messageFutureTransientFailureUnorderedAckIdThenSuccess = + SettableApiFuture.create(); + ackRequestDataList.add( + AckRequestData.newBuilder(MOCK_ACK_ID_TRANSIENT_FAILURE_UNORDERED_ACK_ID_THEN_SUCCESS) + .setMessageFuture(messageFutureTransientFailureUnorderedAckIdThenSuccess) + .build()); + errorInfoMetadataMapInitialRequest.put( + MOCK_ACK_ID_TRANSIENT_FAILURE_UNORDERED_ACK_ID_THEN_SUCCESS, + TRANSIENT_FAILURE_UNORDERED_ACK_ID); + ackIdsInitialRequest.add(MOCK_ACK_ID_TRANSIENT_FAILURE_UNORDERED_ACK_ID_THEN_SUCCESS); + ackIdsRetryRequest.add(MOCK_ACK_ID_TRANSIENT_FAILURE_UNORDERED_ACK_ID_THEN_SUCCESS); + + // Build our requests so we can set our mock responses + AcknowledgeRequest acknowledgeRequestInitial = + AcknowledgeRequest.newBuilder() + .setSubscription(MOCK_SUBSCRIPTION_NAME) + .addAllAckIds(ackIdsInitialRequest) + .build(); + + AcknowledgeRequest acknowledgeRequestRetry = + AcknowledgeRequest.newBuilder() + .setSubscription(MOCK_SUBSCRIPTION_NAME) + .addAllAckIds(ackIdsRetryRequest) + .build(); + + // Set mock grpc responses + when(mockSubscriberStub.acknowledgeCallable().futureCall(acknowledgeRequestInitial)) + .thenReturn( + ApiFutures.immediateFailedFuture( + getMockStatusException(errorInfoMetadataMapInitialRequest))); + when(mockSubscriberStub + .acknowledgeCallable() + .futureCall( + argThat( + new CustomArgumentMatchers.AcknowledgeRequestMatcher(acknowledgeRequestRetry)))) + .thenReturn(ApiFutures.immediateFuture(null)); + + // Instantiate class and run operation(s) + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnection(true); + + streamingSubscriberConnection.sendAckOperations(ackRequestDataList); + + // Backoff + systemExecutor.advanceTime(Duration.ofMillis(200)); + + // Assert expected behavior; + verify(mockSubscriberStub.acknowledgeCallable(), times(1)) + .futureCall(acknowledgeRequestInitial); + verify(mockSubscriberStub.acknowledgeCallable(), times(1)) + .futureCall( + argThat(new CustomArgumentMatchers.AcknowledgeRequestMatcher(acknowledgeRequestRetry))); + verify(mockSubscriberStub, never()).modifyAckDeadlineCallable(); + + try { + assertEquals(AckResponse.SUCCESSFUL, messageFutureSuccessExpected.get()); + assertEquals(AckResponse.INVALID, messageFutureInvalidExpected.get()); + assertEquals(AckResponse.OTHER, messageFutureOtherExpected.get()); + assertEquals( + AckResponse.SUCCESSFUL, messageFutureTransientFailureServiceUnavailableThenSuccess.get()); + assertEquals( + AckResponse.SUCCESSFUL, messageFutureTransientFailureUnorderedAckIdThenSuccess.get()); + } catch (InterruptedException | ExecutionException e) { + // In case something goes wrong retrieving the futures + throw new AssertionError(); + } + } + + @Test + public void testSetFailureResponseOutstandingMessages() { + // Setup + + List ackRequestDataList = new ArrayList(); + List nackRequestDataList = new ArrayList(); + List> futureList = + new ArrayList>(); + + // Create some acks + for (int i = 0; i < 5; i++) { + SettableApiFuture future = SettableApiFuture.create(); + futureList.add(future); + ackRequestDataList.add( + AckRequestData.newBuilder("ACK-ID-" + i).setMessageFuture(future).build()); + } + + // Create some nacks + for (int i = 5; i < 10; i++) { + SettableApiFuture future = SettableApiFuture.create(); + futureList.add(future); + nackRequestDataList.add( + AckRequestData.newBuilder("ACK-ID-" + i).setMessageFuture(future).build()); + } + + ModackRequestData modackRequestData = new ModackRequestData(0, nackRequestDataList); + + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnection(true); + + streamingSubscriberConnection.sendAckOperations(ackRequestDataList); + streamingSubscriberConnection.sendModackOperations( + Collections.singletonList(modackRequestData)); + + // Assert pending status + futureList.forEach( + ackResponseSettableApiFuture -> { + assertFalse(ackResponseSettableApiFuture.isDone()); + }); + + // Set + streamingSubscriberConnection.setResponseOutstandingMessages(AckResponse.PERMISSION_DENIED); + + // Assert futures + + futureList.forEach( + ackResponseSettableApiFuture -> { + try { + assertEquals(ackResponseSettableApiFuture.get(), AckResponse.PERMISSION_DENIED); + } catch (InterruptedException | ExecutionException e) { + // In case something goes wrong retrieving the futures + throw new AssertionError(); + } + }); + } + + private StreamingSubscriberConnection getStreamingSubscriberConnection( + boolean exactlyOnceDeliveryEnabled) { + return getStreamingSubscriberReceiverFromBuilder( + StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class)), + exactlyOnceDeliveryEnabled); + } + + private StreamingSubscriberConnection getStreamingSubscriberReceiverFromBuilder( + StreamingSubscriberConnection.Builder builder, boolean exactlyOnceDeliveryEnabled) { + return builder + .setSubscription(MOCK_SUBSCRIPTION_NAME) + .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT) + .setAckLatencyDistribution(mock(Distribution.class)) + .setSubscriberStub(mockSubscriberStub) + .setChannelAffinity(0) + .setFlowControlSettings(mock(FlowControlSettings.class)) + .setFlowController(mock(FlowController.class)) + .setExecutor(executor) + .setSystemExecutor(systemExecutor) + .setClock(clock) + .setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) + .setMinDurationPerAckExtensionDefaultUsed(true) + .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) + .setMaxDurationPerAckExtensionDefaultUsed(true) + .setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled) + .build(); + } + + private StatusException getMockStatusException(Map metadata) { + ErrorInfo errorInfo = ErrorInfo.newBuilder().putAllMetadata(metadata).build(); + Status status = + Status.newBuilder() + .setCode(StatusCode.Code.OK.ordinal()) + .addDetails(Any.pack(errorInfo)) + .build(); + return StatusProto.toStatusException(status); + } +} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 4491b6ef9..ab7021bba 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -16,12 +16,7 @@ package com.google.cloud.pubsub.v1; -import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.DEFAULT_STREAM_ACK_DEADLINE; -import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.MAX_STREAM_ACK_DEADLINE; -import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.MIN_STREAM_ACK_DEADLINE; -import static com.google.cloud.pubsub.v1.Subscriber.DEFAULT_MAX_DURATION_PER_ACK_EXTENSION; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.ExecutorProvider; @@ -30,9 +25,7 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.grpc.GrpcTransportChannel; -import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.FixedTransportChannelProvider; -import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.*; import com.google.cloud.pubsub.v1.Subscriber.Builder; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; @@ -42,9 +35,7 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -62,6 +53,8 @@ public class SubscriberTest { private FakeScheduledExecutorService fakeExecutor; private FakeSubscriberServiceImpl fakeSubscriberServiceImpl; private Server testServer; + private LinkedBlockingQueue consumersWithResponse; + private MessageReceiverWithAckResponse messageReceiverWithAckResponse; private final MessageReceiver testReceiver = new MessageReceiver() { @@ -75,6 +68,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { @Before public void setUp() throws Exception { + consumersWithResponse = new LinkedBlockingQueue<>(); InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(testName.getMethodName()); fakeSubscriberServiceImpl = new FakeSubscriberServiceImpl(); fakeExecutor = new FakeScheduledExecutorService(); @@ -82,6 +76,16 @@ public void setUp() throws Exception { serverBuilder.addService(fakeSubscriberServiceImpl); testServer = serverBuilder.build(); testServer.start(); + + messageReceiverWithAckResponse = + new MessageReceiverWithAckResponse() { + @Override + public void receiveMessage( + final PubsubMessage message, + final AckReplyConsumerWithResponse consumerWithResponse) { + consumersWithResponse.add(consumerWithResponse); + } + }; } @After @@ -241,7 +245,7 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception { assertEquals( expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); assertEquals( - MIN_STREAM_ACK_DEADLINE.getSeconds(), + Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()), fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); subscriber.stopAsync().awaitTerminated(); @@ -255,7 +259,7 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception { assertEquals( expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); assertEquals( - MAX_STREAM_ACK_DEADLINE.getSeconds(), + Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()), fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); subscriber.stopAsync().awaitTerminated(); @@ -275,18 +279,24 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception { subscriber.stopAsync().awaitTerminated(); // maxDurationPerAckExtension is unset. - maxDurationPerAckExtension = (int) DEFAULT_MAX_DURATION_PER_ACK_EXTENSION.getSeconds(); - subscriber = - startSubscriber( - getTestSubscriberBuilder(testReceiver) - .setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension))); + subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver)); assertEquals( expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); assertEquals( - DEFAULT_STREAM_ACK_DEADLINE.getSeconds(), + Math.toIntExact(Subscriber.STREAM_ACK_DEADLINE_DEFAULT.getSeconds()), fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); subscriber.stopAsync().awaitTerminated(); + + // maxDurationPerAckExtension is unset with exactly once enabled + subscriber = + startSubscriber(getTestSubscriberBuilder(testReceiver).setExactlyOnceDeliveryEnabled(true)); + assertEquals( + expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); + assertEquals( + Math.toIntExact(Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT.getSeconds()), + fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); + subscriber.stopAsync().awaitTerminated(); } @Test @@ -325,8 +335,22 @@ private Subscriber startSubscriber(Builder testSubscriberBuilder) { return subscriber; } - private Builder getTestSubscriberBuilder(MessageReceiver receiver) { - return Subscriber.newBuilder(TEST_SUBSCRIPTION, receiver) + private Builder getTestSubscriberBuilder(MessageReceiver messageReceiver) { + return Subscriber.newBuilder(TEST_SUBSCRIPTION, messageReceiver) + .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) + .setSystemExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) + .setChannelProvider( + FixedTransportChannelProvider.create(GrpcTransportChannel.create(testChannel))) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setClock(fakeExecutor.getClock()) + .setParallelPullCount(1) + .setFlowControlSettings( + FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build()); + } + + private Builder getTestSubscriberBuilder( + MessageReceiverWithAckResponse messageReceiverWithAckResponse) { + return Subscriber.newBuilder(TEST_SUBSCRIPTION, messageReceiverWithAckResponse) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setSystemExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setChannelProvider( @@ -334,7 +358,7 @@ private Builder getTestSubscriberBuilder(MessageReceiver receiver) { .setCredentialsProvider(NoCredentialsProvider.create()) .setClock(fakeExecutor.getClock()) .setParallelPullCount(1) - .setMaxDurationPerAckExtension(Duration.ofSeconds(5)) + .setExactlyOnceDeliveryEnabled(true) .setFlowControlSettings( FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build()); } diff --git a/pom.xml b/pom.xml index b411341b0..1ebae91da 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,12 @@ + + org.mockito + mockito-core + 4.3.1 + test + junit junit