diff --git a/appdatabase/migrations/bindata.go b/appdatabase/migrations/bindata.go index 51da24e9a74..049b9c29025 100644 --- a/appdatabase/migrations/bindata.go +++ b/appdatabase/migrations/bindata.go @@ -11,6 +11,7 @@ // 1647862837_add_communities_settings_table.up.sql (206B) // 1647956635_add_waku_messages_table.up.sql (266B) // 1647957715_add_community_archives_info_table.up.sql (194B) +// 1647957841_add_community_message_archive_hashes_table.up.sql (115B) // doc.go (74B) package migrations @@ -300,6 +301,26 @@ func _1647957715_add_community_archives_info_tableUpSql() (*asset, error) { return a, nil } +var __1647957841_add_community_message_archive_hashes_tableUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\x08\x71\x74\xf2\x71\x55\x48\xce\xcf\xcd\x2d\xcd\xcb\x2c\xa9\x8c\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x8d\x4f\x2c\x4a\xce\xc8\x2c\x4b\x8d\xcf\x48\x2c\xce\x48\x2d\x56\xd0\xe0\x52\x40\x52\x94\x99\xa2\x10\xe2\x1a\x11\xa2\xe0\xe7\x1f\xa2\xe0\x17\xea\xe3\xa3\xc3\xa5\xa0\x00\x52\x08\x11\x0d\x08\xf2\xf4\x75\x0c\x8a\x54\xf0\x76\x8d\x84\xab\xe0\xd2\xb4\xe6\xe2\x02\x04\x00\x00\xff\xff\xcf\xb4\xb6\x9e\x73\x00\x00\x00") + +func _1647957841_add_community_message_archive_hashes_tableUpSqlBytes() ([]byte, error) { + return bindataRead( + __1647957841_add_community_message_archive_hashes_tableUpSql, + "1647957841_add_community_message_archive_hashes_table.up.sql", + ) +} + +func _1647957841_add_community_message_archive_hashes_tableUpSql() (*asset, error) { + bytes, err := _1647957841_add_community_message_archive_hashes_tableUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1647957841_add_community_message_archive_hashes_table.up.sql", size: 115, mode: os.FileMode(0664), modTime: time.Unix(1647957899, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x2f, 0xfe, 0xbf, 0x8, 0xc0, 0x6b, 0x76, 0xc7, 0xd7, 0x70, 0xc0, 0x57, 0xa2, 0x6e, 0x91, 0x9e, 0x46, 0x72, 0x64, 0x51, 0x91, 0xbe, 0x1a, 0x21, 0x3d, 0x3c, 0xc3, 0x44, 0xa0, 0xf, 0xb7, 0xa5}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -433,6 +454,8 @@ var _bindata = map[string]func() (*asset, error){ "1647957715_add_community_archives_info_table.up.sql": _1647957715_add_community_archives_info_tableUpSql, + "1647957841_add_community_message_archive_hashes_table.up.sql": _1647957841_add_community_message_archive_hashes_tableUpSql, + "doc.go": docGo, } @@ -477,17 +500,18 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "1640111208_dummy.up.sql": &bintree{_1640111208_dummyUpSql, map[string]*bintree{}}, - "1642666031_add_removed_clock_to_bookmarks.up.sql": &bintree{_1642666031_add_removed_clock_to_bookmarksUpSql, map[string]*bintree{}}, - "1643644541_gif_api_key_setting.up.sql": &bintree{_1643644541_gif_api_key_settingUpSql, map[string]*bintree{}}, - "1644188994_recent_stickers.up.sql": &bintree{_1644188994_recent_stickersUpSql, map[string]*bintree{}}, - "1646659233_add_address_to_dapp_permisssion.up.sql": &bintree{_1646659233_add_address_to_dapp_permisssionUpSql, map[string]*bintree{}}, - "1646841105_add_emoji_account.up.sql": &bintree{_1646841105_add_emoji_accountUpSql, map[string]*bintree{}}, - "1647278782_display_name.up.sql": &bintree{_1647278782_display_nameUpSql, map[string]*bintree{}}, - "1647860168_add_torrent_config.up.sql": &bintree{_1647860168_add_torrent_configUpSql, map[string]*bintree{}}, - "1647862837_add_communities_settings_table.up.sql": &bintree{_1647862837_add_communities_settings_tableUpSql, map[string]*bintree{}}, - "1647956635_add_waku_messages_table.up.sql": &bintree{_1647956635_add_waku_messages_tableUpSql, map[string]*bintree{}}, - "1647957715_add_community_archives_info_table.up.sql": &bintree{_1647957715_add_community_archives_info_tableUpSql, map[string]*bintree{}}, + "1640111208_dummy.up.sql": &bintree{_1640111208_dummyUpSql, map[string]*bintree{}}, + "1642666031_add_removed_clock_to_bookmarks.up.sql": &bintree{_1642666031_add_removed_clock_to_bookmarksUpSql, map[string]*bintree{}}, + "1643644541_gif_api_key_setting.up.sql": &bintree{_1643644541_gif_api_key_settingUpSql, map[string]*bintree{}}, + "1644188994_recent_stickers.up.sql": &bintree{_1644188994_recent_stickersUpSql, map[string]*bintree{}}, + "1646659233_add_address_to_dapp_permisssion.up.sql": &bintree{_1646659233_add_address_to_dapp_permisssionUpSql, map[string]*bintree{}}, + "1646841105_add_emoji_account.up.sql": &bintree{_1646841105_add_emoji_accountUpSql, map[string]*bintree{}}, + "1647278782_display_name.up.sql": &bintree{_1647278782_display_nameUpSql, map[string]*bintree{}}, + "1647860168_add_torrent_config.up.sql": &bintree{_1647860168_add_torrent_configUpSql, map[string]*bintree{}}, + "1647862837_add_communities_settings_table.up.sql": &bintree{_1647862837_add_communities_settings_tableUpSql, map[string]*bintree{}}, + "1647956635_add_waku_messages_table.up.sql": &bintree{_1647956635_add_waku_messages_tableUpSql, map[string]*bintree{}}, + "1647957715_add_community_archives_info_table.up.sql": &bintree{_1647957715_add_community_archives_info_tableUpSql, map[string]*bintree{}}, + "1647957841_add_community_message_archive_hashes_table.up.sql": &bintree{_1647957841_add_community_message_archive_hashes_tableUpSql, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/appdatabase/migrations/sql/1647957841_add_community_message_archive_hashes_table.up.sql b/appdatabase/migrations/sql/1647957841_add_community_message_archive_hashes_table.up.sql new file mode 100644 index 00000000000..29382655636 --- /dev/null +++ b/appdatabase/migrations/sql/1647957841_add_community_message_archive_hashes_table.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE community_message_archive_hashes ( + community_id TEXT NOT NULL, + hash TEXT PRIMARY KEY NOT NULL +); + diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index c79eeff6cf5..16da628b4d6 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -805,6 +805,10 @@ func (m *Manager) JoinCommunity(id types.HexBytes) (*Community, error) { return community, nil } +func (m *Manager) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) { + return m.persistence.GetMagnetlinkMessageClock(communityID) +} + func (m *Manager) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error { return m.persistence.UpdateMagnetlinkMessageClock(communityID, clock) } @@ -1049,6 +1053,29 @@ func (m *Manager) GetAdminCommunitiesChatIDs() (map[string]bool, error) { return chatIDs, nil } +func (m *Manager) IsAdminCommunity(pubKey *ecdsa.PublicKey) (bool, error) { + adminCommunities, err := m.Created() + if err != nil { + return false, err + } + + for _, c := range adminCommunities { + if c.PrivateKey().PublicKey.Equal(pubKey) { + return true, nil + } + } + return false, nil +} + +func (m *Manager) IsJoinedCommunity(pubKey *ecdsa.PublicKey) (bool, error) { + community, err := m.GetByID(crypto.CompressPubkey(pubKey)) + if err != nil { + return false, err + } + + return community != nil && community.Joined(), nil +} + func (m *Manager) GetCommunityChatsFilters(communityID types.HexBytes) ([]*transport.Filter, error) { chatIDs, err := m.persistence.GetCommunityChatIDs(communityID) if err != nil { @@ -1140,7 +1167,7 @@ func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.Hex func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) error { m.UnseedHistoryArchiveTorrent(communityID) - err := m.CreateHistoryArchiveTorrent(communityID, topics, startDate, endDate, partition) + _, err := m.CreateHistoryArchiveTorrent(communityID, topics, startDate, endDate, partition) if err != nil { return err } @@ -1229,7 +1256,7 @@ type EncodedArchiveData struct { bytes []byte } -func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) error { +func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) ([]string, error) { from := startDate to := from.Add(partition) @@ -1244,25 +1271,26 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{} wakuMessageArchiveIndex := make(map[string]*protobuf.WakuMessageArchiveIndexMetadata) + archiveIDs := make([]string, 0) if _, err := os.Stat(archiveDir); os.IsNotExist(err) { err := os.MkdirAll(archiveDir, 0700) if err != nil { - return err + return archiveIDs, err } } if _, err := os.Stat(torrentDir); os.IsNotExist(err) { err := os.MkdirAll(torrentDir, 0700) if err != nil { - return err + return archiveIDs, err } } _, err := os.Stat(indexPath) if err == nil { - wakuMessageArchiveIndexProto, err = m.loadHistoryArchiveIndexFromFile(communityID) + wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(communityID) if err != nil { - return err + return archiveIDs, err } } @@ -1288,7 +1316,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics log.Println("Creating message archive for partition (", partition, "). From: ", from, " To: ", to) messages, err := m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix())) if err != nil { - return err + return archiveIDs, err } if len(messages) == 0 { @@ -1305,7 +1333,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics wakuMessageArchive := m.createWakuMessageArchive(from, to, messages, topicsAsByteArrays) encodedArchive, err := proto.Marshal(wakuMessageArchive) if err != nil { - return err + return archiveIDs, err } rawSize := len(encodedArchive) @@ -1329,10 +1357,12 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics wakuMessageArchiveIndexMetadataBytes, err := proto.Marshal(wakuMessageArchiveIndexMetadata) if err != nil { - return err + return archiveIDs, err } - wakuMessageArchiveIndex[crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String()] = wakuMessageArchiveIndexMetadata + archiveID := crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String() + archiveIDs = append(archiveIDs, archiveID) + wakuMessageArchiveIndex[archiveID] = wakuMessageArchiveIndexMetadata encodedArchives = append(encodedArchives, &EncodedArchiveData{bytes: encodedArchive, padding: padding}) from = to to = to.Add(partition) @@ -1348,7 +1378,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics if _, err := os.Stat(dataPath); err == nil { dataBytes, err = os.ReadFile(dataPath) if err != nil { - return err + return archiveIDs, err } } @@ -1360,17 +1390,17 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics wakuMessageArchiveIndexProto.Archives = wakuMessageArchiveIndex indexBytes, err := proto.Marshal(wakuMessageArchiveIndexProto) if err != nil { - return err + return archiveIDs, err } err = os.WriteFile(indexPath, indexBytes, 0644) if err != nil { - return err + return archiveIDs, err } err = os.WriteFile(dataPath, dataBytes, 0644) if err != nil { - return err + return archiveIDs, err } metaInfo := metainfo.MetaInfo{ @@ -1385,22 +1415,22 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics err = info.BuildFromFilePath(archiveDir) if err != nil { - return err + return archiveIDs, err } metaInfo.InfoBytes, err = bencode.Marshal(info) if err != nil { - return err + return archiveIDs, err } metaInfoBytes, err := bencode.Marshal(metaInfo) if err != nil { - return err + return archiveIDs, err } err = os.WriteFile(m.torrentFile(communityID.String()), metaInfoBytes, 0644) if err != nil { - return err + return archiveIDs, err } m.publish(&Subscription{ @@ -1423,7 +1453,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics lastMessageArchiveEndDate, err := m.persistence.GetLastMessageArchiveEndDate(communityID) if err != nil { - return err + return archiveIDs, err } if lastMessageArchiveEndDate > 0 { @@ -1432,11 +1462,12 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics err = m.persistence.SaveLastMessageArchiveEndDate(communityID, uint64(from.Unix())) } if err != nil { - return err + return archiveIDs, err } log.Println("History archive created/updated for community: ", communityID.String()) - return nil + + return archiveIDs, nil } func (m *Manager) SeedHistoryArchiveTorrent(communityID types.HexBytes) error { @@ -1506,6 +1537,136 @@ func (m *Manager) IsSeedingHistoryArchiveTorrent(communityID types.HexBytes) boo return ok && torrent.Seeding() } +func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes, magnetlink string) ([]string, error) { + + id := communityID.String() + ml, _ := metainfo.ParseMagnetUri(magnetlink) + + log.Println("Adding torrent from magnetlink for community: ", id) + torrent, err := m.torrentClient.AddMagnet(magnetlink) + if err != nil { + return nil, err + } + m.torrentTasks[id] = ml.InfoHash + + <-torrent.GotInfo() + files := torrent.Files() + + i, ok := findIndexFile(files) + if !ok { + // We're dealing with a malformed torrent, so don't do anything + return nil, nil + } + + indexFile := files[i] + + log.Println("Downloading history archive index") + indexFile.Download() + for { + if indexFile.BytesCompleted() == indexFile.Length() { + break + } + } + log.Println("Done.") + + index, err := m.LoadHistoryArchiveIndexFromFile(communityID) + if err != nil { + return nil, err + } + + var archiveIDs []string + + for hash, metadata := range index.Archives { + hasArchive, err := m.persistence.HasMessageArchiveID(communityID, hash) + if err != nil { + m.logger.Debug("Failed to check if has message archive id", zap.Error(err)) + continue + } + if hasArchive { + continue + } + + startIndex := int(metadata.Offset) / pieceLength + endIndex := startIndex + int(metadata.Size)/pieceLength + + log.Println("Downloading data for message archive: ", hash) + log.Println("Pieces: ", startIndex, "-", endIndex) + torrent.DownloadPieces(startIndex, endIndex) + + psc := torrent.SubscribePieceStateChanges() + for { + i := startIndex + done := false + for { + if i > endIndex-1 { + break + } + done = torrent.PieceState(i).Complete + i++ + } + if done { + psc.Close() + break + } + <-psc.Values + } + log.Println("Done") + + archiveIDs = append(archiveIDs, hash) + err = m.persistence.SaveMessageArchiveID(communityID, hash) + if err != nil { + m.logger.Debug("Couldn't save message archive ID", zap.Error(err)) + continue + } + } + return archiveIDs, nil +} + +func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, archiveIDs []string) (map[transport.Filter][]*types.Message, error) { + id := communityID.String() + + index, err := m.LoadHistoryArchiveIndexFromFile(communityID) + if err != nil { + return nil, err + } + totalData, err := os.ReadFile(m.archiveDataFile(id)) + if err != nil { + return nil, err + } + + messages := make(map[transport.Filter][]*types.Message) + + for _, hash := range archiveIDs { + metadata := index.Archives[hash] + + archive := &protobuf.WakuMessageArchive{} + data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding] + + err := proto.Unmarshal(data, archive) + if err != nil { + log.Println("Failed to unmarshal WakuMessageArchive", err) + m.logger.Debug("Failed to unmarshal WakuMessageArchive", zap.Error(err)) + continue + } + + for _, message := range archive.Messages { + filter := m.transport.FilterByTopic(message.Topic) + if filter != nil { + shhMessage := &types.Message{ + Sig: message.Sig, + Timestamp: uint32(message.Timestamp), + Topic: types.BytesToTopic(message.Topic), + Payload: message.Payload, + Padding: message.Padding, + Hash: message.Hash, + } + messages[*filter] = append(messages[*filter], shhMessage) + } + } + } + return messages, nil +} + func (m *Manager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) { id := communityID.String() torrentFile := m.torrentFile(id) @@ -1552,7 +1713,7 @@ func (m *Manager) createWakuMessageArchive(from time.Time, to time.Time, message return wakuMessageArchive } -func (m *Manager) loadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) { +func (m *Manager) LoadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) { wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{} indexPath := m.archiveIndexFile(communityID.String()) @@ -1588,3 +1749,12 @@ func topicsAsByteArrays(topics []types.TopicType) [][]byte { } return topicsAsByteArrays } + +func findIndexFile(files []*torrent.File) (index int, ok bool) { + for i, f := range files { + if f.DisplayPath() == "index" { + return i, true + } + } + return 0, false +} diff --git a/protocol/communities/manager_test.go b/protocol/communities/manager_test.go index 1aaa50bae79..dc36bdf9957 100644 --- a/protocol/communities/manager_test.go +++ b/protocol/communities/manager_test.go @@ -230,7 +230,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_WithoutMessages() { // Partition of 7 days partition := 7 * 24 * time.Hour - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) // There are no waku messages in the database so we don't expect @@ -272,7 +272,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateArchive() { err = s.manager.StoreWakuMessage(&message3) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) _, err = os.Stat(s.manager.archiveDataFile(community.IDString())) @@ -282,7 +282,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateArchive() { _, err = os.Stat(s.manager.torrentFile(community.IDString())) s.Require().NoError(err) - index, err := s.manager.loadHistoryArchiveIndexFromFile(community.ID()) + index, err := s.manager.LoadHistoryArchiveIndexFromFile(community.ID()) s.Require().NoError(err) s.Require().Len(index.Archives, 1) @@ -333,10 +333,10 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateMultipleArchi err = s.manager.StoreWakuMessage(&message4) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) - index, err := s.manager.loadHistoryArchiveIndexFromFile(community.ID()) + index, err := s.manager.LoadHistoryArchiveIndexFromFile(community.ID()) s.Require().NoError(err) s.Require().Len(index.Archives, 3) @@ -382,10 +382,10 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() { err = s.manager.StoreWakuMessage(&message1) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) - index, err := s.manager.loadHistoryArchiveIndexFromFile(community.ID()) + index, err := s.manager.LoadHistoryArchiveIndexFromFile(community.ID()) s.Require().NoError(err) s.Require().Len(index.Archives, 1) @@ -397,10 +397,10 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() { err = s.manager.StoreWakuMessage(&message2) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) - index, err = s.manager.loadHistoryArchiveIndexFromFile(community.ID()) + index, err = s.manager.LoadHistoryArchiveIndexFromFile(community.ID()) s.Require().NoError(err) s.Require().Len(index.Archives, 2) } @@ -426,7 +426,7 @@ func (s *ManagerSuite) TestSeedHistoryArchiveTorrent() { err = s.manager.StoreWakuMessage(&message1) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) err = s.manager.SeedHistoryArchiveTorrent(community.ID()) @@ -462,7 +462,7 @@ func (s *ManagerSuite) TestUnseedHistoryArchiveTorrent() { err = s.manager.StoreWakuMessage(&message1) s.Require().NoError(err) - err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) + _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition) s.Require().NoError(err) err = s.manager.SeedHistoryArchiveTorrent(community.ID()) diff --git a/protocol/communities/persistence.go b/protocol/communities/persistence.go index 5b7cdf6b1d1..e253341b92c 100644 --- a/protocol/communities/persistence.go +++ b/protocol/communities/persistence.go @@ -423,6 +423,15 @@ func (p *Persistence) GetWakuMessagesByFilterTopic(topics []types.TopicType, fro return messages, nil } +func (p *Persistence) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) { + var magnetlinkClock uint64 + err := p.db.QueryRow(`SELECT magnetlink_clock FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&magnetlinkClock) + if err == sql.ErrNoRows { + return 0, nil + } + return magnetlinkClock, err +} + func (p *Persistence) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error { _, err := p.db.Exec(`UPDATE communities_archive_info SET magnetlink_clock = ? @@ -460,6 +469,22 @@ func (p *Persistence) GetLastMessageArchiveEndDate(communityID types.HexBytes) ( return lastMessageArchiveEndDate, nil } +func (p *Persistence) HasMessageArchiveID(communityID types.HexBytes, hash string) (exists bool, err error) { + err = p.db.QueryRow(`SELECT EXISTS (SELECT 1 FROM community_message_archive_hashes WHERE community_id = ? AND hash = ?)`, + communityID.String(), + hash, + ).Scan(&exists) + return exists, err +} + +func (p *Persistence) SaveMessageArchiveID(communityID types.HexBytes, hash string) error { + _, err := p.db.Exec(`INSERT INTO community_message_archive_hashes (community_id, hash) VALUES (?, ?)`, + communityID.String(), + hash, + ) + return err +} + func (p *Persistence) GetCommunitiesSettings() ([]CommunitySettings, error) { rows, err := p.db.Query("SELECT community_id, message_archive_seeding_enabled, message_archive_fetching_enabled FROM communities_settings") if err != nil { diff --git a/protocol/message_persistence.go b/protocol/message_persistence.go index e1d72ee29c7..a5d902ddc61 100644 --- a/protocol/message_persistence.go +++ b/protocol/message_persistence.go @@ -1866,7 +1866,7 @@ func (db sqlitePersistence) GetDeletes(messageID string, from string) ([]*Delete } func (db sqlitePersistence) SaveEdit(editMessage EditMessage) error { - _, err := db.db.Exec(`INSERT INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID) + _, err := db.db.Exec(`INSERT OR REPLACE INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID) return err } diff --git a/protocol/messenger.go b/protocol/messenger.go index f4692f45306..7388973d834 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -2906,7 +2906,7 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) { return nil, err } - return m.handleRetrievedMessages(chatWithMessages) + return m.handleRetrievedMessages(chatWithMessages, true) } func (m *Messenger) GetStats() types.StatsSummary { @@ -3055,7 +3055,7 @@ func (r *ReceivedMessageState) addNewActivityCenterNotification(publicKey ecdsa. return nil } -func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message) (*MessengerResponse, error) { +func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message, storeWakuMessages bool) (*MessengerResponse, error) { response := &MessengerResponse{} messageState := &ReceivedMessageState{ AllChats: m.allChats, @@ -3085,7 +3085,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte // Indicates tha all messages in the batch have been processed correctly allMessagesProcessed := true - if adminCommunitiesChatIDs[filter.ChatID] { + if adminCommunitiesChatIDs[filter.ChatID] && storeWakuMessages { logger.Debug("storing waku message") err := m.communitiesManager.StoreWakuMessage(shhMessage) if err != nil { @@ -3645,6 +3645,15 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte continue } + case protobuf.CommunityMessageArchiveMagnetlink: + logger.Debug("Handling CommunityMessageArchiveMagnetlink") + magnetlinkMessage := msg.ParsedMessage.Interface().(protobuf.CommunityMessageArchiveMagnetlink) + err = m.HandleHistoryArchiveMagnetlinkMessage(messageState, publicKey, magnetlinkMessage.MagnetUri, magnetlinkMessage.Clock) + if err != nil { + logger.Warn("failed to handle CommunityMessageArchiveMagnetlink", zap.Error(err)) + continue + } + case protobuf.AnonymousMetricBatch: logger.Debug("Handling AnonymousMetricBatch") if m.anonMetricsServer == nil { diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index 18907b127f7..ee85b8575b6 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "encoding/hex" "fmt" + "log" "github.com/pkg/errors" "go.uber.org/zap" @@ -635,6 +636,63 @@ func (m *Messenger) HandleCommunityInvitation(state *ReceivedMessageState, signe return nil } +func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessageState, communityPubKey *ecdsa.PublicKey, magnetlink string, clock uint64) error { + + id := types.HexBytes(crypto.CompressPubkey(communityPubKey)) + settings, err := m.communitiesManager.GetCommunitySettingsByID(id) + if err != nil { + m.logger.Debug("Couldn't get community settings for community with id: ", zap.Any("id", id)) + return err + } + + if m.config.torrentConfig.Enabled && settings.HistoryArchiveSupportEnabled { + signedByOwnedCommunity, err := m.communitiesManager.IsAdminCommunity(communityPubKey) + if err != nil { + return err + } + joinedCommunity, err := m.communitiesManager.IsJoinedCommunity(communityPubKey) + if err != nil { + return err + } + lastClock, err := m.communitiesManager.GetMagnetlinkMessageClock(id) + if err != nil { + return err + } + // We are only interested in a community archive magnet link + // if it originates from a community that the current account is + // part of and doesn't own the private key at the same time + if !signedByOwnedCommunity && joinedCommunity && clock > lastClock { + + m.communitiesManager.UnseedHistoryArchiveTorrent(id) + go func() { + downloadedArchiveIDs, err := m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink) + if err != nil { + log.Println("failed to download history archive data", err) + m.logger.Debug("failed to download history archive data", zap.Error(err)) + return + } + + messagesToHandle, err := m.communitiesManager.ExtractMessagesFromHistoryArchives(id, downloadedArchiveIDs) + if err != nil { + log.Println("failed to extract history archive messages", err) + m.logger.Debug("failed to extract history archive messages", zap.Error(err)) + return + } + + _, err = m.handleRetrievedMessages(messagesToHandle, false) + if err != nil { + log.Println("failed to write history archive messages to database", err) + m.logger.Debug("failed to write history archive messages to database", zap.Error(err)) + return + } + }() + + return m.communitiesManager.UpdateMagnetlinkMessageClock(id, clock) + } + } + return nil +} + // HandleCommunityRequestToJoin handles an community request to join func (m *Messenger) HandleCommunityRequestToJoin(state *ReceivedMessageState, signer *ecdsa.PublicKey, requestToJoinProto protobuf.CommunityRequestToJoin) error { if requestToJoinProto.CommunityId == nil { diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 923cbedcb46..5fa694aa19a 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -1,6 +1,7 @@ package transport import ( + "bytes" "crypto/ecdsa" "encoding/hex" "sync" @@ -225,6 +226,17 @@ func (f *FiltersManager) FilterByFilterID(filterID string) *Filter { return nil } +func (f *FiltersManager) FilterByTopic(topic []byte) *Filter { + f.mutex.Lock() + defer f.mutex.Unlock() + for _, f := range f.filters { + if bytes.Equal(types.TopicTypeToByteArray(f.Topic), topic) { + return f + } + } + return nil +} + // FiltersByIdentities returns an array of filters for given list of public keys func (f *FiltersManager) FiltersByIdentities(identities []string) []*Filter { f.mutex.Lock() diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 7f668fed64a..97d65b99a92 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -149,6 +149,10 @@ func (t *Transport) FilterByChatID(chatID string) *Filter { return t.filters.FilterByChatID(chatID) } +func (t *Transport) FilterByTopic(topic []byte) *Filter { + return t.filters.FilterByTopic(topic) +} + func (t *Transport) FiltersByIdentities(identities []string) []*Filter { return t.filters.FiltersByIdentities(identities) } diff --git a/protocol/v1/status_message.go b/protocol/v1/status_message.go index 2b3cd096275..01489226e90 100644 --- a/protocol/v1/status_message.go +++ b/protocol/v1/status_message.go @@ -267,6 +267,8 @@ func (m *StatusMessage) HandleApplication() error { return m.unmarshalProtobufData(new(protobuf.SyncBookmark)) case protobuf.ApplicationMetadataMessage_SYNC_CLEAR_HISTORY: return m.unmarshalProtobufData(new(protobuf.SyncClearHistory)) + case protobuf.ApplicationMetadataMessage_COMMUNITY_ARCHIVE_MAGNETLINK: + return m.unmarshalProtobufData(new(protobuf.CommunityMessageArchiveMagnetlink)) } return nil }