Skip to content

Commit

Permalink
feat_: phase-1 of use single content-topic for all community chats
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 17, 2024
1 parent 7971fd3 commit 0eb01c3
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 37 deletions.
27 changes: 14 additions & 13 deletions eth-node/types/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (

// NewMessage represents a new whisper message that is posted through the RPC.
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
ContentTopicOverride string `json:"contentTopicOverride"`
}

// Message is the RPC representation of a whisper message.
Expand Down
12 changes: 7 additions & 5 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,11 +660,12 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes
}

newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
ContentTopicOverride: rawMessage.ContentTopicOverride,

Check warning on line 668 in protocol/common/message_sender.go

View check run for this annotation

Codecov / codecov/patch

protocol/common/message_sender.go#L663-L668

Added lines #L663 - L668 were not covered by tests
}

if rawMessage.BeforeDispatch != nil {
Expand Down Expand Up @@ -765,6 +766,7 @@ func (s *MessageSender) SendPublic(
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
newMessage.Priority = rawMessage.Priority
newMessage.ContentTopicOverride = rawMessage.ContentTopicOverride

messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)

Expand Down
1 change: 1 addition & 0 deletions protocol/common/raw_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ type RawMessage struct {
ResendType ResendType
ResendMethod ResendMethod
Priority *MessagePriority
ContentTopicOverride string
}
1 change: 1 addition & 0 deletions protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4183,6 +4183,7 @@ func (m *Manager) GetOwnedCommunitiesChatIDs() (map[string]bool, error) {
for _, id := range c.ChatIDs() {
chatIDs[id] = true
}
chatIDs[c.MemberUpdateChannelID()] = true // TODO: for now including this chatID as controlled so that archiving works without any issues.

Check warning on line 4186 in protocol/communities/manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/communities/manager.go#L4186

Added line #L4186 was not covered by tests
}
}
return chatIDs, nil
Expand Down
4 changes: 4 additions & 0 deletions protocol/communities/manager_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ func (m *ArchiveManager) StartHistoryArchiveTasksInterval(community *Community,
m.logger.Error("failed to get community chat topics ", zap.Error(err))
continue
}
// adding the content-topic used for member updates.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
topics = append(topics, m.transport.FilterByChatID(community.MemberUpdateChannelID()).ContentTopic)

Check warning on line 360 in protocol/communities/manager_archive.go

View check run for this annotation

Codecov / codecov/patch

protocol/communities/manager_archive.go#L360

Added line #L360 was not covered by tests

ts := time.Now().Unix()
to := time.Unix(ts, 0)
Expand Down
1 change: 1 addition & 0 deletions protocol/communities_key_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(community *comm
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
HashRatchetGroupID: hashRatchetGroupID,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
//ContentTopicOverride: community.MemberUpdateChannelID(), //TODO: Confirm if this is correct, could not figure out where LocalChatID is set in this flow
}
_, err := ckd.sender.SendCommunityMessage(context.Background(), &rawMessage)

Expand Down
3 changes: 2 additions & 1 deletion protocol/communities_messenger_token_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
startDate := messageDate.Add(-time.Minute)
endDate := messageDate.Add(time.Minute)
topic := types.BytesToTopic(transport.ToTopic(chat.ID))
topics := []types.TopicType{topic}
communityCommonTopic := types.BytesToTopic(transport.ToTopic(community.MemberUpdateChannelID()))
topics := []types.TopicType{topic, communityCommonTopic}

torrentConfig := params.TorrentConfig{
Enabled: true,
Expand Down
3 changes: 2 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2207,7 +2207,8 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
)
return rawMessage, fmt.Errorf("can't post message type '%d' on chat '%s'", rawMessage.MessageType, chat.ID)
}

//setting content-topic over-ride for community messages to use memberUpdatesChannelID
rawMessage.ContentTopicOverride = community.MemberUpdateChannelID()

Check warning on line 2211 in protocol/messenger.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger.go#L2211

Added line #L2211 was not covered by tests
logger.Debug("sending community chat message", zap.String("chatName", chat.Name))
isCommunityEncrypted, err := m.communitiesManager.IsEncrypted(chat.CommunityID)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -2713,7 +2713,12 @@ func (m *Messenger) UpdateCommunityFilters(community *communities.Community) err
publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats()))

publicFiltersToInit = append(publicFiltersToInit, defaultFilters...)

for _, filter := range publicFiltersToInit {
_, err := m.transport.RemoveFilterByChatID(filter.ChatID)
if err != nil {
return err

Check warning on line 2719 in protocol/messenger_communities.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_communities.go#L2716-L2719

Added lines #L2716 - L2719 were not covered by tests
}
}
for chatID := range community.Chats() {
communityChatID := community.IDString() + chatID
_, err := m.transport.RemoveFilterByChatID(communityChatID)
Expand Down Expand Up @@ -3949,6 +3954,10 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
for _, filter := range filters {
topics = append(topics, filter.ContentTopic)
}
// adding the content-topic used for member updates.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
filters = append(filters, m.transport.FilterByChatID(c.MemberUpdateChannelID()))

Check warning on line 3960 in protocol/messenger_communities.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_communities.go#L3960

Added line #L3960 was not covered by tests

// First we need to know the timestamp of the latest waku message
// we've received for this community, so we can request messages we've
Expand Down
34 changes: 23 additions & 11 deletions protocol/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (f *FiltersManager) Init(

// Add public, one-to-one and negotiated filters.
for _, fi := range filtersToInit {
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic)
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopicOverrideID)

Check warning on line 102 in protocol/transport/filters_manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/transport/filters_manager.go#L102

Added line #L102 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -123,15 +123,16 @@ func (f *FiltersManager) Init(
}

type FiltersToInitialize struct {
ChatID string
PubsubTopic string
ChatID string
PubsubTopic string
ContentTopicOverrideID string //litte hacky but this is used to override content-topic in filtersManager.
}

func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, pf := range publicFiltersToInit {
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic)
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopicOverrideID)

Check warning on line 135 in protocol/transport/filters_manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/transport/filters_manager.go#L135

Added line #L135 was not covered by tests
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -455,7 +456,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
}

keyString := hex.EncodeToString(secret.Key)
filter, err := f.addSymmetric(keyString, "")
filter, err := f.addSymmetric(keyString, "", "")

Check warning on line 459 in protocol/transport/filters_manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/transport/filters_manager.go#L459

Added line #L459 was not covered by tests
if err != nil {
f.logger.Debug("could not register negotiated topic", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -534,11 +535,16 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
}

// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopicID string) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

if chat, ok := f.filters[chatID]; ok {
chatIDToLoad := chatID
if contentTopicID != "" {
chatIDToLoad = contentTopicID

Check warning on line 544 in protocol/transport/filters_manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/transport/filters_manager.go#L544

Added line #L544 was not covered by tests
}

if chat, ok := f.filters[chatIDToLoad]; ok {
if chat.PubsubTopic != pubsubTopic {
f.logger.Debug("updating pubsub topic for filter",
zap.String("chatID", chatID),
Expand All @@ -547,13 +553,13 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
zap.String("newTopic", pubsubTopic),
)
chat.PubsubTopic = pubsubTopic
f.filters[chatID] = chat
f.filters[chatIDToLoad] = chat //TODO: Do we need to update watchers as well on modification?

Check warning on line 556 in protocol/transport/filters_manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/transport/filters_manager.go#L556

Added line #L556 was not covered by tests
}

return chat, nil
}

filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic)
filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopicID)

Check warning on line 562 in protocol/transport/filters_manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/transport/filters_manager.go#L562

Added line #L562 was not covered by tests
if err != nil {
f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -592,7 +598,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
return f.filters[chatID], nil
}

contactCodeFilter, err := f.addSymmetric(chatID, "")
contactCodeFilter, err := f.addSymmetric(chatID, "", "")
if err != nil {
f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand All @@ -615,7 +621,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
}

// addSymmetric adds a symmetric key filter
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) {
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopicID string) (*RawFilter, error) {
var symKeyID string
var err error

Expand Down Expand Up @@ -644,6 +650,12 @@ func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFi
}
}

if contentTopicID != "" {
//add receive filter for the single default contentTopic for all community chats
topic = ToTopic(contentTopicID)
topics = append(topics, topic)

Check warning on line 656 in protocol/transport/filters_manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/transport/filters_manager.go#L655-L656

Added lines #L655 - L656 were not covered by tests
}

id, err := f.service.Subscribe(&types.SubscriptionOptions{
SymKeyID: symKeyID,
PoW: minPow,
Expand Down
13 changes: 8 additions & 5 deletions protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil
}

func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID, "")
return t.filters.LoadPublic(chatID, "", "")

Check warning on line 194 in protocol/transport/transport.go

View check run for this annotation

Codecov / codecov/patch

protocol/transport/transport.go#L194

Added line #L194 was not covered by tests
}

func (t *Transport) LeavePublic(chatID string) error {
Expand Down Expand Up @@ -258,6 +258,8 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
}

for i := range msgs {
// TODO: find the filter for the msg based on chatID in the message and map it properly. or better this is done in filter layer itself?
// something like t.FilterByChatID()
// Exclude anything that is a cache hit
if !hits[types.EncodeHex(msgs[i].Hash)] {
result[*filter] = append(result[*filter], msgs[i])
Expand All @@ -276,16 +278,16 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
// SendPublic sends a new message using the Whisper service.
// For public filters, chat name is used as an ID as well as
// a topic.
// In case of communities a single topic is used to send all messages.
func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) {
if err := t.addSig(newMessage); err != nil {
return nil, err
}

filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, newMessage.ContentTopicOverride)
if err != nil {
return nil, err
}

newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.ContentTopic
newMessage.PubsubTopic = filter.PubsubTopic
Expand Down Expand Up @@ -362,7 +364,8 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.
}

// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, newMessage.ContentTopicOverride)

Check warning on line 368 in protocol/transport/transport.go

View check run for this annotation

Codecov / codecov/patch

protocol/transport/transport.go#L368

Added line #L368 was not covered by tests
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 0eb01c3

Please sign in to comment.