Skip to content

Commit

Permalink
Handle history archive magnetlink messages
Browse files Browse the repository at this point in the history
This introduces the ability for status notes to handle community
history archive magnetlinks. To make this work, a few things are needed:

1. A new database table has been introduced to store message archive
   hashes. This is necessary so status nodes can determine whether or
   not they need to download a certain archive
2. The messenger's `handleRetrievedMessages()` has been exteded to take
   magnetlink messages into account
3. New APIs were added to download torrent data given a magnetlink and
   also to extract messages from downloaded archives, which are then
   later fed to `handleRetrievedMessages`

Closes #2568
  • Loading branch information
0x-r4bbit committed Mar 14, 2022
1 parent 10a51de commit 81d88dc
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 7 deletions.
2 changes: 1 addition & 1 deletion appdatabase/migrations/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

155 changes: 153 additions & 2 deletions protocol/communities/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,10 @@ func (m *Manager) JoinCommunity(id types.HexBytes) (*Community, error) {
return community, nil
}

func (m *Manager) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) {
return m.persistence.GetMagnetlinkMessageClock(communityID)
}

func (m *Manager) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error {
return m.persistence.UpdateMagnetlinkMessageClock(communityID, clock)
}
Expand Down Expand Up @@ -1045,6 +1049,29 @@ func (m *Manager) GetAdminCommunitiesChatIDs() (map[string]bool, error) {
return chatIDs, nil
}

func (m *Manager) IsAdminCommunity(pubKey *ecdsa.PublicKey) (bool, error) {
adminCommunities, err := m.Created()
if err != nil {
return false, err
}

for _, c := range adminCommunities {
if c.PrivateKey().PublicKey.Equal(pubKey) {
return true, nil
}
}
return false, nil
}

func (m *Manager) IsJoinedCommunity(pubKey *ecdsa.PublicKey) (bool, error) {
community, err := m.GetByID(crypto.CompressPubkey(pubKey))
if err != nil {
return false, err
}

return community != nil && community.Joined(), nil
}

func (m *Manager) GetCommunityChatsFilters(communityID types.HexBytes) ([]*transport.Filter, error) {
chatIDs, err := m.persistence.GetCommunityChatIDs(communityID)
if err != nil {
Expand Down Expand Up @@ -1250,7 +1277,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics

_, err := os.Stat(indexPath)
if err == nil {
wakuMessageArchiveIndexProto, err = m.loadHistoryArchiveIndexFromFile(communityID)
wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(communityID)
if err != nil {
return err
}
Expand Down Expand Up @@ -1455,6 +1482,130 @@ func (m *Manager) IsSeedingHistoryArchiveTorrent(communityID types.HexBytes) boo
return exists && ok
}

func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes, magnetlink string) ([]string, error) {

id := communityID.String()
ml, _ := metainfo.ParseMagnetUri(magnetlink)

log.Println("Adding torrent from magnetlink for community: ", id)
torrent, err := m.torrentClient.AddMagnet(magnetlink)
if err != nil {
return nil, err
}
m.torrentTasks[id] = ml.InfoHash

<-torrent.GotInfo()
files := torrent.Files()

indexFile := files[1]

log.Println("Downloading history archive index")
indexFile.Download()
for {
if indexFile.BytesCompleted() == indexFile.Length() {
break
}
}
log.Println("Done.")

index, err := m.LoadHistoryArchiveIndexFromFile(communityID)
if err != nil {
return nil, err
}

var archiveIDs []string

for hash, metadata := range index.Archives {
hasArchive, err := m.persistence.HasMessageArchiveID(communityID, hash)
if err != nil {
m.logger.Debug("Failed to check if has message archive id", zap.Error(err))
continue
}
if hasArchive {
continue
}

startIndex := int(metadata.Offset) / pieceLength
endIndex := startIndex + int(metadata.Size)/pieceLength

log.Println("Downloading data for message archive: ", hash)
log.Println("Pieces: ", startIndex, "-", endIndex)
torrent.DownloadPieces(startIndex, endIndex)

psc := torrent.SubscribePieceStateChanges()
for {
i := startIndex
done := false
for {
if i > endIndex-1 {
break
}
done = torrent.PieceState(i).Complete
i++
}
if done {
psc.Close()
break
}
<-psc.Values
}
log.Println("Done")

archiveIDs = append(archiveIDs, hash)
err = m.persistence.SaveMessageArchiveID(communityID, hash)
if err != nil {
m.logger.Debug("Couldn't save message archive ID", zap.Error(err))
continue
}
}
return archiveIDs, nil
}

func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, archiveIDs []string) (map[transport.Filter][]*types.Message, error) {
id := communityID.String()

index, err := m.LoadHistoryArchiveIndexFromFile(communityID)
if err != nil {
return nil, err
}
totalData, err := os.ReadFile(m.archiveDataFile(id))
if err != nil {
return nil, err
}

messages := make(map[transport.Filter][]*types.Message)

for _, hash := range archiveIDs {
metadata := index.Archives[hash]

archive := &protobuf.WakuMessageArchive{}
data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding]

err := proto.Unmarshal(data, archive)
if err != nil {
log.Println("Failed to unmarshal WakuMessageArchive", err)
m.logger.Debug("Failed to unmarshal WakuMessageArchive", zap.Error(err))
continue
}

for _, message := range archive.Messages {
filter := m.transport.FilterByTopic(message.Topic)
if filter != nil {
shhMessage := &types.Message{
Sig: message.Sig,
Timestamp: uint32(message.Timestamp),
Topic: types.BytesToTopic(message.Topic),
Payload: message.Payload,
Padding: message.Padding,
Hash: message.Hash,
}
messages[*filter] = append(messages[*filter], shhMessage)
}
}
}
return messages, nil
}

func (m *Manager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) {
id := communityID.String()
torrentFile := m.torrentFile(id)
Expand Down Expand Up @@ -1501,7 +1652,7 @@ func (m *Manager) createWakuMessageArchive(from time.Time, to time.Time, message
return wakuMessageArchive
}

func (m *Manager) loadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) {
func (m *Manager) LoadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) {
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}

indexPath := m.archiveIndexFile(communityID.String())
Expand Down
25 changes: 25 additions & 0 deletions protocol/communities/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,15 @@ func (p *Persistence) GetWakuMessagesByFilterTopic(topics []types.TopicType, fro
return messages, nil
}

func (p *Persistence) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) {
var magnetlinkClock uint64
err := p.db.QueryRow(`SELECT magnetlink_clock FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&magnetlinkClock)
if err == sql.ErrNoRows {
return 0, nil
}
return magnetlinkClock, err
}

func (p *Persistence) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error {
_, err := p.db.Exec(`UPDATE communities_archive_info SET
magnetlink_clock = ?
Expand Down Expand Up @@ -460,6 +469,22 @@ func (p *Persistence) GetLastMessageArchiveEndDate(communityID types.HexBytes) (
return lastMessageArchiveEndDate, nil
}

func (p *Persistence) HasMessageArchiveID(communityID types.HexBytes, hash string) (exists bool, err error) {
err = p.db.QueryRow(`SELECT EXISTS (SELECT 1 FROM community_message_archive_hashes WHERE community_id = ? AND hash = ?)`,
communityID.String(),
hash,
).Scan(&exists)
return exists, err
}

func (p *Persistence) SaveMessageArchiveID(communityID types.HexBytes, hash string) error {
_, err := p.db.Exec(`INSERT INTO community_message_archive_hashes (community_id, hash) VALUES (?, ?)`,
communityID.String(),
hash,
)
return err
}

func (p *Persistence) GetCommunitiesSettings() ([]CommunitySettings, error) {
rows, err := p.db.Query("SELECT community_id, message_archive_seeding_enabled, message_archive_fetching_enabled FROM communities_settings")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion protocol/message_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,7 +1853,7 @@ func (db sqlitePersistence) GetDeletes(messageID string, from string) ([]*Delete
}

func (db sqlitePersistence) SaveEdit(editMessage EditMessage) error {
_, err := db.db.Exec(`INSERT INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID)
_, err := db.db.Exec(`INSERT OR REPLACE INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID)
return err
}

Expand Down
15 changes: 12 additions & 3 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2811,7 +2811,7 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) {
return nil, err
}

return m.handleRetrievedMessages(chatWithMessages)
return m.handleRetrievedMessages(chatWithMessages, true)
}

func (m *Messenger) GetStats() types.StatsSummary {
Expand Down Expand Up @@ -2959,7 +2959,7 @@ func (r *ReceivedMessageState) addNewActivityCenterNotification(publicKey ecdsa.
return nil
}

func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message) (*MessengerResponse, error) {
func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message, storeWakuMessages bool) (*MessengerResponse, error) {
response := &MessengerResponse{}
messageState := &ReceivedMessageState{
AllChats: m.allChats,
Expand Down Expand Up @@ -2989,7 +2989,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
// Indicates tha all messages in the batch have been processed correctly
allMessagesProcessed := true

if adminCommunitiesChatIDs[filter.ChatID] {
if adminCommunitiesChatIDs[filter.ChatID] && !storeWakuMessages {
logger.Debug("storing waku message")
err := m.communitiesManager.StoreWakuMessage(shhMessage)
if err != nil {
Expand Down Expand Up @@ -3549,6 +3549,15 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}

case protobuf.CommunityMessageArchiveMagnetlink:
logger.Debug("Handling CommunityMessageArchiveMagnetlink")
magnetlinkMessage := msg.ParsedMessage.Interface().(protobuf.CommunityMessageArchiveMagnetlink)
err = m.HandleHistoryArchiveMagnetlinkMessage(messageState, publicKey, magnetlinkMessage.MagnetUri, magnetlinkMessage.Clock)
if err != nil {
logger.Warn("failed to handle CommunityMessageArchiveMagnetlink", zap.Error(err))
continue
}

case protobuf.AnonymousMetricBatch:
logger.Debug("Handling AnonymousMetricBatch")
if m.anonMetricsServer == nil {
Expand Down
58 changes: 58 additions & 0 deletions protocol/messenger_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/ecdsa"
"encoding/hex"
"fmt"
"log"

"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -626,6 +627,63 @@ func (m *Messenger) HandleCommunityInvitation(state *ReceivedMessageState, signe
return nil
}

func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessageState, communityPubKey *ecdsa.PublicKey, magnetlink string, clock uint64) error {

id := types.HexBytes(crypto.CompressPubkey(communityPubKey))
settings, err := m.communitiesManager.GetCommunitySettingsByID(id)
if err != nil {
m.logger.Debug("Couldn't get community settings for community with id: ", zap.Any("id", id))
return err
}

if m.config.torrentConfig.Enabled && settings.HistoryArchiveSupportEnabled {
signedByOwnedCommunity, err := m.communitiesManager.IsAdminCommunity(communityPubKey)
if err != nil {
return err
}
joinedCommunity, err := m.communitiesManager.IsJoinedCommunity(communityPubKey)
if err != nil {
return err
}
lastClock, err := m.communitiesManager.GetMagnetlinkMessageClock(id)
if err != nil {
return err
}
// We are only interested in a community archive magnet link
// if it originates from a community that the current account is
// part of and doesn't own the private key at the same time
if !signedByOwnedCommunity && joinedCommunity && clock > lastClock {

m.communitiesManager.UnseedHistoryArchiveTorrent(id)
go func() {
downloadedArchiveIDs, err := m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink)
if err != nil {
log.Println("failed to download history archive data", err)
m.logger.Debug("failed to download history archive data", zap.Error(err))
return
}

messagesToHandle, err := m.communitiesManager.ExtractMessagesFromHistoryArchives(id, downloadedArchiveIDs)
if err != nil {
log.Println("failed to extract history archive messages", err)
m.logger.Debug("failed to extract history archive messages", zap.Error(err))
return
}

_, err = m.handleRetrievedMessages(messagesToHandle, false)
if err != nil {
log.Println("failed to write history archive messages to database", err)
m.logger.Debug("failed to write history archive messages to database", zap.Error(err))
return
}
}()

return m.communitiesManager.UpdateMagnetlinkMessageClock(id, clock)
}
}
return nil
}

// HandleCommunityRequestToJoin handles an community request to join
func (m *Messenger) HandleCommunityRequestToJoin(state *ReceivedMessageState, signer *ecdsa.PublicKey, requestToJoinProto protobuf.CommunityRequestToJoin) error {
if requestToJoinProto.CommunityId == nil {
Expand Down
Loading

0 comments on commit 81d88dc

Please sign in to comment.