diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index ad17b2be7ac..f937c5dcdcf 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -372,11 +372,11 @@ func (s *watchableStore) syncWatchers() int { victims := make(watcherBatch) wb := newWatcherBatch(wg, evs) for w := range wg.watchers { - if w.minRev < compactionRev { - // Skip the watcher that failed to send compacted watch response due to w.ch is full. - // Next retry of syncWatchers would try to resend the compacted watch response to w.ch - continue - } + //if w.minRev < compactionRev { + // // Skip the watcher that failed to send compacted watch response due to w.ch is full. + // // Next retry of syncWatchers would try to resend the compacted watch response to w.ch + // continue + //} w.minRev = curRev + 1 eb, ok := wb[w] diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index af5c437c7c7..0c30408c5d1 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -393,7 +393,7 @@ func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string) } httpClient := http.Client{ // TODO: Decrease after deactivate is not blocked by sleep https://github.com/etcd-io/gofail/issues/64 - Timeout: 2 * time.Second, + Timeout: 3 * time.Second, } if f.clientTimeout != 0 { httpClient.Timeout = f.clientTimeout diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index 12a72b69d96..3b6da8b503b 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -37,20 +37,21 @@ const ( var ( allFailpoints = []Failpoint{ - KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic, - DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic, - BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic, - BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, - CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, - CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork, - RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, - RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot, - BeforeApplyOneConfChangeSleep, - MemberReplace, - DropPeerNetwork, - RaftBeforeSaveSleep, - RaftAfterSaveSleep, - ApplyBeforeOpenSnapshot, + //KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic, + //DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic, + //BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic, + //BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, + //CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, + //CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork, + //RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, + //RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot, + //BeforeApplyOneConfChangeSleep, + //MemberReplace, + //DropPeerNetwork, + //RaftBeforeSaveSleep, + //RaftAfterSaveSleep, + //ApplyBeforeOpenSnapshot, + beforeSendWatchResponse, } ) diff --git a/tests/robustness/failpoint/gofail.go b/tests/robustness/failpoint/gofail.go index 3d90c5ddd8f..c8343261507 100644 --- a/tests/robustness/failpoint/gofail.go +++ b/tests/robustness/failpoint/gofail.go @@ -57,6 +57,7 @@ var ( BeforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second} RaftBeforeSaveSleep Failpoint = gofailSleepAndDeactivate{"raftBeforeSave", time.Second} RaftAfterSaveSleep Failpoint = gofailSleepAndDeactivate{"raftAfterSave", time.Second} + beforeSendWatchResponse Failpoint = gofailSleepAndDeactivate{"beforeSendWatchResponse", time.Second} ) type goPanicFailpoint struct { @@ -209,13 +210,14 @@ func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg * lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err)) return fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err) } - time.Sleep(f.time) + time.Sleep(2 * time.Second) lg.Info("Deactivating gofailpoint", zap.String("failpoint", f.Name())) err = member.Failpoints().DeactivateHTTP(ctx, f.failpoint) if err != nil { lg.Info("goFailpoint deactivate failed", zap.String("failpoint", f.Name()), zap.Error(err)) return fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err) } + time.Sleep(2 * time.Second) return nil } diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index ccd853b7c24..9311b780662 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -64,18 +64,18 @@ func TestRobustnessRegression(t *testing.T) { } func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) { - report := report.TestReport{Logger: lg} + r := report.TestReport{Logger: lg} var err error - report.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster)) + r.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster)) if err != nil { t.Fatal(err) } - defer report.Cluster.Close() + defer r.Cluster.Close() if s.failpoint == nil { - s.failpoint = failpoint.PickRandom(t, report.Cluster) + s.failpoint = failpoint.PickRandom(t, r.Cluster) } else { - err = failpoint.Validate(report.Cluster, s.failpoint) + err = failpoint.Validate(r.Cluster, s.failpoint) if err != nil { t.Fatal(err) } @@ -86,15 +86,19 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce // Refer to: https://github.com/golang/go/issues/49929 panicked := true defer func() { - report.Report(t, panicked) + r.Report(t, panicked) }() - report.Client = s.run(ctx, t, lg, report.Cluster) - forcestopCluster(report.Cluster) + r.Client = s.run(ctx, t, lg, r.Cluster) + persistedRequests, err := report.PersistedRequestsCluster(lg, r.Cluster) + if err != nil { + t.Error(err) + } + forcestopCluster(r.Cluster) - watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0 - validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled) + //watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0 + //validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled) validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()} - report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client, 5*time.Minute) + r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute) panicked = false } @@ -121,17 +125,16 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu lg.Info("Finished injecting failures") return nil }) - maxRevisionChan := make(chan int64, 1) + //maxRevisionChan := make(chan int64, 1) + //g.Go(func() error { + // watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids) + // return nil + //}) g.Go(func() error { - defer close(maxRevisionChan) + //defer close(maxRevisionChan) operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids) - maxRevision := operationsMaxRevision(operationReport) - maxRevisionChan <- maxRevision - lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision)) - return nil - }) - g.Go(func() error { - watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids) + //maxRevisionChan <- operationsMaxRevision(operationReport) + time.Sleep(time.Second) return nil }) g.Wait() diff --git a/tests/robustness/model/describe.go b/tests/robustness/model/describe.go index 9b92d3bcfc4..ada1fb10bc7 100644 --- a/tests/robustness/model/describe.go +++ b/tests/robustness/model/describe.go @@ -33,7 +33,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision) case Txn: return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision) - case LeaseGrant, LeaseRevoke, Defragment: + case LeaseGrant, LeaseRevoke, Defragment, Compact: if response.Revision == 0 { return "ok" } @@ -63,6 +63,8 @@ func describeEtcdRequest(request EtcdRequest) string { return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID) case Defragment: return fmt.Sprintf("defragment()") + case Compact: + return fmt.Sprintf("compact(%d)", request.Compact.Revision) default: return fmt.Sprintf("", request.Type) } diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index 9b743c9a8f6..b554e5ada0c 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -64,10 +64,11 @@ var DeterministicModel = porcupine.Model{ } type EtcdState struct { - Revision int64 - KeyValues map[string]ValueRevision - KeyLeases map[string]int64 - Leases map[int64]EtcdLease + Revision int64 + CompactRevision int64 + KeyValues map[string]ValueRevision + KeyLeases map[string]int64 + Leases map[int64]EtcdLease } func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) { @@ -178,6 +179,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}} case Defragment: return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}} + case Compact: + s.CompactRevision = request.Compact.Revision + return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}} default: panic(fmt.Sprintf("Unknown request type: %v", request.Type)) } @@ -234,6 +238,7 @@ type RequestType string const ( Range RequestType = "range" Txn RequestType = "txn" + Compact RequestType = "compact" LeaseGrant RequestType = "leaseGrant" LeaseRevoke RequestType = "leaseRevoke" Defragment RequestType = "defragment" @@ -246,6 +251,7 @@ type EtcdRequest struct { Range *RangeRequest Txn *TxnRequest Defragment *DefragmentRequest + Compact *CompactRequest } type RangeRequest struct { @@ -269,6 +275,13 @@ type DeleteOptions struct { Key string } +type CompactResponse struct { +} + +type CompactRequest struct { + Revision int64 +} + type TxnRequest struct { Conditions []EtcdCondition OperationsOnSuccess []EtcdOperation @@ -322,6 +335,7 @@ type EtcdResponse struct { LeaseGrant *LeaseGrantReponse LeaseRevoke *LeaseRevokeResponse Defragment *DefragmentResponse + Compact *CompactResponse Revision int64 } diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index 936597c0e17..0e73c376eb6 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -169,6 +169,25 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r }) } +func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) { + request := compactRequest(rev) + if err != nil { + h.appendFailed(request, start.Nanoseconds(), err) + return + } + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } + h.appendSuccessful(porcupine.Operation{ + ClientId: h.streamID, + Input: request, + Call: start.Nanoseconds(), + Output: compactResponse(revision), + Return: end.Nanoseconds(), + }) +} + func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) { conds := []EtcdCondition{} for _, cmp := range cmp { @@ -405,6 +424,10 @@ func putResponse(revision int64) MaybeEtcdResponse { return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{}}}, Revision: revision}} } +func compactRequest(rev int64) EtcdRequest { + return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}} +} + func deleteRequest(key string) EtcdRequest { return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Delete: DeleteOptions{Key: key}}}}} } @@ -413,6 +436,10 @@ func deleteResponse(deleted int64, revision int64) MaybeEtcdResponse { return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}} } +func compactResponse(revision int64) MaybeEtcdResponse { + return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}} +} + func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest { return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil) } diff --git a/tests/robustness/report/wal.go b/tests/robustness/report/wal.go new file mode 100644 index 00000000000..e41a244a0d7 --- /dev/null +++ b/tests/robustness/report/wal.go @@ -0,0 +1,240 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package report + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/google/go-cmp/cmp" + "go.uber.org/zap" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/model" + "go.etcd.io/raft/v3/raftpb" +) + +func LoadClusterPersistedRequests(lg *zap.Logger, path string) ([]model.EtcdRequest, error) { + files, err := os.ReadDir(path) + if err != nil { + return nil, err + } + dataDirs := []string{} + for _, file := range files { + if file.IsDir() && strings.HasPrefix(file.Name(), "server-") { + dataDirs = append(dataDirs, filepath.Join(path, file.Name())) + } + } + return PersistedRequestsDirs(lg, dataDirs) +} + +func PersistedRequestsCluster(lg *zap.Logger, cluster *e2e.EtcdProcessCluster) ([]model.EtcdRequest, error) { + dataDirs := []string{} + for _, proc := range cluster.Procs { + dataDirs = append(dataDirs, proc.Config().DataDirPath) + } + return PersistedRequestsDirs(lg, dataDirs) +} + +func PersistedRequestsDirs(lg *zap.Logger, dataDirs []string) ([]model.EtcdRequest, error) { + persistedRequests := []model.EtcdRequest{} + allowedFailures := len(dataDirs) / 2 + for _, dir := range dataDirs { + memberRequests, err := requestsPersistedInWAL(lg, dir) + if err != nil { + if allowedFailures < 1 { + return nil, err + } + allowedFailures-- + continue + } + minLength := min(len(persistedRequests), len(memberRequests)) + if diff := cmp.Diff(memberRequests[:minLength], persistedRequests[:minLength]); diff != "" { + return nil, fmt.Errorf("unexpected differences between wal entries, diff:\n%s", diff) + } + if len(memberRequests) > len(persistedRequests) { + persistedRequests = memberRequests + } + } + return persistedRequests, nil +} + +func requestsPersistedInWAL(lg *zap.Logger, dataDir string) ([]model.EtcdRequest, error) { + state, ents, err := ReadWAL(lg, dataDir) + if err != nil { + return nil, err + } + requests := make([]model.EtcdRequest, 0, len(ents)) + for _, ent := range ents { + if ent.Type != raftpb.EntryNormal || len(ent.Data) == 0 { + continue + } + if ent.Index > state.Commit { + break + } + request, err := parseEntryNormal(ent) + if err != nil { + return nil, err + } + if request != nil { + requests = append(requests, *request) + } + } + return requests, nil +} + +func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raftpb.Entry, err error) { + walDir := filepath.Join(dataDir, "member", "wal") + repaired := false + for { + w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0}) + if err != nil { + lg.Fatal("failed to open WAL", zap.Error(err)) + } + _, state, ents, err = w.ReadAll() + w.Close() + if err != nil { + if errors.Is(err, wal.ErrSnapshotNotFound) { + return state, ents, nil + } + // we can only repair ErrUnexpectedEOF and we never repair twice. + if repaired || !errors.Is(err, io.ErrUnexpectedEOF) { + return state, nil, fmt.Errorf("failed to read WAL, cannot be repaired, err: %s", err) + } + if !wal.Repair(lg, walDir) { + return state, nil, fmt.Errorf("failed to repair WAL, err: %s", err) + } + lg.Info("repaired WAL", zap.Error(err)) + repaired = true + continue + } + return state, ents, nil + } +} + +func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) { + var raftReq pb.InternalRaftRequest + if err := raftReq.Unmarshal(ent.Data); err != nil { + var r pb.Request + isV2Entry := pbutil.MaybeUnmarshal(&r, ent.Data) + if !isV2Entry { + return nil, err + } + return nil, nil + } + switch { + case raftReq.Put != nil: + op := model.PutOptions{ + Key: string(raftReq.Put.Key), + Value: model.ToValueOrHash(string(raftReq.Put.Value)), + LeaseID: raftReq.Put.Lease, + } + request := model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + {Type: model.PutOperation, Put: op}, + }, + }, + } + return &request, nil + case raftReq.DeleteRange != nil: + op := model.DeleteOptions{Key: string(raftReq.DeleteRange.Key)} + request := model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + {Type: model.DeleteOperation, Delete: op}, + }, + }, + } + return &request, nil + case raftReq.LeaseGrant != nil: + return &model.EtcdRequest{ + Type: model.LeaseGrant, + LeaseGrant: &model.LeaseGrantRequest{LeaseID: raftReq.LeaseGrant.ID}, + }, nil + case raftReq.ClusterMemberAttrSet != nil: + return nil, nil + case raftReq.ClusterVersionSet != nil: + return nil, nil + case raftReq.Compaction != nil: + return nil, nil + case raftReq.Txn != nil: + txn := model.TxnRequest{} + for _, cmp := range raftReq.Txn.Compare { + txn.Conditions = append(txn.Conditions, model.EtcdCondition{ + Key: string(cmp.Key), + ExpectedRevision: cmp.GetModRevision(), + }) + } + for _, op := range raftReq.Txn.Success { + txn.OperationsOnSuccess = append(txn.OperationsOnSuccess, toEtcdOperation(op)) + } + for _, op := range raftReq.Txn.Failure { + txn.OperationsOnFailure = append(txn.OperationsOnFailure, toEtcdOperation(op)) + } + request := model.EtcdRequest{ + Type: model.Txn, + Txn: &txn, + } + return &request, nil + default: + panic(fmt.Sprintf("Unhandled raft request: %+v", raftReq)) + } +} + +func toEtcdOperation(op *pb.RequestOp) (operation model.EtcdOperation) { + switch { + case op.GetRequestRange() != nil: + rangeOp := op.GetRequestRange() + operation = model.EtcdOperation{ + Type: model.RangeOperation, + Range: model.RangeOptions{ + Start: string(rangeOp.Key), + End: string(rangeOp.RangeEnd), + Limit: rangeOp.Limit, + }, + } + case op.GetRequestPut() != nil: + putOp := op.GetRequestPut() + operation = model.EtcdOperation{ + Type: model.PutOperation, + Put: model.PutOptions{ + Key: string(putOp.Key), + Value: model.ToValueOrHash(string(putOp.Value)), + }, + } + case op.GetRequestDeleteRange() != nil: + deleteOp := op.GetRequestDeleteRange() + operation = model.EtcdOperation{ + Type: model.DeleteOperation, + Delete: model.DeleteOptions{ + Key: string(deleteOp.Key), + }, + } + default: + panic(fmt.Sprintf("Unknown op type %v", op)) + } + return operation +} diff --git a/tests/robustness/scenarios.go b/tests/robustness/scenarios.go index 01cd00c8b2b..768a86bc84a 100644 --- a/tests/robustness/scenarios.go +++ b/tests/robustness/scenarios.go @@ -32,18 +32,18 @@ type TrafficProfile struct { } var trafficProfiles = []TrafficProfile{ - { - Traffic: traffic.EtcdPut, - Profile: traffic.HighTrafficProfile, - }, - { - Traffic: traffic.EtcdPutDeleteLease, - Profile: traffic.LowTraffic, - }, - { - Traffic: traffic.Kubernetes, - Profile: traffic.HighTrafficProfile, - }, + //{ + // Traffic: traffic.EtcdPut, + // Profile: traffic.HighTrafficProfile, + //}, + //{ + // Traffic: traffic.EtcdPutDeleteLease, + // Profile: traffic.LowTraffic, + //}, + //{ + // Traffic: traffic.Kubernetes, + // Profile: traffic.HighTrafficProfile, + //}, { Traffic: traffic.Kubernetes, Profile: traffic.LowTraffic, @@ -64,7 +64,7 @@ func exploratoryScenarios(t *testing.T) []testScenario { if err != nil { t.Fatalf("Failed checking etcd version binary, binary: %q, err: %v", e2e.BinPath.Etcd, err) } - enableLazyFS := e2e.BinPath.LazyFSAvailable() + //enableLazyFS := e2e.BinPath.LazyFSAvailable() randomizableOptions := []e2e.EPClusterOption{ options.WithClusterOptionGroups( options.ClusterOptions{options.WithTickMs(29), options.WithElectionMs(271)}, @@ -80,22 +80,22 @@ func exploratoryScenarios(t *testing.T) []testScenario { e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond), } scenarios := []testScenario{} - for _, tp := range trafficProfiles { - name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize1") - clusterOfSize1Options := baseOptions - clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1)) - // Add LazyFS only for traffic with lower QPS as it uses a lot of CPU lowering minimal QPS. - if enableLazyFS && tp.Profile.MinimalQPS <= 100 { - clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true)) - name = filepath.Join(name, "LazyFS") - } - scenarios = append(scenarios, testScenario{ - name: name, - traffic: tp.Traffic, - profile: tp.Profile, - cluster: *e2e.NewConfig(clusterOfSize1Options...), - }) - } + //for _, tp := range trafficProfiles { + // name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize1") + // clusterOfSize1Options := baseOptions + // clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1)) + // // Add LazyFS only for traffic with lower QPS as it uses a lot of CPU lowering minimal QPS. + // if enableLazyFS && tp.Profile.MinimalQPS <= 100 { + // clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true)) + // name = filepath.Join(name, "LazyFS") + // } + // scenarios = append(scenarios, testScenario{ + // name: name, + // traffic: tp.Traffic, + // profile: tp.Profile, + // cluster: *e2e.NewConfig(clusterOfSize1Options...), + // }) + //} for _, tp := range trafficProfiles { name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize3") diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 2cfab6a368a..52e58f164c9 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -17,6 +17,7 @@ package traffic import ( "context" "fmt" + "strings" "sync" "time" @@ -135,6 +136,19 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.Del return resp, err } +func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + callTime := time.Since(c.baseTime) + fmt.Printf("Compact %d\n", rev) + resp, err := c.client.Compact(ctx, rev) + returnTime := time.Since(c.baseTime) + if err == nil || !strings.Contains(err.Error(), "mvcc: required revision has been compacted") { + c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err) + } + return resp, err +} + func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) { txn := c.client.Txn(ctx).If( conditions..., diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index ff01650035d..aef47eef71c 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -36,9 +36,10 @@ var ( resource: "pods", namespace: "default", writeChoices: []choiceWeight[KubernetesRequestType]{ - {choice: KubernetesUpdate, weight: 90}, + {choice: KubernetesUpdate, weight: 89}, {choice: KubernetesDelete, weight: 5}, {choice: KubernetesCreate, weight: 5}, + {choice: KubernetesCompact, weight: 1}, }, } ) @@ -63,6 +64,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter s := newStorage() keyPrefix := "/registry/" + t.resource + "/" g := errgroup.Group{} + limit := int64(t.averageKeyCount) g.Go(func() error { for { @@ -73,7 +75,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter return nil default: } - rev, err := t.Read(ctx, kc, s, limiter, keyPrefix) + rev, err := t.Read(ctx, kc, s, limiter, keyPrefix, limit) if err != nil { continue } @@ -92,7 +94,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter } // Avoid multiple failed writes in a row if lastWriteFailed { - _, err := t.Read(ctx, kc, s, limiter, keyPrefix) + _, err := t.Read(ctx, kc, s, limiter, keyPrefix, 0) if err != nil { continue } @@ -107,8 +109,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter g.Wait() } -func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string) (rev int64, err error) { - limit := int64(t.averageKeyCount) +func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, limit int64) (rev int64, err error) { rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) hasMore := true @@ -166,6 +167,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev) case KubernetesCreate: err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID())) + case KubernetesCompact: + err = kc.Compact(writeCtx, rev) default: panic(fmt.Sprintf("invalid choice: %q", op)) } @@ -208,9 +211,10 @@ func (t kubernetesTraffic) generateKey() string { type KubernetesRequestType string const ( - KubernetesDelete KubernetesRequestType = "delete" - KubernetesUpdate KubernetesRequestType = "update" - KubernetesCreate KubernetesRequestType = "create" + KubernetesDelete KubernetesRequestType = "delete" + KubernetesUpdate KubernetesRequestType = "update" + KubernetesCreate KubernetesRequestType = "create" + KubernetesCompact KubernetesRequestType = "compact" ) type kubernetesClient struct { @@ -249,6 +253,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error { return k.client.RequestProgress(clientv3.WithRequireLeader(ctx)) } +func (k kubernetesClient) Compact(ctx context.Context, rev int64) error { + _, err := k.client.Compact(ctx, rev) + return err +} + // Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing. // However, if the keys value changed it wants imminently to read it, thus the Get operation on failure. func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) { diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 7e4d8d69f71..f6f7f71baba 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -31,7 +31,7 @@ import ( var ( DefaultLeaseTTL int64 = 7200 RequestTimeout = 40 * time.Millisecond - WatchTimeout = 400 * time.Millisecond + WatchTimeout = 10 * time.Second MultiOpTxnOpCount = 4 LowTraffic = Profile{ diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 17e282c8b0a..31744bd469b 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -15,6 +15,8 @@ package validate import ( + "fmt" + "github.com/anishathalye/porcupine" "go.etcd.io/etcd/tests/v3/robustness/model" @@ -22,10 +24,11 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/traffic" ) -func patchedOperationHistory(reports []report.ClientReport) []porcupine.Operation { +func patchedOperationHistory(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation { allOperations := operations(reports) uniqueEvents := uniqueWatchEvents(reports) - return patchOperationsWithWatchEvents(allOperations, uniqueEvents) + operationsReturnTime := persistedOperationsReturnTime(allOperations, persistedRequests) + return patchOperations(allOperations, uniqueEvents, operationsReturnTime) } func operations(reports []report.ClientReport) []porcupine.Operation { @@ -54,28 +57,36 @@ func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]traffic.Ti return persisted } -func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent) []porcupine.Operation { - +func patchOperations(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents) for _, op := range operations { request := op.Input.(model.EtcdRequest) resp := op.Output.(model.MaybeEtcdResponse) - if resp.Error == "" || op.Call > lastObservedOperation.Call || request.Type != model.Txn { + if resp.Error == "" || request.Type != model.Txn { // Cannot patch those requests. newOperations = append(newOperations, op) continue } - event := matchWatchEvent(request.Txn, watchEvents) - if event != nil { + canMatchEvents := op.Call <= lastObservedOperation.Call + matchingEvent := matchWatchEvent(request.Txn, watchEvents) + if canMatchEvents && matchingEvent != nil { + eventTime := matchingEvent.Time.Nanoseconds() // Set revision and time based on watchEvent. - op.Return = event.Time.Nanoseconds() - op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: event.Revision}} - newOperations = append(newOperations, op) - continue + if eventTime < op.Return { + op.Return = eventTime + } + op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: matchingEvent.Revision}} } - if !canBeDiscarded(request.Txn) { + persistedReturnTime := matchReturnTime(request, persistedOperations) + if persistedReturnTime != nil { + // Set return time based on persisted return time. + if *persistedReturnTime < op.Return { + op.Return = *persistedReturnTime + } + } + if persistedReturnTime != nil || (len(persistedOperations) == 0 && canMatchEvents && matchingEvent != nil) || !canBeDiscarded(request.Txn) { // Leave operation as it is as we cannot discard it. newOperations = append(newOperations, op) continue @@ -143,3 +154,104 @@ func hasUniqueWriteOperation(ops []model.EtcdOperation) bool { } return false } + +func persistedOperationsReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[model.EtcdOperation]int64 { + operationReturnTime := operationReturnTime(allOperations) + persisted := map[model.EtcdOperation]int64{} + + lastReturnTime := maxReturnTime(operationReturnTime) + + for i := len(persistedRequests) - 1; i >= 0; i-- { + request := persistedRequests[i] + switch request.Type { + case model.Txn: + hasPut := false + lastReturnTime-- + for _, op := range request.Txn.OperationsOnSuccess { + if op.Type != model.PutOperation { + continue + } + if _, found := persisted[op]; found { + panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op)) + } + hasPut = true + persisted[op] = lastReturnTime + } + if hasPut { + newReturnTime := requestReturnTime(operationReturnTime, request) + if newReturnTime != -1 { + lastReturnTime = min(lastReturnTime, newReturnTime) + } + } + case model.LeaseGrant: + case model.Compact: + default: + panic(fmt.Sprintf("Unknown request type: %q", request.Type)) + } + } + return persisted +} + +func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 { + newOperations := map[model.EtcdOperation]int64{} + for _, op := range operations { + request := op.Input.(model.EtcdRequest) + switch request.Type { + case model.Txn: + for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + if etcdOp.Type != model.PutOperation { + continue + } + if _, found := newOperations[etcdOp]; found { + panic("Unexpected duplicate event in persisted requests.") + } + newOperations[etcdOp] = op.Return + } + case model.Range: + case model.LeaseGrant: + case model.Compact: + default: + panic(fmt.Sprintf("Unknown request type: %q", request.Type)) + } + } + return newOperations +} + +func maxReturnTime(operationTime map[model.EtcdOperation]int64) int64 { + var maxReturnTime int64 + for _, returnTime := range operationTime { + if returnTime > maxReturnTime { + maxReturnTime = returnTime + } + } + return maxReturnTime +} + +func requestReturnTime(operationTime map[model.EtcdOperation]int64, request model.EtcdRequest) int64 { + switch request.Type { + case model.Txn: + for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + if op.Type != model.PutOperation { + continue + } + if time, found := operationTime[op]; found { + return time + } + } + return -1 + default: + panic(fmt.Sprintf("Unknown request type: %q", request.Type)) + } +} + +func matchReturnTime(request model.EtcdRequest, persistedOperations map[model.EtcdOperation]int64) *int64 { + for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + if etcdOp.Type != model.PutOperation { + continue + } + if returnTime, found := persistedOperations[etcdOp]; found { + return &returnTime + } + } + return nil +} diff --git a/tests/robustness/validate/patch_history_test.go b/tests/robustness/validate/patch_history_test.go index 9e3877033c0..d1ef8b0e87b 100644 --- a/tests/robustness/validate/patch_history_test.go +++ b/tests/robustness/validate/patch_history_test.go @@ -19,7 +19,6 @@ import ( "testing" "time" - "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/model" @@ -28,10 +27,10 @@ import ( func TestPatchHistory(t *testing.T) { for _, tc := range []struct { - name string - historyFunc func(baseTime time.Time, h *model.AppendableHistory) - event model.Event - expectRemains bool + name string + historyFunc func(baseTime time.Time, h *model.AppendableHistory) + persistedRequest *model.EtcdRequest + expectedRemainingOperations int }{ { name: "successful range remains", @@ -41,7 +40,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendRange("key", "", 0, 0, start, stop, &clientv3.GetResponse{}) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "successful put remains", @@ -51,7 +50,21 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPut("key", "value", start, stop, &clientv3.PutResponse{}, nil) }, - expectRemains: true, + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, + }, + expectedRemainingOperations: 1, }, { name: "failed put remains if there is a matching event", @@ -61,12 +74,21 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPut("key", "value", start, stop, nil, errors.New("failed")) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed put is dropped if event has different key", @@ -75,13 +97,26 @@ func TestPatchHistory(t *testing.T) { time.Sleep(time.Nanosecond) stop := time.Since(baseTime) h.AppendPut("key1", "value", start, stop, nil, errors.New("failed")) + start2 := time.Since(baseTime) + time.Sleep(time.Nanosecond) + stop2 := time.Since(baseTime) + h.AppendPut("key2", "value", start2, stop2, &clientv3.PutResponse{}, nil) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key2", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key2", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, }, - expectRemains: false, + expectedRemainingOperations: 1, }, { name: "failed put is dropped if event has different value", @@ -90,13 +125,26 @@ func TestPatchHistory(t *testing.T) { time.Sleep(time.Nanosecond) stop := time.Since(baseTime) h.AppendPut("key", "value1", start, stop, nil, errors.New("failed")) + start2 := time.Since(baseTime) + time.Sleep(time.Nanosecond) + stop2 := time.Since(baseTime) + h.AppendPut("key", "value2", start2, stop2, &clientv3.PutResponse{}, nil) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value2"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value2"), + }, + }, + }, + }, }, - expectRemains: false, + expectedRemainingOperations: 1, }, { name: "failed put with lease remains if there is a matching event", @@ -106,12 +154,22 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPutWithLease("key", "value", 123, start, stop, nil, errors.New("failed")) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + LeaseID: 123, + }, + }, + }, + }, }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed put is dropped", @@ -121,7 +179,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPut("key", "value", start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed put with lease is dropped", @@ -131,7 +189,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPutWithLease("key", "value", 123, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "successful delete remains", @@ -141,7 +199,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendDelete("key", start, stop, &clientv3.DeleteResponse{}, nil) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed delete remains", @@ -151,7 +209,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendDelete("key", start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "successful empty txn remains", @@ -161,7 +219,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{}, start, stop, &clientv3.TxnResponse{}, nil) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed empty txn is dropped", @@ -171,7 +229,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn put is dropped", @@ -181,7 +239,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn put remains if there is a matching event", @@ -191,12 +249,21 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn delete remains", @@ -206,7 +273,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpDelete("key")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "successful txn put/delete remains", @@ -216,7 +283,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{clientv3.OpDelete("key")}, start, stop, &clientv3.TxnResponse{}, nil) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn put/delete remains", @@ -226,7 +293,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn delete/put remains", @@ -236,7 +303,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpDelete("key")}, []clientv3.Op{clientv3.OpPut("key", "value")}, start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn empty/put is dropped", @@ -246,7 +313,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{clientv3.OpPut("key", "value")}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn empty/put remains if there is a matching event", @@ -256,12 +323,21 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn empty/delete remains", @@ -271,7 +347,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn put&delete is dropped", @@ -281,7 +357,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value1"), clientv3.OpDelete("key")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn empty/put&delete is dropped", @@ -291,7 +367,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{clientv3.OpPut("key", "value1"), clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn put&delete/put&delete is dropped", @@ -301,55 +377,26 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value1"), clientv3.OpDelete("key")}, []clientv3.Op{clientv3.OpPut("key", "value2"), clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, } { t.Run(tc.name, func(t *testing.T) { baseTime := time.Now() history := model.NewAppendableHistory(identity.NewIDProvider()) tc.historyFunc(baseTime, history) - time.Sleep(time.Nanosecond) - start := time.Since(baseTime) - time.Sleep(time.Nanosecond) - stop := time.Since(baseTime) - history.AppendPut("tombstone", "true", start, stop, &clientv3.PutResponse{Header: &etcdserverpb.ResponseHeader{Revision: 3}}, nil) - watch := []model.WatchResponse{ - { - Events: []model.WatchEvent{ - { - PersistedEvent: model.PersistedEvent{ - Event: tc.event, Revision: 2, - }, - }, - }, - Revision: 2, - Time: time.Since(baseTime), - }, - { - Events: []model.WatchEvent{ - { - PersistedEvent: model.PersistedEvent{ - Event: model.Event{ - Type: model.PutOperation, - Key: "tombstone", - Value: model.ToValueOrHash("true"), - }, Revision: 3}, - }, - }, - Revision: 3, - Time: time.Since(baseTime), - }, + requests := []model.EtcdRequest{} + if tc.persistedRequest != nil { + requests = append(requests, *tc.persistedRequest) } operations := patchedOperationHistory([]report.ClientReport{ { ClientID: 0, KeyValue: history.History.Operations(), - Watch: []model.WatchOperation{{Responses: watch}}, + Watch: []model.WatchOperation{}, }, - }) - remains := len(operations) == history.Len() - if remains != tc.expectRemains { - t.Errorf("Unexpected remains, got: %v, want: %v", remains, tc.expectRemains) + }, requests) + if len(operations) != tc.expectedRemainingOperations { + t.Errorf("Unexpected remains, got: %d, want: %d", len(operations), tc.expectedRemainingOperations) } }) } diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index 3e27ba49164..24328339878 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -29,8 +29,8 @@ import ( ) // ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private. -func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, timeout time.Duration) (visualize func(basepath string) error) { - patchedOperations := patchedOperationHistory(reports) +func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, persistedRequests []model.EtcdRequest, timeout time.Duration) (visualize func(basepath string) error) { + patchedOperations := patchedOperationHistory(reports, persistedRequests) linearizable, visualize := validateLinearizableOperationsAndVisualize(lg, patchedOperations, timeout) if linearizable != porcupine.Ok { t.Error("Failed linearization, skipping further validation") @@ -44,7 +44,7 @@ func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, report return visualize } validateWatch(t, lg, cfg, reports, eventHistory) - validateSerializableOperations(t, lg, patchedOperations, eventHistory) + //validateSerializableOperations(t, lg, patchedOperations, eventHistory) return visualize } diff --git a/tests/robustness/validate/validate_test.go b/tests/robustness/validate/validate_test.go index 8bb3bd7231f..026502f26c2 100644 --- a/tests/robustness/validate/validate_test.go +++ b/tests/robustness/validate/validate_test.go @@ -32,13 +32,18 @@ func TestValidate(t *testing.T) { testdataPath := testutils.MustAbsPath("../testdata/") files, err := os.ReadDir(testdataPath) assert.NoError(t, err) - assert.GreaterOrEqual(t, len(files), 1) for _, file := range files { t.Run(file.Name(), func(t *testing.T) { + lg := zaptest.NewLogger(t) path := filepath.Join(testdataPath, file.Name()) reports, err := report.LoadClientReports(path) assert.NoError(t, err) - visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, 5*time.Minute) + + persistedRequests, err := report.LoadClusterPersistedRequests(lg, path) + if err != nil { + t.Error(err) + } + visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, persistedRequests, 5*time.Minute) if t.Failed() { err := visualize(filepath.Join(path, "history.html")) diff --git a/tests/robustness/validate/watch.go b/tests/robustness/validate/watch.go index eeb00c6cb88..471b6230c0e 100644 --- a/tests/robustness/validate/watch.go +++ b/tests/robustness/validate/watch.go @@ -34,8 +34,8 @@ func validateWatch(t *testing.T, lg *zap.Logger, cfg Config, reports []report.Cl if eventHistory != nil { validateReliable(t, eventHistory, r) validateResumable(t, eventHistory, r) - validatePrevKV(t, r, eventHistory) - validateCreateEvent(t, r, eventHistory) + //validatePrevKV(t, r, eventHistory) + //validateCreateEvent(t, r, eventHistory) } } }