Skip to content

Commit

Permalink
e2e test: add schema version verification in mix_version_test.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Apr 30, 2024
1 parent c2a3ca6 commit ca3d48c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 0 deletions.
3 changes: 3 additions & 0 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdPr
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
for _, proc := range epc.Procs {
require.NoError(t, proc.VerifySchemaVersion(epc.Cfg.Logger))
}
})
return epc
}
Expand Down
8 changes: 8 additions & 0 deletions tests/e2e/etcd_mix_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ func mixVersionsSnapshotTestByAddingMember(t *testing.T, cfg *e2e.EtcdProcessClu
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithConfig(cfg),
e2e.WithSnapshotCount(10),
e2e.WithKeepDataDir(true),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
derr := epc.Close()
require.NoError(t, derr, "failed to close etcd cluster: %v", derr)
for _, proc := range epc.Procs {
require.NoError(t, proc.VerifySchemaVersion(epc.Cfg.Logger))
}
}()

t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)")
Expand Down Expand Up @@ -135,11 +139,15 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessCl
e2e.WithConfig(cfg),
e2e.WithSnapshotCount(10),
e2e.WithSnapshotCatchUpEntries(10),
e2e.WithKeepDataDir(true),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
derr := epc.Close()
require.NoError(t, derr, "failed to close etcd cluster: %v", derr)
for _, proc := range epc.Procs {
require.NoError(t, proc.VerifySchemaVersion(epc.Cfg.Logger))
}
}()
toPartitionedMember := epc.Procs[mockPartitionNodeIndex]

Expand Down
57 changes: 57 additions & 0 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ import (
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/pkg/v3/proxy"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/tests/v3/framework/config"
)

Expand All @@ -60,6 +64,8 @@ type EtcdProcess interface {
LazyFS() *LazyFS
Logs() LogsExpect
Kill() error
// VerifySchemaVersion verifies the db file schema version is compatible with the binary after the process is closed.
VerifySchemaVersion(lg *zap.Logger) error
}

type LogsExpect interface {
Expand Down Expand Up @@ -253,6 +259,57 @@ func (ep *EtcdServerProcess) Close() error {
return nil
}

func (ep *EtcdServerProcess) VerifySchemaVersion(lg *zap.Logger) error {
currentEtcdVer, err := ep.getEtcdVersion(lg)
if err != nil {
return err
}
prevEtcdVer := semver.Version{Major: currentEtcdVer.Major, Minor: currentEtcdVer.Minor - 1}

dbPath := datadir.ToBackendFileName(ep.cfg.DataDirPath)
be := backend.NewDefaultBackend(lg, dbPath)
defer be.Close()
ver, err := schema.UnsafeDetectSchemaVersion(lg, be.BatchTx())
if err != nil {
return err
}

// in a mix version cluster, the storage version would be set to the cluster version,
// which could be lower than the server version by 1 minor version.
if currentEtcdVer.LessThan(ver) || ver.LessThan(prevEtcdVer) {
return fmt.Errorf("expect backend schema version to be between [%s, %s], but got %s", prevEtcdVer.String(), currentEtcdVer.String(), ver.String())
}
// check new fields introduced in V3_6 do not exist in V3_5 data file.
// V3_6 contains all the fields in V3_5, so no need to check for V3_6 servers.
if *currentEtcdVer == version.V3_5 {
_, vs := be.BatchTx().UnsafeRange(schema.Meta, schema.MetaStorageVersionName, nil, 1)
if len(vs) != 0 {
return fmt.Errorf("expect storageVersion not exist in the meta bucket, but got %s", string(vs[0]))
}
}
return nil
}

// getEtcdVersion returns the binary version directly from the ExecPath.
func (ep *EtcdServerProcess) getEtcdVersion(lg *zap.Logger) (*semver.Version, error) {
proc, err := SpawnCmd(append([]string{ep.cfg.ExecPath}, "--version"), nil)
if err != nil {
return nil, err
}
defer proc.Close()
line, err := proc.Expect("etcd Version:")
if err != nil {
return nil, err
}
verStr := strings.TrimSpace(strings.Split(line, ":")[1])
lg.Info("getEtcdVersion", zap.String("etcd_version", verStr))
v, err := semver.NewVersion(verStr)
if err != nil {
return nil, err
}
return &semver.Version{Major: v.Major, Minor: v.Minor}, nil
}

func (ep *EtcdServerProcess) waitReady(ctx context.Context) error {
defer close(ep.donec)
return WaitReadyExpectProc(ctx, ep.proc, EtcdServerReadyLines)
Expand Down

0 comments on commit ca3d48c

Please sign in to comment.