diff --git a/core/reader/replicate_channel_manager.go b/core/reader/replicate_channel_manager.go index 5534144b..9c711b68 100644 --- a/core/reader/replicate_channel_manager.go +++ b/core/reader/replicate_channel_manager.go @@ -133,7 +133,7 @@ func (r *replicateChannelManager) getCtx() context.Context { } func IsCollectionNotFoundError(err error) bool { - return strings.Contains(err.Error(), "collection not found") + return strings.Contains(err.Error(), "collection not found") || strings.Contains(err.Error(), "can't find collection") } func IsDatabaseNotFoundError(err error) bool { @@ -1046,6 +1046,17 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa } beginTS := pack.BeginTs + minTS, resetTS := GetTSManager().GetMinTS(r.pChannelName) + if minTS == 0 { + r.sendErrEvent(errors.Newf("fail to get channel ts, channel: %s", r.pChannelName)) + log.Warn("fail to get channel ts", zap.String("channel", r.pChannelName)) + return nil + } + endTS := minTS + if beginTS > endTS { + beginTS = endTS + } + needTsMsg := false pChannel := r.targetPChannel for _, msg := range pack.Msgs { @@ -1096,6 +1107,9 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa } realMsg.ShardName = info.VChannel dataLen = int(realMsg.GetNumRows()) + if resetTS { + realMsg.EndTimestamp = endTS + } case *msgstream.DeleteMsg: if info.Dropped { log.Info("skip delete msg because collection has been dropped", zap.Int64("collection_id", sourceCollectionID)) @@ -1118,12 +1132,18 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa } realMsg.ShardName = info.VChannel dataLen = int(realMsg.GetNumRows()) + if resetTS { + realMsg.EndTimestamp = endTS + } case *msgstream.DropCollectionMsg: collectionID := realMsg.CollectionID realMsg.CollectionID = info.CollectionID info.BarrierChan.Write(msg.EndTs()) needTsMsg = true r.RemoveCollection(collectionID) + if resetTS { + realMsg.EndTimestamp = endTS + } case *msgstream.DropPartitionMsg: if info.Dropped { log.Info("skip drop partition msg because collection has been dropped", @@ -1179,10 +1199,13 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa partitionBarrierChan.Write(msg.EndTs()) r.RemovePartitionInfo(sourceCollectionID, realMsg.PartitionName, partitionID) } + if resetTS { + realMsg.EndTimestamp = endTS + } } if err != nil { r.sendErrEvent(err) - log.Warn("fail to get partition info", zap.Any("msg", msg.Type()), zap.Error(err)) + log.Warn("fail to process the msg info", zap.Any("msg", msg.Type()), zap.Error(err)) return nil } originPosition := msg.Position() @@ -1222,17 +1245,6 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa return util.EmptyMsgPack } - minTS := GetTSManager().GetMinTS(r.pChannelName) - if minTS == 0 { - r.sendErrEvent(errors.Newf("fail to get channel ts, channel: %s", r.pChannelName)) - log.Warn("fail to get channel ts", zap.String("channel", r.pChannelName)) - return nil - } - endTS := minTS - if beginTS > endTS { - beginTS = endTS - } - for _, position := range newPack.StartPositions { position.ChannelName = pChannel position.Timestamp = beginTS diff --git a/core/reader/ts_manager.go b/core/reader/ts_manager.go index f2df3302..abee979f 100644 --- a/core/reader/ts_manager.go +++ b/core/reader/ts_manager.go @@ -106,7 +106,7 @@ func (m *tsManager) getUnsafeTSInfo() (map[string]uint64, map[string]int) { return a, c } -func (m *tsManager) GetMinTS(channelName string) uint64 { +func (m *tsManager) GetMinTS(channelName string) (uint64, bool) { minTS := m.channelTS.LoadWithDefault(channelName, math.MaxUint64) err := retry.Do(context.Background(), func() error { @@ -128,14 +128,16 @@ func (m *tsManager) GetMinTS(channelName string) uint64 { return nil }, m.retryOptions...) if err != nil { - return 0 + return 0, false } + resetTS := false if m.lastTS.Load() > minTS { a, b := m.getUnsafeTSInfo() log.Info("last ts is larger than min ts", zap.Uint64("lastTS", m.lastTS.Load()), zap.Uint64("minTS", minTS), zap.String("channelName", channelName), zap.Any("channelTS", a), zap.Any("channelRef", b)) minTS = m.lastTS.Load() + resetTS = true } m.lastTS.CompareAndSwapWithFunc(func(old uint64) uint64 { if old <= minTS { @@ -145,7 +147,7 @@ func (m *tsManager) GetMinTS(channelName string) uint64 { }) msgTime, _ := tsoutil.ParseHybridTs(minTS) TSMetricVec.WithLabelValues(channelName).Set(float64(msgTime)) - return minTS + return minTS, resetTS } // EmptyTS Only for test diff --git a/core/reader/ts_manager_test.go b/core/reader/ts_manager_test.go index 44c92e5d..a6c44e14 100644 --- a/core/reader/ts_manager_test.go +++ b/core/reader/ts_manager_test.go @@ -85,7 +85,7 @@ func TestTS(t *testing.T) { time.Sleep(500 * time.Millisecond) m.CollectTS("a", 2) }() - minTS := m.GetMinTS("a") + minTS, _ := m.GetMinTS("a") assert.EqualValues(t, 1, minTS) m.EmptyTS() }