From 1ff9e98b575d9d4ab94a451708b0d58cf83a10ed Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 24 Oct 2024 15:27:28 +0530 Subject: [PATCH] feat_: use fleet nodes as one of the preferred nodes for waku filter subscriptions --- go.mod | 2 +- go.sum | 4 +-- params/cluster.go | 6 ++++ .../go-waku/waku/v2/api/filter/filter.go | 18 +++++++++++- .../waku/v2/api/filter/filter_manager.go | 11 ++++++- .../go-waku/waku/v2/protocol/filter/client.go | 15 ++++++++-- vendor/modules.txt | 2 +- wakuv2/waku.go | 29 +++++++++++++++++-- 8 files changed, 77 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 3859cded336..2148e33440c 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20241018104939-8842d00df1b9 + github.com/waku-org/go-waku v0.8.1-0.20241024095241-be7eb6801c3e github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 024cfe4ae29..072d603328a 100644 --- a/go.sum +++ b/go.sum @@ -2136,8 +2136,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20241018104939-8842d00df1b9 h1:0idZXdPAB4Xgbj6R9rJoeNWwVRarGioPIPIwapfJbQA= -github.com/waku-org/go-waku v0.8.1-0.20241018104939-8842d00df1b9/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= +github.com/waku-org/go-waku v0.8.1-0.20241024095241-be7eb6801c3e h1:6jNiSLK8HkMoGP1fZSP+C/0AvGBMV1V5eAvhgLx4vGc= +github.com/waku-org/go-waku v0.8.1-0.20241024095241-be7eb6801c3e/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/params/cluster.go b/params/cluster.go index 7ac6184bdae..6eb7c8acc55 100644 --- a/params/cluster.go +++ b/params/cluster.go @@ -30,6 +30,9 @@ var supportedFleets = map[FleetName]map[NodeType][]string{ FleetStatusStaging: { WakuNodes: { "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@boot.staging.status.nodes.status.im", + "/dns4/boot-01.do-ams3.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmQE7FXQc6iZHdBzYfw3qCSDa9dLc1wsBJKoP4aZvztq2d", + "/dns4/boot-01.gc-us-central1-a.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmGAA54bBTE78MYidSy3P7Q9yAWFNTAEReJYD69VRvtL5r", + "/dns4/boot-01.ac-cn-hongkong-c.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmNTpGnyZ8W1BK2sXEmgSCNWiyDKgRU3NBR2DXST2HzxRU", }, DiscV5BootstrapNodes: { "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@boot.staging.status.nodes.status.im", @@ -41,6 +44,9 @@ var supportedFleets = map[FleetName]map[NodeType][]string{ FleetStatusProd: { WakuNodes: { "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im", + "/dns4/boot-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAR24Mbb6VuzoyUiGx42UenDkshENVDj4qnmmbabLvo31", + "/dns4/boot-01.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAm8mUZ18tBWPXDQsaF7PbCKYA35z7WB2xNZH2EVq1qS8LJ", + "/dns4/boot-01.ac-cn-hongkong-c.status.prod.statusim.net/tcp/30303/p2p/16Uiu2HAmGwcE8v7gmJNEWFtZtojYpPMTHy2jBLL6xRk33qgDxFWX", }, DiscV5BootstrapNodes: { "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im", diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go index 020bb23f54e..5e8a35e1c1d 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go @@ -52,6 +52,7 @@ type Sub struct { type subscribeParameters struct { batchInterval time.Duration multiplexChannelBuffer int + preferredPeers peer.IDSlice } type SubscribeOptions func(*subscribeParameters) @@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions { } } +func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions { + return func(params *subscribeParameters) { + params.preferredPeers = peers + } +} + // Subscribe func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { sub := new(Sub) @@ -197,7 +204,16 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int options := make([]filter.FilterSubscribeOption, 0) options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount))) for _, p := range apiSub.Config.Peers { - options = append(options, filter.WithPeer(p)) + isExcludedPeer := false + for _, px := range peersToExclude { // configured peer can be excluded if sub fails with it. + if p == px { + isExcludedPeer = true + break + } + } + if !isExcludedPeer { + options = append(options, filter.WithPeer(p)) + } } if len(peersToExclude) > 0 { apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go index a43c3c3963c..942ac685289 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go @@ -2,10 +2,12 @@ package filter import ( "context" + "math/rand" "sync" "time" "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/zap" "golang.org/x/exp/maps" @@ -61,7 +63,8 @@ type EnevelopeProcessor interface { OnNewEnvelope(env *protocol.Envelope) error } -func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { +func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, + envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx @@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { defer utils.LogOnPanic() ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} + if len(mgr.params.preferredPeers) > 0 { + //use one peer which is from preferred peers. + randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1) + randomPreferredPeer := mgr.params.preferredPeers[randomIndex] + config.Peers = []peer.ID{randomPreferredPeer} + } sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) mgr.Lock() mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 3b56d4700ee..4bf48b30d05 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -333,11 +333,17 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) } reqPeerCount := params.maxPeers - len(params.selectedPeers) + for _, p := range params.selectedPeers { + //exclude peers that are preferredpeers so that they don't get selected again. + if _, ok := params.peersToExclude[p]; !ok { + params.peersToExclude[p] = struct{}{} + } + } if params.pm != nil && reqPeerCount > 0 { wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude))) - params.selectedPeers, err = wf.pm.SelectPeers( + selectedPeers, err := wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, @@ -350,7 +356,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, ) if err != nil { wf.log.Error("peer selection returned err", zap.Error(err)) - return nil, nil, err + if len(params.selectedPeers) == 0 { + return nil, nil, err + } + } + if len(selectedPeers) > 0 { + params.selectedPeers = append(params.selectedPeers, selectedPeers...) } } wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers))) diff --git a/vendor/modules.txt b/vendor/modules.txt index 0d3310a0920..19d637b00bb 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1015,7 +1015,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20241018104939-8842d00df1b9 +# github.com/waku-org/go-waku v0.8.1-0.20241024095241-be7eb6801c3e ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 43c0626bc91..0b3b9ddb842 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -526,7 +526,6 @@ func (w *Waku) discoverAndConnectPeers() { w.logger.Warn("invalid peer multiaddress", zap.String("ma", addrString), zap.Error(err)) continue } - peerInfo, err := peer.AddrInfoFromP2pAddr(addr) if err != nil { w.logger.Warn("invalid peer multiaddress", zap.Stringer("addr", addr), zap.Error(err)) @@ -1202,7 +1201,8 @@ func (w *Waku) Start() error { w.cfg.MinPeersForFilter, w, w.node.FilterLightnode(), - filterapi.WithBatchInterval(300*time.Millisecond)) + filterapi.WithBatchInterval(300*time.Millisecond), + filterapi.WithPreferredServiceNodes(w.GetPreferredServiceNodes())) } err = w.setupRelaySubscriptions() @@ -1702,6 +1702,31 @@ func (w *Waku) StopDiscV5() error { return nil } +func (w *Waku) GetPreferredServiceNodes() peer.IDSlice { + var bootnodes peer.IDSlice + for _, addrString := range w.cfg.WakuNodes { + addrString := addrString + if strings.HasPrefix(addrString, "enrtree://") { + continue + } else { + // It is a normal multiaddress + addr, err := multiaddr.NewMultiaddr(addrString) + if err != nil { + w.logger.Warn("invalid peer multiaddress", zap.String("ma", addrString), zap.Error(err)) + continue + } + _, id := peer.SplitAddr(addr) + if id == "" { + w.logger.Warn("peer multiaddress doesn't contain peerID", zap.Stringer("addr", addr)) + continue + } + bootnodes = append(bootnodes, id) + w.logger.Debug("adding peer to bootnodes for filter", zap.Stringer("id", id)) + } + } + return bootnodes +} + func (w *Waku) handleNetworkChangeFromApp(state connection.State) { //If connection state is reported by something other than peerCount becoming 0 e.g from mobile app, disconnect all peers if (state.Offline && len(w.node.Host().Network().Peers()) > 0) ||