From dfff96f7db1b0399bb93b83b7edce3b0d3063ef1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cgifi-siby=E2=80=9D?= Date: Mon, 6 Jan 2025 10:30:35 -0800 Subject: [PATCH] GCP native support (for the Google Cloud Platform provider) Signed-off-by: gifi-siby --- README.md | 7 +- cmd/backup_yaml.go | 29 +- configs/backup.yaml | 7 +- core/backup_context.go | 3 + core/paramtable/base_table.go | 11 + core/paramtable/params.go | 58 ++-- core/storage/chunk_manager.go | 2 + core/storage/gcp_native_chunk_manager.go | 340 +++++++++++++++++++++++ core/storage/options.go | 8 + go.mod | 12 +- go.sum | 10 + 11 files changed, 449 insertions(+), 38 deletions(-) create mode 100644 core/storage/gcp_native_chunk_manager.go diff --git a/README.md b/README.md index 87b0d62..86c4da4 100644 --- a/README.md +++ b/README.md @@ -104,20 +104,22 @@ Below is a summary of the configurations supported in `backup.yaml`: | | `tlsMode` | TLS mode (0: none, 1: one-way, 2: two-way). | `0` | | | `user` | Username for Milvus. | `root` | | | `password` | Password for Milvus. | `Milvus` | -| `minio` | `storageType` | Storage type for Milvus (e.g., `local`, `minio`, `s3`, `aws`, `gcp`, `ali(aliyun)`, `azure`, `tc(tencent)`). | `minio` | +| `minio` | `storageType` | Storage type for Milvus (e.g., `local`, `minio`, `s3`, `aws`, `gcp`, `ali(aliyun)`, `azure`, `tc(tencent)`, `gcpnative`). Use `gcpnative` for the Google Cloud Platform provider. | `minio` | | | `address` | MinIO/S3 address. | `localhost` | | | `port` | MinIO/S3 port. | `9000` | | | `accessKeyID` | MinIO/S3 access key ID. | `minioadmin` | | | `secretAccessKey` | MinIO/S3 secret access key. | `minioadmin` | +| | `gcpCredentialJSON` | Path to your GCP credential JSON key file. Used only for the "gcpnative" cloud provider. | `/path/to/file` | | | `useSSL` | Whether to use SSL for MinIO/S3. | `false` | | | `bucketName` | Bucket name in MinIO/S3. | `a-bucket` | | | `rootPath` | Storage root path in MinIO/S3. | `files` | -| `minio (backup)` | `backupStorageType` | Backup storage type (e.g., `local`, `minio`, `s3`, `aws`, `gcp`, `ali(aliyun)`, `azure`, `tc(tencent)`). | `minio` | +| `minio (backup)` | `backupStorageType` | Backup storage type for Milvus (e.g., `local`, `minio`, `s3`, `aws`, `gcp`, `ali(aliyun)`, `azure`, `tc(tencent)`, `gcpnative`). Use `gcpnative` for the Google Cloud Platform provider. | `minio` | | | `backupAddress` | Address of backup storage. | `localhost` | | | `backupPort` | Port of backup storage. | `9000` | | | `backupUseSSL` | Whether to use SSL for backup storage. | `false` | | | `backupAccessKeyID` | Backup storage access key ID. | `minioadmin` | | | `backupSecretAccessKey` | Backup storage secret access key. | `minioadmin` | +| | `backupGcpCredentialJSON` | Path to your GCP credential JSON key file. Used only for the "gcpnative" cloud provider. | `/path/to/file` | | | `backupBucketName` | Bucket name for backups. | `a-bucket` | | | `backupRootPath` | Root path to store backup data. | `backup` | | | `crossStorage` | Enable cross-storage backups (e.g., MinIO to AWS S3). | `false` | @@ -153,6 +155,7 @@ backupAddress: s3.{your-aws-region}.amazonaws.com # Address of AWS S3 (replace backupPort: 443 # Default port for AWS S3 backupAccessKeyID: # Access key ID for your AWS S3 backupSecretAccessKey: # Secret access key for AWS S3 +backupGcpCredentialJSON: "/path/to/file" # Path to your GCP credential JSON key file. Used only for the "gcpnative" cloud provider. backupBucketName: "your-bucket-name" # Bucket name where the backups will be stored backupRootPath: "backups" # Root path inside the bucket to store backups backupUseSSL: true # Use SSL for secure connections (Required) diff --git a/cmd/backup_yaml.go b/cmd/backup_yaml.go index 20e4492..793a747 100644 --- a/cmd/backup_yaml.go +++ b/cmd/backup_yaml.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "strings" + "github.com/spf13/cobra" "github.com/zilliztech/milvus-backup/core/paramtable" "gopkg.in/yaml.v3" @@ -41,18 +42,20 @@ type YAMLConFig struct { Password string `yaml:"password"` } `yaml:"milvus"` Minio struct { - Address string `yaml:"address"` - Port int `yaml:"port"` - AccessKeyID string `yaml:"accessKeyID"` - secretAccessKey string `yaml:"secretAccessKey"` - UseSSL bool `yaml:"useSSL"` - UseIAM bool `yaml:"useIAM"` - CloudProvider string `yaml:"cloudProvider"` - IamEndpoint string `yaml:"iamEndpoint"` - BucketName string `yaml:"bucketName"` - RootPath string `yaml:"rootPath"` - BackupBucketName string `yaml:"backupBucketName"` - BackupRootPath string `yaml:"backupRootPath"` + Address string `yaml:"address"` + Port int `yaml:"port"` + AccessKeyID string `yaml:"accessKeyID"` + secretAccessKey string `yaml:"secretAccessKey"` + GcpCredentialJSON string `yaml:"gcpCredentialJSON"` + UseSSL bool `yaml:"useSSL"` + UseIAM bool `yaml:"useIAM"` + CloudProvider string `yaml:"cloudProvider"` + IamEndpoint string `yaml:"iamEndpoint"` + BucketName string `yaml:"bucketName"` + RootPath string `yaml:"rootPath"` + BackupGcpCredentialJSON string `yaml:"backupGcpCredentialJSON"` + BackupBucketName string `yaml:"backupBucketName"` + BackupRootPath string `yaml:"backupRootPath"` } `yaml:"minio"` Backup struct { MaxSegmentGroupSize string `yaml:"maxSegmentGroupSize"` @@ -82,12 +85,14 @@ func printParams(base *paramtable.BackupParams) { yml.Minio.Port = base.ParseIntWithDefault("minio.port", 9000) yml.Minio.AccessKeyID = base.BaseTable.LoadWithDefault("minio.accessKeyID", "") yml.Minio.secretAccessKey = base.BaseTable.LoadWithDefault("minio.secretAccessKey", "") + yml.Minio.GcpCredentialJSON = base.BaseTable.LoadWithDefault("minio.gcpCredentialJSON", "") yml.Minio.UseSSL = base.ParseBool("minio.useSSL", false) yml.Minio.UseIAM = base.ParseBool("minio.useIAM", false) yml.Minio.CloudProvider = base.BaseTable.LoadWithDefault("minio.cloudProvider", "aws") yml.Minio.IamEndpoint = base.BaseTable.LoadWithDefault("minio.iamEndpoint", "") yml.Minio.BucketName = base.BaseTable.LoadWithDefault("minio.bucketName", "") yml.Minio.RootPath = base.LoadWithDefault("minio.rootPath", "") + yml.Minio.BackupGcpCredentialJSON = base.BaseTable.LoadWithDefault("minio.backupGcpCredentialJSON", "") yml.Minio.BackupBucketName = base.LoadWithDefault("minio.backupBucketName", "") yml.Minio.BackupRootPath = base.LoadWithDefault("minio.backupRootPath", "") diff --git a/configs/backup.yaml b/configs/backup.yaml index ae5a4bb..bfcabd5 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -22,11 +22,13 @@ milvus: # Related configuration of minio, which is responsible for data persistence for Milvus. minio: # Milvus storage configs, make them the same with milvus config - storageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent) + storageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent), gcpnative + # You can use "gcpnative" for the Google Cloud Platform provider. Uses service account credentials for authentication. address: localhost # Address of MinIO/S3 port: 9000 # Port of MinIO/S3 accessKeyID: minioadmin # accessKeyID of MinIO/S3 secretAccessKey: minioadmin # MinIO/S3 encryption string + gcpCredentialJSON: "" # The JSON content contains the gcs service account credentials. Used only for the "gcpnative" cloud provider. useSSL: false # Access to MinIO/S3 with SSL useIAM: false iamEndpoint: "" @@ -34,11 +36,12 @@ minio: rootPath: "files" # Milvus storage root path in MinIO/S3, make it the same as your milvus instance # Backup storage configs, the storage you want to put the backup data - backupStorageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent) + backupStorageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent), gcpnative backupAddress: localhost # Address of MinIO/S3 backupPort: 9000 # Port of MinIO/S3 backupAccessKeyID: minioadmin # accessKeyID of MinIO/S3 backupSecretAccessKey: minioadmin # MinIO/S3 encryption string + backupGcpCredentialJSON: "" # The JSON content contains the gcs service account credentials. Used only for the "gcpnative" cloud provider. backupBucketName: "a-bucket" # Bucket name to store backup data. Backup data will store to backupBucketName/backupRootPath backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath diff --git a/core/backup_context.go b/core/backup_context.go index 7c76e95..086b953 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -101,6 +101,7 @@ func createStorageClient(ctx context.Context, params paramtable.BackupParams) (s BucketName: params.MinioCfg.BucketName, AccessKeyID: params.MinioCfg.AccessKeyID, SecretAccessKeyID: params.MinioCfg.SecretAccessKey, + GcpCredentialJSON: params.MinioCfg.GcpCredentialJSON, UseSSL: params.MinioCfg.UseSSL, UseIAM: params.MinioCfg.UseIAM, IAMEndpoint: params.MinioCfg.IAMEndpoint, @@ -169,6 +170,7 @@ func (b *BackupContext) getMilvusStorageClient() storage.ChunkManager { BucketName: b.params.MinioCfg.BucketName, AccessKeyID: b.params.MinioCfg.AccessKeyID, SecretAccessKeyID: b.params.MinioCfg.SecretAccessKey, + GcpCredentialJSON: b.params.MinioCfg.GcpCredentialJSON, UseSSL: b.params.MinioCfg.UseSSL, UseIAM: b.params.MinioCfg.UseIAM, IAMEndpoint: b.params.MinioCfg.IAMEndpoint, @@ -200,6 +202,7 @@ func (b *BackupContext) getBackupStorageClient() storage.ChunkManager { BucketName: b.params.MinioCfg.BackupBucketName, AccessKeyID: b.params.MinioCfg.BackupAccessKeyID, SecretAccessKeyID: b.params.MinioCfg.BackupSecretAccessKey, + GcpCredentialJSON: b.params.MinioCfg.BackupGcpCredentialJSON, UseSSL: b.params.MinioCfg.BackupUseSSL, UseIAM: b.params.MinioCfg.BackupUseIAM, IAMEndpoint: b.params.MinioCfg.BackupIAMEndpoint, diff --git a/core/paramtable/base_table.go b/core/paramtable/base_table.go index 23087b9..1caf6aa 100644 --- a/core/paramtable/base_table.go +++ b/core/paramtable/base_table.go @@ -37,6 +37,7 @@ const ( DefaultMinioPort = "9000" DefaultMinioAccessKey = "minioadmin" DefaultMinioSecretAccessKey = "minioadmin" + DefaultGcpCredentialJSON = "" DefaultMinioUseSSL = "false" DefaultMinioBucketName = "a-bucket" DefaultMinioRootPath = "files" @@ -428,6 +429,11 @@ func (gp *BaseTable) loadMinioConfig() { _ = gp.Save("minio.secretAccessKey", minioSecretKey) } + gcpCredentialJSON := os.Getenv("GCP_KEY_JSON") + if gcpCredentialJSON != "" { + _ = gp.Save("minio.gcpCredentialJSON", gcpCredentialJSON) + } + minioUseSSL := os.Getenv("MINIO_USE_SSL") if minioUseSSL != "" { _ = gp.Save("minio.useSSL", minioUseSSL) @@ -483,6 +489,11 @@ func (gp *BaseTable) loadMinioConfig() { _ = gp.Save("minio.backupSecretAccessKey", minioBackupSecretKey) } + backupGcpCredentialJSON := os.Getenv("BACKUP_GCP_KEY_JSON") + if backupGcpCredentialJSON != "" { + _ = gp.Save("minio.backupGcpCredentialJSON", backupGcpCredentialJSON) + } + minioBackupUseSSL := os.Getenv("MINIO_BACKUP_USE_SSL") if minioBackupUseSSL != "" { _ = gp.Save("minio.backupUseSSL", minioBackupUseSSL) diff --git a/core/paramtable/params.go b/core/paramtable/params.go index 7a20f7e..7505cc2 100644 --- a/core/paramtable/params.go +++ b/core/paramtable/params.go @@ -171,6 +171,7 @@ const ( S3 = "s3" CloudProviderAWS = "aws" CloudProviderGCP = "gcp" + CloudProviderGCPNative = "gcpnative" CloudProviderAli = "ali" CloudProviderAliyun = "aliyun" CloudProviderAzure = "azure" @@ -184,6 +185,7 @@ var supportedStorageType = map[string]bool{ S3: true, CloudProviderAWS: true, CloudProviderGCP: true, + CloudProviderGCPNative: true, CloudProviderAli: true, CloudProviderAliyun: true, CloudProviderAzure: true, @@ -196,27 +198,29 @@ type MinioConfig struct { StorageType string // Deprecated - CloudProvider string - Address string - Port string - AccessKeyID string - SecretAccessKey string - UseSSL bool - BucketName string - RootPath string - UseIAM bool - IAMEndpoint string - - BackupStorageType string - BackupAddress string - BackupPort string - BackupAccessKeyID string - BackupSecretAccessKey string - BackupUseSSL bool - BackupBucketName string - BackupRootPath string - BackupUseIAM bool - BackupIAMEndpoint string + CloudProvider string + Address string + Port string + AccessKeyID string + SecretAccessKey string + GcpCredentialJSON string + UseSSL bool + BucketName string + RootPath string + UseIAM bool + IAMEndpoint string + + BackupStorageType string + BackupAddress string + BackupPort string + BackupAccessKeyID string + BackupSecretAccessKey string + BackupGcpCredentialJSON string + BackupUseSSL bool + BackupBucketName string + BackupRootPath string + BackupUseIAM bool + BackupIAMEndpoint string CrossStorage bool } @@ -229,6 +233,7 @@ func (p *MinioConfig) init(base *BaseTable) { p.initPort() p.initAccessKeyID() p.initSecretAccessKey() + p.initGcpCredentialJSON() p.initUseSSL() p.initBucketName() p.initRootPath() @@ -241,6 +246,7 @@ func (p *MinioConfig) init(base *BaseTable) { p.initBackupPort() p.initBackupAccessKeyID() p.initBackupSecretAccessKey() + p.initBackupGcpCredentialJSON() p.initBackupUseSSL() p.initBackupBucketName() p.initBackupRootPath() @@ -270,6 +276,11 @@ func (p *MinioConfig) initSecretAccessKey() { p.SecretAccessKey = key } +func (p *MinioConfig) initGcpCredentialJSON() { + gcpCredentialJSON := p.Base.LoadWithDefault("minio.gcpCredentialJSON", DefaultGcpCredentialJSON) + p.GcpCredentialJSON = gcpCredentialJSON +} + func (p *MinioConfig) initUseSSL() { usessl := p.Base.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL) var err error @@ -380,6 +391,11 @@ func (p *MinioConfig) initBackupSecretAccessKey() { p.BackupSecretAccessKey = key } +func (p *MinioConfig) initBackupGcpCredentialJSON() { + gcpCredentialJSON := p.Base.LoadWithDefault("minio.backupGcpCredentialJSON", DefaultGcpCredentialJSON) + p.BackupGcpCredentialJSON = gcpCredentialJSON +} + func (p *MinioConfig) initBackupBucketName() { bucketName := p.Base.LoadWithDefault("minio.backupBucketName", p.Base.LoadWithDefault("minio.bucketName", DefaultMinioBackupBucketName)) diff --git a/core/storage/chunk_manager.go b/core/storage/chunk_manager.go index 947435b..79a24de 100644 --- a/core/storage/chunk_manager.go +++ b/core/storage/chunk_manager.go @@ -42,6 +42,8 @@ func NewChunkManager(ctx context.Context, params paramtable.BackupParams, config case paramtable.CloudProviderAzure: // todo @wayblink return newAzureChunkManagerWithParams(ctx, params, config) + case paramtable.CloudProviderGCPNative: + return newGCPNativeChunkManagerWithConfig(ctx, config) default: return NewMinioChunkManagerWithConfig(ctx, config) } diff --git a/core/storage/gcp_native_chunk_manager.go b/core/storage/gcp_native_chunk_manager.go new file mode 100644 index 0000000..6c3303a --- /dev/null +++ b/core/storage/gcp_native_chunk_manager.go @@ -0,0 +1,340 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "strings" + + "cloud.google.com/go/storage" + "github.com/cockroachdb/errors" + "github.com/zilliztech/milvus-backup/internal/log" + "github.com/zilliztech/milvus-backup/internal/util/retry" + "go.uber.org/zap" + "golang.org/x/oauth2/google" + "golang.org/x/sync/errgroup" + "google.golang.org/api/iterator" + "google.golang.org/api/option" +) + +type GCPNativeChunkManager struct { + client *storage.Client + config *StorageConfig +} + +func (gcm *GCPNativeChunkManager) Config() *StorageConfig { + return gcm.config +} + +func newGCPNativeChunkManagerWithConfig(ctx context.Context, config *StorageConfig) (*GCPNativeChunkManager, error) { + var client *storage.Client + var err error + var opts []option.ClientOption + var projectId string + if config.Address != "" { + complete_address := "https://" + if !config.UseSSL { + complete_address = "http://" + } + complete_address = complete_address + config.Address + "/storage/v1/" + opts = append(opts, option.WithEndpoint(complete_address)) + } + // Read the credentials file + jsonData, err := os.ReadFile(config.GcpCredentialJSON) + if err != nil { + return nil, fmt.Errorf("unable to read credentials file:: %v", err) + } + + creds, err := google.CredentialsFromJSON(ctx, jsonData, storage.ScopeReadWrite) + if err != nil { + return nil, fmt.Errorf("failed to create credentials from JSON: %v", err) + + } + projectId, err = getProjectId(config.GcpCredentialJSON) + if err != nil { + return nil, fmt.Errorf("failed to get project ID: %v", err) + } + opts = append(opts, option.WithCredentials(creds)) + + client, err = storage.NewClient(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create GCS client: %v", err) + } + + if config.BucketName == "" { + return nil, fmt.Errorf("invalid empty bucket name") + } + // Check bucket validity + checkBucketFn := func() error { + bucket := client.Bucket(config.BucketName) + _, err := bucket.Attrs(ctx) + if err == storage.ErrBucketNotExist && config.CreateBucket { + log.Info("gcs bucket does not exist, create bucket.", zap.String("bucket name", config.BucketName)) + err = client.Bucket(config.BucketName).Create(ctx, projectId, nil) + if err != nil { + return fmt.Errorf("failed to create bucket: %v", err) + } + return nil + } + return err + } + err = retry.Do(ctx, checkBucketFn, retry.Attempts(CheckBucketRetryAttempts)) + if err != nil { + return nil, fmt.Errorf("bucket check failed: %v", err) + } + + log.Info("GCP native chunk manager init success.", zap.String("bucketname", config.BucketName)) + return &GCPNativeChunkManager{ + client: client, + config: config, + }, nil +} + +func getProjectId(credentialPath string) (string, error) { + if credentialPath == "" { + return "", errors.New("the path to the JSON file is empty") + } + + // Read the credentials file + jsonData, err := os.ReadFile(credentialPath) + if err != nil { + return "", fmt.Errorf("failed to read credentials file: %v", err) + } + + var data map[string]interface{} + if err := json.Unmarshal(jsonData, &data); err != nil { + return "", errors.New("failed to parse Google Cloud credentials as JSON") + } + + propertyValue, ok := data["project_id"] + if !ok { + return "", errors.New("project_id doesn't exist") + } + + projectId := fmt.Sprintf("%v", propertyValue) + return projectId, nil +} + +// Write writes data to filePath in the specified GCS bucket +func (gcm *GCPNativeChunkManager) Write(ctx context.Context, bucketName string, filePath string, content []byte) error { + wc := gcm.client.Bucket(bucketName).Object(filePath).NewWriter(ctx) + defer func() { + if err := wc.Close(); err != nil { + log.Error("Failed to close writer", zap.Error(err)) + } + }() + + _, err := wc.Write(content) + if err != nil { + return err + } + return nil +} + +// Exist checks whether chunk is saved to GCS storage. +func (gcm *GCPNativeChunkManager) Exist(ctx context.Context, bucketName string, filePath string) (bool, error) { + objectKeys, _, err := gcm.ListWithPrefix(ctx, bucketName, filePath, false) + if err != nil { + return false, err + } + + if len(objectKeys) > 0 { + return true, nil + } + + return false, nil +} + +// Read reads the storage data if exists. +func (gcm *GCPNativeChunkManager) Read(ctx context.Context, bucketName string, filePath string) ([]byte, error) { + rc, err := gcm.client.Bucket(bucketName).Object(filePath).NewReader(ctx) + if err != nil { + return nil, err + } + defer func() { + if err := rc.Close(); err != nil { + log.Error("Failed to close reader", zap.Error(err)) + } + }() + + data, err := io.ReadAll(rc) + if err != nil { + return nil, err + } + + return data, nil +} + +// ListWithPrefix returns objects with provided prefix. +func (gcm *GCPNativeChunkManager) ListWithPrefix(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []int64, error) { + var objectKeys []string + var sizes []int64 + delimiter := "" + if !recursive { + delimiter = "/" + } + + it := gcm.client.Bucket(bucketName).Objects(ctx, &storage.Query{ + Prefix: prefix, + Delimiter: delimiter, + }) + + for { + attrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, nil, fmt.Errorf("failed to list objects: %w", err) + } + // Check if it's an object (not a directory) + if attrs.Name != "" { + objectKeys = append(objectKeys, attrs.Name) + sizes = append(sizes, attrs.Size) + } else if attrs.Prefix != "" { + // If recursive is false, directories are handled via attrs.Prefix (not attrs.Name) + // For directories, we only add the directory name (attrs.Prefix) once + objectKeys = append(objectKeys, attrs.Prefix) + sizes = append(sizes, 0) // No size for directories + } + } + return objectKeys, sizes, nil +} + +// Remove deletes an object with @key. +func (gcm *GCPNativeChunkManager) Remove(ctx context.Context, bucketName string, filePath string) error { + err := gcm.client.Bucket(bucketName).Object(filePath).Delete(ctx) + if err != nil { + return err + } + return nil +} + +func (gcm *GCPNativeChunkManager) RemoveWithPrefix(ctx context.Context, bucketName string, prefix string) error { + objectKeys, _, err := gcm.ListWithPrefix(ctx, bucketName, prefix, false) + if err != nil { + return err + } + // Group objects by their depth (number of / in the key) + groupedByLevel := make(map[int][]string) + var maxLevel int + for _, key := range objectKeys { + level := strings.Count(key, "/") + groupedByLevel[level] = append(groupedByLevel[level], key) + if level > maxLevel { + maxLevel = level + } + } + + for level := maxLevel; level >= 0; level-- { + // Get the objects at this level + keysAtLevel, exists := groupedByLevel[level] + if !exists || len(keysAtLevel) == 0 { + continue + } + + // Dynamically adjust maxGoroutines based on the number of objects at this level + maxGoroutines := 10 + if len(keysAtLevel) < maxGoroutines { + maxGoroutines = len(keysAtLevel) + } + i := 0 + for i < len(keysAtLevel) { + runningGroup, groupCtx := errgroup.WithContext(context.Background()) + for j := 0; j < maxGoroutines && i < len(keysAtLevel); j++ { + key := keysAtLevel[i] + runningGroup.Go(func(key string) func() error { + return func() error { + err := gcm.Remove(groupCtx, bucketName, key) + if err != nil { + log.Warn("failed to remove object", zap.String("bucket", bucketName), zap.String("path", key), zap.Error(err)) + return err + } + return nil + } + }(key)) + i++ + } + if err := runningGroup.Wait(); err != nil { + return err + } + } + } + err = gcm.Remove(ctx, bucketName, strings.TrimSuffix(prefix, "/")) + if err != nil { + log.Warn("failed to remove object", zap.String("bucket", bucketName), zap.String("path", prefix), zap.Error(err)) + return err + } + return nil +} + +// Copy files from fromPath into toPath recursively +func (gcm *GCPNativeChunkManager) Copy(ctx context.Context, fromBucketName string, toBucketName string, fromPath string, toPath string) error { + objectKeys, _, err := gcm.ListWithPrefix(ctx, fromBucketName, fromPath, true) + if err != nil { + log.Error("Error listing objects", zap.Error(err)) + return err + } + for _, srcObjectKey := range objectKeys { + dstObjectKey := strings.Replace(srcObjectKey, fromPath, toPath, 1) + + srcReader, err := gcm.client.Bucket(fromBucketName).Object(srcObjectKey).NewReader(ctx) + if err != nil { + log.Error("Error creating reader for object", zap.String("srcObjectKey", srcObjectKey), zap.Error(err)) + return err + } + + defer func(srcObjectKey string, srcReader *storage.Reader) { + if err := srcReader.Close(); err != nil { + log.Error("Error closing source reader", zap.String("srcObjectKey", srcObjectKey), zap.Error(err)) + } + }(srcObjectKey, srcReader) + + dstWriter := gcm.client.Bucket(toBucketName).Object(dstObjectKey).NewWriter(ctx) + defer func(dstObjectKey string, dstWriter *storage.Writer) { + if err := dstWriter.Close(); err != nil { + log.Error("Error closing destination writer", zap.String("dstObjectKey", dstObjectKey), zap.Error(err)) + } + }(dstObjectKey, dstWriter) + + if _, err := io.Copy(dstWriter, srcReader); err != nil { + log.Error("Error copying object", zap.String("from", srcObjectKey), zap.String("to", dstObjectKey), zap.Error(err)) + return err + } + } + return nil +} + +func (gcm *GCPNativeChunkManager) ListObjectsPage(ctx context.Context, bucket, prefix string) (ListObjectsPaginator, error) { + panic("not implemented") +} + +func (gcm *GCPNativeChunkManager) HeadObject(ctx context.Context, bucket, key string) (ObjectAttr, error) { + panic("not implemented") +} + +func (gcm *GCPNativeChunkManager) GetObject(ctx context.Context, bucketName, key string) (*Object, error) { + panic("not implemented") +} + +func (gcm *GCPNativeChunkManager) UploadObject(ctx context.Context, i UploadObjectInput) error { + panic("not implemented") +} diff --git a/core/storage/options.go b/core/storage/options.go index db976f8..cc24ef2 100644 --- a/core/storage/options.go +++ b/core/storage/options.go @@ -7,6 +7,7 @@ type StorageConfig struct { BucketName string AccessKeyID string SecretAccessKeyID string + GcpCredentialJSON string UseSSL bool CreateBucket bool RootPath string @@ -15,6 +16,7 @@ type StorageConfig struct { backupAccessKeyID string backupSecretAccessKeyID string + BackupGcpCredentialJSON string backupBucketName string backupRootPath string } @@ -49,6 +51,12 @@ func SecretAccessKeyID(secretAccessKeyID string) Option { } } +func GcpCredentialJSON(gcpCredentialJSON string) Option { + return func(c *StorageConfig) { + c.GcpCredentialJSON = gcpCredentialJSON + } +} + func UseSSL(useSSL bool) Option { return func(c *StorageConfig) { c.UseSSL = useSSL diff --git a/go.mod b/go.mod index 4d776c2..e08f78d 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,11 @@ require ( replace github.com/milvus-io/milvus-sdk-go/v2 => github.com/wayblink/milvus-sdk-go/v2 v2.3.0-beta4.0.20241030091852-d6eb85c1a8ff -require github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd +require ( + cloud.google.com/go/storage v1.10.0 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd + google.golang.org/api v0.44.0 +) require ( cloud.google.com/go v0.81.0 // indirect @@ -68,10 +72,13 @@ require ( github.com/goccy/go-json v0.9.7 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/googleapis/gax-go/v2 v2.0.5 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect + github.com/jstemmer/go-junit-report v0.9.1 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -101,9 +108,12 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/ugorji/go/codec v1.2.7 // indirect + go.opencensus.io v0.23.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect + golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect + golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect diff --git a/go.sum b/go.sum index 689b2ed..ab48493 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,7 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= +cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09bA= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= @@ -202,6 +203,7 @@ github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVI github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -249,8 +251,10 @@ github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= +github.com/google/martian/v3 v3.1.0 h1:wCKgOCHuUEVfsaQLpPSJb7VdYCdTVZQAuOdYm1yc/60= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -268,6 +272,7 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= +github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -317,6 +322,7 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= +github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= @@ -559,6 +565,7 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -611,6 +618,7 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= @@ -625,6 +633,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -873,6 +882,7 @@ google.golang.org/api v0.36.0/go.mod h1:+z5ficQTmoYpPn8LCUNVpK5I7hwkpjbcgqA7I34q google.golang.org/api v0.40.0/go.mod h1:fYKFpnQN0DsDSKRVRcQSDQNtqWPfM9i+zNPxepjRCQ8= google.golang.org/api v0.41.0/go.mod h1:RkxM5lITDfTzmyKFPt+wGrCJbVfniCr2ool8kTBzRTU= google.golang.org/api v0.43.0/go.mod h1:nQsDGjRXMo4lvh5hP0TKqF244gqhGcr/YSIykhUk/94= +google.golang.org/api v0.44.0 h1:URs6qR1lAxDsqWITsQXI4ZkGiYJ5dHtRNiCpfs2OeKA= google.golang.org/api v0.44.0/go.mod h1:EBOGZqzyhtvMDoxwS97ctnh0zUmYY6CxqXsc1AvkYD8= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=