diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 2f3156c288..2d2c647ac9 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,14 +18,9 @@ import ( "context" "encoding/json" "errors" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy" - - "github.com/OpenIMSDK/protocol/conversation" - - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/protocol/conversation" "github.com/OpenIMSDK/protocol/msggateway" "github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/tools/discoveryregistry" @@ -34,6 +29,7 @@ import ( "github.com/OpenIMSDK/tools/utils" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/getui" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush" @@ -41,6 +37,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -53,7 +51,6 @@ type Pusher struct { msgRpcClient *rpcclient.MessageRpcClient conversationRpcClient *rpcclient.ConversationRpcClient groupRpcClient *rpcclient.GroupRpcClient - successCount int } var errNoOfflinePusher = errors.New("no offlinePusher is configured") @@ -104,24 +101,29 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg if err := callbackOnlinePush(ctx, userIDs, msg); err != nil { return err } + // push wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs) if err != nil { return err } + isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs) - p.successCount++ - if isOfflinePush { - for _, v := range wsResults { - if msg.SendID != v.UserID && (!v.OnlinePush) { - if err := callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil { - return err - } - err = p.offlinePushMsg(ctx, msg.SendID, msg, []string{v.UserID}) - if err != nil { - return err - } + + if !isOfflinePush { + return nil + } + + for _, v := range wsResults { + if msg.SendID != v.UserID && (!v.OnlinePush) { + if err = callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil { + return err + } + + err = p.offlinePushMsg(ctx, msg.SendID, msg, []string{v.UserID}) + if err != nil { + return err } } } @@ -140,14 +142,16 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error { func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string - if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil { + if err = callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil { return err } + if len(pushToUserIDs) == 0 { pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID) if err != nil { return err } + switch msg.ContentType { case constant.MemberQuitNotification: var tips sdkws.MemberQuitTips @@ -155,7 +159,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws return err } defer func(groupID string, userIDs []string) { - if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { + if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs) } }(groupID, []string{tips.QuitUser.UserID}) @@ -167,7 +171,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws } kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) defer func(groupID string, userIDs []string) { - if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { + if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs) } }(groupID, kickedUsers) @@ -183,48 +187,61 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0]) } defer func(groupID string) { - if err := p.groupRpcClient.DismissGroup(ctx, groupID); err != nil { + if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil { log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID) } }(groupID) } } } + wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) if err != nil { return err } + log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) - p.successCount++ isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) if isOfflinePush { - var onlineSuccessUserIDs []string - var WebAndPcBackgroundUserIDs []string - onlineSuccessUserIDs = append(onlineSuccessUserIDs, msg.SendID) + var ( + onlineSuccessUserIDs = []string{msg.SendID} + webAndPcBackgroundUserIDs []string + ) + for _, v := range wsResults { if v.OnlinePush && v.UserID != msg.SendID { onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID) } - if !v.OnlinePush { - if len(v.Resp) != 0 { - for _, singleResult := range v.Resp { - if singleResult.ResultCode == -2 { - if constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC || - singleResult.RecvPlatFormID == constant.WebPlatformID { - WebAndPcBackgroundUserIDs = append(WebAndPcBackgroundUserIDs, v.UserID) - } - } - } + + if v.OnlinePush { + continue + } + + if len(v.Resp) == 0 { + continue + } + + for _, singleResult := range v.Resp { + if singleResult.ResultCode != -2 { + continue + } + + isPC := constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC + isWebID := singleResult.RecvPlatFormID == constant.WebPlatformID + + if isPC || isWebID { + webAndPcBackgroundUserIDs = append(webAndPcBackgroundUserIDs, v.UserID) } } } + needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) if msg.ContentType != constant.SignalingNotification { notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID) if err != nil { - // log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID) return err } + needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs) } // Use offline push messaging @@ -234,6 +251,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws if err != nil { return err } + if len(offlinePushUserIDs) > 0 { needOfflinePushUserIDs = offlinePushUserIDs } @@ -250,8 +268,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) return err } - if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, WebAndPcBackgroundUserIDs)); err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs)) + if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) return err } } @@ -319,15 +337,18 @@ func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) err = errNoOfflinePusher return } - type AtContent struct { + + type atContent struct { Text string `json:"text"` AtUserList []string `json:"atUserList"` IsAtSelf bool `json:"isAtSelf"` } + opts, err = p.GetOfflinePushOpts(msg) if err != nil { return } + if msg.OfflinePushInfo != nil { title = msg.OfflinePushInfo.Title content = msg.OfflinePushInfo.Desc @@ -345,9 +366,9 @@ func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) case constant.File: title = constant.ContentType2PushContent[int64(msg.ContentType)] case constant.AtText: - a := AtContent{} - _ = utils.JsonStringToStruct(string(msg.Content), &a) - if utils.IsContain(conversationID, a.AtUserList) { + ac := atContent{} + _ = utils.JsonStringToStruct(string(msg.Content), &ac) + if utils.IsContain(conversationID, ac.AtUserList) { title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] } else { title = constant.ContentType2PushContent[constant.GroupMsg]