Skip to content

Commit

Permalink
fix(pubsub): remove unused AckResult map (#6656)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Sep 14, 2022
1 parent 6a0080a commit 5f69002
Showing 1 changed file with 0 additions and 17 deletions.
17 changes: 0 additions & 17 deletions pubsub/iterator.go
Expand Up @@ -96,9 +96,6 @@ type messageIterator struct {
eoMu sync.RWMutex
enableExactlyOnceDelivery bool
sendNewAckDeadline bool
// This stores pending AckResults for cleaner shutdown when sub.Receive's ctx is cancelled.
// If exactly once delivery is not enabled, this map should not be populated.
pendingAckResults map[string]*AckResult
}

// newMessageIterator starts and returns a new messageIterator.
Expand Down Expand Up @@ -144,7 +141,6 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
pendingAcks: map[string]*AckResult{},
pendingNacks: map[string]*AckResult{},
pendingModAcks: map[string]*AckResult{},
pendingAckResults: map[string]*AckResult{},
}
it.wg.Add(1)
go it.sender()
Expand Down Expand Up @@ -198,7 +194,6 @@ func (it *messageIterator) done(ackID string, ack bool, r *AckResult, receiveTim
it.mu.Lock()
defer it.mu.Unlock()
delete(it.keepAliveDeadlines, ackID)
delete(it.pendingAckResults, ackID)
if ack {
it.pendingAcks[ackID] = r
} else {
Expand Down Expand Up @@ -261,9 +256,6 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
maxExt := time.Now().Add(it.po.maxExtension)
ackIDs := map[string]*AckResult{}
it.mu.Lock()
it.eoMu.RLock()
enableExactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
for _, m := range msgs {
ackID := msgAckID(m)
addRecv(m.ID, ackID, now)
Expand All @@ -277,15 +269,6 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
// close the channel without checking if it exists.
ackIDs[ackID] = newSuccessAckResult()
}
// If exactly once is enabled, keep track of all pending AckResults
// so we can cleanly close them all at shutdown.
if enableExactlyOnceDelivery {
ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler)
if !ok {
it.fail(errors.New("failed to assert type as psAckHandler"))
}
it.pendingAckResults[ackID] = ackh.ackResult
}
}
deadline := it.ackDeadline()
it.mu.Unlock()
Expand Down

0 comments on commit 5f69002

Please sign in to comment.