diff --git a/appdatabase/migrations/bindata.go b/appdatabase/migrations/bindata.go index 5afa6660579..78762bc69fc 100644 --- a/appdatabase/migrations/bindata.go +++ b/appdatabase/migrations/bindata.go @@ -13,6 +13,7 @@ // 1647882837_add_communities_settings_table.up.sql (206B) // 1647956635_add_waku_messages_table.up.sql (266B) // 1647957715_add_community_archives_info_table.up.sql (194B) +// 1647957841_add_community_message_archive_hashes_table.up.sql (115B) // doc.go (74B) package migrations @@ -342,6 +343,26 @@ func _1647957715_add_community_archives_info_tableUpSql() (*asset, error) { return a, nil } +var __1647957841_add_community_message_archive_hashes_tableUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\x08\x71\x74\xf2\x71\x55\x48\xce\xcf\xcd\x2d\xcd\xcb\x2c\xa9\x8c\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x8d\x4f\x2c\x4a\xce\xc8\x2c\x4b\x8d\xcf\x48\x2c\xce\x48\x2d\x56\xd0\xe0\x52\x40\x52\x94\x99\xa2\x10\xe2\x1a\x11\xa2\xe0\xe7\x1f\xa2\xe0\x17\xea\xe3\xa3\xc3\xa5\xa0\x00\x52\x08\x11\x0d\x08\xf2\xf4\x75\x0c\x8a\x54\xf0\x76\x8d\x84\xab\xe0\xd2\xb4\xe6\xe2\x02\x04\x00\x00\xff\xff\xcf\xb4\xb6\x9e\x73\x00\x00\x00") + +func _1647957841_add_community_message_archive_hashes_tableUpSqlBytes() ([]byte, error) { + return bindataRead( + __1647957841_add_community_message_archive_hashes_tableUpSql, + "1647957841_add_community_message_archive_hashes_table.up.sql", + ) +} + +func _1647957841_add_community_message_archive_hashes_tableUpSql() (*asset, error) { + bytes, err := _1647957841_add_community_message_archive_hashes_tableUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1647957841_add_community_message_archive_hashes_table.up.sql", size: 115, mode: os.FileMode(0664), modTime: time.Unix(1648217719, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x2f, 0xfe, 0xbf, 0x8, 0xc0, 0x6b, 0x76, 0xc7, 0xd7, 0x70, 0xc0, 0x57, 0xa2, 0x6e, 0x91, 0x9e, 0x46, 0x72, 0x64, 0x51, 0x91, 0xbe, 0x1a, 0x21, 0x3d, 0x3c, 0xc3, 0x44, 0xa0, 0xf, 0xb7, 0xa5}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -479,6 +500,8 @@ var _bindata = map[string]func() (*asset, error){ "1647957715_add_community_archives_info_table.up.sql": _1647957715_add_community_archives_info_tableUpSql, + "1647957841_add_community_message_archive_hashes_table.up.sql": _1647957841_add_community_message_archive_hashes_tableUpSql, + "doc.go": docGo, } @@ -523,19 +546,20 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "1640111208_dummy.up.sql": &bintree{_1640111208_dummyUpSql, map[string]*bintree{}}, - "1642666031_add_removed_clock_to_bookmarks.up.sql": &bintree{_1642666031_add_removed_clock_to_bookmarksUpSql, map[string]*bintree{}}, - "1643644541_gif_api_key_setting.up.sql": &bintree{_1643644541_gif_api_key_settingUpSql, map[string]*bintree{}}, - "1644188994_recent_stickers.up.sql": &bintree{_1644188994_recent_stickersUpSql, map[string]*bintree{}}, - "1646659233_add_address_to_dapp_permisssion.up.sql": &bintree{_1646659233_add_address_to_dapp_permisssionUpSql, map[string]*bintree{}}, - "1646841105_add_emoji_account.up.sql": &bintree{_1646841105_add_emoji_accountUpSql, map[string]*bintree{}}, - "1647278782_display_name.up.sql": &bintree{_1647278782_display_nameUpSql, map[string]*bintree{}}, - "1647862838_reset_last_backup.up.sql": &bintree{_1647862838_reset_last_backupUpSql, map[string]*bintree{}}, - "1647871652_add_settings_sync_clock_table.up.sql": &bintree{_1647871652_add_settings_sync_clock_tableUpSql, map[string]*bintree{}}, - "1647880168_add_torrent_config.up.sql": &bintree{_1647880168_add_torrent_configUpSql, map[string]*bintree{}}, - "1647882837_add_communities_settings_table.up.sql": &bintree{_1647882837_add_communities_settings_tableUpSql, map[string]*bintree{}}, - "1647956635_add_waku_messages_table.up.sql": &bintree{_1647956635_add_waku_messages_tableUpSql, map[string]*bintree{}}, - "1647957715_add_community_archives_info_table.up.sql": &bintree{_1647957715_add_community_archives_info_tableUpSql, map[string]*bintree{}}, + "1640111208_dummy.up.sql": &bintree{_1640111208_dummyUpSql, map[string]*bintree{}}, + "1642666031_add_removed_clock_to_bookmarks.up.sql": &bintree{_1642666031_add_removed_clock_to_bookmarksUpSql, map[string]*bintree{}}, + "1643644541_gif_api_key_setting.up.sql": &bintree{_1643644541_gif_api_key_settingUpSql, map[string]*bintree{}}, + "1644188994_recent_stickers.up.sql": &bintree{_1644188994_recent_stickersUpSql, map[string]*bintree{}}, + "1646659233_add_address_to_dapp_permisssion.up.sql": &bintree{_1646659233_add_address_to_dapp_permisssionUpSql, map[string]*bintree{}}, + "1646841105_add_emoji_account.up.sql": &bintree{_1646841105_add_emoji_accountUpSql, map[string]*bintree{}}, + "1647278782_display_name.up.sql": &bintree{_1647278782_display_nameUpSql, map[string]*bintree{}}, + "1647862838_reset_last_backup.up.sql": &bintree{_1647862838_reset_last_backupUpSql, map[string]*bintree{}}, + "1647871652_add_settings_sync_clock_table.up.sql": &bintree{_1647871652_add_settings_sync_clock_tableUpSql, map[string]*bintree{}}, + "1647880168_add_torrent_config.up.sql": &bintree{_1647880168_add_torrent_configUpSql, map[string]*bintree{}}, + "1647882837_add_communities_settings_table.up.sql": &bintree{_1647882837_add_communities_settings_tableUpSql, map[string]*bintree{}}, + "1647956635_add_waku_messages_table.up.sql": &bintree{_1647956635_add_waku_messages_tableUpSql, map[string]*bintree{}}, + "1647957715_add_community_archives_info_table.up.sql": &bintree{_1647957715_add_community_archives_info_tableUpSql, map[string]*bintree{}}, + "1647957841_add_community_message_archive_hashes_table.up.sql": &bintree{_1647957841_add_community_message_archive_hashes_tableUpSql, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/appdatabase/migrations/sql/1647957841_add_community_message_archive_hashes_table.up.sql b/appdatabase/migrations/sql/1647957841_add_community_message_archive_hashes_table.up.sql new file mode 100644 index 00000000000..29382655636 --- /dev/null +++ b/appdatabase/migrations/sql/1647957841_add_community_message_archive_hashes_table.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE community_message_archive_hashes ( + community_id TEXT NOT NULL, + hash TEXT PRIMARY KEY NOT NULL +); + diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index c5459e69aba..2d28fc87baf 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -633,6 +633,30 @@ func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, des return nil, err } + hasCommunityArchiveInfo, err := m.persistence.HasCommunityArchiveInfo(community.ID()) + if err != nil { + return nil, err + } + + cdMagnetlinkClock := community.config.CommunityDescription.MagnetlinkClock + if !hasCommunityArchiveInfo { + err = m.persistence.SaveCommunityArchiveInfo(community.ID(), cdMagnetlinkClock, 0) + if err != nil { + return nil, err + } + } else { + magnetlinkClock, err := m.persistence.GetMagnetlinkMessageClock(community.ID()) + if err != nil { + return nil, err + } + if cdMagnetlinkClock > magnetlinkClock { + err = m.persistence.UpdateMagnetlinkMessageClock(community.ID(), cdMagnetlinkClock) + if err != nil { + return nil, err + } + } + } + pkString := common.PubkeyToHex(m.identity) // If the community require membership, we set whether we should leave/join the community after a state change @@ -1062,6 +1086,29 @@ func (m *Manager) GetAdminCommunitiesChatIDs() (map[string]bool, error) { return chatIDs, nil } +func (m *Manager) IsAdminCommunity(pubKey *ecdsa.PublicKey) (bool, error) { + adminCommunities, err := m.Created() + if err != nil { + return false, err + } + + for _, c := range adminCommunities { + if c.PrivateKey().PublicKey.Equal(pubKey) { + return true, nil + } + } + return false, nil +} + +func (m *Manager) IsJoinedCommunity(pubKey *ecdsa.PublicKey) (bool, error) { + community, err := m.GetByID(crypto.CompressPubkey(pubKey)) + if err != nil { + return false, err + } + + return community != nil && community.Joined(), nil +} + func (m *Manager) GetCommunityChatsFilters(communityID types.HexBytes) ([]*transport.Filter, error) { chatIDs, err := m.persistence.GetCommunityChatIDs(communityID) if err != nil { @@ -1153,7 +1200,7 @@ func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.Hex func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) error { m.UnseedHistoryArchiveTorrent(communityID) - err := m.CreateHistoryArchiveTorrent(communityID, topics, startDate, endDate, partition) + _, err := m.CreateHistoryArchiveTorrent(communityID, topics, startDate, endDate, partition) if err != nil { return err } @@ -1242,7 +1289,7 @@ type EncodedArchiveData struct { bytes []byte } -func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) error { +func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) ([]string, error) { from := startDate to := from.Add(partition) @@ -1257,25 +1304,26 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{} wakuMessageArchiveIndex := make(map[string]*protobuf.WakuMessageArchiveIndexMetadata) + archiveIDs := make([]string, 0) if _, err := os.Stat(archiveDir); os.IsNotExist(err) { err := os.MkdirAll(archiveDir, 0700) if err != nil { - return err + return archiveIDs, err } } if _, err := os.Stat(torrentDir); os.IsNotExist(err) { err := os.MkdirAll(torrentDir, 0700) if err != nil { - return err + return archiveIDs, err } } _, err := os.Stat(indexPath) if err == nil { - wakuMessageArchiveIndexProto, err = m.loadHistoryArchiveIndexFromFile(communityID) + wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(communityID) if err != nil { - return err + return archiveIDs, err } } @@ -1301,7 +1349,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics log.Println("Creating message archive for partition (", partition, "). From: ", from, " To: ", to) messages, err := m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix())) if err != nil { - return err + return archiveIDs, err } if len(messages) == 0 { @@ -1318,7 +1366,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics wakuMessageArchive := m.createWakuMessageArchive(from, to, messages, topicsAsByteArrays) encodedArchive, err := proto.Marshal(wakuMessageArchive) if err != nil { - return err + return archiveIDs, err } rawSize := len(encodedArchive) @@ -1342,10 +1390,12 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics wakuMessageArchiveIndexMetadataBytes, err := proto.Marshal(wakuMessageArchiveIndexMetadata) if err != nil { - return err + return archiveIDs, err } - wakuMessageArchiveIndex[crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String()] = wakuMessageArchiveIndexMetadata + archiveID := crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String() + archiveIDs = append(archiveIDs, archiveID) + wakuMessageArchiveIndex[archiveID] = wakuMessageArchiveIndexMetadata encodedArchives = append(encodedArchives, &EncodedArchiveData{bytes: encodedArchive, padding: padding}) from = to to = to.Add(partition) @@ -1361,7 +1411,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics if _, err := os.Stat(dataPath); err == nil { dataBytes, err = os.ReadFile(dataPath) if err != nil { - return err + return archiveIDs, err } } @@ -1373,17 +1423,17 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics wakuMessageArchiveIndexProto.Archives = wakuMessageArchiveIndex indexBytes, err := proto.Marshal(wakuMessageArchiveIndexProto) if err != nil { - return err + return archiveIDs, err } err = os.WriteFile(indexPath, indexBytes, 0644) // nolint: gosec if err != nil { - return err + return archiveIDs, err } err = os.WriteFile(dataPath, dataBytes, 0644) // nolint: gosec if err != nil { - return err + return archiveIDs, err } metaInfo := metainfo.MetaInfo{ @@ -1398,22 +1448,22 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics err = info.BuildFromFilePath(archiveDir) if err != nil { - return err + return archiveIDs, err } metaInfo.InfoBytes, err = bencode.Marshal(info) if err != nil { - return err + return archiveIDs, err } metaInfoBytes, err := bencode.Marshal(metaInfo) if err != nil { - return err + return archiveIDs, err } err = os.WriteFile(m.torrentFile(communityID.String()), metaInfoBytes, 0644) // nolint: gosec if err != nil { - return err + return archiveIDs, err } m.publish(&Subscription{ @@ -1436,7 +1486,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics lastMessageArchiveEndDate, err := m.persistence.GetLastMessageArchiveEndDate(communityID) if err != nil { - return err + return archiveIDs, err } if lastMessageArchiveEndDate > 0 { @@ -1445,11 +1495,12 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics err = m.persistence.SaveLastMessageArchiveEndDate(communityID, uint64(from.Unix())) } if err != nil { - return err + return archiveIDs, err } log.Println("History archive created/updated for community: ", communityID.String()) - return nil + + return archiveIDs, nil } func (m *Manager) SeedHistoryArchiveTorrent(communityID types.HexBytes) error { @@ -1519,6 +1570,140 @@ func (m *Manager) IsSeedingHistoryArchiveTorrent(communityID types.HexBytes) boo return ok && torrent.Seeding() } +func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes, magnetlink string) ([]string, error) { + + id := communityID.String() + ml, _ := metainfo.ParseMagnetUri(magnetlink) + + log.Println("Adding torrent from magnetlink for community: ", id) + torrent, err := m.torrentClient.AddMagnet(magnetlink) + if err != nil { + return nil, err + } + m.torrentTasks[id] = ml.InfoHash + + <-torrent.GotInfo() + files := torrent.Files() + + i, ok := findIndexFile(files) + if !ok { + // We're dealing with a malformed torrent, so don't do anything + return nil, nil + } + + indexFile := files[i] + + log.Println("Downloading history archive index") + indexFile.Download() + for { + if indexFile.BytesCompleted() == indexFile.Length() { + break + } + } + log.Println("Done.") + + index, err := m.LoadHistoryArchiveIndexFromFile(communityID) + if err != nil { + return nil, err + } + + var archiveIDs []string + + for hash, metadata := range index.Archives { + hasArchive, err := m.persistence.HasMessageArchiveID(communityID, hash) + if err != nil { + m.logger.Debug("Failed to check if has message archive id", zap.Error(err)) + continue + } + if hasArchive { + continue + } + + startIndex := int(metadata.Offset) / pieceLength + endIndex := startIndex + int(metadata.Size)/pieceLength + + log.Println("Downloading data for message archive: ", hash) + log.Println("Pieces (start, end): ", startIndex, "-", endIndex-1) + torrent.DownloadPieces(startIndex, endIndex) + + psc := torrent.SubscribePieceStateChanges() + for { + i := startIndex + done := false + for { + if i > endIndex-1 { + break + } + done = torrent.PieceState(i).Complete + i++ + } + if done { + psc.Close() + break + } + <-psc.Values + } + log.Println("Done") + archiveIDs = append(archiveIDs, hash) + err = m.persistence.SaveMessageArchiveID(communityID, hash) + if err != nil { + m.logger.Debug("Couldn't save message archive ID", zap.Error(err)) + continue + } + } + m.publish(&Subscription{ + HistoryArchivesSeedingSignal: &signal.HistoryArchivesSeedingSignal{ + CommunityID: communityID.String(), + }, + }) + return archiveIDs, nil +} + +func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, archiveIDs []string) (map[transport.Filter][]*types.Message, error) { + id := communityID.String() + + index, err := m.LoadHistoryArchiveIndexFromFile(communityID) + if err != nil { + return nil, err + } + totalData, err := os.ReadFile(m.archiveDataFile(id)) + if err != nil { + return nil, err + } + + messages := make(map[transport.Filter][]*types.Message) + + for _, hash := range archiveIDs { + metadata := index.Archives[hash] + + archive := &protobuf.WakuMessageArchive{} + data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding] + + err := proto.Unmarshal(data, archive) + if err != nil { + log.Println("Failed to unmarshal WakuMessageArchive", err) + m.logger.Debug("Failed to unmarshal WakuMessageArchive", zap.Error(err)) + continue + } + + for _, message := range archive.Messages { + filter := m.transport.FilterByTopic(message.Topic) + if filter != nil { + shhMessage := &types.Message{ + Sig: message.Sig, + Timestamp: uint32(message.Timestamp), + Topic: types.BytesToTopic(message.Topic), + Payload: message.Payload, + Padding: message.Padding, + Hash: message.Hash, + } + messages[*filter] = append(messages[*filter], shhMessage) + } + } + } + return messages, nil +} + func (m *Manager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) { id := communityID.String() torrentFile := m.torrentFile(id) @@ -1565,7 +1750,7 @@ func (m *Manager) createWakuMessageArchive(from time.Time, to time.Time, message return wakuMessageArchive } -func (m *Manager) loadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) { +func (m *Manager) LoadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) { wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{} indexPath := m.archiveIndexFile(communityID.String()) @@ -1601,3 +1786,12 @@ func topicsAsByteArrays(topics []types.TopicType) [][]byte { } return topicsAsByteArrays } + +func findIndexFile(files []*torrent.File) (index int, ok bool) { + for i, f := range files { + if f.DisplayPath() == "index" { + return i, true + } + } + return 0, false +} diff --git a/protocol/communities/manager_test.go b/protocol/communities/manager_test.go index 8c0ddcea5ad..ff0ae81bfb2 100644 --- a/protocol/communities/manager_test.go +++ b/protocol/communities/manager_test.go @@ -233,7 +233,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_WithoutMessages() { // Partition of 7 days partition := 7 * 24 * time.Hour - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) // There are no waku messages in the database so we don't expect @@ -275,7 +275,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateArchive() { err = s.manager.StoreWakuMessage(&message3) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) _, err = os.Stat(s.manager.archiveDataFile(community.IDString())) @@ -285,7 +285,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateArchive() { _, err = os.Stat(s.manager.torrentFile(community.IDString())) s.Require().NoError(err) - index, err := s.manager.loadHistoryArchiveIndexFromFile(community.ID()) + index, err := s.manager.LoadHistoryArchiveIndexFromFile(community.ID()) s.Require().NoError(err) s.Require().Len(index.Archives, 1) @@ -336,10 +336,10 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateMultipleArchi err = s.manager.StoreWakuMessage(&message4) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) - index, err := s.manager.loadHistoryArchiveIndexFromFile(community.ID()) + index, err := s.manager.LoadHistoryArchiveIndexFromFile(community.ID()) s.Require().NoError(err) s.Require().Len(index.Archives, 3) @@ -385,10 +385,10 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() { err = s.manager.StoreWakuMessage(&message1) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) - index, err := s.manager.loadHistoryArchiveIndexFromFile(community.ID()) + index, err := s.manager.LoadHistoryArchiveIndexFromFile(community.ID()) s.Require().NoError(err) s.Require().Len(index.Archives, 1) @@ -400,10 +400,10 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() { err = s.manager.StoreWakuMessage(&message2) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) - index, err = s.manager.loadHistoryArchiveIndexFromFile(community.ID()) + index, err = s.manager.LoadHistoryArchiveIndexFromFile(community.ID()) s.Require().NoError(err) s.Require().Len(index.Archives, 2) } @@ -430,7 +430,7 @@ func (s *ManagerSuite) TestSeedHistoryArchiveTorrent() { err = s.manager.StoreWakuMessage(&message1) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) err = s.manager.SeedHistoryArchiveTorrent(community.ID()) @@ -467,7 +467,7 @@ func (s *ManagerSuite) TestUnseedHistoryArchiveTorrent() { err = s.manager.StoreWakuMessage(&message1) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) err = s.manager.SeedHistoryArchiveTorrent(community.ID()) diff --git a/protocol/communities/persistence.go b/protocol/communities/persistence.go index 821d5e3edf2..dfabe91439c 100644 --- a/protocol/communities/persistence.go +++ b/protocol/communities/persistence.go @@ -423,6 +423,11 @@ func (p *Persistence) GetWakuMessagesByFilterTopic(topics []types.TopicType, fro return messages, nil } +func (p *Persistence) HasCommunityArchiveInfo(communityID types.HexBytes) (exists bool, err error) { + err = p.db.QueryRow(`SELECT count(1) FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&exists) + return exists, err +} + func (p *Persistence) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) { var magnetlinkClock uint64 err := p.db.QueryRow(`SELECT magnetlink_clock FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&magnetlinkClock) @@ -432,6 +437,14 @@ func (p *Persistence) GetMagnetlinkMessageClock(communityID types.HexBytes) (uin return magnetlinkClock, err } +func (p *Persistence) SaveCommunityArchiveInfo(communityID types.HexBytes, clock uint64, lastArchiveEndDate uint64) error { + _, err := p.db.Exec(`INSERT INTO communities_archive_info (magnetlink_clock, last_message_archive_end_date, community_id) VALUES (?, ?, ?)`, + clock, + lastArchiveEndDate, + communityID.String()) + return err +} + func (p *Persistence) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error { _, err := p.db.Exec(`UPDATE communities_archive_info SET magnetlink_clock = ? @@ -469,6 +482,22 @@ func (p *Persistence) GetLastMessageArchiveEndDate(communityID types.HexBytes) ( return lastMessageArchiveEndDate, nil } +func (p *Persistence) HasMessageArchiveID(communityID types.HexBytes, hash string) (exists bool, err error) { + err = p.db.QueryRow(`SELECT EXISTS (SELECT 1 FROM community_message_archive_hashes WHERE community_id = ? AND hash = ?)`, + communityID.String(), + hash, + ).Scan(&exists) + return exists, err +} + +func (p *Persistence) SaveMessageArchiveID(communityID types.HexBytes, hash string) error { + _, err := p.db.Exec(`INSERT INTO community_message_archive_hashes (community_id, hash) VALUES (?, ?)`, + communityID.String(), + hash, + ) + return err +} + func (p *Persistence) GetCommunitiesSettings() ([]CommunitySettings, error) { rows, err := p.db.Query("SELECT community_id, message_archive_seeding_enabled, message_archive_fetching_enabled FROM communities_settings") if err != nil { diff --git a/protocol/message_persistence.go b/protocol/message_persistence.go index 60d6ca8e97b..bcd7c89f887 100644 --- a/protocol/message_persistence.go +++ b/protocol/message_persistence.go @@ -1867,7 +1867,7 @@ func (db sqlitePersistence) GetDeletes(messageID string, from string) ([]*Delete } func (db sqlitePersistence) SaveEdit(editMessage EditMessage) error { - _, err := db.db.Exec(`INSERT INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID) + _, err := db.db.Exec(`INSERT OR REPLACE INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID) return err } diff --git a/protocol/messenger.go b/protocol/messenger.go index 06686322230..5ac08d0f3ce 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -2917,7 +2917,7 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) { return nil, err } - return m.handleRetrievedMessages(chatWithMessages) + return m.handleRetrievedMessages(chatWithMessages, true) } func (m *Messenger) GetStats() types.StatsSummary { @@ -3066,7 +3066,7 @@ func (r *ReceivedMessageState) addNewActivityCenterNotification(publicKey ecdsa. return nil } -func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message) (*MessengerResponse, error) { +func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message, storeWakuMessages bool) (*MessengerResponse, error) { response := &MessengerResponse{} messageState := &ReceivedMessageState{ AllChats: m.allChats, @@ -3096,7 +3096,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte // Indicates tha all messages in the batch have been processed correctly allMessagesProcessed := true - if adminCommunitiesChatIDs[filter.ChatID] { + if adminCommunitiesChatIDs[filter.ChatID] && storeWakuMessages { logger.Debug("storing waku message") err := m.communitiesManager.StoreWakuMessage(shhMessage) if err != nil { @@ -3672,6 +3672,15 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } + case protobuf.CommunityMessageArchiveMagnetlink: + logger.Debug("Handling CommunityMessageArchiveMagnetlink") + magnetlinkMessage := msg.ParsedMessage.Interface().(protobuf.CommunityMessageArchiveMagnetlink) + err = m.HandleHistoryArchiveMagnetlinkMessage(messageState, publicKey, magnetlinkMessage.MagnetUri, magnetlinkMessage.Clock) + if err != nil { + logger.Warn("failed to handle CommunityMessageArchiveMagnetlink", zap.Error(err)) + continue + } + case protobuf.AnonymousMetricBatch: logger.Debug("Handling AnonymousMetricBatch") if m.anonMetricsServer == nil { diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index 92b42258191..d301dca26ab 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "encoding/hex" "fmt" + "log" "github.com/pkg/errors" "go.uber.org/zap" @@ -642,6 +643,63 @@ func (m *Messenger) HandleCommunityInvitation(state *ReceivedMessageState, signe return nil } +func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessageState, communityPubKey *ecdsa.PublicKey, magnetlink string, clock uint64) error { + + id := types.HexBytes(crypto.CompressPubkey(communityPubKey)) + settings, err := m.communitiesManager.GetCommunitySettingsByID(id) + if err != nil { + m.logger.Debug("Couldn't get community settings for community with id: ", zap.Any("id", id)) + return err + } + + if m.config.torrentConfig.Enabled && settings.HistoryArchiveSupportEnabled { + signedByOwnedCommunity, err := m.communitiesManager.IsAdminCommunity(communityPubKey) + if err != nil { + return err + } + joinedCommunity, err := m.communitiesManager.IsJoinedCommunity(communityPubKey) + if err != nil { + return err + } + lastClock, err := m.communitiesManager.GetMagnetlinkMessageClock(id) + if err != nil { + return err + } + // We are only interested in a community archive magnet link + // if it originates from a community that the current account is + // part of and doesn't own the private key at the same time + if !signedByOwnedCommunity && joinedCommunity && clock >= lastClock { + + m.communitiesManager.UnseedHistoryArchiveTorrent(id) + go func() { + downloadedArchiveIDs, err := m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink) + if err != nil { + log.Println("failed to download history archive data", err) + m.logger.Debug("failed to download history archive data", zap.Error(err)) + return + } + + messagesToHandle, err := m.communitiesManager.ExtractMessagesFromHistoryArchives(id, downloadedArchiveIDs) + if err != nil { + log.Println("failed to extract history archive messages", err) + m.logger.Debug("failed to extract history archive messages", zap.Error(err)) + return + } + + _, err = m.handleRetrievedMessages(messagesToHandle, false) + if err != nil { + log.Println("failed to write history archive messages to database", err) + m.logger.Debug("failed to write history archive messages to database", zap.Error(err)) + return + } + }() + + return m.communitiesManager.UpdateMagnetlinkMessageClock(id, clock) + } + } + return nil +} + // HandleCommunityRequestToJoin handles an community request to join func (m *Messenger) HandleCommunityRequestToJoin(state *ReceivedMessageState, signer *ecdsa.PublicKey, requestToJoinProto protobuf.CommunityRequestToJoin) error { if requestToJoinProto.CommunityId == nil { diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 923cbedcb46..5fa694aa19a 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -1,6 +1,7 @@ package transport import ( + "bytes" "crypto/ecdsa" "encoding/hex" "sync" @@ -225,6 +226,17 @@ func (f *FiltersManager) FilterByFilterID(filterID string) *Filter { return nil } +func (f *FiltersManager) FilterByTopic(topic []byte) *Filter { + f.mutex.Lock() + defer f.mutex.Unlock() + for _, f := range f.filters { + if bytes.Equal(types.TopicTypeToByteArray(f.Topic), topic) { + return f + } + } + return nil +} + // FiltersByIdentities returns an array of filters for given list of public keys func (f *FiltersManager) FiltersByIdentities(identities []string) []*Filter { f.mutex.Lock() diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 7f668fed64a..97d65b99a92 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -149,6 +149,10 @@ func (t *Transport) FilterByChatID(chatID string) *Filter { return t.filters.FilterByChatID(chatID) } +func (t *Transport) FilterByTopic(topic []byte) *Filter { + return t.filters.FilterByTopic(topic) +} + func (t *Transport) FiltersByIdentities(identities []string) []*Filter { return t.filters.FiltersByIdentities(identities) } diff --git a/protocol/v1/status_message.go b/protocol/v1/status_message.go index d22e9268085..24b3389b92c 100644 --- a/protocol/v1/status_message.go +++ b/protocol/v1/status_message.go @@ -269,6 +269,8 @@ func (m *StatusMessage) HandleApplication() error { return m.unmarshalProtobufData(new(protobuf.SyncClearHistory)) case protobuf.ApplicationMetadataMessage_SYNC_SETTING: return m.unmarshalProtobufData(new(protobuf.SyncSetting)) + case protobuf.ApplicationMetadataMessage_COMMUNITY_ARCHIVE_MAGNETLINK: + return m.unmarshalProtobufData(new(protobuf.CommunityMessageArchiveMagnetlink)) } return nil }