Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat_: use a single content-topic for all community chats #5864

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions eth-node/types/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (

// NewMessage represents a new whisper message that is posted through the RPC.
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
ContentTopicOverride string `json:"contentTopicOverride"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks quite counter-intuitive. Why do we want some special ContentTopicOverride field, and not just use Topic?

Copy link
Contributor Author

@chaitanyaprem chaitanyaprem Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, let me think of why i did not do that. maybe it is possible without breaking anything.

but for some reference as to why i had to use a separate override field

Copy link
Contributor Author

@chaitanyaprem chaitanyaprem Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to do this as

rawMessage.ContentTopicOverride = community.MemberUpdateChannelID()
doesn't have access for logic to derive content-topic from chatID and similarly
ContentTopicOverride: rawMessage.ContentTopicOverride,

Happy to follow any suggestions you have to get around this.

}

// Message is the RPC representation of a whisper message.
Expand Down
12 changes: 7 additions & 5 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,11 +660,12 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes
}

newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
ContentTopicOverride: rawMessage.ContentTopicOverride,
}

if rawMessage.BeforeDispatch != nil {
Expand Down Expand Up @@ -765,6 +766,7 @@ func (s *MessageSender) SendPublic(
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
newMessage.Priority = rawMessage.Priority
newMessage.ContentTopicOverride = rawMessage.ContentTopicOverride

messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)

Expand Down
1 change: 1 addition & 0 deletions protocol/common/raw_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ type RawMessage struct {
ResendType ResendType
ResendMethod ResendMethod
Priority *MessagePriority
ContentTopicOverride string
}
6 changes: 6 additions & 0 deletions protocol/communities/community.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,12 @@ func (o *Community) setPrivateKey(pk *ecdsa.PrivateKey) {
o.config.PrivateKey = pk
}
}
func (o *Community) UniversalChatID() string {
// Using Member updates channelID as chatID to act as a universal content-topic for all chats in the community as explained here https://forum.vac.dev/t/status-communities-review-and-proposed-usage-of-waku-content-topics/335
// This is to match filter criteria of community with the content-topic usage.
// This specific topic is chosen as existing users before the change are already subscribed to this and will not get affected by it.
return o.MemberUpdateChannelID()
}

func (o *Community) SetResendAccountsClock(clock uint64) {
o.config.CommunityDescription.ResendAccountsClock = clock
Expand Down
16 changes: 16 additions & 0 deletions protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4194,6 +4194,22 @@
return chatIDs, nil
}

func (m *Manager) GetOwnedCommunitiesUniversalChatIDs() (map[string]bool, error) {
ownedCommunities, err := m.Controlled()
if err != nil {
return nil, err

Check warning on line 4200 in protocol/communities/manager.go

View check run for this annotation

Codecov / codecov/patch

protocol/communities/manager.go#L4200

Added line #L4200 was not covered by tests
}

chatIDs := make(map[string]bool)
for _, c := range ownedCommunities {
if c.Joined() {
chatIDs[c.UniversalChatID()] = true
}
}
return chatIDs, nil

}

func (m *Manager) StoreWakuMessage(message *types.Message) error {
return m.persistence.SaveWakuMessage(message)
}
Expand Down
4 changes: 4 additions & 0 deletions protocol/communities/manager_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@
m.logger.Error("failed to get community chat topics ", zap.Error(err))
continue
}
// adding the content-topic used for all community chats as all chat messages will be published on this topic.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
topics = append(topics, m.transport.FilterByChatID(community.UniversalChatID()).ContentTopic)

Check warning on line 360 in protocol/communities/manager_archive.go

View check run for this annotation

Codecov / codecov/patch

protocol/communities/manager_archive.go#L360

Added line #L360 was not covered by tests

ts := time.Now().Unix()
to := time.Unix(ts, 0)
Expand Down
3 changes: 2 additions & 1 deletion protocol/communities_messenger_token_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
startDate := messageDate.Add(-time.Minute)
endDate := messageDate.Add(time.Minute)
topic := types.BytesToTopic(transport.ToTopic(chat.ID))
topics := []types.TopicType{topic}
communityCommonTopic := types.BytesToTopic(transport.ToTopic(community.UniversalChatID()))
topics := []types.TopicType{topic, communityCommonTopic}

torrentConfig := params.TorrentConfig{
Enabled: true,
Expand Down
12 changes: 11 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,7 +2268,8 @@
)
return rawMessage, fmt.Errorf("can't post message type '%d' on chat '%s'", rawMessage.MessageType, chat.ID)
}

//setting content-topic over-ride for community messages to use memberUpdatesChannelID
rawMessage.ContentTopicOverride = community.UniversalChatID()
logger.Debug("sending community chat message", zap.String("chatName", chat.Name))
isCommunityEncrypted, err := m.communitiesManager.IsEncrypted(chat.CommunityID)
if err != nil {
Expand Down Expand Up @@ -3589,6 +3590,15 @@
if err != nil {
logger.Info("failed to retrieve admin communities", zap.Error(err))
}
//fetch universal chatIDs as well.
controlledCommunitiesUniversalChatIDs, err := m.communitiesManager.GetOwnedCommunitiesUniversalChatIDs()
if err != nil {
logger.Info("failed to retrieve admin communities", zap.Error(err))

Check warning on line 3596 in protocol/messenger.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger.go#L3596

Added line #L3596 was not covered by tests
}

for chatID, flag := range controlledCommunitiesUniversalChatIDs {
controlledCommunitiesChatIDs[chatID] = flag
}

iterator := m.retrievedMessagesIteratorFactory(chatWithMessages)
for iterator.HasNext() {
Expand Down
11 changes: 11 additions & 0 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -2714,6 +2714,13 @@

publicFiltersToInit = append(publicFiltersToInit, defaultFilters...)

for _, filter := range defaultFilters {
_, err := m.transport.RemoveFilterByChatID(filter.ChatID)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err

Check warning on line 2720 in protocol/messenger_communities.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_communities.go#L2720

Added line #L2720 was not covered by tests
}
}

for chatID := range community.Chats() {
communityChatID := community.IDString() + chatID
_, err := m.transport.RemoveFilterByChatID(communityChatID)
Expand Down Expand Up @@ -3949,6 +3956,10 @@
for _, filter := range filters {
topics = append(topics, filter.ContentTopic)
}
// adding the content-topic used for member updates.
// since member updates would not be too frequent i.e only addition/deletion would add a new message,
// this shouldn't cause too much increase in size of archive generated.
filters = append(filters, m.transport.FilterByChatID(c.UniversalChatID()))

Check warning on line 3962 in protocol/messenger_communities.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_communities.go#L3962

Added line #L3962 was not covered by tests

// First we need to know the timestamp of the latest waku message
// we've received for this community, so we can request messages we've
Expand Down
34 changes: 23 additions & 11 deletions protocol/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (f *FiltersManager) Init(

// Add public, one-to-one and negotiated filters.
for _, fi := range filtersToInit {
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic)
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic, fi.ContentTopicOverrideID)
if err != nil {
return nil, err
}
Expand All @@ -123,15 +123,16 @@ func (f *FiltersManager) Init(
}

type FiltersToInitialize struct {
ChatID string
PubsubTopic string
ChatID string
PubsubTopic string
ContentTopicOverrideID string //litte hacky but this is used to override content-topic in filtersManager.
}

func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, pf := range publicFiltersToInit {
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic)
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic, pf.ContentTopicOverrideID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -455,7 +456,7 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
}

keyString := hex.EncodeToString(secret.Key)
filter, err := f.addSymmetric(keyString, "")
filter, err := f.addSymmetric(keyString, "", "")
if err != nil {
f.logger.Debug("could not register negotiated topic", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -534,11 +535,16 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
}

// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string, contentTopicID string) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

if chat, ok := f.filters[chatID]; ok {
chatIDToLoad := chatID
if contentTopicID != "" {
chatIDToLoad = contentTopicID
}

if chat, ok := f.filters[chatIDToLoad]; ok {
if chat.PubsubTopic != pubsubTopic {
f.logger.Debug("updating pubsub topic for filter",
zap.String("chatID", chatID),
Expand All @@ -547,13 +553,13 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
zap.String("newTopic", pubsubTopic),
)
chat.PubsubTopic = pubsubTopic
f.filters[chatID] = chat
f.filters[chatIDToLoad] = chat //TODO: Do we need to update watchers as well on modification?
}

return chat, nil
}

filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic)
filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic, contentTopicID)
if err != nil {
f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -592,7 +598,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
return f.filters[chatID], nil
}

contactCodeFilter, err := f.addSymmetric(chatID, "")
contactCodeFilter, err := f.addSymmetric(chatID, "", "")
if err != nil {
f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
Expand All @@ -615,7 +621,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
}

// addSymmetric adds a symmetric key filter
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) {
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string, contentTopicID string) (*RawFilter, error) {
var symKeyID string
var err error

Expand Down Expand Up @@ -644,6 +650,12 @@ func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFi
}
}

if contentTopicID != "" {
//add receive filter for the single default contentTopic for all community chats
topic = ToTopic(contentTopicID)
topics = append(topics, topic)
}

id, err := f.service.Subscribe(&types.SubscriptionOptions{
SymKeyID: symKeyID,
PoW: minPow,
Expand Down
13 changes: 8 additions & 5 deletions protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil
}

func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID, "")
return t.filters.LoadPublic(chatID, "", "")
}

func (t *Transport) LeavePublic(chatID string) error {
Expand Down Expand Up @@ -223,6 +223,8 @@ func (t *Transport) GetStats() types.StatsSummary {
return t.waku.GetStats()
}

// With change in filter used for communities, messages are indexed here with common filter and not their own chatID filter.
// The caller should not use chatID from the filter to determine chatID of the message, rather should dervice it from messaage itself.
func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
result := make(map[Filter][]*types.Message)
logger := t.logger.With(zap.String("site", "retrieveRawAll"))
Expand Down Expand Up @@ -276,16 +278,16 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
// SendPublic sends a new message using the Whisper service.
// For public filters, chat name is used as an ID as well as
// a topic.
// In case of communities a single topic is used to send all messages.
func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) {
if err := t.addSig(newMessage); err != nil {
return nil, err
}

filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic, newMessage.ContentTopicOverride)
if err != nil {
return nil, err
}

newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.ContentTopic
newMessage.PubsubTopic = filter.PubsubTopic
Expand Down Expand Up @@ -362,7 +364,8 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.
}

// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic)
//passing content-topic override, it will be used if set. otherwise chatName will be used to load filter.
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic, newMessage.ContentTopicOverride)
if err != nil {
return nil, err
}
Expand Down