/
partition_state.go
175 lines (150 loc) · 3.97 KB
/
partition_state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package main
import (
"container/list"
"github.com/benbjohnson/clock"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
type State int8
const (
Before State = iota
Available
Unavailable
)
type EventSink interface {
submitLatency(latency time.Duration)
changeState(state State)
}
type StatsEventSink struct {
statSink *StatSink
}
func (ses StatsEventSink) submitLatency(latency time.Duration) {
log.Debugf("Received latency %s", latency.String())
ses.statSink.Put(latency)
}
func (StatsEventSink) changeState(state State) {
switch state {
case Available:
log.Infof("Topic is available")
case Unavailable:
log.Infof("Topic is no longer available")
}
}
type StateWatcher struct {
channel chan msg
waitGroup *sync.WaitGroup
}
func (s *StateWatcher) sent(seq uint64) {
log.Debugf("Called sent with seq %d", seq)
s.channel <- msg{Sent, seq}
}
func (s *StateWatcher) received(seq uint64) {
log.Debugf("Called received with seq %d", seq)
s.channel <- msg{Received, seq}
}
func (s *StateWatcher) stop() {
close(s.channel)
s.waitGroup.Wait()
}
func NewStateWatcher(sink EventSink) *StateWatcher {
return newStateWatcherWithClock(sink, clock.New())
}
func newStateWatcherWithClock(sink EventSink, clock clock.Clock) *StateWatcher {
c := make(chan msg)
var waitGroup sync.WaitGroup
waitGroup.Add(1)
stateWatcher := &StateWatcher{c, &waitGroup}
go statusUpdaterLoop(stateWatcher, sink, clock)
return stateWatcher
}
// we want to make sure that the Timer waits slightly longer than timeout to make sure that
// we have already reached the timeout threshold when waking up
const AdditionalTimerTime = 10 * time.Millisecond
type partitionState struct {
state State
clock clock.Clock
timeout time.Duration
lastLatency time.Duration
lastReceived time.Time
}
// This method is intended to run in its own goroutine and is responsible for detecting changes
// to state and calling EventSink.changeState() when that happens
func statusUpdaterLoop(stateWatcher *StateWatcher, sink EventSink, clock clock.Clock) {
state := &partitionState{
clock: clock,
timeout: 10 * time.Second,
}
sentTimes := list.New()
timer := clock.Timer(state.timeout + AdditionalTimerTime)
for {
select {
case <-timer.C:
updateState(state, sink)
case msg, more := <-stateWatcher.channel:
if !more {
log.Info("Updater loop is being asked to quit, exiting")
timer.Stop()
stateWatcher.waitGroup.Done()
return
}
if msg.t == Sent {
timer.Reset(state.timeout + AdditionalTimerTime)
sentTimes.PushBack(seqSentTime{seq: msg.seq, sent: state.clock.Now()})
} else {
sentTime := removeBySeq(sentTimes, msg.seq)
if sentTime == nil {
log.Warnf("Received a response without channel time, probably a duplicate")
} else {
state.lastReceived = clock.Now()
state.lastLatency = clock.Now().Sub(*sentTime)
sink.submitLatency(state.lastLatency)
updateState(state, sink)
timer.Stop()
}
}
}
}
}
type msgType int8
const (
Sent msgType = iota
Received
)
type msg struct {
t msgType
seq uint64
}
// If a seqSentTime with sequence number matching seq, delete the item from list
// and return a pointer to it's channel time. If not found, return nil
func removeBySeq(sentTimes *list.List, seq uint64) *time.Time {
for e := sentTimes.Front(); e != nil; e = e.Next() {
sst := e.Value.(seqSentTime)
if sst.seq == seq {
sentTimes.Remove(e)
return &sst.sent
}
}
return nil
}
type seqSentTime struct {
seq uint64
sent time.Time
}
func updateState(state *partitionState, sink EventSink) {
s := calculateState(state)
if state.state != s {
sink.changeState(s)
state.state = s
}
}
func calculateState(state *partitionState) State {
if state.lastReceived.Add(state.timeout).Before(state.clock.Now()) {
log.Debugf("The last Received message was more than %s ago, partition is unavailable", state.timeout.String())
return Unavailable
}
if state.lastLatency < state.timeout {
return Available
}
return Unavailable
}