Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e test: add schema version verification in mix_version_test. #17881

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 12 additions & 7 deletions server/storage/schema/changes.go
Expand Up @@ -21,17 +21,22 @@ type schemaChange interface {
downgradeAction() action
}

// addNewField represents adding new field when upgrading. Downgrade will remove the field.
func addNewField(bucket backend.Bucket, fieldName []byte, fieldValue []byte) schemaChange {
type NewField struct {
Bucket backend.Bucket
FieldName []byte
FieldValue []byte
}

func (f *NewField) schemaChange() schemaChange {
return simpleSchemaChange{
upgrade: setKeyAction{
Bucket: bucket,
FieldName: fieldName,
FieldValue: fieldValue,
Bucket: f.Bucket,
FieldName: f.FieldName,
FieldValue: f.FieldValue,
},
downgrade: deleteKeyAction{
Bucket: bucket,
FieldName: fieldName,
Bucket: f.Bucket,
FieldName: f.FieldName,
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/storage/schema/changes_test.go
Expand Up @@ -30,7 +30,7 @@ func TestUpgradeDowngrade(t *testing.T) {
}{
{
name: "addNewField empty",
change: addNewField(Meta, []byte("/test"), []byte("1")),
change: (&NewField{Meta, []byte("/test"), []byte("1")}).schemaChange(),
expectStateAfterUpgrade: map[string]string{"/test": "1"},
},
}
Expand Down
29 changes: 26 additions & 3 deletions server/storage/schema/schema.go
Expand Up @@ -123,14 +123,37 @@ func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange,
return actions, nil
}

func NewFieldsForVersion(v semver.Version) []NewField {
if newFields, found := newFieldsMapping[v]; found {
return newFields
}
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should panic/error if user calls with invalid version.

}

func newFieldMappingsToSchemaChanges(newFieldMap map[semver.Version][]NewField) map[semver.Version][]schemaChange {
schemaChangeMap := map[semver.Version][]schemaChange{}
for ver, newFields := range newFieldMap {
changes := []schemaChange{}
for _, f := range newFields {
changes = append(changes, f.schemaChange())
}
schemaChangeMap[ver] = changes
}
return schemaChangeMap
}

var (
// schemaChanges list changes that were introduced in a particular version.
// newFieldsMapping list new fields that were introduced in a particular version.
// schema was introduced in v3.6 as so its changes were not tracked before.
schemaChanges = map[semver.Version][]schemaChange{
newFieldsMapping = map[semver.Version][]NewField{
version.V3_6: {
addNewField(Meta, MetaStorageVersionName, emptyStorageVersion),
{Meta, MetaStorageVersionName, emptyStorageVersion},
},
}
// schemaChanges list changes that were introduced in a particular version.
// schema was introduced in v3.6 as so its changes were not tracked before.
schemaChanges = newFieldMappingsToSchemaChanges(newFieldsMapping)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't like this change, it limits the possible schema changes to just adding new empty fields.


// emptyStorageVersion is used for v3.6 Step for the first time, in all other version StoragetVersion should be set by migrator.
// Adding a addNewField for StorageVersion we can reuse logic to remove it when downgrading to v3.5
emptyStorageVersion = []byte("")
Expand Down
3 changes: 3 additions & 0 deletions tests/e2e/cluster_downgrade_test.go
Expand Up @@ -202,6 +202,9 @@ func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdPr
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
for _, proc := range epc.Procs {
require.NoError(t, proc.VerifySchemaVersion(epc.Cfg.Logger))
}
})
return epc
}
Expand Down
8 changes: 8 additions & 0 deletions tests/e2e/etcd_mix_versions_test.go
Expand Up @@ -92,11 +92,15 @@ func mixVersionsSnapshotTestByAddingMember(t *testing.T, cfg *e2e.EtcdProcessClu
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithConfig(cfg),
e2e.WithSnapshotCount(10),
e2e.WithKeepDataDir(true),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
derr := epc.Close()
require.NoError(t, derr, "failed to close etcd cluster: %v", derr)
for _, proc := range epc.Procs {
require.NoError(t, proc.VerifySchemaVersion(epc.Cfg.Logger))
}
}()

t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)")
Expand Down Expand Up @@ -135,11 +139,15 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessCl
e2e.WithConfig(cfg),
e2e.WithSnapshotCount(10),
e2e.WithSnapshotCatchUpEntries(10),
e2e.WithKeepDataDir(true),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
derr := epc.Close()
require.NoError(t, derr, "failed to close etcd cluster: %v", derr)
for _, proc := range epc.Procs {
require.NoError(t, proc.VerifySchemaVersion(epc.Cfg.Logger))
}
}()
toPartitionedMember := epc.Procs[mockPartitionNodeIndex]

Expand Down
42 changes: 42 additions & 0 deletions tests/framework/e2e/etcd_process.go
Expand Up @@ -34,6 +34,9 @@ import (
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/pkg/v3/proxy"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/tests/v3/framework/config"
)

Expand All @@ -60,6 +63,8 @@ type EtcdProcess interface {
LazyFS() *LazyFS
Logs() LogsExpect
Kill() error
// VerifySchemaVersion verifies the db file schema version is compatible with the binary after the process is closed.
VerifySchemaVersion(lg *zap.Logger) error
}

type LogsExpect interface {
Expand Down Expand Up @@ -253,6 +258,43 @@ func (ep *EtcdServerProcess) Close() error {
return nil
}

func (ep *EtcdServerProcess) VerifySchemaVersion(lg *zap.Logger) error {
currentEtcdVer, err := GetVersionFromBinary(ep.cfg.ExecPath)
if err != nil {
return err
}
currentEtcdVer.Patch = 0
prevEtcdVer := semver.Version{Major: currentEtcdVer.Major, Minor: currentEtcdVer.Minor - 1}

dbPath := datadir.ToBackendFileName(ep.cfg.DataDirPath)
be := backend.NewDefaultBackend(lg, dbPath)
defer be.Close()
ver, err := schema.UnsafeDetectSchemaVersion(lg, be.BatchTx())
if err != nil {
return err
}

// in a mix version cluster, the storage version would be set to the cluster version,
// which could be lower than the server version by 1 minor version.
if currentEtcdVer.LessThan(ver) || ver.LessThan(prevEtcdVer) {
return fmt.Errorf("expect backend schema version to be between [%s, %s], but got %s", prevEtcdVer.String(), currentEtcdVer.String(), ver.String())
}
// storage schema is generally backward compatible. No need to check the buckets for higher version.
if ep.cfg.ExecPath == BinPath.Etcd {
return nil
}
lg.Info("verify no new storage schema field is present in the db file of last release process")
nextEtcdVer := semver.Version{Major: currentEtcdVer.Major, Minor: currentEtcdVer.Minor + 1}
newFields := schema.NewFieldsForVersion(nextEtcdVer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validating consistency of schema should be a feature of schema package. Same as bbolt consistency check, schema should be able to confirm that fields presents and their values are consistent with storage version value.

for _, f := range newFields {
_, vs := be.BatchTx().UnsafeRange(f.Bucket, f.FieldName, nil, 1)
if len(vs) != 0 {
return fmt.Errorf("expect %s not exist in the %s bucket, but got %s", f.Bucket.Name(), f.FieldName, vs[0])
}
}
return nil
}

func (ep *EtcdServerProcess) waitReady(ctx context.Context) error {
defer close(ep.donec)
return WaitReadyExpectProc(ctx, ep.proc, EtcdServerReadyLines)
Expand Down