diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerImpl.java new file mode 100644 index 000000000..f9d4bd15a --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerImpl.java @@ -0,0 +1,37 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.pubsub.v1; + +import com.google.api.core.SettableApiFuture; + +public class AckReplyConsumerImpl implements AckReplyConsumer { + final SettableApiFuture ackReplySettableApiFuture; + + public AckReplyConsumerImpl( + final SettableApiFuture ackReplySettableApiFuture) { + this.ackReplySettableApiFuture = ackReplySettableApiFuture; + } + + @Override + public void ack() { + ackReplySettableApiFuture.set(MessageDispatcher.AckReply.ACK); + } + + @Override + public void nack() { + ackReplySettableApiFuture.set(MessageDispatcher.AckReply.NACK); + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponseImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponseImpl.java new file mode 100644 index 000000000..d0a0316af --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponseImpl.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.pubsub.v1; + +import com.google.api.core.SettableApiFuture; +import java.util.concurrent.Future; + +public class AckReplyConsumerWithResponseImpl implements AckReplyConsumerWithResponse { + final SettableApiFuture ackReplySettableApiFuture; + final SettableApiFuture messageFuture; + + public AckReplyConsumerWithResponseImpl( + SettableApiFuture ackReplySettableApiFuture, + SettableApiFuture messageFuture) { + this.ackReplySettableApiFuture = ackReplySettableApiFuture; + this.messageFuture = messageFuture; + } + + @Override + public Future ack() { + ackReplySettableApiFuture.set(MessageDispatcher.AckReply.ACK); + return messageFuture; + } + + @Override + public Future nack() { + ackReplySettableApiFuture.set(MessageDispatcher.AckReply.NACK); + return messageFuture; + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index de51d3bf5..3c772819a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -434,33 +433,11 @@ public void run() { SettableApiFuture messageFuture = ackHandler.getMessageFutureIfExists(); final AckReplyConsumerWithResponse ackReplyConsumerWithResponse = - new AckReplyConsumerWithResponse() { - @Override - public Future ack() { - ackReplySettableApiFuture.set(AckReply.ACK); - return messageFuture; - } - - @Override - public Future nack() { - ackReplySettableApiFuture.set(AckReply.NACK); - return messageFuture; - } - }; + new AckReplyConsumerWithResponseImpl(ackReplySettableApiFuture, messageFuture); receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse); } else { final AckReplyConsumer ackReplyConsumer = - new AckReplyConsumer() { - @Override - public void ack() { - ackReplySettableApiFuture.set(AckReply.ACK); - } - - @Override - public void nack() { - ackReplySettableApiFuture.set(AckReply.NACK); - } - }; + new AckReplyConsumerImpl(ackReplySettableApiFuture); receiver.receiveMessage(message, ackReplyConsumer); } } catch (Exception e) {