From 81d88dcd18874cb5e14e5272c630ec105a5f9810 Mon Sep 17 00:00:00 2001 From: Pascal Precht <445106+PascalPrecht@users.noreply.github.com> Date: Mon, 14 Mar 2022 12:35:09 +0100 Subject: [PATCH] Handle history archive magnetlink messages This introduces the ability for status notes to handle community history archive magnetlinks. To make this work, a few things are needed: 1. A new database table has been introduced to store message archive hashes. This is necessary so status nodes can determine whether or not they need to download a certain archive 2. The messenger's `handleRetrievedMessages()` has been exteded to take magnetlink messages into account 3. New APIs were added to download torrent data given a magnetlink and also to extract messages from downloaded archives, which are then later fed to `handleRetrievedMessages` Closes #2568 --- appdatabase/migrations/bindata.go | 2 +- protocol/communities/manager.go | 155 +++++++++++++++++- protocol/communities/persistence.go | 25 +++ protocol/message_persistence.go | 2 +- protocol/messenger.go | 15 +- protocol/messenger_handler.go | 58 +++++++ protocol/migrations/migrations.go | 24 +++ ...munity_message_archive_hashes_table.up.sql | 5 + protocol/transport/filters_manager.go | 12 ++ protocol/transport/transport.go | 4 + protocol/v1/status_message.go | 2 + 11 files changed, 297 insertions(+), 7 deletions(-) create mode 100644 protocol/migrations/sqlite/1645608605_add_community_message_archive_hashes_table.up.sql diff --git a/appdatabase/migrations/bindata.go b/appdatabase/migrations/bindata.go index e7276257576..bdcb3be25f0 100644 --- a/appdatabase/migrations/bindata.go +++ b/appdatabase/migrations/bindata.go @@ -232,7 +232,7 @@ func _1646828466_add_communities_archive_infoUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1646828466_add_communities_archive_info.up.sql", size: 195, mode: os.FileMode(0664), modTime: time.Unix(1646903872, 0)} + info := bindataFileInfo{name: "1646828466_add_communities_archive_info.up.sql", size: 195, mode: os.FileMode(0664), modTime: time.Unix(1646908277, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xfd, 0x3d, 0x27, 0x93, 0x55, 0x14, 0x82, 0xe1, 0x23, 0x2a, 0xd, 0x22, 0xbb, 0x32, 0xe7, 0xf6, 0xd, 0xf7, 0x7a, 0x38, 0x54, 0x91, 0xe2, 0x97, 0xd2, 0xe, 0x3a, 0xc9, 0xb, 0x6a, 0xd9, 0x1a}} return a, nil } diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index fe50ce5c6d9..606b578f231 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -805,6 +805,10 @@ func (m *Manager) JoinCommunity(id types.HexBytes) (*Community, error) { return community, nil } +func (m *Manager) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) { + return m.persistence.GetMagnetlinkMessageClock(communityID) +} + func (m *Manager) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error { return m.persistence.UpdateMagnetlinkMessageClock(communityID, clock) } @@ -1045,6 +1049,29 @@ func (m *Manager) GetAdminCommunitiesChatIDs() (map[string]bool, error) { return chatIDs, nil } +func (m *Manager) IsAdminCommunity(pubKey *ecdsa.PublicKey) (bool, error) { + adminCommunities, err := m.Created() + if err != nil { + return false, err + } + + for _, c := range adminCommunities { + if c.PrivateKey().PublicKey.Equal(pubKey) { + return true, nil + } + } + return false, nil +} + +func (m *Manager) IsJoinedCommunity(pubKey *ecdsa.PublicKey) (bool, error) { + community, err := m.GetByID(crypto.CompressPubkey(pubKey)) + if err != nil { + return false, err + } + + return community != nil && community.Joined(), nil +} + func (m *Manager) GetCommunityChatsFilters(communityID types.HexBytes) ([]*transport.Filter, error) { chatIDs, err := m.persistence.GetCommunityChatIDs(communityID) if err != nil { @@ -1250,7 +1277,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics _, err := os.Stat(indexPath) if err == nil { - wakuMessageArchiveIndexProto, err = m.loadHistoryArchiveIndexFromFile(communityID) + wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(communityID) if err != nil { return err } @@ -1455,6 +1482,130 @@ func (m *Manager) IsSeedingHistoryArchiveTorrent(communityID types.HexBytes) boo return exists && ok } +func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes, magnetlink string) ([]string, error) { + + id := communityID.String() + ml, _ := metainfo.ParseMagnetUri(magnetlink) + + log.Println("Adding torrent from magnetlink for community: ", id) + torrent, err := m.torrentClient.AddMagnet(magnetlink) + if err != nil { + return nil, err + } + m.torrentTasks[id] = ml.InfoHash + + <-torrent.GotInfo() + files := torrent.Files() + + indexFile := files[1] + + log.Println("Downloading history archive index") + indexFile.Download() + for { + if indexFile.BytesCompleted() == indexFile.Length() { + break + } + } + log.Println("Done.") + + index, err := m.LoadHistoryArchiveIndexFromFile(communityID) + if err != nil { + return nil, err + } + + var archiveIDs []string + + for hash, metadata := range index.Archives { + hasArchive, err := m.persistence.HasMessageArchiveID(communityID, hash) + if err != nil { + m.logger.Debug("Failed to check if has message archive id", zap.Error(err)) + continue + } + if hasArchive { + continue + } + + startIndex := int(metadata.Offset) / pieceLength + endIndex := startIndex + int(metadata.Size)/pieceLength + + log.Println("Downloading data for message archive: ", hash) + log.Println("Pieces: ", startIndex, "-", endIndex) + torrent.DownloadPieces(startIndex, endIndex) + + psc := torrent.SubscribePieceStateChanges() + for { + i := startIndex + done := false + for { + if i > endIndex-1 { + break + } + done = torrent.PieceState(i).Complete + i++ + } + if done { + psc.Close() + break + } + <-psc.Values + } + log.Println("Done") + + archiveIDs = append(archiveIDs, hash) + err = m.persistence.SaveMessageArchiveID(communityID, hash) + if err != nil { + m.logger.Debug("Couldn't save message archive ID", zap.Error(err)) + continue + } + } + return archiveIDs, nil +} + +func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, archiveIDs []string) (map[transport.Filter][]*types.Message, error) { + id := communityID.String() + + index, err := m.LoadHistoryArchiveIndexFromFile(communityID) + if err != nil { + return nil, err + } + totalData, err := os.ReadFile(m.archiveDataFile(id)) + if err != nil { + return nil, err + } + + messages := make(map[transport.Filter][]*types.Message) + + for _, hash := range archiveIDs { + metadata := index.Archives[hash] + + archive := &protobuf.WakuMessageArchive{} + data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding] + + err := proto.Unmarshal(data, archive) + if err != nil { + log.Println("Failed to unmarshal WakuMessageArchive", err) + m.logger.Debug("Failed to unmarshal WakuMessageArchive", zap.Error(err)) + continue + } + + for _, message := range archive.Messages { + filter := m.transport.FilterByTopic(message.Topic) + if filter != nil { + shhMessage := &types.Message{ + Sig: message.Sig, + Timestamp: uint32(message.Timestamp), + Topic: types.BytesToTopic(message.Topic), + Payload: message.Payload, + Padding: message.Padding, + Hash: message.Hash, + } + messages[*filter] = append(messages[*filter], shhMessage) + } + } + } + return messages, nil +} + func (m *Manager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) { id := communityID.String() torrentFile := m.torrentFile(id) @@ -1501,7 +1652,7 @@ func (m *Manager) createWakuMessageArchive(from time.Time, to time.Time, message return wakuMessageArchive } -func (m *Manager) loadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) { +func (m *Manager) LoadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) { wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{} indexPath := m.archiveIndexFile(communityID.String()) diff --git a/protocol/communities/persistence.go b/protocol/communities/persistence.go index c737f7e1a39..c6145b961c8 100644 --- a/protocol/communities/persistence.go +++ b/protocol/communities/persistence.go @@ -423,6 +423,15 @@ func (p *Persistence) GetWakuMessagesByFilterTopic(topics []types.TopicType, fro return messages, nil } +func (p *Persistence) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) { + var magnetlinkClock uint64 + err := p.db.QueryRow(`SELECT magnetlink_clock FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&magnetlinkClock) + if err == sql.ErrNoRows { + return 0, nil + } + return magnetlinkClock, err +} + func (p *Persistence) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error { _, err := p.db.Exec(`UPDATE communities_archive_info SET magnetlink_clock = ? @@ -460,6 +469,22 @@ func (p *Persistence) GetLastMessageArchiveEndDate(communityID types.HexBytes) ( return lastMessageArchiveEndDate, nil } +func (p *Persistence) HasMessageArchiveID(communityID types.HexBytes, hash string) (exists bool, err error) { + err = p.db.QueryRow(`SELECT EXISTS (SELECT 1 FROM community_message_archive_hashes WHERE community_id = ? AND hash = ?)`, + communityID.String(), + hash, + ).Scan(&exists) + return exists, err +} + +func (p *Persistence) SaveMessageArchiveID(communityID types.HexBytes, hash string) error { + _, err := p.db.Exec(`INSERT INTO community_message_archive_hashes (community_id, hash) VALUES (?, ?)`, + communityID.String(), + hash, + ) + return err +} + func (p *Persistence) GetCommunitiesSettings() ([]CommunitySettings, error) { rows, err := p.db.Query("SELECT community_id, message_archive_seeding_enabled, message_archive_fetching_enabled FROM communities_settings") if err != nil { diff --git a/protocol/message_persistence.go b/protocol/message_persistence.go index 9c1bc195c46..c8201db6cda 100644 --- a/protocol/message_persistence.go +++ b/protocol/message_persistence.go @@ -1853,7 +1853,7 @@ func (db sqlitePersistence) GetDeletes(messageID string, from string) ([]*Delete } func (db sqlitePersistence) SaveEdit(editMessage EditMessage) error { - _, err := db.db.Exec(`INSERT INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID) + _, err := db.db.Exec(`INSERT OR REPLACE INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID) return err } diff --git a/protocol/messenger.go b/protocol/messenger.go index 9ebe3e295cf..81bbb039574 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -2811,7 +2811,7 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) { return nil, err } - return m.handleRetrievedMessages(chatWithMessages) + return m.handleRetrievedMessages(chatWithMessages, true) } func (m *Messenger) GetStats() types.StatsSummary { @@ -2959,7 +2959,7 @@ func (r *ReceivedMessageState) addNewActivityCenterNotification(publicKey ecdsa. return nil } -func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message) (*MessengerResponse, error) { +func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message, storeWakuMessages bool) (*MessengerResponse, error) { response := &MessengerResponse{} messageState := &ReceivedMessageState{ AllChats: m.allChats, @@ -2989,7 +2989,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte // Indicates tha all messages in the batch have been processed correctly allMessagesProcessed := true - if adminCommunitiesChatIDs[filter.ChatID] { + if adminCommunitiesChatIDs[filter.ChatID] && !storeWakuMessages { logger.Debug("storing waku message") err := m.communitiesManager.StoreWakuMessage(shhMessage) if err != nil { @@ -3549,6 +3549,15 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } + case protobuf.CommunityMessageArchiveMagnetlink: + logger.Debug("Handling CommunityMessageArchiveMagnetlink") + magnetlinkMessage := msg.ParsedMessage.Interface().(protobuf.CommunityMessageArchiveMagnetlink) + err = m.HandleHistoryArchiveMagnetlinkMessage(messageState, publicKey, magnetlinkMessage.MagnetUri, magnetlinkMessage.Clock) + if err != nil { + logger.Warn("failed to handle CommunityMessageArchiveMagnetlink", zap.Error(err)) + continue + } + case protobuf.AnonymousMetricBatch: logger.Debug("Handling AnonymousMetricBatch") if m.anonMetricsServer == nil { diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index de3cd5f90af..416fcaac349 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "encoding/hex" "fmt" + "log" "github.com/pkg/errors" "go.uber.org/zap" @@ -626,6 +627,63 @@ func (m *Messenger) HandleCommunityInvitation(state *ReceivedMessageState, signe return nil } +func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessageState, communityPubKey *ecdsa.PublicKey, magnetlink string, clock uint64) error { + + id := types.HexBytes(crypto.CompressPubkey(communityPubKey)) + settings, err := m.communitiesManager.GetCommunitySettingsByID(id) + if err != nil { + m.logger.Debug("Couldn't get community settings for community with id: ", zap.Any("id", id)) + return err + } + + if m.config.torrentConfig.Enabled && settings.HistoryArchiveSupportEnabled { + signedByOwnedCommunity, err := m.communitiesManager.IsAdminCommunity(communityPubKey) + if err != nil { + return err + } + joinedCommunity, err := m.communitiesManager.IsJoinedCommunity(communityPubKey) + if err != nil { + return err + } + lastClock, err := m.communitiesManager.GetMagnetlinkMessageClock(id) + if err != nil { + return err + } + // We are only interested in a community archive magnet link + // if it originates from a community that the current account is + // part of and doesn't own the private key at the same time + if !signedByOwnedCommunity && joinedCommunity && clock > lastClock { + + m.communitiesManager.UnseedHistoryArchiveTorrent(id) + go func() { + downloadedArchiveIDs, err := m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink) + if err != nil { + log.Println("failed to download history archive data", err) + m.logger.Debug("failed to download history archive data", zap.Error(err)) + return + } + + messagesToHandle, err := m.communitiesManager.ExtractMessagesFromHistoryArchives(id, downloadedArchiveIDs) + if err != nil { + log.Println("failed to extract history archive messages", err) + m.logger.Debug("failed to extract history archive messages", zap.Error(err)) + return + } + + _, err = m.handleRetrievedMessages(messagesToHandle, false) + if err != nil { + log.Println("failed to write history archive messages to database", err) + m.logger.Debug("failed to write history archive messages to database", zap.Error(err)) + return + } + }() + + return m.communitiesManager.UpdateMagnetlinkMessageClock(id, clock) + } + } + return nil +} + // HandleCommunityRequestToJoin handles an community request to join func (m *Messenger) HandleCommunityRequestToJoin(state *ReceivedMessageState, signer *ecdsa.PublicKey, requestToJoinProto protobuf.CommunityRequestToJoin) error { if requestToJoinProto.CommunityId == nil { diff --git a/protocol/migrations/migrations.go b/protocol/migrations/migrations.go index 5bf8cbb0c25..39195359d1a 100644 --- a/protocol/migrations/migrations.go +++ b/protocol/migrations/migrations.go @@ -49,6 +49,7 @@ // 1634896007_add_last_updated_locally_and_removed.up.sql (131B) // 1635840039_add_clock_read_at_column_in_chats.up.sql (245B) // 1637852321_add_received_invitation_admin_column_in_chats.up.sql (72B) +// 1645608605_add_community_message_archive_hashes_table.up.sql (115B) // README.md (554B) // doc.go (850B) @@ -1099,6 +1100,26 @@ func _1637852321_add_received_invitation_admin_column_in_chatsUpSql() (*asset, e return a, nil } +var __1645608605_add_community_message_archive_hashes_tableUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\x08\x71\x74\xf2\x71\x55\x48\xce\xcf\xcd\x2d\xcd\xcb\x2c\xa9\x8c\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x8d\x4f\x2c\x4a\xce\xc8\x2c\x4b\x8d\xcf\x48\x2c\xce\x48\x2d\x56\xd0\xe0\x52\x40\x52\x94\x99\xa2\x10\xe2\x1a\x11\xa2\xe0\xe7\x1f\xa2\xe0\x17\xea\xe3\xa3\xc3\xa5\xa0\x00\x52\x08\x11\x0d\x08\xf2\xf4\x75\x0c\x8a\x54\xf0\x76\x8d\x84\xab\xe0\xd2\xb4\xe6\xe2\x02\x04\x00\x00\xff\xff\xcf\xb4\xb6\x9e\x73\x00\x00\x00") + +func _1645608605_add_community_message_archive_hashes_tableUpSqlBytes() ([]byte, error) { + return bindataRead( + __1645608605_add_community_message_archive_hashes_tableUpSql, + "1645608605_add_community_message_archive_hashes_table.up.sql", + ) +} + +func _1645608605_add_community_message_archive_hashes_tableUpSql() (*asset, error) { + bytes, err := _1645608605_add_community_message_archive_hashes_tableUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1645608605_add_community_message_archive_hashes_table.up.sql", size: 115, mode: os.FileMode(0664), modTime: time.Unix(1647259091, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x2f, 0xfe, 0xbf, 0x8, 0xc0, 0x6b, 0x76, 0xc7, 0xd7, 0x70, 0xc0, 0x57, 0xa2, 0x6e, 0x91, 0x9e, 0x46, 0x72, 0x64, 0x51, 0x91, 0xbe, 0x1a, 0x21, 0x3d, 0x3c, 0xc3, 0x44, 0xa0, 0xf, 0xb7, 0xa5}} + return a, nil +} + var _readmeMd = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\x91\xc1\xce\xd3\x30\x10\x84\xef\x7e\x8a\x91\x7a\x01\xa9\x2a\x8f\xc0\x0d\x71\x82\x03\x48\x1c\xc9\x36\x9e\x36\x96\x1c\x6f\xf0\xae\x93\xe6\xed\x91\xa3\xc2\xdf\xff\x66\xed\xd8\x33\xdf\x78\x4f\xa7\x13\xbe\xea\x06\x57\x6c\x35\x39\x31\xa7\x7b\x15\x4f\x5a\xec\x73\x08\xbf\x08\x2d\x79\x7f\x4a\x43\x5b\x86\x17\xfd\x8c\x21\xea\x56\x5e\x47\x90\x4a\x14\x75\x48\xde\x64\x37\x2c\x6a\x96\xae\x99\x48\x05\xf6\x27\x77\x13\xad\x08\xae\x8a\x51\xe7\x25\xf3\xf1\xa9\x9f\xf9\x58\x58\x2c\xad\xbc\xe0\x8b\x56\xf0\x21\x5d\xeb\x4c\x95\xb3\xae\x84\x60\xd4\xdc\xe6\x82\x5d\x1b\x36\x6d\x39\x62\x92\xf5\xb8\x11\xdb\x92\xd3\x28\xce\xe0\x13\xe1\x72\xcd\x3c\x63\xd4\x65\x87\xae\xac\xe8\xc3\x28\x2e\x67\x44\x66\x3a\x21\x25\xa2\x72\xac\x14\x67\xbc\x84\x9f\x53\x32\x8c\x52\x70\x25\x56\xd6\xfd\x8d\x05\x37\xad\x30\x9d\x9f\xa6\x86\x0f\xcd\x58\x7f\xcf\x34\x93\x3b\xed\x90\x9f\xa4\x1f\xcf\x30\x85\x4d\x07\x58\xaf\x7f\x25\xc4\x9d\xf3\x72\x64\x84\xd0\x7f\xf9\x9b\x3a\x2d\x84\xef\x85\x48\x66\x8d\xd8\x88\x9b\x8c\x8c\x98\x5b\xf6\x74\x14\x4e\x33\x0d\xc9\xe0\x93\x38\xda\x12\xc5\x69\xbd\xe4\xf0\x2e\x7a\x78\x07\x1c\xfe\x13\x9f\x91\x29\x31\x95\x7b\x7f\x62\x59\x37\xb4\xe5\x5e\x25\xfe\x33\xee\xd5\x53\x71\xd6\xda\x3a\xd8\xcb\xde\x2e\xf8\xa1\x90\x55\x53\x0c\xc7\xaa\x0d\xe9\x76\x14\x29\x1c\x7b\x68\xdd\x2f\xe1\x6f\x00\x00\x00\xff\xff\x3c\x0a\xc2\xfe\x2a\x02\x00\x00") func readmeMdBytes() ([]byte, error) { @@ -1328,6 +1349,8 @@ var _bindata = map[string]func() (*asset, error){ "1637852321_add_received_invitation_admin_column_in_chats.up.sql": _1637852321_add_received_invitation_admin_column_in_chatsUpSql, + "1645608605_add_community_message_archive_hashes_table.up.sql": _1645608605_add_community_message_archive_hashes_tableUpSql, + "README.md": readmeMd, "doc.go": docGo, @@ -1423,6 +1446,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "1634896007_add_last_updated_locally_and_removed.up.sql": &bintree{_1634896007_add_last_updated_locally_and_removedUpSql, map[string]*bintree{}}, "1635840039_add_clock_read_at_column_in_chats.up.sql": &bintree{_1635840039_add_clock_read_at_column_in_chatsUpSql, map[string]*bintree{}}, "1637852321_add_received_invitation_admin_column_in_chats.up.sql": &bintree{_1637852321_add_received_invitation_admin_column_in_chatsUpSql, map[string]*bintree{}}, + "1645608605_add_community_message_archive_hashes_table.up.sql": &bintree{_1645608605_add_community_message_archive_hashes_tableUpSql, map[string]*bintree{}}, "README.md": &bintree{readmeMd, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/protocol/migrations/sqlite/1645608605_add_community_message_archive_hashes_table.up.sql b/protocol/migrations/sqlite/1645608605_add_community_message_archive_hashes_table.up.sql new file mode 100644 index 00000000000..29382655636 --- /dev/null +++ b/protocol/migrations/sqlite/1645608605_add_community_message_archive_hashes_table.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE community_message_archive_hashes ( + community_id TEXT NOT NULL, + hash TEXT PRIMARY KEY NOT NULL +); + diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 923cbedcb46..5fa694aa19a 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -1,6 +1,7 @@ package transport import ( + "bytes" "crypto/ecdsa" "encoding/hex" "sync" @@ -225,6 +226,17 @@ func (f *FiltersManager) FilterByFilterID(filterID string) *Filter { return nil } +func (f *FiltersManager) FilterByTopic(topic []byte) *Filter { + f.mutex.Lock() + defer f.mutex.Unlock() + for _, f := range f.filters { + if bytes.Equal(types.TopicTypeToByteArray(f.Topic), topic) { + return f + } + } + return nil +} + // FiltersByIdentities returns an array of filters for given list of public keys func (f *FiltersManager) FiltersByIdentities(identities []string) []*Filter { f.mutex.Lock() diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 7f668fed64a..97d65b99a92 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -149,6 +149,10 @@ func (t *Transport) FilterByChatID(chatID string) *Filter { return t.filters.FilterByChatID(chatID) } +func (t *Transport) FilterByTopic(topic []byte) *Filter { + return t.filters.FilterByTopic(topic) +} + func (t *Transport) FiltersByIdentities(identities []string) []*Filter { return t.filters.FiltersByIdentities(identities) } diff --git a/protocol/v1/status_message.go b/protocol/v1/status_message.go index 2b3cd096275..01489226e90 100644 --- a/protocol/v1/status_message.go +++ b/protocol/v1/status_message.go @@ -267,6 +267,8 @@ func (m *StatusMessage) HandleApplication() error { return m.unmarshalProtobufData(new(protobuf.SyncBookmark)) case protobuf.ApplicationMetadataMessage_SYNC_CLEAR_HISTORY: return m.unmarshalProtobufData(new(protobuf.SyncClearHistory)) + case protobuf.ApplicationMetadataMessage_COMMUNITY_ARCHIVE_MAGNETLINK: + return m.unmarshalProtobufData(new(protobuf.CommunityMessageArchiveMagnetlink)) } return nil }