diff --git a/server/storage/schema/changes.go b/server/storage/schema/changes.go index 6eb0b751209..c067ea927e2 100644 --- a/server/storage/schema/changes.go +++ b/server/storage/schema/changes.go @@ -21,17 +21,22 @@ type schemaChange interface { downgradeAction() action } -// addNewField represents adding new field when upgrading. Downgrade will remove the field. -func addNewField(bucket backend.Bucket, fieldName []byte, fieldValue []byte) schemaChange { +type NewField struct { + Bucket backend.Bucket + FieldName []byte + FieldValue []byte +} + +func (f *NewField) schemaChange() schemaChange { return simpleSchemaChange{ upgrade: setKeyAction{ - Bucket: bucket, - FieldName: fieldName, - FieldValue: fieldValue, + Bucket: f.Bucket, + FieldName: f.FieldName, + FieldValue: f.FieldValue, }, downgrade: deleteKeyAction{ - Bucket: bucket, - FieldName: fieldName, + Bucket: f.Bucket, + FieldName: f.FieldName, }, } } diff --git a/server/storage/schema/changes_test.go b/server/storage/schema/changes_test.go index 05b8d49cf44..1ad88ace909 100644 --- a/server/storage/schema/changes_test.go +++ b/server/storage/schema/changes_test.go @@ -30,7 +30,7 @@ func TestUpgradeDowngrade(t *testing.T) { }{ { name: "addNewField empty", - change: addNewField(Meta, []byte("/test"), []byte("1")), + change: (&NewField{Meta, []byte("/test"), []byte("1")}).schemaChange(), expectStateAfterUpgrade: map[string]string{"/test": "1"}, }, } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index eb4fd3a2a5b..180d52832a1 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -123,14 +123,37 @@ func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, return actions, nil } +func NewFieldsForVersion(v semver.Version) []NewField { + if newFields, found := newFieldsMapping[v]; found { + return newFields + } + return nil +} + +func newFieldMappingsToSchemaChanges(newFieldMap map[semver.Version][]NewField) map[semver.Version][]schemaChange { + schemaChangeMap := map[semver.Version][]schemaChange{} + for ver, newFields := range newFieldMap { + changes := []schemaChange{} + for _, f := range newFields { + changes = append(changes, f.schemaChange()) + } + schemaChangeMap[ver] = changes + } + return schemaChangeMap +} + var ( - // schemaChanges list changes that were introduced in a particular version. + // newFieldsMapping list new fields that were introduced in a particular version. // schema was introduced in v3.6 as so its changes were not tracked before. - schemaChanges = map[semver.Version][]schemaChange{ + newFieldsMapping = map[semver.Version][]NewField{ version.V3_6: { - addNewField(Meta, MetaStorageVersionName, emptyStorageVersion), + {Meta, MetaStorageVersionName, emptyStorageVersion}, }, } + // schemaChanges list changes that were introduced in a particular version. + // schema was introduced in v3.6 as so its changes were not tracked before. + schemaChanges = newFieldMappingsToSchemaChanges(newFieldsMapping) + // emptyStorageVersion is used for v3.6 Step for the first time, in all other version StoragetVersion should be set by migrator. // Adding a addNewField for StorageVersion we can reuse logic to remove it when downgrading to v3.5 emptyStorageVersion = []byte("") diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go index 6fbc75bf7f6..f465c6ab555 100644 --- a/tests/e2e/cluster_downgrade_test.go +++ b/tests/e2e/cluster_downgrade_test.go @@ -202,6 +202,9 @@ func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdPr if errC := epc.Close(); errC != nil { t.Fatalf("error closing etcd processes (%v)", errC) } + for _, proc := range epc.Procs { + require.NoError(t, proc.VerifySchemaVersion(epc.Cfg.Logger)) + } }) return epc } diff --git a/tests/e2e/etcd_mix_versions_test.go b/tests/e2e/etcd_mix_versions_test.go index aed2563898e..b6a99824554 100644 --- a/tests/e2e/etcd_mix_versions_test.go +++ b/tests/e2e/etcd_mix_versions_test.go @@ -92,11 +92,15 @@ func mixVersionsSnapshotTestByAddingMember(t *testing.T, cfg *e2e.EtcdProcessClu epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg), e2e.WithSnapshotCount(10), + e2e.WithKeepDataDir(true), ) require.NoError(t, err, "failed to start etcd cluster: %v", err) defer func() { derr := epc.Close() require.NoError(t, derr, "failed to close etcd cluster: %v", derr) + for _, proc := range epc.Procs { + require.NoError(t, proc.VerifySchemaVersion(epc.Cfg.Logger)) + } }() t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)") @@ -135,11 +139,15 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessCl e2e.WithConfig(cfg), e2e.WithSnapshotCount(10), e2e.WithSnapshotCatchUpEntries(10), + e2e.WithKeepDataDir(true), ) require.NoError(t, err, "failed to start etcd cluster: %v", err) defer func() { derr := epc.Close() require.NoError(t, derr, "failed to close etcd cluster: %v", derr) + for _, proc := range epc.Procs { + require.NoError(t, proc.VerifySchemaVersion(epc.Cfg.Logger)) + } }() toPartitionedMember := epc.Procs[mockPartitionNodeIndex] diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index f9d2089a3e3..21b79253a6b 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -34,6 +34,9 @@ import ( "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/pkg/v3/proxy" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/datadir" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/tests/v3/framework/config" ) @@ -60,6 +63,8 @@ type EtcdProcess interface { LazyFS() *LazyFS Logs() LogsExpect Kill() error + // VerifySchemaVersion verifies the db file schema version is compatible with the binary after the process is closed. + VerifySchemaVersion(lg *zap.Logger) error } type LogsExpect interface { @@ -253,6 +258,43 @@ func (ep *EtcdServerProcess) Close() error { return nil } +func (ep *EtcdServerProcess) VerifySchemaVersion(lg *zap.Logger) error { + currentEtcdVer, err := GetVersionFromBinary(ep.cfg.ExecPath) + if err != nil { + return err + } + currentEtcdVer.Patch = 0 + prevEtcdVer := semver.Version{Major: currentEtcdVer.Major, Minor: currentEtcdVer.Minor - 1} + + dbPath := datadir.ToBackendFileName(ep.cfg.DataDirPath) + be := backend.NewDefaultBackend(lg, dbPath) + defer be.Close() + ver, err := schema.UnsafeDetectSchemaVersion(lg, be.BatchTx()) + if err != nil { + return err + } + + // in a mix version cluster, the storage version would be set to the cluster version, + // which could be lower than the server version by 1 minor version. + if currentEtcdVer.LessThan(ver) || ver.LessThan(prevEtcdVer) { + return fmt.Errorf("expect backend schema version to be between [%s, %s], but got %s", prevEtcdVer.String(), currentEtcdVer.String(), ver.String()) + } + // storage schema is generally backward compatible. No need to check the buckets for higher version. + if ep.cfg.ExecPath == BinPath.Etcd { + return nil + } + lg.Info("verify no new storage schema field is present in the db file of last release process") + nextEtcdVer := semver.Version{Major: currentEtcdVer.Major, Minor: currentEtcdVer.Minor + 1} + newFields := schema.NewFieldsForVersion(nextEtcdVer) + for _, f := range newFields { + _, vs := be.BatchTx().UnsafeRange(f.Bucket, f.FieldName, nil, 1) + if len(vs) != 0 { + return fmt.Errorf("expect %s not exist in the %s bucket, but got %s", f.Bucket.Name(), f.FieldName, vs[0]) + } + } + return nil +} + func (ep *EtcdServerProcess) waitReady(ctx context.Context) error { defer close(ep.donec) return WaitReadyExpectProc(ctx, ep.proc, EtcdServerReadyLines)