Skip to content

Commit

Permalink
GO-4727 protocol improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
requilence committed Jan 30, 2025
1 parent f35c052 commit a1c97db
Show file tree
Hide file tree
Showing 9 changed files with 1,853 additions and 1,869 deletions.
6 changes: 3 additions & 3 deletions core/block/chats/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type Service interface {
GetMessages(ctx context.Context, chatObjectId string, req chatobject.GetMessagesRequest) ([]*model.ChatMessage, *model.ChatState, error)
GetMessagesByIds(ctx context.Context, chatObjectId string, messageIds []string) ([]*model.ChatMessage, error)
SubscribeLastMessages(ctx context.Context, chatObjectId string, limit int) ([]*model.ChatMessage, int, error)
ReadMessages(ctx context.Context, chatObjectId string, afterOrderId, beforeOrderId string, lastDbState int64) error
Unsubscribe(chatObjectId string) error
ChatReadMessages(chatObjectId string, beforeOrderId string, lastDbState int64) error

app.Component
}
Expand Down Expand Up @@ -127,8 +127,8 @@ func (s *service) Unsubscribe(chatObjectId string) error {
})
}

func (s *service) ChatReadMessages(chatObjectId string, beforeOrderId string, lastDbState int64) error {
func (s *service) ReadMessages(ctx context.Context, chatObjectId string, afterOrderId, beforeOrderId string, lastAddedMessageTimestamp int64) error {
return cache.Do(s.objectGetter, chatObjectId, func(sb chatobject.StoreObject) error {
return sb.MarkReadMessages(context.Background(), beforeOrderId, lastDbState)
return sb.MarkReadMessages(ctx, afterOrderId, beforeOrderId, lastAddedMessageTimestamp)
})
}
46 changes: 36 additions & 10 deletions core/block/editor/chatobject/chatobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type StoreObject interface {
ToggleMessageReaction(ctx context.Context, messageId string, emoji string) error
DeleteMessage(ctx context.Context, messageId string) error
SubscribeLastMessages(ctx context.Context, limit int) ([]*model.ChatMessage, int, error)
MarkReadMessages(ctx context.Context, beforeOrderId string, lastDbState int64) error
MarkReadMessages(ctx context.Context, afterOrderId string, beforeOrderId string, lastAddedMessageTimestamp int64) error
Unsubscribe() error
}

Expand Down Expand Up @@ -184,7 +184,7 @@ func (s *storeObject) initialChatState() (*model.ChatState, error) {
Counter: int32(count),

Check failure on line 184 in core/block/editor/chatobject/chatobject.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion int -> int32 (gosec)
},
// todo: add replies counter
DbState: int64(lastAdded),
DbTimestamp: int64(lastAdded),
}, nil
}
func (s *storeObject) markReadMessages(ids []string) {
Expand All @@ -204,7 +204,7 @@ func (s *storeObject) markReadMessages(ids []string) {
ctx := txn.Context()
idsModified := make([]string, 0, len(ids))
for _, id := range ids {
res, err := coll.Find(query.Key{Path: []string{"id"}, Filter: query.NewComp(query.CompOpEq, id)}).Limit(1).Update(ctx, `{"$set":{"`+readKey+`":true}}`)
res, err := coll.UpdateId(ctx, id, query.MustParseModifier(`{"$set":{"`+readKey+`":true}}`))
if err != nil {
log.Warnf("markReadMessages: update messages: %v", err)
continue
Expand All @@ -213,25 +213,50 @@ func (s *storeObject) markReadMessages(ids []string) {
idsModified = append(idsModified, id)
}
}
if len(idsModified) > 0 {

}

err = txn.Commit()
if err != nil {
log.Errorf("markReadMessages: commit: %v", err)
return
}
log.Warnf("markReadMessages: %d/%d messages marked as read", len(idsModified), len(ids))
if len(idsModified) > 0 {

Check failure on line 226 in core/block/editor/chatobject/chatobject.go

View workflow job for this annotation

GitHub Actions / lint

`if len(idsModified) > 0` has complex nested blocks (complexity: 6) (nestif)
// it doesn't work within the same transaction
// query the new oldest unread message's orderId
iter, err := coll.Find(
query.Key{Path: []string{readKey}, Filter: query.NewComp(query.CompOpEq, false)},
).Sort(ascOrder).
Limit(1).
Iter(s.componentCtx)
if err != nil {
log.Errorf("markReadMessages: find oldest read message: %v", err)
}
defer iter.Close()
if iter.Next() {
val, err := iter.Doc()
if err != nil {
log.Errorf("markReadMessages: get oldest read message: %v", err)
}
if val != nil {
newOldestOrderId := val.Value().GetObject(orderKey).Get("id").GetString()
s.subscription.chatState.Messages.OldestOrderId = newOldestOrderId
}
}
s.subscription.updateReadStatus(idsModified, true)
s.onUpdate()
}
}

func (s *storeObject) MarkReadMessages(ctx context.Context, beforeOrderId string, lastDbState int64) error {
func (s *storeObject) MarkReadMessages(ctx context.Context, afterOrderId, beforeOrderId string, lastAddedMessageTimestamp int64) error {
// 1. select all messages with orderId < beforeOrderId and addedTime < lastDbState
// 2. use the last(by orderId) message id as lastHead
// 3. update the MarkSeenHeads
// 2. mark messages as read in the DB

msg, err := s.GetLastAddedMessageBeforeOrderIdAndAddedTime(ctx, beforeOrderId, lastDbState)
msg, err := s.GetLastAddedMessageInOrderRange(ctx, afterOrderId, beforeOrderId, lastAddedMessageTimestamp)
if err != nil {
return fmt.Errorf("get message: %w", err)
}
Expand All @@ -241,18 +266,19 @@ func (s *storeObject) MarkReadMessages(ctx context.Context, beforeOrderId string
return nil
}

func (s *storeObject) GetLastAddedMessageBeforeOrderIdAndAddedTime(ctx context.Context, orderId string, addedTime int64) (*model.ChatMessage, error) {
func (s *storeObject) GetLastAddedMessageInOrderRange(ctx context.Context, afterOrderId, beforeOrderId string, lastAddedMessageTimestamp int64) (*model.ChatMessage, error) {
coll, err := s.store.Collection(ctx, collectionName)
if err != nil {
return nil, fmt.Errorf("get collection: %w", err)
}

iter, err := coll.Find(
query.And{
query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(query.CompOpLte, orderId)},
query.Key{Path: []string{addedKey}, Filter: query.NewComp(query.CompOpLte, addedTime)},
query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(query.CompOpGte, afterOrderId)},
query.Key{Path: []string{orderKey, "id"}, Filter: query.NewComp(query.CompOpLte, beforeOrderId)},
query.Key{Path: []string{addedKey}, Filter: query.NewComp(query.CompOpLte, lastAddedMessageTimestamp)},
},
).
).Sort(descAdded).
Limit(1).
Iter(ctx)
if err != nil {
Expand Down Expand Up @@ -509,7 +535,7 @@ func (s *storeObject) TryClose(objectTTL time.Duration) (res bool, err error) {
if !s.locker.TryLock() {
return false, nil
}
isActive := s.subscription.enabled.Load()
isActive := s.subscription.enabled
s.Unlock()

if isActive {
Expand Down
29 changes: 11 additions & 18 deletions core/block/editor/chatobject/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package chatobject

import (
"slices"
"sync"
"sync/atomic"

"github.com/anyproto/anytype-heart/core/event"
"github.com/anyproto/anytype-heart/core/session"
Expand All @@ -20,9 +18,8 @@ type subscription struct {

eventsBuffer []*pb.EventMessage

enabled atomic.Bool
enabled bool
chatState *model.ChatState
sync.Mutex
}

func newSubscription(spaceId string, chatId string, eventSender event.Sender) *subscription {
Expand All @@ -33,12 +30,14 @@ func newSubscription(spaceId string, chatId string, eventSender event.Sender) *s
}
}

func (s *subscription) enable() (wasEnabled bool) {
return s.enabled.Swap(true) == false
func (s *subscription) enable() (wasDisabled bool) {
wasDisabled = !s.enabled
s.enabled = true
return
}

func (s *subscription) close() {
s.enabled.Store(false)
s.enabled = false
}

// setSessionContext sets the session context for the current operation
Expand All @@ -47,15 +46,9 @@ func (s *subscription) setSessionContext(ctx session.Context) {
}

func (s *subscription) flush() *model.ChatState {
s.Lock()
// if len(s.eventsBuffer) == 0 {
// s.Unlock()
// return
// }
events := slices.Clone(s.eventsBuffer)
s.eventsBuffer = s.eventsBuffer[:0]
chatState := copyChatState(s.chatState)
s.Unlock()

events = append(events, event.NewMessage(s.spaceId, &pb.EventMessageValueOfChatStateUpdate{ChatStateUpdate: &pb.EventChatUpdateState{
State: chatState,
Expand All @@ -71,7 +64,7 @@ func (s *subscription) flush() *model.ChatState {
s.sessionContext.SetMessages(s.chatId, events)
s.eventSender.BroadcastToOtherSessions(s.sessionContext.ID(), ev)
s.sessionContext = nil
} else if s.enabled.Load() {
} else if s.enabled {
s.eventSender.Broadcast(ev)
}
return chatState
Expand Down Expand Up @@ -157,7 +150,7 @@ func (s *subscription) canSend() bool {
if s.sessionContext != nil {
return true
}
if !s.enabled.Load() {
if !s.enabled {
return false
}
return true
Expand All @@ -168,9 +161,9 @@ func copyChatState(state *model.ChatState) *model.ChatState {
return nil
}
return &model.ChatState{
Messages: copyReadState(state.Messages),
Mentions: copyReadState(state.Mentions),
DbState: state.DbState,
Messages: copyReadState(state.Messages),
Mentions: copyReadState(state.Mentions),
DbTimestamp: state.DbTimestamp,
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/chats.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func (mw *Middleware) ChatUnsubscribe(cctx context.Context, req *pb.RpcChatUnsub
}
}

func (mw *Middleware) ChatReadMessages(ctx context.Context, request *pb.RpcChatReadRequest) *pb.RpcChatReadResponse {
func (mw *Middleware) ChatReadMessages(cctx context.Context, request *pb.RpcChatReadRequest) *pb.RpcChatReadResponse {
chatService := mustService[chats.Service](mw)
err := chatService.ChatReadMessages(request.ChatObjectId, request.BeforeOrderId, request.LastDbState)
err := chatService.ReadMessages(cctx, request.ChatObjectId, request.AfterOrderId, request.BeforeOrderId, request.LastDbTimestamp)
code := mapErrorCode(err,
errToCode(anystore.ErrDocNotFound, pb.RpcChatReadResponseError_MESSAGES_NOT_FOUND),
)
Expand Down
13 changes: 6 additions & 7 deletions docs/proto.md
Original file line number Diff line number Diff line change
Expand Up @@ -10391,7 +10391,7 @@ Get marks list in the selected range in text block.
| ----- | ---- | ----- | ----------- |
| error | [Rpc.Chat.GetMessages.Response.Error](#anytype-Rpc-Chat-GetMessages-Response-Error) | | |
| messages | [model.ChatMessage](#anytype-model-ChatMessage) | repeated | |
| chatState | [model.ChatState](#anytype-model-ChatState) | | dbState from the state should be used in the next request to Chat.SubscribeLastMessages and Chat.ReadMessage |
| chatState | [model.ChatState](#anytype-model-ChatState) | | |



Expand Down Expand Up @@ -10491,10 +10491,10 @@ Get marks list in the selected range in text block.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| type | [Rpc.Chat.Read.ReadType](#anytype-Rpc-Chat-Read-ReadType) | | |
| chatObjectId | [string](#string) | | Identifier for the chat |
| afterOrderId | [string](#string) | | first orderId in the current viewport |
| beforeOrderId | [string](#string) | | last orderId in the current viewport |
| lastDbState | [int64](#int64) | | last dbState from the last processed Chat.Add event or ChatGetMessages response (in case no events received after it). Used to prevent race conditions |
| chatObjectId | [string](#string) | | id of the chat object |
| afterOrderId | [string](#string) | | read from this orderId; if empty - read from the beginning of the chat |
| beforeOrderId | [string](#string) | | read til this orderId |
| lastDbTimestamp | [int64](#int64) | | dbTimestamp from the last processed ChatState event(or GetMessages). Used to prevent race conditions |



Expand Down Expand Up @@ -10553,7 +10553,6 @@ Get marks list in the selected range in text block.
| ----- | ---- | ----- | ----------- |
| chatObjectId | [string](#string) | | Identifier for the chat |
| limit | [int32](#int32) | | Number of max last messages to return and subscribe |
| lastDbState | [int64](#int64) | | lastDbState from the ChatGetMessages response. In case some messages where added between ChatGetMessages and SubscribeLastMessages you will immediately receive Chat.Add events. |



Expand Down Expand Up @@ -29684,7 +29683,7 @@ Used to decode block meta only, without the content itself
| ----- | ---- | ----- | ----------- |
| messages | [ChatState.UnreadState](#anytype-model-ChatState-UnreadState) | | unread messages |
| mentions | [ChatState.UnreadState](#anytype-model-ChatState-UnreadState) | | unread mentions |
| dbState | [int64](#int64) | | reflects the state of the chat db at the moment of sending response/event that includes this state |
| dbTimestamp | [int64](#int64) | | reflects the state of the chat db at the moment of sending response/event that includes this state |



Expand Down
Loading

0 comments on commit a1c97db

Please sign in to comment.