-
Notifications
You must be signed in to change notification settings - Fork 42
/
pod_test.go
397 lines (294 loc) · 8.49 KB
/
pod_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
package bus
import (
"bytes"
"errors"
"fmt"
"testing"
"time"
"github.com/suborbital/e2core/foundation/bus/testutil"
)
func TestPodFilter(t *testing.T) {
g := New()
counter := testutil.NewAsyncCounter(100)
onFunc := func(msg Message) error {
counter.Count()
return nil
}
p1 := g.Connect()
p1.On(onFunc)
p2 := g.Connect()
p2.On(onFunc)
for i := 0; i < 10; i++ {
p1.Send(NewMsg(MsgTypeDefault, []byte(fmt.Sprintf("hello, world %d", i))))
}
// only 10 should be tracked because p1 should have filtered out the messages that it sent
// and then they should not reach its own onFunc
if err := counter.Wait(10, 1); err != nil {
t.Error(err)
}
}
func TestPodFilterMessageSentBySelf(t *testing.T) {
g := New()
counter := testutil.NewAsyncCounter(100)
onFunc := func(msg Message) error {
counter.Count()
return nil
}
p1 := g.Connect()
p1.On(onFunc)
p2 := g.Connect()
//p2.On(onFunc)
p1.Send(NewMsg(MsgTypeDefault, []byte("hello p2")))
// Message sent by p1 should end up with self
if err := counter.Wait(0, 1); err != nil {
t.Error(err)
}
p2.Send(NewMsg(MsgTypeDefault, []byte("hello p1")))
// Message sent by p2 should end up with p1
if err := counter.Wait(1, 1); err != nil {
t.Error(err)
}
}
func TestWaitOn(t *testing.T) {
g := New()
p1 := g.Connect()
go func() {
time.Sleep(time.Duration(time.Millisecond * 500))
p1.Send(NewMsg(MsgTypeDefault, []byte("hello, world")))
time.Sleep(time.Duration(time.Millisecond * 500))
p1.Send(NewMsg(MsgTypeDefault, []byte("goodbye, world")))
}()
errGoodbye := errors.New("goodbye")
p2 := g.Connect()
if err := p2.WaitOn(func(msg Message) error {
if bytes.Equal(msg.Data(), []byte("hello, world")) {
return nil
}
return ErrMsgNotWanted
}); err != nil {
t.Errorf("expected nil error, got %s", err)
}
if err := p2.WaitOn(func(msg Message) error {
if bytes.Equal(msg.Data(), []byte("goodbye, world")) {
return errGoodbye
}
return ErrMsgNotWanted
}); err != errGoodbye {
t.Errorf("expected errGoodbye error, got %s", err)
}
}
const msgTypeBad = "test.bad"
func TestPodFailure(t *testing.T) {
g := New()
counter := testutil.NewAsyncCounter(200)
// create one pod that returns errors on "bad" messages
p := g.Connect()
p.On(func(msg Message) error {
counter.Count()
if msg.Type() == msgTypeBad {
return errors.New("bad message")
}
return nil
})
// and another 9 that don't
for i := 0; i < 9; i++ {
p2 := g.Connect()
p2.On(func(msg Message) error {
counter.Count()
return nil
})
}
pod := g.Connect()
// send 64 "bad" messages (64 reaches the highwater mark)
for i := 0; i < 64; i++ {
pod.Send(NewMsg(msgTypeBad, []byte(fmt.Sprintf("hello, world %d", i))))
}
time.Sleep(time.Second)
// send 10 more "bad" messages
for i := 0; i < 10; i++ {
pod.Send(NewMsg(msgTypeBad, []byte(fmt.Sprintf("hello, world %d", i))))
}
// 730 because the 64th message to the "bad" pod put it over the highwater
// mark and so the last 10 message would never be delievered
// sleeps were needed to allow all of the internal goroutines to finish execing
// in the worst case scenario of a single process machine (which lots of containers are)
if err := counter.Wait(730, 1); err != nil {
t.Error(err)
}
// the first pod should now have been disconnected, causing only 9 recievers reset and test again
// send 10 "normal" messages
for i := 0; i < 10; i++ {
pod.Send(NewMsg(MsgTypeDefault, []byte(fmt.Sprintf("hello, world %d", i))))
}
if err := counter.Wait(90, 1); err != nil {
t.Error(err)
}
}
func TestPodFailurePt2(t *testing.T) {
// test where the "bad" pod is somewhere in the "middle" of the ring
g := New()
counter := testutil.NewAsyncCounter(200)
for i := 0; i < 4; i++ {
p2 := g.Connect()
p2.On(func(msg Message) error {
counter.Count()
return nil
})
}
// create one pod that returns errors on "bad" messages
p := g.Connect()
p.On(func(msg Message) error {
counter.Count()
if msg.Type() == msgTypeBad {
return errors.New("bad message")
}
return nil
})
// and another 9 that don't
for i := 0; i < 5; i++ {
p2 := g.Connect()
p2.On(func(msg Message) error {
counter.Count()
return nil
})
}
pod := g.Connect()
// send 64 "bad" messages (64 reaches the highwater mark)
for i := 0; i < 64; i++ {
pod.Send(NewMsg(msgTypeBad, []byte(fmt.Sprintf("hello, world %d", i))))
}
time.Sleep(time.Duration(time.Second))
// send 10 more "bad" messages
for i := 0; i < 10; i++ {
pod.Send(NewMsg(msgTypeBad, []byte(fmt.Sprintf("hello, world %d", i))))
}
// 730 because the 64th message to the "bad" pod put it over the highwater
// mark and so the last 10 message would never be delievered
// sleeps were needed to allow all of the internal goroutines to finish execing
// in the worst case scenario of a single process machine (which lots of containers are)
if err := counter.Wait(730, 1); err != nil {
t.Error(err)
}
// the first pod should now have been disconnected, causing only 9 recievers reset and test again
// send 10 "normal" messages
for i := 0; i < 10; i++ {
pod.Send(NewMsg(MsgTypeDefault, []byte(fmt.Sprintf("hello, world %d", i))))
}
if err := counter.Wait(90, 1); err != nil {
t.Error(err)
}
}
func TestPodFlushFailed(t *testing.T) {
g := New()
counter := testutil.NewAsyncCounter(200)
// create a pod that returns errors on "bad" messages
p := g.Connect()
p.On(func(msg Message) error {
counter.Count()
if msg.Type() == msgTypeBad {
return errors.New("bad message")
}
return nil
})
sender := g.Connect()
// send 5 "bad" messages
for i := 0; i < 5; i++ {
sender.Send(NewMsg(msgTypeBad, []byte(fmt.Sprintf("hello, world %d", i))))
}
<-time.After(time.Duration(time.Second))
// replace the OnFunc to not error when the flushed messages come back through
p.On(func(msg Message) error {
counter.Count()
return nil
})
// send 10 "normal" messages
for i := 0; i < 9; i++ {
sender.Send(NewMsg(MsgTypeDefault, []byte(fmt.Sprintf("hello, world %d", i))))
}
<-time.After(time.Duration(time.Second))
// yes this is stupid, but on single-CPU machines (such as GitHub actions), this test won't allow things to be flushed properly.
sender.Send(NewMsg(MsgTypeDefault, []byte(fmt.Sprintf("flushing!"))))
// 20 because upon handling the first "good" message, the bus should flush
// the 5 "failed" messages back into the connection thus repeating them
if err := counter.Wait(20, 1); err != nil {
t.Error(err)
}
}
func TestPodReplay(t *testing.T) {
g := New()
counter := testutil.NewAsyncCounter(500)
// create one pod that returns errors on "bad" messages
p1 := g.Connect()
p1.On(func(msg Message) error {
counter.Count()
return nil
})
sender := g.Connect()
// send 100 messages and ensure they're received by p1
for i := 0; i < 100; i++ {
sender.Send(NewMsg(MsgTypeDefault, []byte(fmt.Sprintf("hello, world %d", i))))
}
if err := counter.Wait(100, 1); err != nil {
t.Error(err)
}
// connect a second pod with replay to ensure the same messages come through
p2 := g.ConnectWithReplay()
p2.On(func(msg Message) error {
counter.Count()
return nil
})
sender.Send(NewMsg(MsgTypeDefault, []byte("let's get it started")))
if err := counter.Wait(102, 1); err != nil {
t.Error(err)
}
}
func TestPodReplayPt2(t *testing.T) {
g := New()
counter := testutil.NewAsyncCounter(2000)
p1 := g.Connect()
p1.On(func(msg Message) error {
counter.Count()
return nil
})
sender := g.Connect()
// send 1000 messages and ensure they're received by p1
for i := 0; i < 1000; i++ {
sender.Send(NewMsg(MsgTypeDefault, []byte(fmt.Sprintf("hello, world %d", i))))
}
if err := counter.Wait(1000, 1); err != nil {
t.Error(err)
}
// connect a second pod with replay to ensure the same messages come through
p2 := g.ConnectWithReplay()
p2.On(func(msg Message) error {
counter.Count()
return nil
})
sender.Send(NewMsg(MsgTypeDefault, []byte(fmt.Sprintf("let's get started"))))
if err := counter.Wait(130, 1); err != nil {
t.Error(err)
}
}
func TestPodDisconnect(t *testing.T) {
g := New()
counter := testutil.NewAsyncCounter(10)
p1 := g.Connect()
p1.On(func(msg Message) error {
counter.Count()
return nil
})
p2 := g.Connect()
p2.On(func(msg Message) error {
counter.Count()
return nil
})
p3 := g.Connect()
p1.Disconnect()
for i := 0; i < 5; i++ {
p3.Send(NewMsg(MsgTypeDefault, []byte("testing disconnect")))
}
// since p1 disconnected, we should only get a count of 5 (from p2, which is still connected)
if err := counter.Wait(5, 1); err != nil {
t.Error(err)
}
}