Skip to content

Commit

Permalink
Merge pull request #3894 from nats-io/neil/ipqueue
Browse files Browse the repository at this point in the history
Refactor `ipQueue` to use generics
  • Loading branch information
derekcollison committed Feb 21, 2023
2 parents 18b5aca + 68961ff commit baf2e47
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 197 deletions.
7 changes: 3 additions & 4 deletions server/consumer.go
Expand Up @@ -307,7 +307,7 @@ type consumer struct {
ptail *proposal

// Ack queue
ackMsgs *ipQueue
ackMsgs *ipQueue[*jsAckMsg]

// For stream signaling.
sigSub *subscription
Expand Down Expand Up @@ -743,7 +743,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
}
// Create ackMsgs queue now that we have a consumer name
o.ackMsgs = s.newIPQueue(fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name))
o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name))

// Create our request waiting queue.
if o.isPullMode() {
Expand Down Expand Up @@ -3168,8 +3168,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
select {
case <-o.ackMsgs.ch:
acks := o.ackMsgs.pop()
for _, acki := range acks {
ack := acki.(*jsAckMsg)
for _, ack := range acks {
o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg)
ack.returnToPool()
}
Expand Down
5 changes: 2 additions & 3 deletions server/events.go
Expand Up @@ -92,7 +92,7 @@ type internal struct {
sweeper *time.Timer
stmr *time.Timer
replies map[string]msgHandler
sendq *ipQueue // of *pubMsg
sendq *ipQueue[*pubMsg]
resetCh chan struct{}
wg sync.WaitGroup
sq *sendq
Expand Down Expand Up @@ -332,8 +332,7 @@ RESET:
select {
case <-sendq.ch:
msgs := sendq.pop()
for _, pmi := range msgs {
pm := pmi.(*pubMsg)
for _, pm := range msgs {
if pm.si != nil {
pm.si.Name = servername
pm.si.Domain = domain
Expand Down
46 changes: 23 additions & 23 deletions server/ipqueue.go
Expand Up @@ -21,11 +21,11 @@ import (
const ipQueueDefaultMaxRecycleSize = 4 * 1024

// This is a generic intra-process queue.
type ipQueue struct {
type ipQueue[T any] struct {
inprogress int64
sync.RWMutex
ch chan struct{}
elts []interface{}
elts []T
pos int
pool *sync.Pool
mrs int
Expand All @@ -47,12 +47,12 @@ func ipQueue_MaxRecycleSize(max int) ipQueueOpt {
}
}

func (s *Server) newIPQueue(name string, opts ...ipQueueOpt) *ipQueue {
func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt) *ipQueue[T] {
qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize}
for _, o := range opts {
o(&qo)
}
q := &ipQueue{
q := &ipQueue[T]{
ch: make(chan struct{}, 1),
mrs: qo.maxRecycleSize,
pool: &sync.Pool{},
Expand All @@ -66,7 +66,7 @@ func (s *Server) newIPQueue(name string, opts ...ipQueueOpt) *ipQueue {
// Add the element `e` to the queue, notifying the queue channel's `ch` if the
// entry is the first to be added, and returns the length of the queue after
// this element is added.
func (q *ipQueue) push(e interface{}) int {
func (q *ipQueue[T]) push(e T) int {
var signal bool
q.Lock()
l := len(q.elts) - q.pos
Expand All @@ -76,10 +76,10 @@ func (q *ipQueue) push(e interface{}) int {
if eltsi != nil {
// Reason we use pointer to slice instead of slice is explained
// here: https://staticcheck.io/docs/checks#SA6002
q.elts = (*(eltsi.(*[]interface{})))[:0]
q.elts = (*(eltsi.(*[]T)))[:0]
}
if cap(q.elts) == 0 {
q.elts = make([]interface{}, 0, 32)
q.elts = make([]T, 0, 32)
}
}
q.elts = append(q.elts, e)
Expand All @@ -103,8 +103,8 @@ func (q *ipQueue) push(e interface{}) int {
// something, but by the time it calls `pop()`, the drain() would have
// emptied the queue. So the caller should never assume that pop() will
// return a slice of 1 or more, it could return `nil`.
func (q *ipQueue) pop() []interface{} {
var elts []interface{}
func (q *ipQueue[T]) pop() []T {
var elts []T
q.Lock()
if q.pos == 0 {
elts = q.elts
Expand All @@ -117,23 +117,23 @@ func (q *ipQueue) pop() []interface{} {
return elts
}

func (q *ipQueue) resetAndReturnToPool(elts *[]interface{}) {
for i, l := 0, len(*elts); i < l; i++ {
(*elts)[i] = nil
}
func (q *ipQueue[T]) resetAndReturnToPool(elts *[]T) {
(*elts) = (*elts)[:0]
q.pool.Put(elts)
}

// Returns the first element from the queue, if any. See comment above
// regarding calling after being notified that there is something and
// the use of drain(). In short, the caller should always expect that
// pop() or popOne() may return `nil`.
func (q *ipQueue) popOne() interface{} {
// the use of drain(). In short, the caller should always check the
// boolean return value to ensure that the value is genuine and not a
// default empty value.
func (q *ipQueue[T]) popOne() (T, bool) {
q.Lock()
l := len(q.elts) - q.pos
if l < 1 {
q.Unlock()
return nil
var empty T
return empty, false
}
e := q.elts[q.pos]
q.pos++
Expand All @@ -150,7 +150,7 @@ func (q *ipQueue) popOne() interface{} {
q.elts, q.pos = nil, 0
}
q.Unlock()
return e
return e, true
}

// After a pop(), the slice can be recycled for the next push() when
Expand All @@ -159,7 +159,7 @@ func (q *ipQueue) popOne() interface{} {
// of the slice.
// Reason we use pointer to slice instead of slice is explained
// here: https://staticcheck.io/docs/checks#SA6002
func (q *ipQueue) recycle(elts *[]interface{}) {
func (q *ipQueue[T]) recycle(elts *[]T) {
// If invoked with a nil list, nothing to do.
if elts == nil || *elts == nil {
return
Expand All @@ -179,7 +179,7 @@ func (q *ipQueue) recycle(elts *[]interface{}) {
}

// Returns the current length of the queue.
func (q *ipQueue) len() int {
func (q *ipQueue[T]) len() int {
q.RLock()
l := len(q.elts) - q.pos
q.RUnlock()
Expand All @@ -190,7 +190,7 @@ func (q *ipQueue) len() int {
// Note that this could cause a reader go routine that has been
// notified that there is something in the queue (reading from queue's `ch`)
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
func (q *ipQueue) drain() {
func (q *ipQueue[T]) drain() {
if q == nil {
return
}
Expand All @@ -213,13 +213,13 @@ func (q *ipQueue) drain() {
// For that reason, the queue maintains a count of elements returned through
// the pop() API. When the caller will call q.recycle(), this count will
// be reduced by the size of the slice returned by pop().
func (q *ipQueue) inProgress() int64 {
func (q *ipQueue[T]) inProgress() int64 {
return atomic.LoadInt64(&q.inprogress)
}

// Remove this queue from the server's map of ipQueues.
// All ipQueue operations (such as push/pop/etc..) are still possible.
func (q *ipQueue) unregister() {
func (q *ipQueue[T]) unregister() {
if q == nil {
return
}
Expand Down
50 changes: 25 additions & 25 deletions server/ipqueue_test.go
Expand Up @@ -21,7 +21,7 @@ import (

func TestIPQueueBasic(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
// Check that pool has been created
if q.pool == nil {
t.Fatal("Expected pool to have been created")
Expand All @@ -42,7 +42,7 @@ func TestIPQueueBasic(t *testing.T) {
}

// Try to change the max recycle size
q2 := s.newIPQueue("test2", ipQueue_MaxRecycleSize(10))
q2 := newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
if q2.mrs != 10 {
t.Fatalf("Expected max recycle size to be 10, got %v", q2.mrs)
}
Expand Down Expand Up @@ -82,14 +82,14 @@ func TestIPQueueBasic(t *testing.T) {
t.Fatalf("Should have gotten 1 element, got %v", len(elts))
}
q2.push(2)
if e := q2.popOne(); e.(int) != 2 {
if e, ok := q2.popOne(); !ok || e != 2 {
t.Fatalf("popOne failed: %+v", e)
}
}

func TestIPQueuePush(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
q.push(1)
if l := q.len(); l != 1 {
t.Fatalf("Expected len to be 1, got %v", l)
Expand All @@ -115,7 +115,7 @@ func TestIPQueuePush(t *testing.T) {

func TestIPQueuePop(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
q.push(1)
<-q.ch
elts := q.pop()
Expand Down Expand Up @@ -154,14 +154,14 @@ func TestIPQueuePop(t *testing.T) {

func TestIPQueuePopOne(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
q.push(1)
<-q.ch
e := q.popOne()
if e == nil {
e, ok := q.popOne()
if !ok {
t.Fatal("Got nil")
}
if i := e.(int); i != 1 {
if i := e; i != 1 {
t.Fatalf("Expected 1, got %v", i)
}
if l := q.len(); l != 0 {
Expand All @@ -179,11 +179,11 @@ func TestIPQueuePopOne(t *testing.T) {
}
q.push(2)
q.push(3)
e = q.popOne()
if e == nil {
e, ok = q.popOne()
if !ok {
t.Fatal("Got nil")
}
if i := e.(int); i != 2 {
if i := e; i != 2 {
t.Fatalf("Expected 2, got %v", i)
}
if l := q.len(); l != 1 {
Expand All @@ -195,11 +195,11 @@ func TestIPQueuePopOne(t *testing.T) {
default:
t.Fatalf("Should have been notified that there is more")
}
e = q.popOne()
if e == nil {
e, ok = q.popOne()
if !ok {
t.Fatal("Got nil")
}
if i := e.(int); i != 3 {
if i := e; i != 3 {
t.Fatalf("Expected 3, got %v", i)
}
if l := q.len(); l != 0 {
Expand All @@ -213,26 +213,26 @@ func TestIPQueuePopOne(t *testing.T) {
}
// Calling it again now that we know there is nothing, we
// should get nil.
if e = q.popOne(); e != nil {
if e, ok = q.popOne(); ok {
t.Fatalf("Expected nil, got %v", e)
}

q = s.newIPQueue("test2")
q = newIPQueue[int](s, "test2")
q.push(1)
q.push(2)
// Capture current capacity
q.RLock()
c := cap(q.elts)
q.RUnlock()
e = q.popOne()
if e == nil || e.(int) != 1 {
e, ok = q.popOne()
if !ok || e != 1 {
t.Fatalf("Invalid value: %v", e)
}
if l := q.len(); l != 1 {
t.Fatalf("Expected len to be 1, got %v", l)
}
values := q.pop()
if len(values) != 1 || values[0].(int) != 2 {
if len(values) != 1 || values[0] != 2 {
t.Fatalf("Unexpected values: %v", values)
}
if cap(values) != c-1 {
Expand All @@ -247,7 +247,7 @@ func TestIPQueuePopOne(t *testing.T) {

func TestIPQueueMultiProducers(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")

wg := sync.WaitGroup{}
wg.Add(3)
Expand All @@ -269,7 +269,7 @@ func TestIPQueueMultiProducers(t *testing.T) {
case <-q.ch:
values := q.pop()
for _, v := range values {
m[v.(int)] = struct{}{}
m[v] = struct{}{}
}
q.recycle(&values)
if n := q.inProgress(); n != 0 {
Expand All @@ -285,7 +285,7 @@ func TestIPQueueMultiProducers(t *testing.T) {

func TestIPQueueRecycle(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
total := 1000
for iter := 0; iter < 5; iter++ {
var sz int
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestIPQueueRecycle(t *testing.T) {
}
}

q = s.newIPQueue("test2", ipQueue_MaxRecycleSize(10))
q = newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
for i := 0; i < 100; i++ {
q.push(i)
}
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestIPQueueRecycle(t *testing.T) {

func TestIPQueueDrain(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
for iter, recycled := 0, false; iter < 5 && !recycled; iter++ {
for i := 0; i < 100; i++ {
q.push(i + 1)
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream.go
Expand Up @@ -142,7 +142,7 @@ type jsAccount struct {
store TemplateStore

// From server
sendq *ipQueue // of *pubMsg
sendq *ipQueue[*pubMsg]

// Usage/limits related fields that will be protected by usageMu
usageMu sync.RWMutex
Expand Down

0 comments on commit baf2e47

Please sign in to comment.