From 5f690022551ac584e5c66af4324a17d7044a898d Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 13 Sep 2022 21:24:03 -0700 Subject: [PATCH] fix(pubsub): remove unused AckResult map (#6656) --- pubsub/iterator.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 0ebb600f80d..ced6a5c9008 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -96,9 +96,6 @@ type messageIterator struct { eoMu sync.RWMutex enableExactlyOnceDelivery bool sendNewAckDeadline bool - // This stores pending AckResults for cleaner shutdown when sub.Receive's ctx is cancelled. - // If exactly once delivery is not enabled, this map should not be populated. - pendingAckResults map[string]*AckResult } // newMessageIterator starts and returns a new messageIterator. @@ -144,7 +141,6 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt pendingAcks: map[string]*AckResult{}, pendingNacks: map[string]*AckResult{}, pendingModAcks: map[string]*AckResult{}, - pendingAckResults: map[string]*AckResult{}, } it.wg.Add(1) go it.sender() @@ -198,7 +194,6 @@ func (it *messageIterator) done(ackID string, ack bool, r *AckResult, receiveTim it.mu.Lock() defer it.mu.Unlock() delete(it.keepAliveDeadlines, ackID) - delete(it.pendingAckResults, ackID) if ack { it.pendingAcks[ackID] = r } else { @@ -261,9 +256,6 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { maxExt := time.Now().Add(it.po.maxExtension) ackIDs := map[string]*AckResult{} it.mu.Lock() - it.eoMu.RLock() - enableExactlyOnceDelivery := it.enableExactlyOnceDelivery - it.eoMu.RUnlock() for _, m := range msgs { ackID := msgAckID(m) addRecv(m.ID, ackID, now) @@ -277,15 +269,6 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // close the channel without checking if it exists. ackIDs[ackID] = newSuccessAckResult() } - // If exactly once is enabled, keep track of all pending AckResults - // so we can cleanly close them all at shutdown. - if enableExactlyOnceDelivery { - ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler) - if !ok { - it.fail(errors.New("failed to assert type as psAckHandler")) - } - it.pendingAckResults[ackID] = ackh.ackResult - } } deadline := it.ackDeadline() it.mu.Unlock()