-
Notifications
You must be signed in to change notification settings - Fork 9.6k
/
deterministic.go
399 lines (359 loc) · 10.1 KB
/
deterministic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"reflect"
"sort"
"github.com/anishathalye/porcupine"
)
// DeterministicModel assumes a deterministic execution of etcd requests. All
// requests that client called were executed and persisted by etcd. This
// assumption is good for simulating etcd behavior (aka writing a fake), but not
// for validating correctness as requests might be lost or interrupted. It
// requires perfect knowledge of what happened to request which is not possible
// in real systems.
//
// Model can still respond with error or partial response.
// - Error for etcd known errors, like future revision or compacted revision.
// - Incomplete response when requests is correct, but model doesn't have all
// to provide a full response. For example stale reads as model doesn't store
// whole change history as real etcd does.
var DeterministicModel = porcupine.Model{
Init: func() any {
data, err := json.Marshal(freshEtcdState())
if err != nil {
panic(err)
}
return string(data)
},
Step: func(st any, in any, out any) (bool, any) {
var s EtcdState
err := json.Unmarshal([]byte(st.(string)), &s)
if err != nil {
panic(err)
}
ok, s := s.apply(in.(EtcdRequest), out.(EtcdResponse))
data, err := json.Marshal(s)
if err != nil {
panic(err)
}
return ok, string(data)
},
DescribeOperation: func(in, out any) string {
return fmt.Sprintf("%s -> %s", describeEtcdRequest(in.(EtcdRequest)), describeEtcdResponse(in.(EtcdRequest), MaybeEtcdResponse{EtcdResponse: out.(EtcdResponse)}))
},
}
type EtcdState struct {
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}
func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
newState, modelResponse := s.Step(request)
return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState
}
func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
}
// Step handles a successful request, returning updated state and response it would generate.
func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
newKVs := map[string]ValueRevision{}
for k, v := range s.KeyValues {
newKVs[k] = v
}
s.KeyValues = newKVs
switch request.Type {
case Range:
if request.Range.Revision == 0 || request.Range.Revision == s.Revision {
resp := s.getRange(request.Range.RangeOptions)
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Range: &resp, Revision: s.Revision}}
}
if request.Range.Revision > s.Revision {
return s, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()}
}
return s, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: s.Revision}}
case Txn:
failure := false
for _, cond := range request.Txn.Conditions {
if val := s.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision {
failure = true
break
}
}
operations := request.Txn.OperationsOnSuccess
if failure {
operations = request.Txn.OperationsOnFailure
}
opResp := make([]EtcdOperationResult, len(operations))
increaseRevision := false
for i, op := range operations {
switch op.Type {
case RangeOperation:
opResp[i] = EtcdOperationResult{
RangeResponse: s.getRange(op.Range),
}
case PutOperation:
_, leaseExists := s.Leases[op.Put.LeaseID]
if op.Put.LeaseID != 0 && !leaseExists {
break
}
s.KeyValues[op.Put.Key] = ValueRevision{
Value: op.Put.Value,
ModRevision: s.Revision + 1,
}
increaseRevision = true
s = detachFromOldLease(s, op.Put.Key)
if leaseExists {
s = attachToNewLease(s, op.Put.LeaseID, op.Put.Key)
}
case DeleteOperation:
if _, ok := s.KeyValues[op.Delete.Key]; ok {
delete(s.KeyValues, op.Delete.Key)
increaseRevision = true
s = detachFromOldLease(s, op.Delete.Key)
opResp[i].Deleted = 1
}
default:
panic("unsupported operation")
}
}
if increaseRevision {
s.Revision++
}
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Failure: failure, Results: opResp}, Revision: s.Revision}}
case LeaseGrant:
lease := EtcdLease{
LeaseID: request.LeaseGrant.LeaseID,
Keys: map[string]struct{}{},
}
s.Leases[request.LeaseGrant.LeaseID] = lease
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseGrant: &LeaseGrantReponse{}}}
case LeaseRevoke:
//Delete the keys attached to the lease
keyDeleted := false
for key := range s.Leases[request.LeaseRevoke.LeaseID].Keys {
//same as delete.
if _, ok := s.KeyValues[key]; ok {
if !keyDeleted {
keyDeleted = true
}
delete(s.KeyValues, key)
delete(s.KeyLeases, key)
}
}
//delete the lease
delete(s.Leases, request.LeaseRevoke.LeaseID)
if keyDeleted {
s.Revision++
}
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}}
case Compact:
s.CompactRevision = request.Compact.Revision
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
}
func (s EtcdState) getRange(options RangeOptions) RangeResponse {
response := RangeResponse{
KVs: []KeyValue{},
}
if options.End != "" {
var count int64
for k, v := range s.KeyValues {
if k >= options.Start && k < options.End {
response.KVs = append(response.KVs, KeyValue{Key: k, ValueRevision: v})
count++
}
}
sort.Slice(response.KVs, func(j, k int) bool {
return response.KVs[j].Key < response.KVs[k].Key
})
if options.Limit != 0 && count > options.Limit {
response.KVs = response.KVs[:options.Limit]
}
response.Count = count
} else {
value, ok := s.KeyValues[options.Start]
if ok {
response.KVs = append(response.KVs, KeyValue{
Key: options.Start,
ValueRevision: value,
})
response.Count = 1
}
}
return response
}
func detachFromOldLease(s EtcdState, key string) EtcdState {
if oldLeaseID, ok := s.KeyLeases[key]; ok {
delete(s.Leases[oldLeaseID].Keys, key)
delete(s.KeyLeases, key)
}
return s
}
func attachToNewLease(s EtcdState, leaseID int64, key string) EtcdState {
s.KeyLeases[key] = leaseID
s.Leases[leaseID].Keys[key] = leased
return s
}
type RequestType string
const (
Range RequestType = "range"
Txn RequestType = "txn"
Compact RequestType = "compact"
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
)
type EtcdRequest struct {
Type RequestType
LeaseGrant *LeaseGrantRequest
LeaseRevoke *LeaseRevokeRequest
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}
type RangeRequest struct {
RangeOptions
Revision int64
}
type RangeOptions struct {
Start string
End string
Limit int64
}
type PutOptions struct {
Key string
Value ValueOrHash
LeaseID int64
}
type DeleteOptions struct {
Key string
}
type CompactResponse struct {
}
type CompactRequest struct {
Revision int64
}
type TxnRequest struct {
Conditions []EtcdCondition
OperationsOnSuccess []EtcdOperation
OperationsOnFailure []EtcdOperation
}
type EtcdCondition struct {
Key string
ExpectedRevision int64
}
type EtcdOperation struct {
Type OperationType
Range RangeOptions
Put PutOptions
Delete DeleteOptions
}
type OperationType string
const (
RangeOperation OperationType = "range-operation"
PutOperation OperationType = "put-operation"
DeleteOperation OperationType = "delete-operation"
)
type LeaseGrantRequest struct {
LeaseID int64
}
type LeaseRevokeRequest struct {
LeaseID int64
}
type DefragmentRequest struct{}
// MaybeEtcdResponse extends EtcdResponse to represent partial or failed responses.
// Possible states:
// * Normal response. Only EtcdResponse is set.
// * Partial response. The EtcdResponse.Revision and PartialResponse are set.
// * Failed response. Only Err is set.
type MaybeEtcdResponse struct {
EtcdResponse
PartialResponse bool
Error string
}
var ErrEtcdFutureRev = errors.New("future rev")
type EtcdResponse struct {
Txn *TxnResponse
Range *RangeResponse
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
Revision int64
}
func Match(r1, r2 MaybeEtcdResponse) bool {
return ((r1.PartialResponse || r2.PartialResponse) && (r1.Revision == r2.Revision)) || reflect.DeepEqual(r1, r2)
}
type TxnResponse struct {
Failure bool
Results []EtcdOperationResult
}
type RangeResponse struct {
KVs []KeyValue
Count int64
}
type LeaseGrantReponse struct {
LeaseID int64
}
type LeaseRevokeResponse struct{}
type DefragmentResponse struct{}
type EtcdOperationResult struct {
RangeResponse
Deleted int64
}
type KeyValue struct {
Key string
ValueRevision
}
var leased = struct{}{}
type EtcdLease struct {
LeaseID int64
Keys map[string]struct{}
}
type ValueRevision struct {
Value ValueOrHash
ModRevision int64
}
type ValueOrHash struct {
Value string
Hash uint32
}
func ToValueOrHash(value string) ValueOrHash {
v := ValueOrHash{}
if len(value) < 20 {
v.Value = value
} else {
h := fnv.New32a()
h.Write([]byte(value))
v.Hash = h.Sum32()
}
return v
}