Skip to content

Commit

Permalink
feat: support stream message (#2824)
Browse files Browse the repository at this point in the history
* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* update gomake version

* update gomake version

* fix: seq conversion bug

* fix: redis pipe exec

* fix: ImportFriends

* fix: A large number of logs keysAndValues ​​length is not even

* feat: mark read aggregate write

* feat: online status supports redis cluster

* feat: online status supports redis cluster

* feat: online status supports redis cluster

* merge

* merge

* read seq is written to mongo

* read seq is written to mongo

* fix: invitation to join group notification

* fix: friend op_user_id

* feat: optimizing asynchronous context

* feat: optimizing memamq size

* feat: add GetSeqMessage

* feat: GroupApplicationAgreeMemberEnterNotification

* feat: GroupApplicationAgreeMemberEnterNotification

* feat: go.mod

* feat: go.mod

* feat: join group notification and get seq

* feat: join group notification and get seq

* feat: avoid pulling messages from sessions with a large number of max seq values of 0

* feat: API supports gzip

* go.mod

* fix: nil pointer error on close

* fix: listen error

* fix: listen error

* update go.mod

* feat: add log

* fix: token parse token value

* fix: GetMsgBySeqs boundary issues

* fix: sn_ not sort

* fix: sn_ not sort

* fix: sn_ not sort

* fix: jssdk add

* fix: jssdk support

* fix: jssdk support

* fix: jssdk support

* fix: the message I sent is not set to read seq in mongodb

* fix: cannot modify group member avatars

* fix: MemberEnterNotification

* fix: MemberEnterNotification

* fix: MsgData status

* feat: stream msg

* feat: support stream messages

---------

Co-authored-by: withchao <[email protected]>
  • Loading branch information
withchao and withchao authored Nov 4, 2024
1 parent 1516e9c commit 8d308a4
Show file tree
Hide file tree
Showing 15 changed files with 441 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.53
github.com/openimsdk/protocol v0.0.72-alpha.54
github.com/openimsdk/tools v0.0.50-alpha.16
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.53 h1:DMzvDd418GaJJLT2Iw+AX+oNc41DROWErXDkZxB+MMM=
github.com/openimsdk/protocol v0.0.72-alpha.53/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.54 h1:opato7N4QjjRq/SHD54bDSVBpOEEDp1VLWVk5Os2A9s=
github.com/openimsdk/protocol v0.0.72-alpha.54/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc=
github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
Expand Down
10 changes: 10 additions & 0 deletions internal/api/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
data = apistruct.AtElem{}
case constant.Custom:
data = apistruct.CustomElem{}
case constant.Stream:
data = apistruct.StreamMsgElem{}
case constant.OANotification:
data = apistruct.OANotificationElem{}
req.SessionType = constant.NotificationChatType
Expand Down Expand Up @@ -373,3 +375,11 @@ func (m *MessageApi) SearchMsg(c *gin.Context) {
func (m *MessageApi) GetServerTime(c *gin.Context) {
a2r.Call(msg.MsgClient.GetServerTime, m.Client, c)
}

func (m *MessageApi) GetStreamMsg(c *gin.Context) {
a2r.Call(msg.MsgClient.GetStreamMsg, m.Client, c)
}

func (m *MessageApi) AppendStreamMsg(c *gin.Context) {
a2r.Call(msg.MsgClient.AppendStreamMsg, m.Client, c)
}
2 changes: 2 additions & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
msgGroup.POST("/batch_send_msg", m.BatchSendMsg)
msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess)
msgGroup.POST("/get_server_time", m.GetServerTime)
msgGroup.POST("/get_stream_msg", m.GetStreamMsg)
msgGroup.POST("/append_stream_msg", m.AppendStreamMsg)
}
// Conversation
conversationGroup := r.Group("/conversation")
Expand Down
4 changes: 4 additions & 0 deletions internal/rpc/msg/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv
}
m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips)
}

func (m *MsgNotificationSender) StreamMsgNotification(ctx context.Context, sendID string, recvID string, sessionType int32, tips *sdkws.StreamMsgTips) {
m.NotificationWithSessionType(ctx, sendID, recvID, constant.StreamMsgNotification, sessionType, tips)
}
5 changes: 5 additions & 0 deletions internal/rpc/msg/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
if req.MsgData != nil {
m.encapsulateMsgData(req.MsgData)
if req.MsgData.ContentType == constant.Stream {
if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil {
return nil, err
}
}
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req)
Expand Down
10 changes: 8 additions & 2 deletions internal/rpc/msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ type (

// MsgServer encapsulates dependencies required for message handling.
msgServer struct {
RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration.
MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration.
MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
StreamMsgDatabase controller.StreamMsgDatabase
Conversation *rpcclient.ConversationRpcClient // RPC client for conversation service.
UserLocalCache *rpccache.UserLocalCache // Local cache for user data.
FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data.
Expand Down Expand Up @@ -101,6 +102,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
streamMsg, err := mgo.NewStreamMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig)
if err != nil {
Expand All @@ -109,6 +114,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
s := &msgServer{
Conversation: &conversationClient,
MsgDatabase: msgDatabase,
StreamMsgDatabase: controller.NewStreamMsgDatabase(streamMsg),
RegisterCenter: client,
UserLocalCache: rpccache.NewUserLocalCache(userRpcClient, &config.LocalCacheConfig, rdb),
GroupLocalCache: rpccache.NewGroupLocalCache(groupRpcClient, &config.LocalCacheConfig, rdb),
Expand Down
114 changes: 114 additions & 0 deletions internal/rpc/msg/stream_msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package msg

import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"time"
)

const StreamDeadlineTime = time.Second * 60 * 10

func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error {
now := time.Now()
val := &model.StreamMsg{
ClientMsgID: msgData.ClientMsgID,
ConversationID: msgprocessor.GetConversationIDByMsg(msgData),
UserID: msgData.SendID,
CreateTime: now,
DeadlineTime: now.Add(StreamDeadlineTime),
}
return m.StreamMsgDatabase.CreateStreamMsg(ctx, val)
}

func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID)
if err != nil {
return nil, err
}
now := time.Now()
if !res.End && res.DeadlineTime.Before(now) {
res.End = true
res.DeadlineTime = now
_ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now)
}
return res, nil
}

func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) {
res, err := m.getStreamMsg(ctx, req.ClientMsgID)
if err != nil {
return nil, err
}
if res.End {
return nil, errs.ErrNoPermission.WrapMsg("stream msg is end")
}
if len(res.Packets) < int(req.StartIndex) {
return nil, errs.ErrNoPermission.WrapMsg("start index is invalid")
}
if val := len(res.Packets) - int(req.StartIndex); val > 0 {
exist := res.Packets[int(req.StartIndex):]
for i, s := range exist {
if len(req.Packets) == 0 {
break
}
if s != req.Packets[i] {
return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i))
}
req.StartIndex++
req.Packets = req.Packets[1:]
}
}
if len(req.Packets) == 0 && res.End == req.End {
return &msg.AppendStreamMsgResp{}, nil
}
deadlineTime := time.Now().Add(StreamDeadlineTime)
if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil {
return nil, err
}
conversation, err := m.Conversation.GetConversation(ctx, res.UserID, res.ConversationID)
if err != nil {
return nil, err
}
tips := &sdkws.StreamMsgTips{
ConversationID: res.ConversationID,
ClientMsgID: res.ClientMsgID,
StartIndex: req.StartIndex,
Packets: req.Packets,
End: req.End,
}
var (
recvID string
sessionType int32
)
if conversation.GroupID == "" {
sessionType = constant.SingleChatType
recvID = conversation.UserID
} else {
sessionType = constant.ReadGroupChatType
recvID = conversation.GroupID
}
m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips)
return &msg.AppendStreamMsgResp{}, nil
}

func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) {
res, err := m.getStreamMsg(ctx, req.ClientMsgID)
if err != nil {
return nil, err
}
return &msg.GetStreamMsgResp{
ClientMsgID: res.ClientMsgID,
ConversationID: res.ConversationID,
UserID: res.UserID,
Packets: res.Packets,
End: res.End,
CreateTime: res.CreateTime.UnixMilli(),
DeadlineTime: res.DeadlineTime.UnixMilli(),
}, nil
}
5 changes: 5 additions & 0 deletions pkg/apistruct/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ type TextElem struct {
Content string `json:"content" validate:"required"`
}

type StreamMsgElem struct {
Type string `mapstructure:"type" validate:"required"`
Content string `mapstructure:"content" validate:"required"`
}

type RevokeElem struct {
RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"`
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/common/storage/controller/stream_msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package controller

import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
)

type StreamMsgDatabase interface {
CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error
AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error
GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error)
}

func NewStreamMsgDatabase(db database.StreamMsg) StreamMsgDatabase {
return &streamMsgDatabase{db: db}
}

type streamMsgDatabase struct {
db database.StreamMsg
}

func (m *streamMsgDatabase) CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error {
return m.db.CreateStreamMsg(ctx, model)
}

func (m *streamMsgDatabase) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error {
return m.db.AppendStreamMsg(ctx, clientMsgID, startIndex, packets, end, deadlineTime)
}

func (m *streamMsgDatabase) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
return m.db.GetStreamMsg(ctx, clientMsgID)
}
60 changes: 60 additions & 0 deletions pkg/common/storage/database/mgo/stream_msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package mgo

import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)

func NewStreamMsgMongo(db *mongo.Database) (*StreamMsgMongo, error) {
coll := db.Collection(database.StreamMsgName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "client_msg_id", Value: 1},
},
Options: options.Index().SetUnique(true),
})
if err != nil {
return nil, errs.Wrap(err)
}
return &StreamMsgMongo{coll: coll}, nil
}

type StreamMsgMongo struct {
coll *mongo.Collection
}

func (m *StreamMsgMongo) CreateStreamMsg(ctx context.Context, val *model.StreamMsg) error {
if val.Packets == nil {
val.Packets = []string{}
}
return mongoutil.InsertMany(ctx, m.coll, []*model.StreamMsg{val})
}

func (m *StreamMsgMongo) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error {
update := bson.M{
"$set": bson.M{
"end": end,
"deadline_time": deadlineTime,
},
}
if len(packets) > 0 {
update["$push"] = bson.M{
"packets": bson.M{
"$each": packets,
"$position": startIndex,
},
}
}
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"client_msg_id": clientMsgID, "end": false}, update, true)
}

func (m *StreamMsgMongo) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
return mongoutil.FindOne[*model.StreamMsg](ctx, m.coll, bson.M{"client_msg_id": clientMsgID})
}
1 change: 1 addition & 0 deletions pkg/common/storage/database/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ const (
UserName = "user"
SeqConversationName = "seq"
SeqUserName = "seq_user"
StreamMsgName = "stream_msg"
)
13 changes: 13 additions & 0 deletions pkg/common/storage/database/stream_msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package database

import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
)

type StreamMsg interface {
CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error
AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error
GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error)
}
21 changes: 21 additions & 0 deletions pkg/common/storage/model/stream_msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package model

import (
"time"
)

const (
StreamMsgStatusWait = 0
StreamMsgStatusDone = 1
StreamMsgStatusFail = 2
)

type StreamMsg struct {
ClientMsgID string `bson:"client_msg_id"`
ConversationID string `bson:"conversation_id"`
UserID string `bson:"user_id"`
Packets []string `bson:"packets"`
End bool `bson:"end"`
CreateTime time.Time `bson:"create_time"`
DeadlineTime time.Time `bson:"deadline_time"`
}
Loading

0 comments on commit 8d308a4

Please sign in to comment.