-
Notifications
You must be signed in to change notification settings - Fork 249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle history archive magnetlink messages #2585
Conversation
Pull Request Checklist
|
This needs tests and some fine-tuning but should be in a reviewable shape. |
Jenkins BuildsClick to see older builds (85)
|
protocol/communities/manager.go
Outdated
} | ||
m.torrentTasks[id] = ml.InfoHash | ||
|
||
<-torrent.GotInfo() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here we're waiting for the magnetlink to emit its metadata (this is needed for us to know what files the torrent will have, so having this info is crucial).
During testing, I've experienced that it sometimes takes a long time to receive a signal, at which point nothing happens here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you are offline? We should probably timeout this if it could hang forever
protocol/communities/manager.go
Outdated
<-torrent.GotInfo() | ||
files := torrent.Files() | ||
|
||
indexFile := files[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always expect exactly two files data
and index
. So accessing [1]
is safe, but we should probably still dynamically determine, in case we'll add more files to these torrent sin the future (maybe due to status-im/specs#167)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be that files
is of different length? (say 1 or 0), even in cases where the torrent is malformed.
If that's the case, best to error check and return an error, since a panic would just crash the app, and if someone manages to push a malformed torrent for example, it would just crash all clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea good point. I'll update it so it'll search for an index
file and if it doesn't exist it'll return early as the torrent is malformed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
return m.communitiesManager.UpdateMagnetlinkMessageClock(id, clock) | ||
} | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I had most of this logic inside communitiesManaegr
, however because we need access to handleRetrievedMessages()
I've moved this here. Lemme know if there's a different way to get access to messenger
APIs inside communitiesManager
otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use pub/sub, but we can have a look at it later
protocol/message_persistence.go
Outdated
@@ -1853,7 +1853,7 @@ func (db sqlitePersistence) GetDeletes(messageID string, from string) ([]*Delete | |||
} | |||
|
|||
func (db sqlitePersistence) SaveEdit(editMessage EditMessage) error { | |||
_, err := db.db.Exec(`INSERT INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID) | |||
_, err := db.db.Exec(`INSERT OR REPLACE INTO user_messages_edits (clock, chat_id, message_id, text, source, id) VALUES(?,?,?,?,?,?)`, editMessage.Clock, editMessage.ChatId, editMessage.MessageId, editMessage.Text, editMessage.From, editMessage.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably not what we want. I noticed that, when replaying history messages into the DB, message edits were causing SQL errors, saying that the entries have to be unique. So I've added this here to satisfy it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cammellos Would be cool if you could confirm whether this here is okay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can see, it should not matter, since all the fields are not modified after insertion, so it would just be replacing the same message.
Though if you make it not return an error, the code will process the edit multiple times, not sure whether that is an issue (in theory it should be fine), but we need to make sure we test edits before merging, just to make sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per offline discussion, I wanted to changes this to leave out the OR REPLACE
and check for the error, which can then be interpreted as "don't process this message".
While doing this, I realized I couldn't reproduce the error anymore. So I'll just remove this change for now.
5881a6d
to
3e1eb0a
Compare
Please review the last commit of this PR |
81d88dc
to
7e39963
Compare
7e39963
to
83322d2
Compare
83322d2
to
e8e7cbe
Compare
e8e7cbe
to
dfdf2a9
Compare
dfdf2a9
to
1c34352
Compare
1c34352
to
eb4f9e3
Compare
eb4f9e3
to
973d2ca
Compare
8b201d4
to
cffda3f
Compare
0beba92
to
eecb4af
Compare
9d2e6c8
to
6ab8643
Compare
@@ -5,6 +5,7 @@ import ( | |||
"database/sql" | |||
"fmt" | |||
"os" | |||
"sort" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment here is just to ensure this file is being reviewed, as github collapses it due to the amount of changes.
6ab8643
to
a655450
Compare
multiaccounts/accounts/database.go
Outdated
@@ -219,5 +219,5 @@ func (db *Database) AddressExists(address types.Address) (exists bool, err error | |||
} | |||
|
|||
func (db *Database) GetNodeConfig() (*params.NodeConfig, error) { | |||
return nodecfg.GetNodeConfig(db.db) | |||
return nodecfg.GetNodeConfigFromDB(db.db) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to make this change because GeNodeConfig
no longer exists. @richard-ramos is that related to you changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it was surprising that this wasnt caught by the CI
a655450
to
2682a09
Compare
protocol/communities/manager.go
Outdated
return nil, err | ||
} | ||
|
||
m.logger.Debug("Adding torrent via magnetlink for community", zap.Any("id", id), zap.Any("magnetlink", magnetlink)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use strings for zap zap.String("id", id.String())
or `zap.String("magnet", magnetlink.String()), so you get the hexencoded version (otherwise is going to be base encoded)
protocol/communities/manager.go
Outdated
} | ||
m.torrentTasks[id] = ml.InfoHash | ||
|
||
<-torrent.GotInfo() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you are offline? We should probably timeout this if it could hang forever
protocol/communities/manager.go
Outdated
m.logger.Debug("Downloading history archive index") | ||
indexFile.Download() | ||
for { | ||
if indexFile.BytesCompleted() == indexFile.Length() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a busy loop, you want to wait you should use a different pattern (use a channel,select/polling etc)
protocol/communities/manager.go
Outdated
startIndex := int(metadata.Offset) / pieceLength | ||
endIndex := startIndex + int(metadata.Size)/pieceLength | ||
|
||
m.logger.Debug("Downloading data for message archive", zap.Any("hash", hash)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use lowercase for log entries
} | ||
|
||
for _, message := range archive.Messages { | ||
filter := m.transport.FilterByTopic(message.Topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are you checking exactly here? meaning, you are checking the topic matches, but generally that's not enough to ensure the message is for you and you should process it.
Which messages should we process and in which case they should not be processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function (ExtractMessagesFromHistoryArchive
) aims to create a map[transport.Filter][]*types.Message
from a downloaded archive, so they can be passed to handleRetrievedMessages
.
Because we're only getting the topic from each archive message, we need to figure out what filter they belong to, so we can create the correct return type.
All messages extracted from an archive (and passed to handleRetrievedMessages
) are considered to be processed.
} | ||
|
||
if m.config.torrentConfig != nil && m.config.torrentConfig.Enabled && settings.HistoryArchiveSupportEnabled { | ||
signedByOwnedCommunity, err := m.communitiesManager.IsAdminCommunity(communityPubKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a first glance, I think we should be using the key of the community to send this messages, rather than the admins? (there's arguments for both, but worth considering)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the message with the magnet link is signed by the community key.
However this part of the code is handling the magnet link messages. So what we're checking here is: Am I the admin/owner of this community? If yes, then I'm not interested in handling the magnet link because I'm expected to be node that has created the magnet link message from a history archive in the first place.
In other words: only members of the community are interested in this, not admins/owners
// We are only interested in a community archive magnet link | ||
// if it originates from a community that the current account is | ||
// part of and doesn't own the private key at the same time | ||
if !signedByOwnedCommunity && joinedCommunity && clock >= lastClock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why we are not interested if we are the admin, is the assumption that we would have the same messages therefore is not useful/is redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, the admin is the one that publishes the history archives via magnetlinks, so that node already has the history. Only members are downloading torrent data
protocol/messenger_handler.go
Outdated
return | ||
} | ||
|
||
_, err = m.handleRetrievedMessages(messagesToHandle, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a bit dangerous, we currently don't run handleRetrieveMessages
in parallel, so weird things might happen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm.. I wonder what such weird things could be, given that handleRetrieveMessages
has various checks for incoming messages and handling out of order messages etc.
Any suggestions on how else this should be done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First thing that comes to mind is to have a channel, and handle retrieved messages it's called on it, so you have 2 producers (polling from the network, and torrent) and a single consumer, a bit more complex, but we don't have to think about the code running in parallel, as it might lead to difficult to find bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I'll see if I can make that happen. Thanks for your input!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You also need to publish the response, otherwise the client will be in inconsistent state,
status-go/services/ext/service.go
Line 210 in f66ad3d
publishMessengerResponse(response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In desktop, what we do is we listen for the HistoryDownloaded
signal, and if that has happened we refetch messages from status-go for the current active channel.
return m.communitiesManager.UpdateMagnetlinkMessageClock(id, clock) | ||
} | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use pub/sub, but we can have a look at it later
2682a09
to
0c8d798
Compare
protocol/communities/manager.go
Outdated
archiveLogger, err := lc.Build() | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed to create a archive logger") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a new logger so we get some output in stdout when archives are created/downloaded (easier for debugging, kinda hard when stuff only gets written to geth.log)
select { | ||
case <-time.After(20 * time.Second): | ||
return nil, errors.New("torrent has timed out") | ||
case <-torrent.GotInfo(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cammellos I've added a time out of 20 seconds here in case torrent.GotInfo()
doesn't emit anything
protocol/communities/manager.go
Outdated
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-ticker.C: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make the busy loop less busy, we're now using a 200 ms ticker, which should be more than enough.
cmd/statusd/main.go
Outdated
if err != nil { | ||
logger.Error("failed to write history archive messages to database", err) | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cammellos lemme know if this is what you had in mind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, but in general LGTM! 🎖️
protocol/communities/persistence.go
Outdated
@@ -443,6 +443,11 @@ func (p *Persistence) GetWakuMessagesByFilterTopic(topics []types.TopicType, fro | |||
return messages, nil | |||
} | |||
|
|||
func (p *Persistence) HasCommunityArchiveInfo(communityID types.HexBytes) (exists bool, err error) { | |||
err = p.db.QueryRow(`SELECT count(1) FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&exists) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's worth it, but depending on the number of rows in communities_archive_info
, EXISTS
is more efficient
err = p.db.QueryRow(`SELECT count(1) FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&exists) | |
err = p.db.QueryRow(`SELECT EXISTS (SELECT 1 FROM communities_archive_info WHERE community_id = ?)`, communityID.String()).Scan(&exists) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is cool!
protocol/messenger_handler.go
Outdated
return err | ||
} | ||
|
||
if m.config.torrentConfig != nil && m.config.torrentConfig.Enabled && settings.HistoryArchiveSupportEnabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
settings
can be null if communities_settings does not contain the id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
if !signedByOwnedCommunity && joinedCommunity && clock >= lastClock { | ||
|
||
m.communitiesManager.UnseedHistoryArchiveTorrent(id) | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a 'slow' operation? what happens if an user logouts while the goroutine is still executing? If it's important for the goroutine to finish before logout, maybe it's a good idea to add a sync.WaitGroup
in messenger and wait for it in func (m *Messenger) Shutdown() (err error)
(or maybe this is already handled somewhere else?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very good point. Adding this now
Please update the VERSION file |
78cdfd0
to
0c201b5
Compare
@cammellos @richard-ramos thanks for all your comments.
|
If follow the processes (which are still TBD, but better to check twice) we need to check it on the mobile client-side briefly and on the desktop side as well. |
@churik u may test it on mobile side and provide feedback. It was not yet tested on desktop, @elina2015 please check this today |
Just to let you know, this feature is completely turned off on mobile, so the only thing to test/check there is that it indeed doesn't create archives, nor downloads them. |
9727028
to
ff46fe1
Compare
Hey, thank you for the clarification! But for context: in such cases, this is not really testing of the feature itself. It was decided (at least until there’s a better solution, suggestions are still welcome and appriciated) to create and test clones of Go pull requests in client repositories (Mobile and Desktop) in order to avoid issues that sometimes arise after PRs are merged without any testing on the Go side (discussion here). Regarding this particular PR: looks good from the Mobile side, no issues found. |
f7dea6d
to
1b49784
Compare
This introduces the ability for status notes to handle community history archive magnetlinks. To make this work, a few things are needed: 1. A new database table has been introduced to store message archive hashes. This is necessary so status nodes can determine whether or not they need to download a certain archive 2. The messenger's `handleRetrievedMessages()` has been exteded to take magnetlink messages into account 3. New APIs were added to download torrent data given a magnetlink and also to extract messages from downloaded archives, which are then later fed to `handleRetrievedMessages` Closes #2568
1b49784
to
b3fa110
Compare
This introduces the ability for status notes to handle community
history archive magnetlinks. To make this work, a few things are needed:
hashes. This is necessary so status nodes can determine whether or
not they need to download a certain archive
handleRetrievedMessages()
has been exteded to takemagnetlink messages into account
also to extract messages from downloaded archives, which are then
later fed to
handleRetrievedMessages
Closes #2568