Skip to content

Commit

Permalink
refactor: improve db structure in storage/controller (#2604)
Browse files Browse the repository at this point in the history
* refactor: refactor workflows contents.

* add tool workflows.

* update field.

* fix: remove chat error.

* Fix err.

* fix error.

* remove cn comment.

* update workflows files.

* update infra config.

* move workflows.

* feat: update bot.

* fix: solve uncorrect outdated msg get.

* update get docIDs logic.

* update

* update skip logic.

* fix

* update.

* fix: delay deleteObject func.

* remove unused content.

* update log type.

* feat: implement request batch count limit.

* update

* update

* refactor: improve db structure in `storage/controller`
  • Loading branch information
mo3et authored Sep 10, 2024
1 parent eea2627 commit c581d43
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 165 deletions.
21 changes: 12 additions & 9 deletions internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/tools/errs"
Expand Down Expand Up @@ -65,6 +66,7 @@ type Config struct {
func Start(ctx context.Context, index int, config *Config) error {
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts",
config.MsgTransfer.Prometheus.Ports, "index", index)

mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
Expand All @@ -73,12 +75,13 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share)
if err != nil {
return err
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))

msgModel := redis.NewMsgCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
Expand All @@ -94,17 +97,17 @@ func Start(ctx context.Context, index int, config *Config) error {
return err
}
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig)
msgTransferDatabase, err := controller.NewMsgTransferDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig)
if err != nil {
return err
}
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgDatabase, &conversationRpcClient, &groupRpcClient)
historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgTransferDatabase, &conversationRpcClient, &groupRpcClient)
if err != nil {
return err
}
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(&config.KafkaConfig, msgDatabase)
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(&config.KafkaConfig, msgTransferDatabase)
if err != nil {
return err
}
Expand Down
25 changes: 13 additions & 12 deletions internal/msgtransfer/online_history_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"context"
"encoding/json"
"errors"
"strconv"
"strings"
"time"

"github.com/IBM/sarama"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
Expand All @@ -33,9 +37,6 @@ import (
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
"strconv"
"strings"
"time"
)

const (
Expand All @@ -56,19 +57,19 @@ type OnlineHistoryRedisConsumerHandler struct {

redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]

msgDatabase controller.CommonMsgDatabase
msgTransferDatabase controller.MsgTransferDatabase
conversationRpcClient *rpcclient.ConversationRpcClient
groupRpcClient *rpcclient.GroupRpcClient
}

func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase,
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false)
if err != nil {
return nil, err
}
var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database
och.msgTransferDatabase = database

b := batcher.New[sarama.ConsumerMessage](
batcher.WithSize(size),
Expand Down Expand Up @@ -161,7 +162,7 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context,
return
}
for key, seq := range readSeq {
if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
}
}
Expand Down Expand Up @@ -248,7 +249,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
}
if len(storageMessageList) > 0 {
msg := storageMessageList[0]
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) {
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
return
Expand Down Expand Up @@ -282,7 +283,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
}

log.ZDebug(ctx, "success incr to next topic")
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
if err != nil {
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
Expand All @@ -299,14 +300,14 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
storageMessageList = append(storageMessageList, msg.message)
}
if len(storageMessageList) > 0 {
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
if err != nil {
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID,
"storageList", storageMessageList)
return
}
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
if err != nil {
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
Expand All @@ -318,7 +319,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
for _, v := range msgs {
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
}
}

Expand Down
11 changes: 6 additions & 5 deletions internal/msgtransfer/online_msg_to_mongo_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package msgtransfer

import (
"context"

"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
Expand All @@ -28,18 +29,18 @@ import (

type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kafka.MConsumerGroup
msgDatabase controller.CommonMsgDatabase
msgTransferDatabase controller.MsgTransferDatabase
}

func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true)
if err != nil {
return nil, err
}

mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: historyConsumerGroup,
msgDatabase: database,
msgTransferDatabase: database,
}
return mc, nil
}
Expand All @@ -57,7 +58,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
return
}
log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.ZError(
ctx,
Expand All @@ -76,7 +77,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
for _, msg := range msgFromMQ.MsgData {
seqs = append(seqs, msg.Seq)
}
err = mc.msgDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
if err != nil {
log.ZError(
ctx,
Expand Down
Loading

0 comments on commit c581d43

Please sign in to comment.