Skip to content

Commit

Permalink
update proto
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Dec 1, 2023
1 parent 50f40eb commit 52bcbbe
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 11 deletions.
22 changes: 15 additions & 7 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,19 +371,27 @@ func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backup
log.Error(fmt.Sprintf("fail to flush the collection: %s", collectionBackup.GetCollectionName()))
return err
}
//collectionBackup.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0)
collectionBackup.BackupPhysicalTimestamp = uint64(timeOfSeal)
channelCheckpoints := make(map[string]string, 0)
var minChannelBackupTimeStamp uint64 = 0
for vch, checkpoint := range channelCPs {
channelCheckpoints[vch] = utils.Base64MsgPosition(&checkpoint)
if minChannelBackupTimeStamp == 0 {
minChannelBackupTimeStamp = checkpoint.GetTimestamp()
} else if minChannelBackupTimeStamp > checkpoint.GetTimestamp() {
minChannelBackupTimeStamp = checkpoint.GetTimestamp()
}
}
collectionBackup.ChannelCheckpoints = channelCheckpoints
collectionBackup.BackupTimestamp = minChannelBackupTimeStamp
log.Info("flush segments",
zap.String("collectionName", collectionBackup.GetCollectionName()),
zap.Int64s("newSealedSegmentIDs", newSealedSegmentIDs),
zap.Int64s("flushedSegmentIDs", flushedSegmentIDs),
zap.Int64("timeOfSeal", timeOfSeal),
zap.Uint64("BackupTimestamp", collectionBackup.BackupTimestamp),
zap.Any("channelCPs", channelCPs))
collectionBackup.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0)
collectionBackup.BackupPhysicalTimestamp = uint64(timeOfSeal)
channelCheckpoints := make(map[string]string, 0)
for vch, checkpoint := range channelCPs {
channelCheckpoints[vch] = checkpoint
}
collectionBackup.ChannelCheckpoints = channelCheckpoints

flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...)
segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName())
Expand Down
3 changes: 2 additions & 1 deletion core/milvus_sdk_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/zilliztech/milvus-backup/internal/util/retry"
Expand Down Expand Up @@ -81,7 +82,7 @@ func (m *MilvusClient) GetPersistentSegmentInfo(ctx context.Context, db, collNam
return m.client.GetPersistentSegmentInfo(ctx, collName)
}

func (m *MilvusClient) FlushV2(ctx context.Context, db, collName string, async bool) ([]int64, []int64, int64, map[string]string, error) {
func (m *MilvusClient) FlushV2(ctx context.Context, db, collName string, async bool) ([]int64, []int64, int64, map[string]milvuspb.MsgPosition, error) {
m.mu.Lock()
defer m.mu.Unlock()
err := m.client.UsingDatabase(ctx, db)
Expand Down
11 changes: 11 additions & 0 deletions core/utils/convert_util.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package utils

import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"time"

"github.com/zilliztech/milvus-backup/core/proto/backuppb"
Expand Down Expand Up @@ -98,3 +101,11 @@ func MapKeyArray(dict map[int64]bool) []int64 {
}
return arr
}

func Base64MsgPosition(position *milvuspb.MsgPosition) string {
positionByte, err := proto.Marshal(position)
if err != nil {
return ""
}
return base64.StdEncoding.EncodeToString(positionByte)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,5 @@ require (

replace (
github.com/milvus-io/milvus-proto/go-api/v2 => github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20231129093819-30d8c2ff887e
github.com/milvus-io/milvus-sdk-go/v2 => github.com/wayblink/milvus-sdk-go/v2 v2.3.0-beta4.0.20231130135235-af8aa511de52
github.com/milvus-io/milvus-sdk-go/v2 => github.com/wayblink/milvus-sdk-go/v2 v2.3.0-beta4.0.20231201113142-5ed2244fb0cc
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20231129093819-30d8c2ff887e h1:Aim4pP36nj8DX2yrOY9uDq8P/SMN4tpTaOy2wXL3jaQ=
github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20231129093819-30d8c2ff887e/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/wayblink/milvus-sdk-go/v2 v2.3.0-beta4.0.20231130135235-af8aa511de52 h1:cdj8nCQYI77iWtxi2AMwdXkHarESXpfepP/pDBMHQNw=
github.com/wayblink/milvus-sdk-go/v2 v2.3.0-beta4.0.20231130135235-af8aa511de52/go.mod h1:WNMN98sJMoHDmHT5IgEz8tJVVYVZmHpH7wQ6TK/EjiM=
github.com/wayblink/milvus-sdk-go/v2 v2.3.0-beta4.0.20231201113142-5ed2244fb0cc h1:6T7Q0X60cSbYRyF9iMSGc0Wznj/v+jxClR6R9PRgGCM=
github.com/wayblink/milvus-sdk-go/v2 v2.3.0-beta4.0.20231201113142-5ed2244fb0cc/go.mod h1:WNMN98sJMoHDmHT5IgEz8tJVVYVZmHpH7wQ6TK/EjiM=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
Expand Down

0 comments on commit 52bcbbe

Please sign in to comment.