-
Notifications
You must be signed in to change notification settings - Fork 42
/
msgbuffer.go
111 lines (88 loc) · 2 KB
/
msgbuffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package bus
import (
"sync"
)
const (
defaultBufferSize = 128
)
// MsgBuffer is a buffer of messages with a particular size limit.
// Oldest messages are automatically evicted as new ones are added
// past said limit. Push(), Next(), and Iter() are thread-safe.
type MsgBuffer struct {
msgs map[string]messageWithOrigin
order []string
limit int
startIndex int
nextIndex int
lock sync.RWMutex
}
func NewMsgBuffer(limit int) *MsgBuffer {
m := &MsgBuffer{
msgs: map[string]messageWithOrigin{},
order: []string{},
limit: limit,
startIndex: 0,
nextIndex: 0,
lock: sync.RWMutex{},
}
return m
}
// Push pushes a new message onto the end of the buffer and evicts the oldest, if needed (based on limit)
func (m *MsgBuffer) Push(msg messageWithOrigin) {
m.lock.Lock()
defer m.lock.Unlock()
m.msgs[msg.UUID()] = msg
lastIndex := len(m.order) - 1
if len(m.order) == m.limit {
delete(m.msgs, m.order[m.startIndex]) // delete the current "first"
m.order[m.startIndex] = msg.UUID()
if m.startIndex == lastIndex {
m.startIndex = 0
} else {
m.startIndex++
}
} else {
m.order = append(m.order, msg.UUID())
}
}
// Next returns the next message in the buffer,
// continually looping over the buffer (all callers share ordering)
func (m *MsgBuffer) Next() Message {
m.lock.Lock()
defer m.lock.Unlock()
if len(m.order) == 0 {
return nil
}
uuid := m.order[m.nextIndex]
msg := m.msgs[uuid]
if m.nextIndex == len(m.order)-1 {
m.nextIndex = 0
} else {
m.nextIndex++
}
return msg
}
// Iter calls msgFunc once per message in the buffer
func (m *MsgBuffer) Iter(msgFunc func(messageWithOrigin) error) {
m.lock.RLock()
defer m.lock.RUnlock()
if len(m.order) == 0 {
return
}
index := m.startIndex
lastIndex := len(m.order) - 1
more := true
for more {
uuid := m.order[index]
msg := m.msgs[uuid]
msgFunc(msg)
if index == lastIndex {
index = 0
} else {
index++
}
if index == m.startIndex {
more = false
}
}
}