From f65c40614f2df9d63fd162869dd0582598ef7fa6 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 12 Sep 2022 23:42:22 +0000 Subject: [PATCH 1/2] fix(pubsub): remove unused AckResult map --- pubsub/iterator.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 0ebb600f80d..b1bb7ac2a2c 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -96,9 +96,6 @@ type messageIterator struct { eoMu sync.RWMutex enableExactlyOnceDelivery bool sendNewAckDeadline bool - // This stores pending AckResults for cleaner shutdown when sub.Receive's ctx is cancelled. - // If exactly once delivery is not enabled, this map should not be populated. - pendingAckResults map[string]*AckResult } // newMessageIterator starts and returns a new messageIterator. @@ -144,7 +141,6 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt pendingAcks: map[string]*AckResult{}, pendingNacks: map[string]*AckResult{}, pendingModAcks: map[string]*AckResult{}, - pendingAckResults: map[string]*AckResult{}, } it.wg.Add(1) go it.sender() @@ -198,7 +194,6 @@ func (it *messageIterator) done(ackID string, ack bool, r *AckResult, receiveTim it.mu.Lock() defer it.mu.Unlock() delete(it.keepAliveDeadlines, ackID) - delete(it.pendingAckResults, ackID) if ack { it.pendingAcks[ackID] = r } else { @@ -277,15 +272,6 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // close the channel without checking if it exists. ackIDs[ackID] = newSuccessAckResult() } - // If exactly once is enabled, keep track of all pending AckResults - // so we can cleanly close them all at shutdown. - if enableExactlyOnceDelivery { - ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler) - if !ok { - it.fail(errors.New("failed to assert type as psAckHandler")) - } - it.pendingAckResults[ackID] = ackh.ackResult - } } deadline := it.ackDeadline() it.mu.Unlock() From 542652037c471b4e8deb23a4ec351a436e800f4a Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 13 Sep 2022 13:12:55 -0700 Subject: [PATCH 2/2] remove unused variable --- pubsub/iterator.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index b1bb7ac2a2c..ced6a5c9008 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -256,9 +256,6 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { maxExt := time.Now().Add(it.po.maxExtension) ackIDs := map[string]*AckResult{} it.mu.Lock() - it.eoMu.RLock() - enableExactlyOnceDelivery := it.enableExactlyOnceDelivery - it.eoMu.RUnlock() for _, m := range msgs { ackID := msgAckID(m) addRecv(m.ID, ackID, now)