Skip to content

Commit

Permalink
[IMPROVED] Head of line improvements (#4027)
Browse files Browse the repository at this point in the history
 Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 6, 2023
2 parents 8154136 + e76b0b9 commit a5326c9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 15 deletions.
8 changes: 4 additions & 4 deletions server/ipqueue.go
@@ -1,4 +1,4 @@
// Copyright 2021 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -23,7 +23,7 @@ const ipQueueDefaultMaxRecycleSize = 4 * 1024
// This is a generic intra-process queue.
type ipQueue[T any] struct {
inprogress int64
sync.RWMutex
sync.Mutex
ch chan struct{}
elts []T
pos int
Expand Down Expand Up @@ -180,9 +180,9 @@ func (q *ipQueue[T]) recycle(elts *[]T) {

// Returns the current length of the queue.
func (q *ipQueue[T]) len() int {
q.RLock()
q.Lock()
l := len(q.elts) - q.pos
q.RUnlock()
q.Unlock()
return l
}

Expand Down
8 changes: 4 additions & 4 deletions server/ipqueue_test.go
Expand Up @@ -221,9 +221,9 @@ func TestIPQueuePopOne(t *testing.T) {
q.push(1)
q.push(2)
// Capture current capacity
q.RLock()
q.Lock()
c := cap(q.elts)
q.RUnlock()
q.Unlock()
e, ok = q.popOne()
if !ok || e != 1 {
t.Fatalf("Invalid value: %v", e)
Expand Down Expand Up @@ -343,9 +343,9 @@ func TestIPQueueRecycle(t *testing.T) {
values = q.pop()
q.recycle(&values)
q.push(1002)
q.RLock()
q.Lock()
recycled := &q.elts == &values
q.RUnlock()
q.Unlock()
if recycled {
t.Fatalf("Unexpected recycled slice")
}
Expand Down
15 changes: 8 additions & 7 deletions server/raft.go
Expand Up @@ -1758,9 +1758,15 @@ func (n *raft) setObserver(isObserver bool, extSt extensionState) {

// Invoked when being notified that there is something in the entryc's queue
func (n *raft) processAppendEntries() {
ok := !n.outOfResources()
if !ok {
n.debug("AppendEntry not processing inbound, no resources")
}
aes := n.entry.pop()
for _, ae := range aes {
n.processAppendEntry(ae, ae.sub)
if ok {
for _, ae := range aes {
n.processAppendEntry(ae, ae.sub)
}
}
n.entry.recycle(&aes)
}
Expand Down Expand Up @@ -2697,11 +2703,6 @@ func (n *raft) runAsCandidate() {

// handleAppendEntry handles an append entry from the wire.
func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
if n.outOfResources() {
n.debug("AppendEntry not processing inbound, no resources")
return
}

msg = copyBytes(msg)
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil {
n.entry.push(ae)
Expand Down

0 comments on commit a5326c9

Please sign in to comment.