Skip to content

Commit

Permalink
Support cross storage backup/restore (#398)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Aug 21, 2024
1 parent c85a7ac commit 2b91f30
Show file tree
Hide file tree
Showing 17 changed files with 1,004 additions and 268 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Variables
BINARY_NAME=milvus-backup
VERSION=$(shell git describe --tags --always --dirty)
VERSION=$(shell git describe --tags --always)
COMMIT=$(shell git rev-parse --short HEAD)
DATE=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ')

Expand Down
17 changes: 11 additions & 6 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@ milvus:

# Related configuration of minio, which is responsible for data persistence for Milvus.
minio:
# cloudProvider: "minio" # deprecated use storageType instead
# Milvus storage configs, make them the same with milvus config
storageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent)

address: localhost # Address of MinIO/S3
port: 9000 # Port of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
secretAccessKey: minioadmin # MinIO/S3 encryption string
useSSL: false # Access to MinIO/S3 with SSL
useIAM: false
iamEndpoint: ""

bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance
rootPath: "files" # Milvus storage root path in MinIO/S3, make it the same as your milvus instance

# only for azure
# Backup storage configs, the storage you want to put the backup data
backupStorageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent)
backupAddress: localhost # Address of MinIO/S3
backupPort: 9000 # Port of MinIO/S3
backupAccessKeyID: minioadmin # accessKeyID of MinIO/S3
backupSecretAccessKey: minioadmin # MinIO/S3 encryption string

backupBucketName: "a-bucket" # Bucket name to store backup data. Backup data will store to backupBucketName/backupRootPath
backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath

Expand All @@ -60,4 +60,9 @@ backup:
gcPause:
enable: true
seconds: 7200
address: http://localhost:9091
address: http://localhost:9091

# If you need to backup or restore data between two different storage systems,
# direct client-side copying is not supported.
# Set this option to true to enable data transfer through Milvus Backup.
copyByServer: "false"
139 changes: 118 additions & 21 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ type BackupContext struct {
milvusClient *MilvusClient

// data storage client
storageClient *storage.ChunkManager
milvusStorageClient storage.ChunkManager
backupStorageClient storage.ChunkManager
backupCopier *storage.Copier
restoreCopier *storage.Copier

milvusBucketName string
backupBucketName string
milvusRootPath string
Expand Down Expand Up @@ -82,13 +86,28 @@ func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (go
return c, nil
}

func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (storage.ChunkManager, error) {
// Deprecated
func createStorageClient(ctx context.Context, params paramtable.BackupParams) (storage.ChunkManager, error) {
minioEndPoint := params.MinioCfg.Address + ":" + params.MinioCfg.Port
log.Debug("Start minio client",
zap.String("address", minioEndPoint),
zap.String("bucket", params.MinioCfg.BucketName),
zap.String("backupBucket", params.MinioCfg.BackupBucketName))
minioClient, err := storage.NewChunkManager(ctx, params)

storageConfig := &storage.StorageConfig{
StorageType: params.MinioCfg.StorageType,
Address: minioEndPoint,
BucketName: params.MinioCfg.BucketName,
AccessKeyID: params.MinioCfg.AccessKeyID,
SecretAccessKeyID: params.MinioCfg.SecretAccessKey,
UseSSL: params.MinioCfg.UseSSL,
UseIAM: params.MinioCfg.UseIAM,
IAMEndpoint: params.MinioCfg.IAMEndpoint,
RootPath: params.MinioCfg.RootPath,
CreateBucket: true,
}

minioClient, err := storage.NewChunkManager(ctx, params, storageConfig)
return minioClient, err
}

Expand Down Expand Up @@ -135,16 +154,94 @@ func (b *BackupContext) getMilvusClient() *MilvusClient {
return b.milvusClient
}

func (b *BackupContext) getStorageClient() storage.ChunkManager {
if b.storageClient == nil {
storageClient, err := CreateStorageClient(b.ctx, b.params)
func (b *BackupContext) getMilvusStorageClient() storage.ChunkManager {
if b.milvusStorageClient == nil {
minioEndPoint := b.params.MinioCfg.Address + ":" + b.params.MinioCfg.Port
log.Debug("create milvus storage client",
zap.String("address", minioEndPoint),
zap.String("bucket", b.params.MinioCfg.BucketName),
zap.String("backupBucket", b.params.MinioCfg.BackupBucketName))

storageConfig := &storage.StorageConfig{
StorageType: b.params.MinioCfg.StorageType,
Address: minioEndPoint,
BucketName: b.params.MinioCfg.BucketName,
AccessKeyID: b.params.MinioCfg.AccessKeyID,
SecretAccessKeyID: b.params.MinioCfg.SecretAccessKey,
UseSSL: b.params.MinioCfg.UseSSL,
UseIAM: b.params.MinioCfg.UseIAM,
IAMEndpoint: b.params.MinioCfg.IAMEndpoint,
RootPath: b.params.MinioCfg.RootPath,
CreateBucket: true,
}

storageClient, err := storage.NewChunkManager(b.ctx, b.params, storageConfig)
if err != nil {
log.Error("failed to initial storage client", zap.Error(err))
panic(err)
}
b.storageClient = &storageClient
b.milvusStorageClient = storageClient
}
return *b.storageClient
return b.milvusStorageClient
}

func (b *BackupContext) getBackupStorageClient() storage.ChunkManager {
if b.backupStorageClient == nil {
minioEndPoint := b.params.MinioCfg.BackupAddress + ":" + b.params.MinioCfg.BackupPort
log.Debug("create backup storage client",
zap.String("address", minioEndPoint),
zap.String("bucket", b.params.MinioCfg.BucketName),
zap.String("backupBucket", b.params.MinioCfg.BackupBucketName))

storageConfig := &storage.StorageConfig{
StorageType: b.params.MinioCfg.BackupStorageType,
Address: minioEndPoint,
BucketName: b.params.MinioCfg.BackupBucketName,
AccessKeyID: b.params.MinioCfg.BackupAccessKeyID,
SecretAccessKeyID: b.params.MinioCfg.BackupSecretAccessKey,
UseSSL: b.params.MinioCfg.BackupUseSSL,
UseIAM: b.params.MinioCfg.BackupUseIAM,
IAMEndpoint: b.params.MinioCfg.BackupIAMEndpoint,
RootPath: b.params.MinioCfg.BackupRootPath,
CreateBucket: true,
}

storageClient, err := storage.NewChunkManager(b.ctx, b.params, storageConfig)
if err != nil {
log.Error("failed to initial storage client", zap.Error(err))
panic(err)
}
b.backupStorageClient = storageClient
}
return b.backupStorageClient
}

func (b *BackupContext) getBackupCopier() *storage.Copier {
if b.backupCopier == nil {
b.backupCopier = storage.NewCopier(
b.getMilvusStorageClient(),
b.getBackupStorageClient(),
storage.CopyOption{
WorkerNum: b.params.BackupCfg.BackupCopyDataParallelism,
RPS: RPS,
CopyByServer: b.params.BackupCfg.CopyByServer,
})
}
return b.backupCopier
}

func (b *BackupContext) getRestoreCopier() *storage.Copier {
if b.restoreCopier == nil {
b.restoreCopier = storage.NewCopier(
b.getBackupStorageClient(),
b.getMilvusStorageClient(),
storage.CopyOption{
WorkerNum: b.params.BackupCfg.BackupCopyDataParallelism,
RPS: RPS,
CopyByServer: b.params.BackupCfg.CopyByServer,
})
}
return b.restoreCopier
}

func (b *BackupContext) getBackupCollectionWorkerPool() *common.WorkerPool {
Expand Down Expand Up @@ -308,7 +405,7 @@ func (b *BackupContext) ListBackups(ctx context.Context, request *backuppb.ListB
}

// 1, trigger inner sync to get the newest backup list in the milvus cluster
backupPaths, _, err := b.getStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
backupPaths, _, err := b.getBackupStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
if err != nil {
log.Error("Fail to list backup directory", zap.Error(err))
resp.Code = backuppb.ResponseCode_Fail
Expand Down Expand Up @@ -394,7 +491,7 @@ func (b *BackupContext) DeleteBackup(ctx context.Context, request *backuppb.Dele
BackupName: request.GetBackupName(),
})
// always trigger a remove to make sure it is deleted
err := b.getStorageClient().RemoveWithPrefix(ctx, b.backupBucketName, BackupDirPath(b.backupRootPath, request.GetBackupName()))
err := b.getBackupStorageClient().RemoveWithPrefix(ctx, b.backupBucketName, BackupDirPath(b.backupRootPath, request.GetBackupName()))

if getResp.GetCode() == backuppb.ResponseCode_Request_Object_Not_Found {
resp.Code = backuppb.ResponseCode_Request_Object_Not_Found
Expand Down Expand Up @@ -465,7 +562,7 @@ func (b *BackupContext) readBackup(ctx context.Context, bucketName string, backu
partitionMetaPath := backupMetaDirPath + SEPERATOR + PARTITION_META_FILE
segmentMetaPath := backupMetaDirPath + SEPERATOR + SEGMENT_META_FILE

exist, err := b.getStorageClient().Exist(ctx, bucketName, backupMetaPath)
exist, err := b.getBackupStorageClient().Exist(ctx, bucketName, backupMetaPath)
if err != nil {
log.Error("check backup meta file failed", zap.String("path", backupMetaPath), zap.Error(err))
return nil, err
Expand All @@ -475,22 +572,22 @@ func (b *BackupContext) readBackup(ctx context.Context, bucketName string, backu
return nil, err
}

backupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, backupMetaPath)
backupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, backupMetaPath)
if err != nil {
log.Error("Read backup meta failed", zap.String("path", backupMetaPath), zap.Error(err))
return nil, err
}
collectionBackupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, collectionMetaPath)
collectionBackupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, collectionMetaPath)
if err != nil {
log.Error("Read collection meta failed", zap.String("path", collectionMetaPath), zap.Error(err))
return nil, err
}
partitionBackupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, partitionMetaPath)
partitionBackupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, partitionMetaPath)
if err != nil {
log.Error("Read partition meta failed", zap.String("path", partitionMetaPath), zap.Error(err))
return nil, err
}
segmentBackupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, segmentMetaPath)
segmentBackupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, segmentMetaPath)
if err != nil {
log.Error("Read segment meta failed", zap.String("path", segmentMetaPath), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -573,7 +670,7 @@ func (b *BackupContext) Check(ctx context.Context) string {
"backup-rootpath: %s\n",
version, b.milvusBucketName, b.milvusRootPath, b.backupBucketName, b.backupRootPath)

paths, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR, false)
paths, _, err := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR, false)
if err != nil {
return "Failed to connect to storage milvus path\n" + info + err.Error()
}
Expand All @@ -582,27 +679,27 @@ func (b *BackupContext) Check(ctx context.Context) string {
return "Milvus storage is empty. Please verify whether your cluster is really empty. If not, the configs(minio address, port, bucket, rootPath) may be wrong\n" + info
}

paths, _, err = b.getStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
paths, _, err = b.getBackupStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
if err != nil {
return "Failed to connect to storage backup path " + info + err.Error()
}

CHECK_PATH := "milvus_backup_check_" + time.Now().String()

err = b.getStorageClient().Write(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, []byte{1})
err = b.getMilvusStorageClient().Write(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, []byte{1})
if err != nil {
return "Failed to connect to storage milvus path\n" + info + err.Error()
}
defer func() {
b.getStorageClient().Remove(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH)
b.getMilvusStorageClient().Remove(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH)
}()

err = b.getStorageClient().Copy(ctx, b.milvusBucketName, b.backupBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, b.backupRootPath+SEPERATOR+CHECK_PATH)
err = b.getMilvusStorageClient().Copy(ctx, b.milvusBucketName, b.backupBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, b.backupRootPath+SEPERATOR+CHECK_PATH)
if err != nil {
return "Failed to copy file from milvus storage to backup storage\n" + info + err.Error()
}
defer func() {
b.getStorageClient().Remove(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR+CHECK_PATH)
b.getBackupStorageClient().Remove(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR+CHECK_PATH)
}()

return "Succeed to connect to milvus and storage.\n" + info
Expand Down
2 changes: 1 addition & 1 deletion core/backup_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestGetBackupFaultBackup(t *testing.T) {
resp := backupContext.CreateBackup(context, req)
assert.Equal(t, backuppb.ResponseCode_Success, resp.GetCode())

backupContext.getStorageClient().RemoveWithPrefix(context, params.MinioCfg.BackupBucketName, BackupMetaPath(params.MinioCfg.BackupRootPath, resp.GetData().GetName()))
backupContext.getMilvusStorageClient().RemoveWithPrefix(context, params.MinioCfg.BackupBucketName, BackupMetaPath(params.MinioCfg.BackupRootPath, resp.GetData().GetName()))

backup := backupContext.GetBackup(context, &backuppb.GetBackupRequest{
BackupName: randBackupName,
Expand Down
Loading

0 comments on commit 2b91f30

Please sign in to comment.