Skip to content

Commit

Permalink
Change the msg ts when the min ts is less than the late ts
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Mar 13, 2024
1 parent 98a40a8 commit 6459edf
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
38 changes: 25 additions & 13 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *replicateChannelManager) getCtx() context.Context {
}

func IsCollectionNotFoundError(err error) bool {
return strings.Contains(err.Error(), "collection not found")
return strings.Contains(err.Error(), "collection not found") || strings.Contains(err.Error(), "can't find collection")
}

func IsDatabaseNotFoundError(err error) bool {
Expand Down Expand Up @@ -1046,6 +1046,17 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
}
beginTS := pack.BeginTs

minTS, resetTS := GetTSManager().GetMinTS(r.pChannelName)
if minTS == 0 {
r.sendErrEvent(errors.Newf("fail to get channel ts, channel: %s", r.pChannelName))
log.Warn("fail to get channel ts", zap.String("channel", r.pChannelName))
return nil
}
endTS := minTS
if beginTS > endTS {
beginTS = endTS
}

needTsMsg := false
pChannel := r.targetPChannel
for _, msg := range pack.Msgs {
Expand Down Expand Up @@ -1096,6 +1107,9 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
}
realMsg.ShardName = info.VChannel
dataLen = int(realMsg.GetNumRows())
if resetTS {
realMsg.EndTimestamp = endTS
}
case *msgstream.DeleteMsg:
if info.Dropped {
log.Info("skip delete msg because collection has been dropped", zap.Int64("collection_id", sourceCollectionID))
Expand All @@ -1118,12 +1132,18 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
}
realMsg.ShardName = info.VChannel
dataLen = int(realMsg.GetNumRows())
if resetTS {
realMsg.EndTimestamp = endTS
}
case *msgstream.DropCollectionMsg:
collectionID := realMsg.CollectionID
realMsg.CollectionID = info.CollectionID
info.BarrierChan.Write(msg.EndTs())
needTsMsg = true
r.RemoveCollection(collectionID)
if resetTS {
realMsg.EndTimestamp = endTS
}
case *msgstream.DropPartitionMsg:
if info.Dropped {
log.Info("skip drop partition msg because collection has been dropped",
Expand Down Expand Up @@ -1179,10 +1199,13 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
partitionBarrierChan.Write(msg.EndTs())
r.RemovePartitionInfo(sourceCollectionID, realMsg.PartitionName, partitionID)
}
if resetTS {
realMsg.EndTimestamp = endTS
}
}
if err != nil {
r.sendErrEvent(err)
log.Warn("fail to get partition info", zap.Any("msg", msg.Type()), zap.Error(err))
log.Warn("fail to process the msg info", zap.Any("msg", msg.Type()), zap.Error(err))
return nil
}
originPosition := msg.Position()
Expand Down Expand Up @@ -1222,17 +1245,6 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
return util.EmptyMsgPack
}

minTS := GetTSManager().GetMinTS(r.pChannelName)
if minTS == 0 {
r.sendErrEvent(errors.Newf("fail to get channel ts, channel: %s", r.pChannelName))
log.Warn("fail to get channel ts", zap.String("channel", r.pChannelName))
return nil
}
endTS := minTS
if beginTS > endTS {
beginTS = endTS
}

for _, position := range newPack.StartPositions {
position.ChannelName = pChannel
position.Timestamp = beginTS
Expand Down
8 changes: 5 additions & 3 deletions core/reader/ts_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (m *tsManager) getUnsafeTSInfo() (map[string]uint64, map[string]int) {
return a, c
}

func (m *tsManager) GetMinTS(channelName string) uint64 {
func (m *tsManager) GetMinTS(channelName string) (uint64, bool) {
minTS := m.channelTS.LoadWithDefault(channelName, math.MaxUint64)

err := retry.Do(context.Background(), func() error {
Expand All @@ -128,14 +128,16 @@ func (m *tsManager) GetMinTS(channelName string) uint64 {
return nil
}, m.retryOptions...)
if err != nil {
return 0
return 0, false
}

resetTS := false
if m.lastTS.Load() > minTS {
a, b := m.getUnsafeTSInfo()
log.Info("last ts is larger than min ts", zap.Uint64("lastTS", m.lastTS.Load()), zap.Uint64("minTS", minTS),
zap.String("channelName", channelName), zap.Any("channelTS", a), zap.Any("channelRef", b))
minTS = m.lastTS.Load()
resetTS = true
}
m.lastTS.CompareAndSwapWithFunc(func(old uint64) uint64 {
if old <= minTS {
Expand All @@ -145,7 +147,7 @@ func (m *tsManager) GetMinTS(channelName string) uint64 {
})
msgTime, _ := tsoutil.ParseHybridTs(minTS)
TSMetricVec.WithLabelValues(channelName).Set(float64(msgTime))
return minTS
return minTS, resetTS
}

// EmptyTS Only for test
Expand Down
2 changes: 1 addition & 1 deletion core/reader/ts_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestTS(t *testing.T) {
time.Sleep(500 * time.Millisecond)
m.CollectTS("a", 2)
}()
minTS := m.GetMinTS("a")
minTS, _ := m.GetMinTS("a")
assert.EqualValues(t, 1, minTS)
m.EmptyTS()
}

0 comments on commit 6459edf

Please sign in to comment.