diff --git a/.github/workflows/cleanup-after-milestone-prs-merged.yml b/.github/workflows/cleanup-after-milestone-prs-merged.yml new file mode 100644 index 0000000000..8a3e381d60 --- /dev/null +++ b/.github/workflows/cleanup-after-milestone-prs-merged.yml @@ -0,0 +1,65 @@ +name: Cleanup After Milestone PRs Merged + +on: + pull_request: + types: + - closed + +jobs: + handle_pr: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4.2.0 + + - name: Get the PR title and extract PR numbers + id: extract_pr_numbers + run: | + # Get the PR title + PR_TITLE="${{ github.event.pull_request.title }}" + + echo "PR Title: $PR_TITLE" + + # Extract PR numbers from the title + PR_NUMBERS=$(echo "$PR_TITLE" | grep -oE "#[0-9]+" | tr -d '#' | tr '\n' ' ') + echo "Extracted PR Numbers: $PR_NUMBERS" + + # Save PR numbers to a file + echo "$PR_NUMBERS" > pr_numbers.txt + echo "Saved PR Numbers to pr_numbers.txt" + + # Check if the title matches a specific pattern + if echo "$PR_TITLE" | grep -qE "^deps: Merge( #[0-9]+)+ PRs into .+"; then + echo "proceed=true" >> $GITHUB_OUTPUT + else + echo "proceed=false" >> $GITHUB_OUTPUT + fi + + - name: Use extracted PR numbers and label PRs + if: (steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge')) && github.event.pull_request.merged == true + run: | + # Read the previously saved PR numbers + PR_NUMBERS=$(cat pr_numbers.txt) + echo "Using extracted PR Numbers: $PR_NUMBERS" + + # Loop through each PR number and add label + for PR_NUMBER in $PR_NUMBERS; do + echo "Adding 'cherry-picked' label to PR #$PR_NUMBER" + curl -X POST \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github+json" \ + https://api.github.com/repos/${{ github.repository }}/issues/$PR_NUMBER/labels \ + -d '{"labels":["cherry-picked"]}' + done + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Delete branch after PR close + if: steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge') + run: | + BRANCH_NAME="${{ github.event.pull_request.head.ref }}" + echo "Branch to delete: $BRANCH_NAME" + git push origin --delete "$BRANCH_NAME" + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/go-build-test.yml b/.github/workflows/go-build-test.yml index 1ed8f03977..5b37bf47db 100644 --- a/.github/workflows/go-build-test.yml +++ b/.github/workflows/go-build-test.yml @@ -2,11 +2,7 @@ name: Go Build Test on: push: - branches: - - main pull_request: - branches: - - main paths-ignore: - '**/*.md' diff --git a/.github/workflows/merge-from-milestone.yml b/.github/workflows/merge-from-milestone.yml new file mode 100644 index 0000000000..44b4f81f47 --- /dev/null +++ b/.github/workflows/merge-from-milestone.yml @@ -0,0 +1,218 @@ +name: Create Pre-Release PR from Milestone + +permissions: + contents: write + pull-requests: write + issues: write + +on: + workflow_dispatch: + inputs: + milestone_name: + description: 'Milestone name to collect closed PRs from' + required: true + default: 'v3.8.2' + target_branch: + description: 'Target branch to merge the consolidated PR' + required: true + default: 'pre-release-v3.8.2' + +env: + MILESTONE_NAME: ${{ github.event.inputs.milestone_name || 'v3.8.2' }} + TARGET_BRANCH: ${{ github.event.inputs.target_branch || 'pre-release-v3.8.2' }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + BOT_TOKEN: ${{ secrets.BOT_TOKEN }} + LABEL_NAME: cherry-picked + TEMP_DIR: /tmp # Using /tmp as the temporary directory + +jobs: + cherry_pick_milestone_prs: + runs-on: ubuntu-latest + steps: + - name: Setup temp directory + run: | + # Create the temporary directory and initialize necessary files + mkdir -p ${{ env.TEMP_DIR }} + touch ${{ env.TEMP_DIR }}/pr_numbers.txt + touch ${{ env.TEMP_DIR }}/commit_hashes.txt + touch ${{ env.TEMP_DIR }}/pr_title.txt + touch ${{ env.TEMP_DIR }}/pr_body.txt + touch ${{ env.TEMP_DIR }}/created_pr_number.txt + + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + token: ${{ secrets.BOT_TOKEN }} + + - name: Setup Git User for OpenIM-Robot + run: | + # Set up Git credentials for the bot + git config --global user.email "OpenIM-Robot@users.noreply.github.com" + git config --global user.name "OpenIM-Robot" + + - name: Fetch Milestone ID and Filter PR Numbers + env: + MILESTONE_NAME: ${{ env.MILESTONE_NAME }} + run: | + # Fetch milestone details and extract milestone ID + milestones=$(curl -s -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + "https://api.github.com/repos/${{ github.repository }}/milestones") + milestone_id=$(echo "$milestones" | grep -B3 "\"title\": \"$MILESTONE_NAME\"" | grep '"number":' | head -n1 | grep -o '[0-9]\+') + if [ -z "$milestone_id" ]; then + echo "Milestone '$MILESTONE_NAME' not found. Exiting." + exit 1 + fi + echo "Milestone ID: $milestone_id" + echo "MILESTONE_ID=$milestone_id" >> $GITHUB_ENV + + # Fetch issues for the milestone + issues=$(curl -s -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + "https://api.github.com/repos/${{ github.repository }}/issues?milestone=$milestone_id&state=closed&per_page=100") + + > ${{ env.TEMP_DIR }}/pr_numbers.txt + + # Filter PRs that do not have the 'cherry-picked' label + for pr_number in $(echo "$issues" | jq -r '.[] | select(.pull_request != null) | .number'); do + labels=$(curl -s -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + "https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels" | jq -r '.[].name') + + if ! echo "$labels" | grep -q "${LABEL_NAME}"; then + echo "PR #$pr_number does not have the 'cherry-picked' label. Adding to the list." + echo "$pr_number" >> ${{ env.TEMP_DIR }}/pr_numbers.txt + else + echo "PR #$pr_number already has the 'cherry-picked' label. Skipping." + fi + done + + # Sort the filtered PR numbers + sort -n ${{ env.TEMP_DIR }}/pr_numbers.txt -o ${{ env.TEMP_DIR }}/pr_numbers.txt + + echo "Filtered and sorted PR numbers:" + cat ${{ env.TEMP_DIR }}/pr_numbers.txt || echo "No closed PR numbers found for milestone." + + - name: Fetch Merge Commits for PRs and Generate Title and Body + run: | + # Ensure the files are initialized + > ${{ env.TEMP_DIR }}/commit_hashes.txt + > ${{ env.TEMP_DIR }}/pr_title.txt + > ${{ env.TEMP_DIR }}/pr_body.txt + + # Write description to the PR body + echo "### Description:" >> ${{ env.TEMP_DIR }}/pr_body.txt + echo "Merging PRs from milestone \`$MILESTONE_NAME\` into target branch \`$TARGET_BRANCH\`." >> ${{ env.TEMP_DIR }}/pr_body.txt + echo "" >> ${{ env.TEMP_DIR }}/pr_body.txt + echo "### Need Merge PRs:" >> ${{ env.TEMP_DIR }}/pr_body.txt + + pr_numbers_in_title="" + + # Process sorted PR numbers and generate commit hashes + for pr_number in $(cat ${{ env.TEMP_DIR }}/pr_numbers.txt); do + echo "Processing PR #$pr_number" + pr_details=$(curl -s -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + "https://api.github.com/repos/${{ github.repository }}/pulls/$pr_number") + pr_title=$(echo "$pr_details" | jq -r '.title') + merge_commit=$(echo "$pr_details" | jq -r '.merge_commit_sha') + short_commit_hash=$(echo "$merge_commit" | cut -c 1-7) + + # Append PR details to the body + echo "- $pr_title: (#$pr_number) ($short_commit_hash)" >> ${{ env.TEMP_DIR }}/pr_body.txt + + if [ "$merge_commit" != "null" ];then + echo "$merge_commit" >> ${{ env.TEMP_DIR }}/commit_hashes.txt + echo "#$pr_number" >> ${{ env.TEMP_DIR }}/pr_title.txt + pr_numbers_in_title="$pr_numbers_in_title #$pr_number" + fi + done + + commit_hashes=$(cat ${{ env.TEMP_DIR }}/commit_hashes.txt | tr '\n' ' ') + first_commit_hash=$(head -n 1 ${{ env.TEMP_DIR }}/commit_hashes.txt) + cherry_pick_branch="cherry-pick-${first_commit_hash:0:7}" + echo "COMMIT_HASHES=$commit_hashes" >> $GITHUB_ENV + echo "CHERRY_PICK_BRANCH=$cherry_pick_branch" >> $GITHUB_ENV + echo "pr_numbers_in_title=$pr_numbers_in_title" >> $GITHUB_ENV + + - name: Pull and Cherry-pick Commits, Then Push + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + BOT_TOKEN: ${{ secrets.BOT_TOKEN }} + run: | + # Fetch and pull the latest changes from the target branch + git fetch origin + git checkout $TARGET_BRANCH + git pull origin $TARGET_BRANCH + + # Create a new branch for cherry-picking + git checkout -b $CHERRY_PICK_BRANCH + + # Cherry-pick the commits and handle conflicts + for commit_hash in $COMMIT_HASHES; do + echo "Attempting to cherry-pick commit $commit_hash" + if ! git cherry-pick "$commit_hash" --strategy=recursive -X theirs; then + echo "Conflict detected for $commit_hash. Resolving with incoming changes." + conflict_files=$(git diff --name-only --diff-filter=U) + echo "Conflicting files:" + echo "$conflict_files" + + for file in $conflict_files; do + if [ -f "$file" ]; then + echo "Resolving conflict for $file" + git add "$file" + else + echo "File $file has been deleted. Skipping." + git rm "$file" + fi + done + + echo "Conflicts resolved. Continuing cherry-pick." + git cherry-pick --continue + else + echo "Cherry-pick successful for commit $commit_hash." + fi + done + + # Push the cherry-pick branch to the repository + git remote set-url origin "https://${BOT_TOKEN}@github.com/${{ github.repository }}.git" + git push origin $CHERRY_PICK_BRANCH --force + + - name: Create Pull Request + run: | + # Prepare and create the PR + pr_title="deps: Merge ${{ env.pr_numbers_in_title }} PRs into $TARGET_BRANCH" + pr_body=$(cat ${{ env.TEMP_DIR }}/pr_body.txt) + + echo "Prepared PR title:" + echo "$pr_title" + echo "Prepared PR body:" + echo "$pr_body" + + # Create the PR using the GitHub API + response=$(curl -s -X POST -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + https://api.github.com/repos/${{ github.repository }}/pulls \ + -d "$(jq -n --arg title "$pr_title" \ + --arg head "$CHERRY_PICK_BRANCH" \ + --arg base "$TARGET_BRANCH" \ + --arg body "$pr_body" \ + '{title: $title, head: $head, base: $base, body: $body}')") + + pr_number=$(echo "$response" | jq -r '.number') + echo "$pr_number" > ${{ env.TEMP_DIR }}/created_pr_number.txt + echo "Created PR #$pr_number" + + - name: Add Label to Created Pull Request + run: | + # Add 'milestone-merge' label to the created PR + pr_number=$(cat ${{ env.TEMP_DIR }}/created_pr_number.txt) + echo "Adding label to PR #$pr_number" + + curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + -d '{"labels": ["milestone-merge"]}' \ + "https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels" + + echo "Added 'milestone-merge' label to PR #$pr_number." diff --git a/README.md b/README.md index a99559cdb0..6b1779b252 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ [![Best Practices](https://img.shields.io/badge/Best%20Practices-purple?style=for-the-badge)](https://www.bestpractices.dev/projects/8045) [![Good First Issues](https://img.shields.io/github/issues/openimsdk/open-im-server/good%20first%20issue?style=for-the-badge&logo=github)](https://github.com/openimsdk/open-im-server/issues?q=is%3Aissue+is%3Aopen+sort%3Aupdated-desc+label%3A%22good+first+issue%22) [![Language](https://img.shields.io/badge/Language-Go-blue.svg?style=for-the-badge&logo=go&logoColor=white)](https://golang.org/) +[![Gurubase](https://img.shields.io/badge/Gurubase-Ask%20OpenIM%20Guru-006BFF?style=for-the-badge)](https://gurubase.io/g/openim)
diff --git a/config/notification.yml b/config/notification.yml index 85ca91af18..ba5ca1c21a 100644 --- a/config/notification.yml +++ b/config/notification.yml @@ -1,20 +1,3 @@ -# Copyright © 2023 OpenIM. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Determines if a message should be sent. If set to false, it triggers a silent sync without a message. If true, it requires triggering a conversation. -# For rpc notification, send twice: once as a message and once as a notification. -# The options field 'isNotification' indicates if it's a notification. groupCreated: isSendMsg: true # Reliability level of the message sending. @@ -309,9 +292,9 @@ userInfoUpdated: unreadCount: false offlinePush: enable: true - title: Remove a blocked user - desc: Remove a blocked user - ext: Remove a blocked user + title: userInfo updated + desc: userInfo updated + ext: userInfo updated userStatusChanged: isSendMsg: false diff --git a/go.mod b/go.mod index 5e5a8b5bea..7cd4e4550f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.47 + github.com/openimsdk/protocol v0.0.72-alpha.51 github.com/openimsdk/tools v0.0.50-alpha.16 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 53109c8908..7cc45616f5 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.47 h1:FGHnEwsA05GxT3vnz7YH3fbVkuoO3P71ZZgkQQ71MjA= -github.com/openimsdk/protocol v0.0.72-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.51 h1:G5Yjndp/FRyOJWhoQcSF2x2GvYiAIlqN0vjkvjUPycU= +github.com/openimsdk/protocol v0.0.72-alpha.51/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/router.go b/internal/api/router.go index 17c9989120..560516d303 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -198,13 +198,6 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En objectGroup.POST("/initiate_form_data", t.InitiateFormData) objectGroup.POST("/complete_form_data", t.CompleteFormData) objectGroup.GET("/*name", t.ObjectRedirect) - - applicationGroup := r.Group("application") - applicationGroup.POST("/add_version", t.AddApplicationVersion) - applicationGroup.POST("/update_version", t.UpdateApplicationVersion) - applicationGroup.POST("/delete_version", t.DeleteApplicationVersion) - applicationGroup.POST("/latest_version", t.LatestApplicationVersion) - applicationGroup.POST("/page_versions", t.PageApplicationVersion) } // Message msgGroup := r.Group("/msg") @@ -297,6 +290,4 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc { var Whitelist = []string{ "/auth/get_admin_token", "/auth/parse_token", - "/application/latest_version", - "/application/page_versions", } diff --git a/internal/api/third.go b/internal/api/third.go index 56661ba89d..6baa70ee5d 100644 --- a/internal/api/third.go +++ b/internal/api/third.go @@ -170,23 +170,3 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) { func (o *ThirdApi) GetPrometheus(c *gin.Context) { c.Redirect(http.StatusFound, o.GrafanaUrl) } - -func (o *ThirdApi) LatestApplicationVersion(c *gin.Context) { - a2r.Call(third.ThirdClient.LatestApplicationVersion, o.Client, c) -} - -func (o *ThirdApi) AddApplicationVersion(c *gin.Context) { - a2r.Call(third.ThirdClient.AddApplicationVersion, o.Client, c) -} - -func (o *ThirdApi) UpdateApplicationVersion(c *gin.Context) { - a2r.Call(third.ThirdClient.UpdateApplicationVersion, o.Client, c) -} - -func (o *ThirdApi) DeleteApplicationVersion(c *gin.Context) { - a2r.Call(third.ThirdClient.DeleteApplicationVersion, o.Client, c) -} - -func (o *ThirdApi) PageApplicationVersion(c *gin.Context) { - a2r.Call(third.ThirdClient.PageApplicationVersion, o.Client, c) -} diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 7dc2ebeea0..f11cfde1af 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -128,6 +128,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error { go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) + go m.historyCH.HandleUserHasReadSeqMessages(m.ctx) err := m.historyCH.redisMessageBatches.Start() if err != nil { return err @@ -157,12 +158,14 @@ func (m *MsgTransfer) Start(index int, config *Config) error { // graceful close kafka client. m.cancel() m.historyCH.redisMessageBatches.Close() + m.historyCH.Close() m.historyCH.historyConsumerGroup.Close() m.historyMongoCH.historyConsumerGroup.Close() return nil case <-netDone: m.cancel() m.historyCH.redisMessageBatches.Close() + m.historyCH.Close() m.historyCH.historyConsumerGroup.Close() m.historyMongoCH.historyConsumerGroup.Close() close(netDone) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b0078649cb..84453c8df4 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,8 +18,10 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "strconv" "strings" + "sync" "time" "github.com/IBM/sarama" @@ -40,11 +42,12 @@ import ( ) const ( - size = 500 - mainDataBuffer = 500 - subChanBuffer = 50 - worker = 50 - interval = 100 * time.Millisecond + size = 500 + mainDataBuffer = 500 + subChanBuffer = 50 + worker = 50 + interval = 100 * time.Millisecond + hasReadChanBuffer = 1000 ) type ContextMsg struct { @@ -52,14 +55,23 @@ type ContextMsg struct { ctx context.Context } +// This structure is used for asynchronously writing the sender’s read sequence (seq) regarding a message into MongoDB. +// For example, if the sender sends a message with a seq of 10, then their own read seq for this conversation should be set to 10. +type userHasReadSeq struct { + conversationID string + userHasReadMap map[string]int64 +} + type OnlineHistoryRedisConsumerHandler struct { historyConsumerGroup *kafka.MConsumerGroup redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] - msgTransferDatabase controller.MsgTransferDatabase - conversationRpcClient *rpcclient.ConversationRpcClient - groupRpcClient *rpcclient.GroupRpcClient + msgTransferDatabase controller.MsgTransferDatabase + conversationRpcClient *rpcclient.ConversationRpcClient + groupRpcClient *rpcclient.GroupRpcClient + conversationUserHasReadChan chan *userHasReadSeq + wg sync.WaitGroup } func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase, @@ -70,6 +82,8 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont } var och OnlineHistoryRedisConsumerHandler och.msgTransferDatabase = database + och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer) + och.wg.Add(1) b := batcher.New[sarama.ConsumerMessage]( batcher.WithSize(size), @@ -115,25 +129,25 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID } func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { - type seqKey struct { - conversationID string - userID string - } - var readSeq map[seqKey]int64 + + var conversationID string + var userSeqMap map[string]int64 for _, msg := range msgs { if msg.message.ContentType != constant.HasReadReceipt { continue } var elem sdkws.NotificationElem if err := json.Unmarshal(msg.message.Content, &elem); err != nil { - log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) + log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) continue } var tips sdkws.MarkAsReadTips if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { - log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) + log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) continue } + //The conversation ID for each batch of messages processed by the batcher is the same. + conversationID = tips.ConversationID if len(tips.Seqs) > 0 { for _, seq := range tips.Seqs { if tips.HasReadSeq < seq { @@ -146,26 +160,25 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, if tips.HasReadSeq < 0 { continue } - if readSeq == nil { - readSeq = make(map[seqKey]int64) - } - key := seqKey{ - conversationID: tips.ConversationID, - userID: tips.MarkAsReadUserID, + if userSeqMap == nil { + userSeqMap = make(map[string]int64) } - if readSeq[key] > tips.HasReadSeq { + + if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq { continue } - readSeq[key] = tips.HasReadSeq + userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq } - if readSeq == nil { + if userSeqMap == nil { return } - for key, seq := range readSeq { - if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil { - log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq) - } + if len(conversationID) == 0 { + log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID) } + if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil { + log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap) + } + } func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg { @@ -250,12 +263,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key } if len(storageMessageList) > 0 { msg := storageMessageList[0] - lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) + lastSeq, isNewConversation, userSeqMap, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) { - log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) + log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) return } log.ZInfo(ctx, "BatchInsertChat2Cache end") + err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap) + if err != nil { + log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) + prommetrics.SeqSetFailedCounter.Inc() + } + och.conversationUserHasReadChan <- &userHasReadSeq{ + conversationID: conversationID, + userHasReadMap: userSeqMap, + } if isNewConversation { switch msg.SessionType { @@ -308,7 +330,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con storageMessageList = append(storageMessageList, msg.message) } if len(storageMessageList) > 0 { - lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) + lastSeq, _, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) if err != nil { log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageMessageList) @@ -323,6 +345,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con och.toPushTopic(ctx, key, conversationID, storageList) } } +func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) { + defer och.wg.Done() + + for msg := range och.conversationUserHasReadChan { + if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, msg.conversationID, msg.userHasReadMap); err != nil { + log.ZWarn(ctx, "set read seq to db error", err, "conversationID", msg.conversationID, "userSeqMap", msg.userHasReadMap) + } + } + + log.ZInfo(ctx, "Channel closed, exiting handleUserHasReadSeqMessages") +} +func (och *OnlineHistoryRedisConsumerHandler) Close() { + close(och.conversationUserHasReadChan) + och.wg.Wait() +} func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { for _, v := range msgs { diff --git a/internal/push/callback.go b/internal/push/callback.go index 8897295827..f8e17bb8c0 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -24,7 +24,6 @@ import ( "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils/datautil" ) func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { @@ -70,7 +69,7 @@ func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before * func (c *ConsumerHandler) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { - if datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { + if msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforePushReq{ diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index ef917d5395..b5ab1b2097 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -1026,7 +1026,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf } num := len(update) if req.GroupInfoForSet.Notification != "" { - num-- + num -= 3 func() { conversation := &pbconversation.ConversationReq{ ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSet.GroupID), @@ -1133,8 +1133,9 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI } num := len(updatedData) + if req.Notification != nil { - num-- + num -= 3 if req.Notification.Value != "" { func() { @@ -1219,7 +1220,7 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans } } - if newOwner.MuteEndTime != time.Unix(0, 0) { + if newOwner.MuteEndTime.After(time.Now()) { if _, err := g.CancelMuteGroupMember(ctx, &pbgroup.CancelMuteGroupMemberReq{ GroupID: group.GroupID, UserID: req.NewOwnerUserID}); err != nil { @@ -1810,7 +1811,6 @@ func (g *groupServer) GetSpecifiedUserGroupRequestInfo(ctx context.Context, req } if req.UserID != opUserID { - req.UserID = mcontext.GetOpUserID(ctx) adminIDs, err := g.db.GetGroupRoleLevelMemberIDs(ctx, req.GroupID, constant.GroupAdmin) if err != nil { return nil, err @@ -1819,10 +1819,11 @@ func (g *groupServer) GetSpecifiedUserGroupRequestInfo(ctx context.Context, req adminIDs = append(adminIDs, owners[0].UserID) adminIDs = append(adminIDs, g.config.Share.IMAdminUserID...) - if !datautil.Contain(req.UserID, adminIDs...) { + if !datautil.Contain(opUserID, adminIDs...) { return nil, errs.ErrNoPermission.WrapMsg("opUser no permission") } } + requests, err := g.db.FindGroupRequests(ctx, req.GroupID, []string{req.UserID}) if err != nil { return nil, err diff --git a/internal/rpc/third/application.go b/internal/rpc/third/application.go deleted file mode 100644 index a6556055c1..0000000000 --- a/internal/rpc/third/application.go +++ /dev/null @@ -1,117 +0,0 @@ -package third - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/protocol/third" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/datautil" - "github.com/redis/go-redis/v9" - "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" - "time" -) - -func IsNotFound(err error) bool { - switch errs.Unwrap(err) { - case redis.Nil, mongo.ErrNoDocuments: - return true - default: - return false - } -} - -func (t *thirdServer) db2pbApplication(val *model.Application) *third.ApplicationVersion { - return &third.ApplicationVersion{ - Id: val.ID.Hex(), - Platform: val.Platform, - Version: val.Version, - Url: val.Url, - Text: val.Text, - Force: val.Force, - Latest: val.Latest, - CreateTime: val.CreateTime.UnixMilli(), - } -} - -func (t *thirdServer) LatestApplicationVersion(ctx context.Context, req *third.LatestApplicationVersionReq) (*third.LatestApplicationVersionResp, error) { - res, err := t.applicationDatabase.LatestVersion(ctx, req.Platform) - if err == nil { - return &third.LatestApplicationVersionResp{Version: t.db2pbApplication(res)}, nil - } else if IsNotFound(err) { - return &third.LatestApplicationVersionResp{}, nil - } else { - return nil, err - } -} - -func (t *thirdServer) AddApplicationVersion(ctx context.Context, req *third.AddApplicationVersionReq) (*third.AddApplicationVersionResp, error) { - if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { - return nil, err - } - val := &model.Application{ - ID: primitive.NewObjectID(), - Platform: req.Platform, - Version: req.Version, - Url: req.Url, - Text: req.Text, - Force: req.Force, - Latest: req.Latest, - CreateTime: time.Now(), - } - if err := t.applicationDatabase.AddVersion(ctx, val); err != nil { - return nil, err - } - return &third.AddApplicationVersionResp{}, nil -} - -func (t *thirdServer) UpdateApplicationVersion(ctx context.Context, req *third.UpdateApplicationVersionReq) (*third.UpdateApplicationVersionResp, error) { - if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { - return nil, err - } - oid, err := primitive.ObjectIDFromHex(req.Id) - if err != nil { - return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error()) - } - update := make(map[string]any) - putUpdate(update, "platform", req.Platform) - putUpdate(update, "version", req.Version) - putUpdate(update, "url", req.Url) - putUpdate(update, "text", req.Text) - putUpdate(update, "force", req.Force) - putUpdate(update, "latest", req.Latest) - if err := t.applicationDatabase.UpdateVersion(ctx, oid, update); err != nil { - return nil, err - } - return &third.UpdateApplicationVersionResp{}, nil -} - -func (t *thirdServer) DeleteApplicationVersion(ctx context.Context, req *third.DeleteApplicationVersionReq) (*third.DeleteApplicationVersionResp, error) { - if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { - return nil, err - } - ids := make([]primitive.ObjectID, 0, len(req.Id)) - for _, id := range req.Id { - oid, err := primitive.ObjectIDFromHex(id) - if err != nil { - return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error()) - } - ids = append(ids, oid) - } - if err := t.applicationDatabase.DeleteVersion(ctx, ids); err != nil { - return nil, err - } - return &third.DeleteApplicationVersionResp{}, nil -} - -func (t *thirdServer) PageApplicationVersion(ctx context.Context, req *third.PageApplicationVersionReq) (*third.PageApplicationVersionResp, error) { - total, res, err := t.applicationDatabase.PageVersion(ctx, req.Platform, req.Pagination) - if err != nil { - return nil, err - } - return &third.PageApplicationVersionResp{ - Total: total, - Versions: datautil.Slice(res, t.db2pbApplication), - }, nil -} diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index c6b588d8d4..d37689b313 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -38,13 +38,12 @@ import ( ) type thirdServer struct { - thirdDatabase controller.ThirdDatabase - s3dataBase controller.S3Database - userRpcClient rpcclient.UserRpcClient - defaultExpire time.Duration - config *Config - minio *minio.Minio - applicationDatabase controller.ApplicationDatabase + thirdDatabase controller.ThirdDatabase + s3dataBase controller.S3Database + userRpcClient rpcclient.UserRpcClient + defaultExpire time.Duration + config *Config + minio *minio.Minio } type Config struct { @@ -75,10 +74,6 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - applicationMgo, err := mgo.NewApplicationMgo(mgocli.GetDB()) - if err != nil { - return err - } // Select the oss method according to the profile policy enable := config.RpcConfig.Object.Enable @@ -104,13 +99,12 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } localcache.InitLocalCache(&config.LocalCacheConfig) third.RegisterThirdServer(server, &thirdServer{ - thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), - userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), - s3dataBase: controller.NewS3Database(rdb, o, s3db), - defaultExpire: time.Hour * 24 * 7, - config: config, - minio: minioCli, - applicationDatabase: controller.NewApplicationDatabase(applicationMgo, redis.NewApplicationRedisCache(applicationMgo, rdb)), + thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), + userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), + s3dataBase: controller.NewS3Database(rdb, o, s3db), + defaultExpire: time.Hour * 24 * 7, + config: config, + minio: minioCli, }) return nil } diff --git a/pkg/common/storage/cache/application.go b/pkg/common/storage/cache/application.go deleted file mode 100644 index 588732ec8d..0000000000 --- a/pkg/common/storage/cache/application.go +++ /dev/null @@ -1,11 +0,0 @@ -package cache - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" -) - -type ApplicationCache interface { - LatestVersion(ctx context.Context, platform string) (*model.Application, error) - DeleteCache(ctx context.Context, platforms []string) error -} diff --git a/pkg/common/storage/cache/cachekey/application.go b/pkg/common/storage/cache/cachekey/application.go deleted file mode 100644 index 032adba3c1..0000000000 --- a/pkg/common/storage/cache/cachekey/application.go +++ /dev/null @@ -1,9 +0,0 @@ -package cachekey - -const ( - ApplicationLatestVersion = "APPLICATION_LATEST_VERSION:" -) - -func GetApplicationLatestVersionKey(platform string) string { - return ApplicationLatestVersion + platform -} diff --git a/pkg/common/storage/cache/redis/application.go b/pkg/common/storage/cache/redis/application.go deleted file mode 100644 index 4a7a4ced67..0000000000 --- a/pkg/common/storage/cache/redis/application.go +++ /dev/null @@ -1,43 +0,0 @@ -package redis - -import ( - "context" - "github.com/dtm-labs/rockscache" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/utils/datautil" - "github.com/redis/go-redis/v9" - "time" -) - -func NewApplicationRedisCache(db database.Application, rdb redis.UniversalClient) *ApplicationRedisCache { - return &ApplicationRedisCache{ - db: db, - rcClient: rockscache.NewClient(rdb, *GetRocksCacheOptions()), - deleter: NewBatchDeleterRedis(rdb, GetRocksCacheOptions(), nil), - expireTime: time.Hour * 24 * 7, - } -} - -type ApplicationRedisCache struct { - db database.Application - rcClient *rockscache.Client - deleter *BatchDeleterRedis - expireTime time.Duration -} - -func (a *ApplicationRedisCache) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { - return getCache(ctx, a.rcClient, cachekey.GetApplicationLatestVersionKey(platform), a.expireTime, func(ctx context.Context) (*model.Application, error) { - return a.db.LatestVersion(ctx, platform) - }) -} - -func (a *ApplicationRedisCache) DeleteCache(ctx context.Context, platforms []string) error { - if len(platforms) == 0 { - return nil - } - return a.deleter.ExecDelWithKeys(ctx, datautil.Slice(platforms, func(platform string) string { - return cachekey.GetApplicationLatestVersionKey(platform) - })) -} diff --git a/pkg/common/storage/controller/application.go b/pkg/common/storage/controller/application.go deleted file mode 100644 index 72bca07efa..0000000000 --- a/pkg/common/storage/controller/application.go +++ /dev/null @@ -1,69 +0,0 @@ -package controller - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/db/pagination" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type ApplicationDatabase interface { - LatestVersion(ctx context.Context, platform string) (*model.Application, error) - AddVersion(ctx context.Context, val *model.Application) error - UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error - DeleteVersion(ctx context.Context, id []primitive.ObjectID) error - PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) -} - -func NewApplicationDatabase(db database.Application, cache cache.ApplicationCache) ApplicationDatabase { - return &applicationDatabase{db: db, cache: cache} -} - -type applicationDatabase struct { - db database.Application - cache cache.ApplicationCache -} - -func (a *applicationDatabase) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { - return a.cache.LatestVersion(ctx, platform) -} - -func (a *applicationDatabase) AddVersion(ctx context.Context, val *model.Application) error { - if err := a.db.AddVersion(ctx, val); err != nil { - return err - } - return a.cache.DeleteCache(ctx, []string{val.Platform}) -} - -func (a *applicationDatabase) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error { - platforms, err := a.db.FindPlatform(ctx, []primitive.ObjectID{id}) - if err != nil { - return err - } - if err := a.db.UpdateVersion(ctx, id, update); err != nil { - return err - } - if p, ok := update["platform"]; ok { - if val, ok := p.(string); ok { - platforms = append(platforms, val) - } - } - return a.cache.DeleteCache(ctx, platforms) -} - -func (a *applicationDatabase) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error { - platforms, err := a.db.FindPlatform(ctx, id) - if err != nil { - return err - } - if err := a.db.DeleteVersion(ctx, id); err != nil { - return err - } - return a.cache.DeleteCache(ctx, platforms) -} - -func (a *applicationDatabase) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) { - return a.db.PageVersion(ctx, platforms, page) -} diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index c6013dbc12..1ecd786aa3 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -24,8 +24,11 @@ type MsgTransferDatabase interface { DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error // BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache. - BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) - SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error + BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, userHasReadMap map[string]int64, err error) + + SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error + + SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error // to mq MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) @@ -219,18 +222,18 @@ func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conv return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) } -func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { +func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) { lenList := len(msgs) if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { - return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() + return 0, false, nil, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() } if lenList < 1 { - return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() + return 0, false, nil, errs.New("no messages to insert", "minCount", 1).Wrap() } currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) if err != nil { log.ZError(ctx, "storage.seq.Malloc", err) - return 0, false, err + return 0, false, nil, err } isNew = currentMaxSeq == 0 lastMaxSeq := currentMaxSeq @@ -248,25 +251,25 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver } else { prommetrics.MsgInsertRedisSuccessCounter.Inc() } - err = db.setHasReadSeqs(ctx, conversationID, userSeqMap) - if err != nil { - log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) - prommetrics.SeqSetFailedCounter.Inc() - } - return lastMaxSeq, isNew, errs.Wrap(err) + return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err) } -func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { +func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { for userID, seq := range userSeqMap { - if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil { + if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { return err } } return nil } -func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq) +func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { + for userID, seq := range userSeqMap { + if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil } func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { diff --git a/pkg/common/storage/database/application.go b/pkg/common/storage/database/application.go deleted file mode 100644 index c98ae74c82..0000000000 --- a/pkg/common/storage/database/application.go +++ /dev/null @@ -1,17 +0,0 @@ -package database - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/db/pagination" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type Application interface { - LatestVersion(ctx context.Context, platform string) (*model.Application, error) - AddVersion(ctx context.Context, val *model.Application) error - UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error - DeleteVersion(ctx context.Context, id []primitive.ObjectID) error - PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) - FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) -} diff --git a/pkg/common/storage/database/mgo/application.go b/pkg/common/storage/database/mgo/application.go deleted file mode 100644 index e59c0560ab..0000000000 --- a/pkg/common/storage/database/mgo/application.go +++ /dev/null @@ -1,82 +0,0 @@ -package mgo - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/db/pagination" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -func NewApplicationMgo(db *mongo.Database) (*ApplicationMgo, error) { - coll := db.Collection("application") - _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - { - Keys: bson.D{ - {Key: "platform", Value: 1}, - {Key: "version", Value: 1}, - }, - Options: options.Index().SetUnique(true), - }, - { - Keys: bson.D{ - {Key: "latest", Value: -1}, - }, - }, - }) - if err != nil { - return nil, err - } - return &ApplicationMgo{coll: coll}, nil -} - -type ApplicationMgo struct { - coll *mongo.Collection -} - -func (a *ApplicationMgo) sort() any { - return bson.D{{"latest", -1}, {"_id", -1}} -} - -func (a *ApplicationMgo) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { - return mongoutil.FindOne[*model.Application](ctx, a.coll, bson.M{"platform": platform}, options.FindOne().SetSort(a.sort())) -} - -func (a *ApplicationMgo) AddVersion(ctx context.Context, val *model.Application) error { - if val.ID.IsZero() { - val.ID = primitive.NewObjectID() - } - return mongoutil.InsertMany(ctx, a.coll, []*model.Application{val}) -} - -func (a *ApplicationMgo) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error { - if len(update) == 0 { - return nil - } - return mongoutil.UpdateOne(ctx, a.coll, bson.M{"_id": id}, bson.M{"$set": update}, true) -} - -func (a *ApplicationMgo) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error { - if len(id) == 0 { - return nil - } - return mongoutil.DeleteMany(ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}) -} - -func (a *ApplicationMgo) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) { - filter := bson.M{} - if len(platforms) > 0 { - filter["platform"] = bson.M{"$in": platforms} - } - return mongoutil.FindPage[*model.Application](ctx, a.coll, filter, page, options.Find().SetSort(a.sort())) -} - -func (a *ApplicationMgo) FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) { - if len(id) == 0 { - return nil, nil - } - return mongoutil.Find[string](ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}, options.Find().SetProjection(bson.M{"_id": 0, "platform": 1})) -} diff --git a/pkg/common/storage/model/application.go b/pkg/common/storage/model/application.go index f5bae2be65..b09b0e8948 100644 --- a/pkg/common/storage/model/application.go +++ b/pkg/common/storage/model/application.go @@ -8,6 +8,7 @@ import ( type Application struct { ID primitive.ObjectID `bson:"_id"` Platform string `bson:"platform"` + Hot bool `bson:"hot"` Version string `bson:"version"` Url string `bson:"url"` Text string `bson:"text"`