Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ipQueue to use generics #3894

Merged
merged 1 commit into from Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions server/consumer.go
Expand Up @@ -307,7 +307,7 @@ type consumer struct {
ptail *proposal

// Ack queue
ackMsgs *ipQueue
ackMsgs *ipQueue[*jsAckMsg]

// For stream signaling.
sigSub *subscription
Expand Down Expand Up @@ -743,7 +743,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
}
// Create ackMsgs queue now that we have a consumer name
o.ackMsgs = s.newIPQueue(fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name))
o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name))

// Create our request waiting queue.
if o.isPullMode() {
Expand Down Expand Up @@ -3168,8 +3168,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
select {
case <-o.ackMsgs.ch:
acks := o.ackMsgs.pop()
for _, acki := range acks {
ack := acki.(*jsAckMsg)
for _, ack := range acks {
o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg)
ack.returnToPool()
}
Expand Down
5 changes: 2 additions & 3 deletions server/events.go
Expand Up @@ -92,7 +92,7 @@ type internal struct {
sweeper *time.Timer
stmr *time.Timer
replies map[string]msgHandler
sendq *ipQueue // of *pubMsg
sendq *ipQueue[*pubMsg]
resetCh chan struct{}
wg sync.WaitGroup
sq *sendq
Expand Down Expand Up @@ -332,8 +332,7 @@ RESET:
select {
case <-sendq.ch:
msgs := sendq.pop()
for _, pmi := range msgs {
pm := pmi.(*pubMsg)
for _, pm := range msgs {
if pm.si != nil {
pm.si.Name = servername
pm.si.Domain = domain
Expand Down
46 changes: 23 additions & 23 deletions server/ipqueue.go
Expand Up @@ -21,11 +21,11 @@ import (
const ipQueueDefaultMaxRecycleSize = 4 * 1024

// This is a generic intra-process queue.
type ipQueue struct {
type ipQueue[T any] struct {
inprogress int64
sync.RWMutex
ch chan struct{}
elts []interface{}
elts []T
pos int
pool *sync.Pool
mrs int
Expand All @@ -47,12 +47,12 @@ func ipQueue_MaxRecycleSize(max int) ipQueueOpt {
}
}

func (s *Server) newIPQueue(name string, opts ...ipQueueOpt) *ipQueue {
func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt) *ipQueue[T] {
qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize}
for _, o := range opts {
o(&qo)
}
q := &ipQueue{
q := &ipQueue[T]{
ch: make(chan struct{}, 1),
mrs: qo.maxRecycleSize,
pool: &sync.Pool{},
Expand All @@ -66,7 +66,7 @@ func (s *Server) newIPQueue(name string, opts ...ipQueueOpt) *ipQueue {
// Add the element `e` to the queue, notifying the queue channel's `ch` if the
// entry is the first to be added, and returns the length of the queue after
// this element is added.
func (q *ipQueue) push(e interface{}) int {
func (q *ipQueue[T]) push(e T) int {
var signal bool
q.Lock()
l := len(q.elts) - q.pos
Expand All @@ -76,10 +76,10 @@ func (q *ipQueue) push(e interface{}) int {
if eltsi != nil {
// Reason we use pointer to slice instead of slice is explained
// here: https://staticcheck.io/docs/checks#SA6002
q.elts = (*(eltsi.(*[]interface{})))[:0]
q.elts = (*(eltsi.(*[]T)))[:0]
}
if cap(q.elts) == 0 {
q.elts = make([]interface{}, 0, 32)
q.elts = make([]T, 0, 32)
}
}
q.elts = append(q.elts, e)
Expand All @@ -103,8 +103,8 @@ func (q *ipQueue) push(e interface{}) int {
// something, but by the time it calls `pop()`, the drain() would have
// emptied the queue. So the caller should never assume that pop() will
// return a slice of 1 or more, it could return `nil`.
func (q *ipQueue) pop() []interface{} {
var elts []interface{}
func (q *ipQueue[T]) pop() []T {
var elts []T
q.Lock()
if q.pos == 0 {
elts = q.elts
Expand All @@ -117,23 +117,23 @@ func (q *ipQueue) pop() []interface{} {
return elts
}

func (q *ipQueue) resetAndReturnToPool(elts *[]interface{}) {
for i, l := 0, len(*elts); i < l; i++ {
(*elts)[i] = nil
}
func (q *ipQueue[T]) resetAndReturnToPool(elts *[]T) {
(*elts) = (*elts)[:0]
q.pool.Put(elts)
}

// Returns the first element from the queue, if any. See comment above
// regarding calling after being notified that there is something and
// the use of drain(). In short, the caller should always expect that
// pop() or popOne() may return `nil`.
func (q *ipQueue) popOne() interface{} {
// the use of drain(). In short, the caller should always check the
// boolean return value to ensure that the value is genuine and not a
// default empty value.
func (q *ipQueue[T]) popOne() (T, bool) {
q.Lock()
l := len(q.elts) - q.pos
if l < 1 {
q.Unlock()
return nil
var empty T
return empty, false
}
e := q.elts[q.pos]
q.pos++
Expand All @@ -150,7 +150,7 @@ func (q *ipQueue) popOne() interface{} {
q.elts, q.pos = nil, 0
}
q.Unlock()
return e
return e, true
}

// After a pop(), the slice can be recycled for the next push() when
Expand All @@ -159,7 +159,7 @@ func (q *ipQueue) popOne() interface{} {
// of the slice.
// Reason we use pointer to slice instead of slice is explained
// here: https://staticcheck.io/docs/checks#SA6002
func (q *ipQueue) recycle(elts *[]interface{}) {
func (q *ipQueue[T]) recycle(elts *[]T) {
// If invoked with a nil list, nothing to do.
if elts == nil || *elts == nil {
return
Expand All @@ -179,7 +179,7 @@ func (q *ipQueue) recycle(elts *[]interface{}) {
}

// Returns the current length of the queue.
func (q *ipQueue) len() int {
func (q *ipQueue[T]) len() int {
q.RLock()
l := len(q.elts) - q.pos
q.RUnlock()
Expand All @@ -190,7 +190,7 @@ func (q *ipQueue) len() int {
// Note that this could cause a reader go routine that has been
// notified that there is something in the queue (reading from queue's `ch`)
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
func (q *ipQueue) drain() {
func (q *ipQueue[T]) drain() {
if q == nil {
return
}
Expand All @@ -213,13 +213,13 @@ func (q *ipQueue) drain() {
// For that reason, the queue maintains a count of elements returned through
// the pop() API. When the caller will call q.recycle(), this count will
// be reduced by the size of the slice returned by pop().
func (q *ipQueue) inProgress() int64 {
func (q *ipQueue[T]) inProgress() int64 {
return atomic.LoadInt64(&q.inprogress)
}

// Remove this queue from the server's map of ipQueues.
// All ipQueue operations (such as push/pop/etc..) are still possible.
func (q *ipQueue) unregister() {
func (q *ipQueue[T]) unregister() {
if q == nil {
return
}
Expand Down
50 changes: 25 additions & 25 deletions server/ipqueue_test.go
Expand Up @@ -21,7 +21,7 @@ import (

func TestIPQueueBasic(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
// Check that pool has been created
if q.pool == nil {
t.Fatal("Expected pool to have been created")
Expand All @@ -42,7 +42,7 @@ func TestIPQueueBasic(t *testing.T) {
}

// Try to change the max recycle size
q2 := s.newIPQueue("test2", ipQueue_MaxRecycleSize(10))
q2 := newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
if q2.mrs != 10 {
t.Fatalf("Expected max recycle size to be 10, got %v", q2.mrs)
}
Expand Down Expand Up @@ -82,14 +82,14 @@ func TestIPQueueBasic(t *testing.T) {
t.Fatalf("Should have gotten 1 element, got %v", len(elts))
}
q2.push(2)
if e := q2.popOne(); e.(int) != 2 {
if e, ok := q2.popOne(); !ok || e != 2 {
t.Fatalf("popOne failed: %+v", e)
}
}

func TestIPQueuePush(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
q.push(1)
if l := q.len(); l != 1 {
t.Fatalf("Expected len to be 1, got %v", l)
Expand All @@ -115,7 +115,7 @@ func TestIPQueuePush(t *testing.T) {

func TestIPQueuePop(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
q.push(1)
<-q.ch
elts := q.pop()
Expand Down Expand Up @@ -154,14 +154,14 @@ func TestIPQueuePop(t *testing.T) {

func TestIPQueuePopOne(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
q.push(1)
<-q.ch
e := q.popOne()
if e == nil {
e, ok := q.popOne()
if !ok {
t.Fatal("Got nil")
}
if i := e.(int); i != 1 {
if i := e; i != 1 {
t.Fatalf("Expected 1, got %v", i)
}
if l := q.len(); l != 0 {
Expand All @@ -179,11 +179,11 @@ func TestIPQueuePopOne(t *testing.T) {
}
q.push(2)
q.push(3)
e = q.popOne()
if e == nil {
e, ok = q.popOne()
if !ok {
t.Fatal("Got nil")
}
if i := e.(int); i != 2 {
if i := e; i != 2 {
t.Fatalf("Expected 2, got %v", i)
}
if l := q.len(); l != 1 {
Expand All @@ -195,11 +195,11 @@ func TestIPQueuePopOne(t *testing.T) {
default:
t.Fatalf("Should have been notified that there is more")
}
e = q.popOne()
if e == nil {
e, ok = q.popOne()
if !ok {
t.Fatal("Got nil")
}
if i := e.(int); i != 3 {
if i := e; i != 3 {
t.Fatalf("Expected 3, got %v", i)
}
if l := q.len(); l != 0 {
Expand All @@ -213,26 +213,26 @@ func TestIPQueuePopOne(t *testing.T) {
}
// Calling it again now that we know there is nothing, we
// should get nil.
if e = q.popOne(); e != nil {
if e, ok = q.popOne(); ok {
t.Fatalf("Expected nil, got %v", e)
}

q = s.newIPQueue("test2")
q = newIPQueue[int](s, "test2")
q.push(1)
q.push(2)
// Capture current capacity
q.RLock()
c := cap(q.elts)
q.RUnlock()
e = q.popOne()
if e == nil || e.(int) != 1 {
e, ok = q.popOne()
if !ok || e != 1 {
t.Fatalf("Invalid value: %v", e)
}
if l := q.len(); l != 1 {
t.Fatalf("Expected len to be 1, got %v", l)
}
values := q.pop()
if len(values) != 1 || values[0].(int) != 2 {
if len(values) != 1 || values[0] != 2 {
t.Fatalf("Unexpected values: %v", values)
}
if cap(values) != c-1 {
Expand All @@ -247,7 +247,7 @@ func TestIPQueuePopOne(t *testing.T) {

func TestIPQueueMultiProducers(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")

wg := sync.WaitGroup{}
wg.Add(3)
Expand All @@ -269,7 +269,7 @@ func TestIPQueueMultiProducers(t *testing.T) {
case <-q.ch:
values := q.pop()
for _, v := range values {
m[v.(int)] = struct{}{}
m[v] = struct{}{}
}
q.recycle(&values)
if n := q.inProgress(); n != 0 {
Expand All @@ -285,7 +285,7 @@ func TestIPQueueMultiProducers(t *testing.T) {

func TestIPQueueRecycle(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
total := 1000
for iter := 0; iter < 5; iter++ {
var sz int
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestIPQueueRecycle(t *testing.T) {
}
}

q = s.newIPQueue("test2", ipQueue_MaxRecycleSize(10))
q = newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
for i := 0; i < 100; i++ {
q.push(i)
}
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestIPQueueRecycle(t *testing.T) {

func TestIPQueueDrain(t *testing.T) {
s := &Server{}
q := s.newIPQueue("test")
q := newIPQueue[int](s, "test")
for iter, recycled := 0, false; iter < 5 && !recycled; iter++ {
for i := 0; i < 100; i++ {
q.push(i + 1)
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream.go
Expand Up @@ -142,7 +142,7 @@ type jsAccount struct {
store TemplateStore

// From server
sendq *ipQueue // of *pubMsg
sendq *ipQueue[*pubMsg]

// Usage/limits related fields that will be protected by usageMu
usageMu sync.RWMutex
Expand Down