diff --git a/golang/client.go b/golang/client.go index 0a7defaaa..5b59376ca 100644 --- a/golang/client.go +++ b/golang/client.go @@ -504,6 +504,7 @@ func (cli *defaultClient) startUp() error { if err != nil { cli.log.Errorf("scheduled queryRoute err=%v", err) } + clearOldRouteSizeCache(oldRoute.([]*v2.MessageQueue)) if newRoute == nil && oldRoute != nil { cli.log.Info("newRoute is nil, but oldRoute is not. do not update") return true @@ -529,6 +530,12 @@ func (cli *defaultClient) startUp() error { ticker.Tick(f, time.Second*30, cli.done) return nil } + +func clearOldRouteSizeCache(oldRoutes []*v2.MessageQueue) { + for _, route := range oldRoutes { + route.ClearSizeCache() + } +} func (cli *defaultClient) notifyClientTermination() { cli.log.Info("start notifyClientTermination") ctx := cli.Sign(context.Background()) diff --git a/golang/protocol/v2/definition.pb.go b/golang/protocol/v2/definition.pb.go index 63cdbe3f6..3c434592b 100644 --- a/golang/protocol/v2/definition.pb.go +++ b/golang/protocol/v2/definition.pb.go @@ -1389,6 +1389,13 @@ type MessageQueue struct { AcceptMessageTypes []MessageType `protobuf:"varint,5,rep,packed,name=accept_message_types,json=acceptMessageTypes,proto3,enum=apache.rocketmq.v2.MessageType" json:"accept_message_types,omitempty"` } +func (x *MessageQueue) ClearSizeCache() { + x.sizeCache = 0 + x.Topic.sizeCache = 0 + x.Broker.sizeCache = 0 + x.Broker.Endpoints.sizeCache = 0 +} + func (x *MessageQueue) Reset() { *x = MessageQueue{} if protoimpl.UnsafeEnabled {