Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reproduce #17529 in robustness tests #17680

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions server/storage/mvcc/watchable_store.go
Expand Up @@ -372,11 +372,11 @@ func (s *watchableStore) syncWatchers() int {
victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
continue
}
//if w.minRev < compactionRev {
// // Skip the watcher that failed to send compacted watch response due to w.ch is full.
// // Next retry of syncWatchers would try to resend the compacted watch response to w.ch
// continue
//}
w.minRev = curRev + 1

eb, ok := wb[w]
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/e2e/etcd_process.go
Expand Up @@ -393,7 +393,7 @@ func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string)
}
httpClient := http.Client{
// TODO: Decrease after deactivate is not blocked by sleep https://github.com/etcd-io/gofail/issues/64
Timeout: 2 * time.Second,
Timeout: 3 * time.Second,
}
if f.clientTimeout != 0 {
httpClient.Timeout = f.clientTimeout
Expand Down
29 changes: 15 additions & 14 deletions tests/robustness/failpoint/failpoint.go
Expand Up @@ -37,20 +37,21 @@ const (

var (
allFailpoints = []Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
BeforeApplyOneConfChangeSleep,
MemberReplace,
DropPeerNetwork,
RaftBeforeSaveSleep,
RaftAfterSaveSleep,
ApplyBeforeOpenSnapshot,
//KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
//DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
//BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
//BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
//CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
//CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
//RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
//RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
//BeforeApplyOneConfChangeSleep,
//MemberReplace,
//DropPeerNetwork,
//RaftBeforeSaveSleep,
//RaftAfterSaveSleep,
//ApplyBeforeOpenSnapshot,
beforeSendWatchResponse,
}
)

Expand Down
4 changes: 3 additions & 1 deletion tests/robustness/failpoint/gofail.go
Expand Up @@ -57,6 +57,7 @@ var (
BeforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second}
RaftBeforeSaveSleep Failpoint = gofailSleepAndDeactivate{"raftBeforeSave", time.Second}
RaftAfterSaveSleep Failpoint = gofailSleepAndDeactivate{"raftAfterSave", time.Second}
beforeSendWatchResponse Failpoint = gofailSleepAndDeactivate{"beforeSendWatchResponse", time.Second}
)

type goPanicFailpoint struct {
Expand Down Expand Up @@ -209,13 +210,14 @@ func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *
lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err)
}
time.Sleep(f.time)
time.Sleep(2 * time.Second)
lg.Info("Deactivating gofailpoint", zap.String("failpoint", f.Name()))
err = member.Failpoints().DeactivateHTTP(ctx, f.failpoint)
if err != nil {
lg.Info("goFailpoint deactivate failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err)
}
time.Sleep(2 * time.Second)
return nil
}

Expand Down
43 changes: 23 additions & 20 deletions tests/robustness/main_test.go
Expand Up @@ -64,18 +64,18 @@ func TestRobustnessRegression(t *testing.T) {
}

func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) {
report := report.TestReport{Logger: lg}
r := report.TestReport{Logger: lg}
var err error
report.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
r.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
if err != nil {
t.Fatal(err)
}
defer report.Cluster.Close()
defer r.Cluster.Close()

if s.failpoint == nil {
s.failpoint = failpoint.PickRandom(t, report.Cluster)
s.failpoint = failpoint.PickRandom(t, r.Cluster)
} else {
err = failpoint.Validate(report.Cluster, s.failpoint)
err = failpoint.Validate(r.Cluster, s.failpoint)
if err != nil {
t.Fatal(err)
}
Expand All @@ -86,15 +86,19 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
// Refer to: https://github.com/golang/go/issues/49929
panicked := true
defer func() {
report.Report(t, panicked)
r.Report(t, panicked)
}()
report.Client = s.run(ctx, t, lg, report.Cluster)
forcestopCluster(report.Cluster)
r.Client = s.run(ctx, t, lg, r.Cluster)
persistedRequests, err := report.PersistedRequestsCluster(lg, r.Cluster)
if err != nil {
t.Error(err)
}
forcestopCluster(r.Cluster)

watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
//watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
//validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client, 5*time.Minute)
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)

panicked = false
}
Expand All @@ -121,17 +125,16 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
lg.Info("Finished injecting failures")
return nil
})
maxRevisionChan := make(chan int64, 1)
//maxRevisionChan := make(chan int64, 1)
//g.Go(func() error {
// watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
// return nil
//})
g.Go(func() error {
defer close(maxRevisionChan)
//defer close(maxRevisionChan)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids)
maxRevision := operationsMaxRevision(operationReport)
maxRevisionChan <- maxRevision
lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision))
return nil
})
g.Go(func() error {
watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
//maxRevisionChan <- operationsMaxRevision(operationReport)
time.Sleep(time.Second)
return nil
})
g.Wait()
Expand Down
4 changes: 3 additions & 1 deletion tests/robustness/model/describe.go
Expand Up @@ -33,7 +33,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision)
case Txn:
return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision)
case LeaseGrant, LeaseRevoke, Defragment:
case LeaseGrant, LeaseRevoke, Defragment, Compact:
if response.Revision == 0 {
return "ok"
}
Expand Down Expand Up @@ -63,6 +63,8 @@ func describeEtcdRequest(request EtcdRequest) string {
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
case Defragment:
return fmt.Sprintf("defragment()")
case Compact:
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down
22 changes: 18 additions & 4 deletions tests/robustness/model/deterministic.go
Expand Up @@ -64,10 +64,11 @@ var DeterministicModel = porcupine.Model{
}

type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
Expand Down Expand Up @@ -178,6 +179,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}}
case Compact:
s.CompactRevision = request.Compact.Revision
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
Expand Down Expand Up @@ -234,6 +238,7 @@ type RequestType string
const (
Range RequestType = "range"
Txn RequestType = "txn"
Compact RequestType = "compact"
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
Expand All @@ -246,6 +251,7 @@ type EtcdRequest struct {
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}

type RangeRequest struct {
Expand All @@ -269,6 +275,13 @@ type DeleteOptions struct {
Key string
}

type CompactResponse struct {
}

type CompactRequest struct {
Revision int64
}

type TxnRequest struct {
Conditions []EtcdCondition
OperationsOnSuccess []EtcdOperation
Expand Down Expand Up @@ -322,6 +335,7 @@ type EtcdResponse struct {
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
Revision int64
}

Expand Down
27 changes: 27 additions & 0 deletions tests/robustness/model/history.go
Expand Up @@ -169,6 +169,25 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
})
}

func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
request := compactRequest(rev)
if err != nil {
h.appendFailed(request, start.Nanoseconds(), err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: compactResponse(revision),
Return: end.Nanoseconds(),
})
}

func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
conds := []EtcdCondition{}
for _, cmp := range cmp {
Expand Down Expand Up @@ -405,6 +424,10 @@ func putResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{}}}, Revision: revision}}
}

func compactRequest(rev int64) EtcdRequest {
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
}

func deleteRequest(key string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Delete: DeleteOptions{Key: key}}}}}
}
Expand All @@ -413,6 +436,10 @@ func deleteResponse(deleted int64, revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}}
}

func compactResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
}

func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest {
return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil)
}
Expand Down