From 174c8b414762b2c80c47db3d9a1cfc75671abb18 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 10 Jan 2025 17:50:56 +0000 Subject: [PATCH] Create a v2 snapshot when running etcdutl migrate command Also added test to cover the etcdutl migrate command Signed-off-by: Benjamin Wang --- etcdutl/etcdutl/common.go | 88 +++++++++++++++++++++ etcdutl/etcdutl/migrate_command.go | 80 ++++++++++++++----- server/etcdserver/api/membership/cluster.go | 16 ++-- 3 files changed, 160 insertions(+), 24 deletions(-) diff --git a/etcdutl/etcdutl/common.go b/etcdutl/etcdutl/common.go index c7473cc7fd7b..1f9ab34d5013 100644 --- a/etcdutl/etcdutl/common.go +++ b/etcdutl/etcdutl/common.go @@ -16,14 +16,19 @@ package etcdutl import ( "errors" + "fmt" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/pkg/v3/cobrautl" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/datadir" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/storage/wal" "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.etcd.io/raft/v3/raftpb" @@ -68,3 +73,86 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro return snapshot, nil } + +func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error { + var ( + lg = GetLogger() + + snapDir = datadir.ToSnapDir(dataDir) + walDir = datadir.ToWALDir(dataDir) + ) + + ci, term := schema.ReadConsistentIndex(be.ReadTx()) + + cl := membership.NewCluster(lg) + cl.SetBackend(schema.NewMembershipBackend(lg, be)) + cl.UnsafeLoad() + + latestWALSnap, err := getLatestWALSnap(lg, dataDir) + if err != nil { + return err + } + + // Each time before creating the v2 snapshot, etcdserve always flush + // the backend storage (bbolt db), so the consistent index should never + // less than the Index or term of the latest snapshot. + if ci < latestWALSnap.Index || term < latestWALSnap.Term { + // This should never happen + return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term) + } + + if ci == latestWALSnap.Index { + lg.Info("The latest snapshot is already up to date", zap.Uint64("consistent_index", ci)) + return nil + } + + voters, learners := getVotersAndLearners(cl) + confState := raftpb.ConfState{ + Voters: voters, + Learners: learners, + } + + // create the v2 snaspshot file + raftSnap := raftpb.Snapshot{ + Data: etcdserver.GetMembershipInfoInV2Format(lg, cl), + Metadata: raftpb.SnapshotMetadata{ + Index: ci, + Term: term, + ConfState: confState, + }, + } + sn := snap.New(lg, snapDir) + if err = sn.SaveSnap(raftSnap); err != nil { + return err + } + + // save WAL snapshot record + w, err := wal.Open(lg, walDir, latestWALSnap) + if err != nil { + return err + } + defer w.Close() + // We must read all records to locate the tail of the last valid WAL file. + if _, _, _, err = w.ReadAll(); err != nil { + return err + } + + return w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}) +} + +func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) { + var ( + voters []uint64 + learners []uint64 + ) + for _, m := range cl.Members() { + if m.IsLearner { + learners = append(learners, uint64(m.ID)) + continue + } + + voters = append(voters, uint64(m.ID)) + } + + return voters, learners +} diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index a265c7baae76..bd491bc0c10d 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -74,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) { func (o *migrateOptions) Config() (*migrateConfig, error) { c := &migrateConfig{ - force: o.force, - lg: GetLogger(), + force: o.force, + dataDir: o.dataDir, + lg: GetLogger(), } var err error dotCount := strings.Count(o.targetVersion, ".") @@ -90,47 +91,75 @@ func (o *migrateOptions) Config() (*migrateConfig, error) { return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion)) } - dbPath := datadir.ToBackendFileName(o.dataDir) - c.be = backend.NewDefaultBackend(GetLogger(), dbPath) + return c, nil +} + +type migrateConfig struct { + lg *zap.Logger + be backend.Backend + targetVersion *semver.Version + walVersion schema.WALVersion + dataDir string + force bool +} - walPath := datadir.ToWALDir(o.dataDir) - walSnap, err := getLatestWALSnap(c.lg, o.dataDir) +func (c *migrateConfig) finalize() error { + walPath := datadir.ToWALDir(c.dataDir) + walSnap, err := getLatestWALSnap(c.lg, c.dataDir) if err != nil { - return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err) + return fmt.Errorf("failed to get the lastest snapshot: %w", err) } w, err := wal.OpenForRead(c.lg, walPath, walSnap) if err != nil { - return nil, fmt.Errorf(`failed to open wal: %w`, err) + return fmt.Errorf(`failed to open wal: %w`, err) } defer w.Close() c.walVersion, err = wal.ReadWALVersion(w) if err != nil { - return nil, fmt.Errorf(`failed to read wal: %w`, err) + return fmt.Errorf(`failed to read wal: %w`, err) } - return c, nil -} - -type migrateConfig struct { - lg *zap.Logger - be backend.Backend - targetVersion *semver.Version - walVersion schema.WALVersion - force bool + return nil } func migrateCommandFunc(c *migrateConfig) error { + dbPath := datadir.ToBackendFileName(c.dataDir) + c.be = backend.NewDefaultBackend(GetLogger(), dbPath) defer c.be.Close() + tx := c.be.BatchTx() current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx()) if err != nil { - c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") + c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err)) return err } if current == *c.targetVersion { c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(¤t))) return nil } + + downgrade, err := isDowngrade(c.lg, tx, c.targetVersion) + if err != nil { + return err + } + if downgrade { + // Update cluster version + be := schema.NewMembershipBackend(c.lg, c.be) + be.MustSaveClusterVersionToBackend(c.targetVersion) + + // forcibly create a v2 snapshot file + // TODO: remove in 3.8 + if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil { + c.lg.Error("Failed to create v2 snapshot file", zap.Error(err)) + return err + } + } + + if err = c.finalize(); err != nil { + c.lg.Error("Failed to finalize config", zap.Error(err)) + return err + } + err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion) if err != nil { if !c.force { @@ -139,7 +168,9 @@ func migrateCommandFunc(c *migrateConfig) error { c.lg.Info("normal migrate failed, trying with force", zap.Error(err)) migrateForce(c.lg, tx, c.targetVersion) } + c.be.ForceCommit() + return nil } @@ -156,6 +187,17 @@ func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { } } +func isDowngrade(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) (bool, error) { + tx.LockOutsideApply() + defer tx.Unlock() + ver, err := schema.UnsafeDetectSchemaVersion(lg, tx) + if err != nil { + lg.Error("Failed to detect current storage version", zap.Error(err)) + return false, err + } + return target.LessThan(ver), nil +} + func storageVersionToString(ver *semver.Version) string { return fmt.Sprintf("%d.%d", ver.Major, ver.Minor) } diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 6539b977d233..c40a65c8a0af 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -256,10 +256,7 @@ func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) { c.versionChanged = n } -func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { - c.Lock() - defer c.Unlock() - +func (c *RaftCluster) UnsafeLoad() { if c.be != nil { c.version = c.be.ClusterVersionFromBackend() c.members, c.removed = c.be.MustReadMembersFromBackend() @@ -267,11 +264,20 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { c.version = clusterVersionFromStore(c.lg, c.v2store) c.members, c.removed = membersFromStore(c.lg, c.v2store) } - c.buildMembershipMetric() if c.be != nil { c.downgradeInfo = c.be.DowngradeInfoFromBackend() } +} + +func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { + c.Lock() + defer c.Unlock() + + c.UnsafeLoad() + + c.buildMembershipMetric() + sv := semver.Must(semver.NewVersion(version.Version)) if c.downgradeInfo != nil && c.downgradeInfo.Enabled { c.lg.Info(