From 6ed72907f4c6be5f17224284dcd3e439638c55ae Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 7 Oct 2024 13:01:26 +0300 Subject: [PATCH 01/16] fixing setup and adding debug logs --- wakuv2/nwaku.go | 81 +++++++++++++++++++++++++++----------------- wakuv2/nwaku_test.go | 25 +++++++++++--- 2 files changed, 71 insertions(+), 35 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 0fcb0b236f..4596cae6e2 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -4,8 +4,8 @@ package wakuv2 /* - #cgo LDFLAGS: -L../third_party/nwaku/vendor/negentropy/cpp/ -lnegentropy -L../third_party/nwaku/build/ -lwaku -lm -ldl -pthread -lminiupnpc -L../third_party/nwaku/vendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ -lnatpmp -L../third_party/nwaku/vendor/nim-nat-traversal/vendor/libnatpmp-upstream/ -L../third_party/nwaku/vendor/nim-libbacktrace/install/usr/lib/ -lbacktrace -Wl,--allow-multiple-definition - #cgo LDFLAGS: -Wl,-rpath,../third_party/nwaku/build/ + #cgo LDFLAGS: -L../third_party/nwaku/build/ -lnegentropy -lwaku + #cgo LDFLAGS: -L../third_party/nwaku -Wl,-rpath,../third_party/nwaku/build/ #include "../third_party/nwaku/library/libwaku.h" #include @@ -19,17 +19,17 @@ package wakuv2 size_t len; } Resp; - void* allocResp() { + static void* allocResp() { return calloc(1, sizeof(Resp)); } - void freeResp(void* resp) { + static void freeResp(void* resp) { if (resp != NULL) { free(resp); } } - char* getMyCharPtr(void* resp) { + static char* getMyCharPtr(void* resp) { if (resp == NULL) { return NULL; } @@ -37,7 +37,7 @@ package wakuv2 return m->msg; } - size_t getMyCharLen(void* resp) { + static size_t getMyCharLen(void* resp) { if (resp == NULL) { return 0; } @@ -45,7 +45,7 @@ package wakuv2 return m->len; } - int getRet(void* resp) { + static int getRet(void* resp) { if (resp == NULL) { return 0; } @@ -54,8 +54,10 @@ package wakuv2 } // resp must be set != NULL in case interest on retrieving data from the callback - void callback(int ret, char* msg, size_t len, void* resp) { + static void callback(int ret, char* msg, size_t len, void* resp) { + printf("---------- GABRIEL calling callback 1 ----\n"); if (resp != NULL) { + printf("---------- GABRIEL calling callback 2 ----\n"); Resp* m = (Resp*) resp; m->ret = ret; m->msg = msg; @@ -65,6 +67,7 @@ package wakuv2 #define WAKU_CALL(call) \ do { \ + printf("---------- GABRIEL calling WAKU_CALL 1 ----\n"); \ int ret = call; \ if (ret != 0) { \ printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ @@ -72,37 +75,37 @@ package wakuv2 } \ } while (0) - void* cGoWakuNew(const char* configJson, void* resp) { + static void* cGoWakuNew(const char* configJson, void* resp) { // We pass NULL because we are not interested in retrieving data from this callback void* ret = waku_new(configJson, (WakuCallBack) callback, resp); return ret; } - void cGoWakuStart(void* wakuCtx, void* resp) { + static void cGoWakuStart(void* wakuCtx, void* resp) { WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuStop(void* wakuCtx, void* resp) { + static void cGoWakuStop(void* wakuCtx, void* resp) { WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuDestroy(void* wakuCtx, void* resp) { + static void cGoWakuDestroy(void* wakuCtx, void* resp) { WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { + static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { + static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuVersion(void* wakuCtx, void* resp) { + static void cGoWakuVersion(void* wakuCtx, void* resp) { WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuSetEventCallback(void* wakuCtx) { + static void cGoWakuSetEventCallback(void* wakuCtx) { // The 'globalEventCallback' Go function is shared amongst all possible Waku instances. // Given that the 'globalEventCallback' is shared, we pass again the @@ -118,7 +121,7 @@ package wakuv2 waku_set_event_callback(wakuCtx, (WakuCallBack) globalEventCallback, wakuCtx); } - void cGoWakuContentTopic(void* wakuCtx, + static void cGoWakuContentTopic(void* wakuCtx, char* appName, int appVersion, char* contentTopicName, @@ -134,15 +137,15 @@ package wakuv2 resp) ); } - void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { + static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) callback, resp) ); } - void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { + static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) callback, resp)); } - void cGoWakuRelayPublish(void* wakuCtx, + static void cGoWakuRelayPublish(void* wakuCtx, const char* pubSubTopic, const char* jsonWakuMessage, int timeoutMs, @@ -156,14 +159,14 @@ package wakuv2 resp)); } - void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { + static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { WAKU_CALL ( waku_relay_subscribe(wakuCtx, pubSubTopic, (WakuCallBack) callback, resp) ); } - void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { + static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, pubSubTopic, @@ -171,7 +174,7 @@ package wakuv2 resp) ); } - void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { + static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { WAKU_CALL( waku_connect(wakuCtx, peerMultiAddr, timeoutMs, @@ -179,23 +182,23 @@ package wakuv2 resp) ); } - void cGoWakuListenAddresses(void* wakuCtx, void* resp) { + static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) callback, resp) ); } - void cGoWakuGetMyENR(void* ctx, void* resp) { + static void cGoWakuGetMyENR(void* ctx, void* resp) { WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) ); } - void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { + static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } - void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) { + static void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) { WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } - void cGoWakuLightpushPublish(void* wakuCtx, + static void cGoWakuLightpushPublish(void* wakuCtx, const char* pubSubTopic, const char* jsonWakuMessage, void* resp) { @@ -207,7 +210,7 @@ package wakuv2 resp)); } - void cGoWakuStoreQuery(void* wakuCtx, + static void cGoWakuStoreQuery(void* wakuCtx, const char* jsonQuery, const char* peerAddr, int timeoutMs, @@ -221,7 +224,7 @@ package wakuv2 resp)); } - void cGoWakuPeerExchangeQuery(void* wakuCtx, + static void cGoWakuPeerExchangeQuery(void* wakuCtx, uint64_t numPeers, void* resp) { @@ -231,7 +234,7 @@ package wakuv2 resp)); } - void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, + static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, const char* protocol, void* resp) { @@ -1853,12 +1856,19 @@ func (self *Waku) WakuStart() error { var resp = C.allocResp() defer C.freeResp(resp) + + + fmt.Println("------------ GABRIEL called wakuStart") C.cGoWakuStart(self.wakuCtx, resp) + fmt.Println("------------ GABRIEL wakuStart 2") if C.getRet(resp) == C.RET_OK { + fmt.Println("------------ GABRIEL wakuStart received RET_OK") return nil } + fmt.Println("------------ GABRIEL wakuStart 3") errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + fmt.Println("------------ GABRIEL error in wakuStart ", errMsg) return errors.New(errMsg) } @@ -2393,11 +2403,16 @@ func New(nodeKey *ecdsa.PrivateKey, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { + fmt.Println("-------- GABRIEL func New 1 ---------") + // Lock the main goroutine to its current OS thread runtime.LockOSThread() + fmt.Println("-------- GABRIEL func New 2 ---------") + WakuSetup() // This should only be called once in the whole app's life + fmt.Println("-------- GABRIEL func New 3 ---------") node, err := wakuNew(nodeKey, fleet, cfg, logger, appDB, ts, onHistoricMessagesRequestFailed, @@ -2406,17 +2421,21 @@ func New(nodeKey *ecdsa.PrivateKey, return nil, err } + fmt.Println("-------- GABRIEL func New 4 ---------") defaultPubsubTopic, err := node.WakuDefaultPubsubTopic() if err != nil { fmt.Println("Error happened:", err.Error()) } + fmt.Println("-------- GABRIEL func New 5 ---------") err = node.WakuRelaySubscribe(defaultPubsubTopic) if err != nil { fmt.Println("Error happened:", err.Error()) } + fmt.Println("-------- GABRIEL func New 6 ---------") node.WakuSetEventCallback() + fmt.Println("-------- GABRIEL func New 7 ---------") return node, nil diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 2c1490cb24..9ef5a64227 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -7,6 +7,7 @@ import ( "context" "crypto/rand" "errors" + "fmt" "math/big" "os" "testing" @@ -162,22 +163,27 @@ func parseNodes(rec []string) []*enode.Node { // // IP_ADDRESS=$(hostname -I | awk '{print $1}'); // docker run \ -// -p 60000:60000/tcp -p 9000:9000/udp -p 8645:8645/tcp harbor.status.im/wakuorg/nwaku:v0.31.0 \ -// --tcp-port=60000 --discv5-discovery=true --cluster-id=16 --pubsub-topic=/waku/2/rs/16/32 --pubsub-topic=/waku/2/rs/16/64 \ -// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=9000 --rest-address=0.0.0.0 --store +// -p 61000:61000/tcp -p 9000:9000/udp -p 8645:8645/tcp harbor.status.im/wakuorg/nwaku:v0.31.0 \ +// --tcp-port=61000 --discv5-discovery=true --cluster-id=16 --pubsub-topic=/waku/2/rs/16/32 --pubsub-topic=/waku/2/rs/16/64 \ +// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=9000 --rest-address=0.0.0.0 --rest-port=8646 --store func TestBasicWakuV2(t *testing.T) { - nwakuInfo, err := GetNwakuInfo(nil, nil) + fmt.Println("---------- GABRIEL 1 ----------") + extNodeRestPort := 8646 + nwakuInfo, err := GetNwakuInfo(nil, &extNodeRestPort) require.NoError(t, err) + fmt.Println("---------- GABRIEL 2 ----------") // Creating a fake DNS Discovery ENRTree tree, url := makeTestTree("n", parseNodes([]string{nwakuInfo.EnrUri}), nil) + fmt.Println("---------- GABRIEL 3 ----------") enrTreeAddress := url envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") if envEnrTreeAddress != "" { enrTreeAddress = envEnrTreeAddress } + fmt.Println("---------- GABRIEL 4 ----------") config := &Config{} setDefaultConfig(config, false) config.Port = 0 @@ -186,30 +192,39 @@ func TestBasicWakuV2(t *testing.T) { config.DiscoveryLimit = 20 config.WakuNodes = []string{enrTreeAddress} w, err := New(nil, "", config, nil, nil, nil, nil, nil) + fmt.Println("---------- GABRIEL 5 ----------") require.NoError(t, err) require.NoError(t, w.Start()) + fmt.Println("---------- GABRIEL 6 ----------") enr, err := w.ENR() require.NoError(t, err) require.NotNil(t, enr) + fmt.Println("---------- GABRIEL 7 ----------") + // DNSDiscovery ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) defer cancel() + fmt.Println("---------- GABRIEL 8 ----------") + discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrTreeAddress, dnsdisc.WithResolver(config.Resolver)) require.NoError(t, err) + fmt.Println("---------- GABRIEL 9 ----------") // Peer used for retrieving history r, err := rand.Int(rand.Reader, big.NewInt(int64(len(discoveredNodes)))) require.NoError(t, err) storeNode := discoveredNodes[int(r.Int64())] + fmt.Println("---------- GABRIEL 10 ----------") options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 30 * time.Second } + fmt.Println("---------- GABRIEL 11 ----------") // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { if len(w.Peers()) < 1 { @@ -219,6 +234,8 @@ func TestBasicWakuV2(t *testing.T) { }, options) require.NoError(t, err) + fmt.Println("---------- GABRIEL 12 ----------") + // Dropping Peer err = w.DropPeer(storeNode.PeerID) require.NoError(t, err) From 2c0eb59d9f45e3effde594ea9c1675e5123e2a91 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 7 Oct 2024 15:18:46 +0300 Subject: [PATCH 02/16] removing unnecessary channel --- wakuv2/nwaku.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 4596cae6e2..c5234ee785 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -256,13 +256,10 @@ import ( "encoding/json" "errors" "fmt" - "os" - "os/signal" "runtime" "strconv" "strings" "sync" - "syscall" "time" "unsafe" @@ -1027,9 +1024,9 @@ func (w *Waku) Start() error { return err } - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - <-ch + // ch := make(chan os.Signal, 1) + // signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + //<-ch // if err = w.node.Start(w.ctx); err != nil { // return fmt.Errorf("failed to start go-waku node: %v", err) From 4fa26147e8fc7f41f3818f88510d08a51a7724f5 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 8 Oct 2024 14:25:21 +0300 Subject: [PATCH 03/16] commenting a lot of things and some config changes to start testing --- wakuv2/api.go | 24 +------- wakuv2/api_test.go | 12 +--- wakuv2/message_publishing.go | 18 +----- wakuv2/nwaku.go | 109 +++++++++++++++++------------------ wakuv2/nwaku_test.go | 44 ++++++++------ 5 files changed, 86 insertions(+), 121 deletions(-) diff --git a/wakuv2/api.go b/wakuv2/api.go index f106b32f52..53f9f2151d 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -18,27 +18,7 @@ package wakuv2 -import ( - "context" - "crypto/ecdsa" - "errors" - "fmt" - "sync" - "time" - - "github.com/waku-org/go-waku/waku/v2/payload" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - - "github.com/status-im/status-go/wakuv2/common" - - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rpc" - - "google.golang.org/protobuf/proto" -) - +/* // List of errors var ( ErrSymAsym = errors.New("specify either a symmetric or an asymmetric key") @@ -513,4 +493,4 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) { api.mu.Unlock() return id, nil -} +} */ diff --git a/wakuv2/api_test.go b/wakuv2/api_test.go index 7a060bf5fd..7119650baa 100644 --- a/wakuv2/api_test.go +++ b/wakuv2/api_test.go @@ -18,16 +18,7 @@ package wakuv2 -import ( - "testing" - "time" - - "golang.org/x/exp/maps" - - "github.com/status-im/status-go/wakuv2/common" -) - -func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { +/* func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { w, err := New(nil, "", nil, nil, nil, nil, nil, nil) if err != nil { t.Fatalf("Error creating WakuV2 client: %v", err) @@ -68,3 +59,4 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { t.Fatalf("Could not find filter with both topics") } } +*/ \ No newline at end of file diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 25f8f57d83..49051e55ce 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,19 +1,5 @@ package wakuv2 -import ( - "errors" - - "go.uber.org/zap" - - "github.com/waku-org/go-waku/waku/v2/api/publish" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/status-im/status-go/wakuv2/common" -) - type PublishMethod int const ( @@ -34,7 +20,7 @@ func (pm PublishMethod) String() string { // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. -func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { +/* func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { pubsubTopic = w.GetPubsubTopic(pubsubTopic) if w.protectedTopicStore != nil { privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic) @@ -160,4 +146,4 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.Pu }) } } -} +} */ diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index c5234ee785..ac6d82601c 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -269,7 +269,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/rpc" "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/peer" @@ -350,7 +349,7 @@ func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription { return subscription } -func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { +/* func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { wg := sync.WaitGroup{} mu := sync.Mutex{} var result []*enode.Node @@ -394,11 +393,12 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) wg.Wait() return result, nil -} +} */ type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) -func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) error { +// This should be handled by nwaku? +/* func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) error { w.logger.Info("retrieving nodes", zap.String("enr", enrtreeAddress)) ctx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() @@ -439,9 +439,10 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA wg.Wait() return nil -} +} */ -func (w *Waku) discoverAndConnectPeers() { +// This too? nwaku? +/* func (w *Waku) discoverAndConnectPeers() { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { defer wg.Done() if len(d.PeerInfo.Addrs) != 0 { @@ -475,7 +476,7 @@ func (w *Waku) discoverAndConnectPeers() { go w.connect(*peerInfo, nil, wps.Static) } } -} +} */ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) { // Connection will be prunned eventually by the connection manager if needed @@ -484,7 +485,7 @@ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origi w.WakuConnect(addr.String(), 1000) } -func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { +/* func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { w.wg.Add(1) defer w.wg.Done() @@ -571,22 +572,22 @@ func (w *Waku) runPeerExchangeLoop() { } } } -} +} */ -func (w *Waku) GetPubsubTopic(topic string) string { +/* func (w *Waku) GetPubsubTopic(topic string) string { if topic == "" { topic = w.cfg.DefaultShardPubsubTopic } return topic -} +} */ // CurrentTime returns current time. func (w *Waku) CurrentTime() time.Time { return w.timesource.Now() } -// APIs returns the RPC descriptors the Waku implementation offers +/* // APIs returns the RPC descriptors the Waku implementation offers func (w *Waku) APIs() []rpc.API { return []rpc.API{ { @@ -596,7 +597,7 @@ func (w *Waku) APIs() []rpc.API { Public: false, }, } -} +} */ // Protocols returns the waku sub-protocols ran by this particular client. func (w *Waku) Protocols() []p2p.Protocol { @@ -851,7 +852,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. -func (w *Waku) Subscribe(f *common.Filter) (string, error) { +/* func (w *Waku) Subscribe(f *common.Filter) (string, error) { f.PubsubTopic = w.GetPubsubTopic(f.PubsubTopic) id, err := w.filters.Install(f) if err != nil { @@ -878,7 +879,7 @@ func (w *Waku) Unsubscribe(ctx context.Context, id string) error { } return nil -} +} */ // GetFilter returns the filter by id. func (w *Waku) GetFilter(id string) *common.Filter { @@ -897,7 +898,7 @@ func (w *Waku) UnsubscribeMany(ids []string) error { return nil } -func (w *Waku) SkipPublishToTopic(value bool) { +/* func (w *Waku) SkipPublishToTopic(value bool) { w.cfg.SkipPublishToTopic = value } @@ -906,7 +907,7 @@ func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { return } w.messageSentCheck.DeleteByMessageIDs(hashes) -} +} */ func (w *Waku) SetStorePeerID(peerID peer.ID) { if w.messageSentCheck != nil { @@ -999,10 +1000,10 @@ func (w *Waku) Query(ctx context.Context, return nil, 0, nil } -// OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. +/* // OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) -} +} */ // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. @@ -1188,7 +1189,7 @@ func (w *Waku) checkForConnectionChanges() { // }() // } -func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { +/* func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { w.poolMu.Lock() defer w.poolMu.Unlock() return w.envelopeCache.Has(gethcommon.Hash(mh)), nil @@ -1200,9 +1201,9 @@ func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic s } w.missingMsgVerifier.SetCriteriaInterest(peerID, protocol.NewContentFilter(pubsubTopic, contentTopics...)) -} +} */ -func (w *Waku) setupRelaySubscriptions() error { +/* func (w *Waku) setupRelaySubscriptions() error { if w.cfg.LightClient { return nil } @@ -1234,9 +1235,9 @@ func (w *Waku) setupRelaySubscriptions() error { } return nil -} +} */ -func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { +/* func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { if envelope == nil { return nil } @@ -1323,9 +1324,9 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) // postEvent queues the message for further processing. func (w *Waku) postEvent(envelope *common.ReceivedMessage) { w.msgQueue <- envelope -} +} */ -// processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. +/* // processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. func (w *Waku) processQueueLoop() { if w.ctx == nil { return @@ -1379,7 +1380,7 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { Hash: e.Hash(), Event: common.EventEnvelopeAvailable, }) -} +} */ // GetEnvelope retrieves an envelope from the message queue by its hash. // It returns nil if the envelope can not be found. @@ -1421,7 +1422,7 @@ func (w *Waku) Peers() types.PeerStats { // return FormatPeerStats(w.node) } -func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { +/* func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { if w.cfg.LightClient { return nil, errors.New("only available in relay mode") } @@ -1483,7 +1484,7 @@ func (w *Waku) RemovePubsubTopicKey(topic string) error { } return w.protectedTopicStore.Delete(topic) -} +} */ func (w *Waku) handleNetworkChangeFromApp(state connection.State) { //If connection state is reported by something other than peerCount becoming 0 e.g from mobile app, disconnect all peers @@ -1497,7 +1498,7 @@ func (w *Waku) handleNetworkChangeFromApp(state connection.State) { // } } -func (w *Waku) ConnectionChanged(state connection.State) { +/* func (w *Waku) ConnectionChanged(state connection.State) { isOnline := !state.Offline if w.cfg.LightClient { //TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114 @@ -1519,7 +1520,7 @@ func (w *Waku) ConnectionChanged(state connection.State) { w.onlineChecker.SetOnline(isOnline) } w.state = state -} +} */ func (w *Waku) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) { // peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, store.StoreQueryID_v300) @@ -1667,15 +1668,17 @@ type WakuPubsubTopic = string type WakuContentTopic = string type WakuConfig struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - NodeKey string `json:"key,omitempty"` - EnableRelay bool `json:"relay"` - LogLevel string `json:"logLevel"` + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + NodeKey string `json:"key,omitempty"` + EnableRelay bool `json:"relay"` + LogLevel string `json:"logLevel"` + DnsDiscovery string `json:"dnsDiscovery,omitempty"` + DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` + MaxMessageSize string `json:"maxMessageSize,omitempty"` + DiscV5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` } -var jamon unsafe.Pointer - type Waku struct { wakuCtx unsafe.Pointer @@ -1710,7 +1713,8 @@ type Waku struct { cancel context.CancelFunc wg sync.WaitGroup - cfg *Config + // cfg *Config + cfg *WakuConfig options []node.WakuNodeOption envelopeFeed event.Feed @@ -1769,20 +1773,12 @@ func printStackTrace() { func wakuNew(nodeKey *ecdsa.PrivateKey, fleet string, - cfg *Config, + cfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { - nwakuConfig := WakuConfig{ - Host: cfg.Host, - Port: 30303, - NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", - EnableRelay: true, - LogLevel: "DEBUG", - } - var err error if logger == nil { logger, err = zap.NewDevelopment() @@ -1794,18 +1790,20 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, ts = timesource.Default() } - cfg = setDefaults(cfg) + /* cfg = setDefaults(cfg) if err = cfg.Validate(logger); err != nil { return nil, err - } + } */ ctx, cancel := context.WithCancel(context.Background()) - jsonConfig, err := json.Marshal(nwakuConfig) + jsonConfig, err := json.Marshal(cfg) if err != nil { return nil, err } + fmt.Println("-------- CREATING CONFIG, jsonConfig: ", string(jsonConfig)) + var cJsonConfig = C.CString(string(jsonConfig)) var resp = C.allocResp() @@ -1813,7 +1811,6 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, defer C.freeResp(resp) wakuCtx := C.cGoWakuNew(cJsonConfig, resp) - jamon = wakuCtx // Notice that the events for self node are handled by the 'MyEventCallback' method if C.getRet(resp) == C.RET_OK { @@ -1841,7 +1838,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), - sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + //sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), }, nil } @@ -2386,14 +2383,14 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { // } // MaxMessageSize returns the maximum accepted message size. -func (w *Waku) MaxMessageSize() uint32 { +/* func (w *Waku) MaxMessageSize() uint32 { return w.cfg.MaxMessageSize -} +} */ // New creates a WakuV2 client ready to communicate through the LibP2P network. func New(nodeKey *ecdsa.PrivateKey, fleet string, - cfg *Config, + cfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 9ef5a64227..be33fa8903 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -5,11 +5,8 @@ package wakuv2 import ( "context" - "crypto/rand" "errors" "fmt" - "math/big" - "os" "testing" "time" @@ -21,16 +18,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/stretchr/testify/require" - "golang.org/x/exp/maps" - "google.golang.org/protobuf/proto" - - "github.com/waku-org/go-waku/waku/v2/dnsdisc" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/status-im/status-go/protocol/tt" - "github.com/status-im/status-go/wakuv2/common" ) var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.status.nodes.status.im" @@ -174,7 +163,7 @@ func TestBasicWakuV2(t *testing.T) { require.NoError(t, err) fmt.Println("---------- GABRIEL 2 ----------") - // Creating a fake DNS Discovery ENRTree + /* // Creating a fake DNS Discovery ENRTree tree, url := makeTestTree("n", parseNodes([]string{nwakuInfo.EnrUri}), nil) fmt.Println("---------- GABRIEL 3 ----------") enrTreeAddress := url @@ -184,14 +173,27 @@ func TestBasicWakuV2(t *testing.T) { } fmt.Println("---------- GABRIEL 4 ----------") - config := &Config{} + fmt.Println("---------- enrTreeAddress: ", enrTreeAddress) */ + + // Instead of the Config type, use WakuConfig + /* config := &Config{} setDefaultConfig(config, false) config.Port = 0 config.Resolver = mapResolver(tree.ToTXT("n")) config.DiscV5BootstrapNodes = []string{enrTreeAddress} config.DiscoveryLimit = 20 - config.WakuNodes = []string{enrTreeAddress} - w, err := New(nil, "", config, nil, nil, nil, nil, nil) + config.WakuNodes = []string{enrTreeAddress} */ + + nwakuConfig := WakuConfig{ + // Host: cfg.Host, + Port: 30303, + NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", + EnableRelay: true, + LogLevel: "DEBUG", + DiscV5BootstrapNodes: []string{nwakuInfo.EnrUri}, + } + + w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil) fmt.Println("---------- GABRIEL 5 ----------") require.NoError(t, err) require.NoError(t, w.Start()) @@ -203,6 +205,7 @@ func TestBasicWakuV2(t *testing.T) { fmt.Println("---------- GABRIEL 7 ----------") + /* // DNSDiscovery ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) defer cancel() @@ -212,6 +215,8 @@ func TestBasicWakuV2(t *testing.T) { discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrTreeAddress, dnsdisc.WithResolver(config.Resolver)) require.NoError(t, err) + fmt.Println("---------- discoveredNodes: ", discoveredNodes) + fmt.Println("---------- GABRIEL 9 ----------") // Peer used for retrieving history r, err := rand.Int(rand.Reader, big.NewInt(int64(len(discoveredNodes)))) @@ -219,6 +224,11 @@ func TestBasicWakuV2(t *testing.T) { storeNode := discoveredNodes[int(r.Int64())] + fmt.Println("---------- storeNode: ", storeNode) + */ + + // storeNode := discoveredNodes[int(r.Int64())] + fmt.Println("---------- GABRIEL 10 ----------") options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 30 * time.Second @@ -236,7 +246,7 @@ func TestBasicWakuV2(t *testing.T) { fmt.Println("---------- GABRIEL 12 ----------") - // Dropping Peer + /* // Dropping Peer err = w.DropPeer(storeNode.PeerID) require.NoError(t, err) @@ -309,7 +319,7 @@ func TestBasicWakuV2(t *testing.T) { } return nil }, options) - require.NoError(t, err) + require.NoError(t, err) */ require.NoError(t, w.Stop()) } From a4ac3278f0d9130fdcec377990cb866f8a4589ac Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 8 Oct 2024 16:29:15 +0300 Subject: [PATCH 04/16] adding discv5Discovery param --- wakuv2/nwaku.go | 5 +++-- wakuv2/nwaku_test.go | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index ac6d82601c..96de43ad8b 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -1676,7 +1676,8 @@ type WakuConfig struct { DnsDiscovery string `json:"dnsDiscovery,omitempty"` DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` MaxMessageSize string `json:"maxMessageSize,omitempty"` - DiscV5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` + Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` + Discv5Discovery bool `json:"discv5Discovery,omitempty"` } type Waku struct { @@ -1834,7 +1835,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, timesource: ts, storeMsgIDsMu: sync.RWMutex{}, logger: logger, - discV5BootstrapNodes: cfg.DiscV5BootstrapNodes, + discV5BootstrapNodes: cfg.Discv5BootstrapNodes, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index be33fa8903..7bccfc2aec 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -190,7 +190,8 @@ func TestBasicWakuV2(t *testing.T) { NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", EnableRelay: true, LogLevel: "DEBUG", - DiscV5BootstrapNodes: []string{nwakuInfo.EnrUri}, + Discv5BootstrapNodes: []string{nwakuInfo.EnrUri}, + Discv5Discovery: true, } w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil) From 59f45c0178b6f8df634739f5874a2187caee6a66 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 8 Oct 2024 18:07:11 +0300 Subject: [PATCH 05/16] adding clusterId and shards --- wakuv2/nwaku.go | 2 ++ wakuv2/nwaku_test.go | 25 +++++++++++++++++-------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 96de43ad8b..5612451919 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -1678,6 +1678,8 @@ type WakuConfig struct { MaxMessageSize string `json:"maxMessageSize,omitempty"` Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` Discv5Discovery bool `json:"discv5Discovery,omitempty"` + ClusterID uint16 `json:"clusterId,omitempty"` + Shards []uint16 `json:"shards,omitempty"` } type Waku struct { diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 7bccfc2aec..49f7bf564a 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -150,11 +150,12 @@ func parseNodes(rec []string) []*enode.Node { // // Using Docker: // -// IP_ADDRESS=$(hostname -I | awk '{print $1}'); -// docker run \ -// -p 61000:61000/tcp -p 9000:9000/udp -p 8645:8645/tcp harbor.status.im/wakuorg/nwaku:v0.31.0 \ -// --tcp-port=61000 --discv5-discovery=true --cluster-id=16 --pubsub-topic=/waku/2/rs/16/32 --pubsub-topic=/waku/2/rs/16/64 \ -// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=9000 --rest-address=0.0.0.0 --rest-port=8646 --store +// IP_ADDRESS=$(ipconfig getifaddr en0) +// docker run \ +// -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \ +// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \ +// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ +// --tcp-port=61000 --rest-admin=true --shard=64 --dns-discovery=true --dns-discovery-url="/dns4/boot-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAR24Mbb6VuzoyUiGx42UenDkshENVDj4qnmmbabLvo31" func TestBasicWakuV2(t *testing.T) { fmt.Println("---------- GABRIEL 1 ----------") @@ -192,6 +193,8 @@ func TestBasicWakuV2(t *testing.T) { LogLevel: "DEBUG", Discv5BootstrapNodes: []string{nwakuInfo.EnrUri}, Discv5Discovery: true, + ClusterID: 16, + Shards: []uint16{64}, } w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil) @@ -238,10 +241,16 @@ func TestBasicWakuV2(t *testing.T) { fmt.Println("---------- GABRIEL 11 ----------") // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) < 1 { - return errors.New("no peers discovered") + + numConnected, err := w.GetNumConnectedPeers() + fmt.Println("numConnected: ", numConnected) + if err != nil { + return err } - return nil + if numConnected > 1 { + return nil + } + return errors.New("no peers discovered") }, options) require.NoError(t, err) From 675bb28b8c5795a41392b2c05da147d32e7511b3 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 9 Oct 2024 12:08:58 +0300 Subject: [PATCH 06/16] adding dnsdiscovery, staticnodes and some cleanup --- wakuv2/nwaku.go | 3 +- wakuv2/nwaku_test.go | 65 ++++---------------------------------------- 2 files changed, 8 insertions(+), 60 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 5612451919..d849688cb3 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -1673,9 +1673,10 @@ type WakuConfig struct { NodeKey string `json:"key,omitempty"` EnableRelay bool `json:"relay"` LogLevel string `json:"logLevel"` - DnsDiscovery string `json:"dnsDiscovery,omitempty"` + DnsDiscovery bool `json:"dnsDiscovery,omitempty"` DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` MaxMessageSize string `json:"maxMessageSize,omitempty"` + Staticnodes []string `json:"staticnodes,omitempty"` Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` Discv5Discovery bool `json:"discv5Discovery,omitempty"` ClusterID uint16 `json:"clusterId,omitempty"` diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 49f7bf564a..43ba4bdc4f 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -158,87 +158,35 @@ func parseNodes(rec []string) []*enode.Node { // --tcp-port=61000 --rest-admin=true --shard=64 --dns-discovery=true --dns-discovery-url="/dns4/boot-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAR24Mbb6VuzoyUiGx42UenDkshENVDj4qnmmbabLvo31" func TestBasicWakuV2(t *testing.T) { - fmt.Println("---------- GABRIEL 1 ----------") extNodeRestPort := 8646 - nwakuInfo, err := GetNwakuInfo(nil, &extNodeRestPort) + storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort) require.NoError(t, err) - fmt.Println("---------- GABRIEL 2 ----------") - - /* // Creating a fake DNS Discovery ENRTree - tree, url := makeTestTree("n", parseNodes([]string{nwakuInfo.EnrUri}), nil) - fmt.Println("---------- GABRIEL 3 ----------") - enrTreeAddress := url - envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") - if envEnrTreeAddress != "" { - enrTreeAddress = envEnrTreeAddress - } - - fmt.Println("---------- GABRIEL 4 ----------") - fmt.Println("---------- enrTreeAddress: ", enrTreeAddress) */ - // Instead of the Config type, use WakuConfig - /* config := &Config{} - setDefaultConfig(config, false) - config.Port = 0 - config.Resolver = mapResolver(tree.ToTXT("n")) - config.DiscV5BootstrapNodes = []string{enrTreeAddress} - config.DiscoveryLimit = 20 - config.WakuNodes = []string{enrTreeAddress} */ - nwakuConfig := WakuConfig{ - // Host: cfg.Host, Port: 30303, NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", EnableRelay: true, LogLevel: "DEBUG", - Discv5BootstrapNodes: []string{nwakuInfo.EnrUri}, + DnsDiscoveryUrl: "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im", + DnsDiscovery: true, Discv5Discovery: true, + Staticnodes: []string{storeNodeInfo.ListenAddresses[0]}, ClusterID: 16, Shards: []uint16{64}, } w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil) - fmt.Println("---------- GABRIEL 5 ----------") require.NoError(t, err) require.NoError(t, w.Start()) - fmt.Println("---------- GABRIEL 6 ----------") enr, err := w.ENR() require.NoError(t, err) require.NotNil(t, enr) - fmt.Println("---------- GABRIEL 7 ----------") - - /* - // DNSDiscovery - ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) - defer cancel() - - fmt.Println("---------- GABRIEL 8 ----------") - - discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrTreeAddress, dnsdisc.WithResolver(config.Resolver)) - require.NoError(t, err) - - fmt.Println("---------- discoveredNodes: ", discoveredNodes) - - fmt.Println("---------- GABRIEL 9 ----------") - // Peer used for retrieving history - r, err := rand.Int(rand.Reader, big.NewInt(int64(len(discoveredNodes)))) - require.NoError(t, err) - - storeNode := discoveredNodes[int(r.Int64())] - - fmt.Println("---------- storeNode: ", storeNode) - */ - - // storeNode := discoveredNodes[int(r.Int64())] - - fmt.Println("---------- GABRIEL 10 ----------") options := func(b *backoff.ExponentialBackOff) { b.MaxElapsedTime = 30 * time.Second } - fmt.Println("---------- GABRIEL 11 ----------") // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { @@ -247,15 +195,14 @@ func TestBasicWakuV2(t *testing.T) { if err != nil { return err } - if numConnected > 1 { + // Have to be connected to at least 3 nodes: the static node, the bootstrap node, and one discovered node + if numConnected > 2 { return nil } return errors.New("no peers discovered") }, options) require.NoError(t, err) - fmt.Println("---------- GABRIEL 12 ----------") - /* // Dropping Peer err = w.DropPeer(storeNode.PeerID) require.NoError(t, err) From 56e51d79d96c62d5df2b1029a23ee32ede34f17f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 11 Oct 2024 12:15:34 +0300 Subject: [PATCH 07/16] check for connection --- wakuv2/nwaku_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 43ba4bdc4f..806e149d6e 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -7,10 +7,13 @@ import ( "context" "errors" "fmt" + "slices" "testing" "time" "github.com/cenkalti/backoff/v3" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -203,6 +206,16 @@ func TestBasicWakuV2(t *testing.T) { }, options) require.NoError(t, err) + storeNode, err :=peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) + require.NoError(t, err) + require.NoError(t, err) + + connectedStoreNodes, err := w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) + require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") + + err = w.DropPeer(storeNode.ID) + require.NoError(t, err) + /* // Dropping Peer err = w.DropPeer(storeNode.PeerID) require.NoError(t, err) From ba45cbc8cc6456590349c489cc355a7325692de8 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 11 Oct 2024 15:51:13 +0300 Subject: [PATCH 08/16] adding disconnect functionality --- wakuv2/nwaku.go | 32 +++++++++++++++++++++++++++++--- wakuv2/nwaku_test.go | 7 ++++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index d849688cb3..336255ce48 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -182,6 +182,13 @@ package wakuv2 resp) ); } + static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { + WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, + peerId, + (WakuCallBack) callback, + resp) ); + } + static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) callback, resp) ); } @@ -2253,12 +2260,12 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) numPeers, err := strconv.Atoi(numPeersStr) if err != nil { fmt.Println(":", err) - errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() + errMsg := "GetNumConnectedPeers - error converting string to int: " + err.Error() return 0, errors.New(errMsg) } return numPeers, nil } - errMsg := "error ListPeersInMesh: " + + errMsg := "error GetNumConnectedPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return 0, errors.New(errMsg) } @@ -2273,6 +2280,9 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { if C.getRet(resp) == C.RET_OK { peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return peer.IDSlice{}, nil + } // peersStr contains a comma-separated list of peer ids itemsPeerIds := strings.Split(peersStr, ",") @@ -2280,7 +2290,7 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { for _, peer := range itemsPeerIds { id, err := peermod.Decode(peer) if err != nil { - errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() + errMsg := "GetPeerIdsByProtocol - error converting string to int: " + err.Error() return nil, errors.New(errMsg) } peers = append(peers, id) @@ -2293,6 +2303,22 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { return nil, errors.New(errMsg) } +func (self *Waku) DisconnectPeerById(peerId peer.ID) error { + var resp = C.allocResp() + var cPeerId = C.CString(peerId.String()) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + + C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DisconnectPeerById: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + // func main() { // config := WakuConfig{ diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 806e149d6e..14b17284ca 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -213,9 +213,14 @@ func TestBasicWakuV2(t *testing.T) { connectedStoreNodes, err := w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") - err = w.DropPeer(storeNode.ID) + err = w.DisconnectPeerById(storeNode.ID) require.NoError(t, err) + connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) + require.NoError(t, err) + isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) + require.True(t, isDisconnected, "nwaku should be disconnected from the store node") + /* // Dropping Peer err = w.DropPeer(storeNode.PeerID) require.NoError(t, err) From 0b5e20b00af31021815f6917e49e0e26044b57dd Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 11 Oct 2024 17:40:45 +0300 Subject: [PATCH 09/16] adding dialing to test --- wakuv2/nwaku.go | 40 +++++++++++++++++++++++++++++++++++----- wakuv2/nwaku_test.go | 19 +++++++++++++------ 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 336255ce48..05739ede0d 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -182,6 +182,20 @@ package wakuv2 resp) ); } + static void cGoWakuDialPeerById(void* wakuCtx, + char* peerId, + char* protocol, + int timeoutMs, + void* resp) { + + WAKU_CALL( waku_dial_peer_by_id(wakuCtx, + peerId, + protocol, + timeoutMs, + (WakuCallBack) callback, + resp) ); + } + static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, peerId, @@ -300,6 +314,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -1558,11 +1573,8 @@ func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { return nil } -func (w *Waku) DialPeerByID(peerID peer.ID) error { - // ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) - // defer cancel() - // return w.node.DialPeerByID(ctx, peerID) - return nil +func (w *Waku) DialPeerByID(peerId peer.ID) error { + return w.WakuDialPeerById(peerId, string(relay.WakuRelayID_v200), 1000) } func (w *Waku) DropPeer(peerID peer.ID) error { @@ -2170,6 +2182,24 @@ func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error { return errors.New(errMsg) } +func (self *Waku) WakuDialPeerById(peerId peer.ID, protocol string, timeoutMs int) error { + var resp = C.allocResp() + var cPeerId = C.CString(peerId.String()) + var cProtocol = C.CString(protocol) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + defer C.free(unsafe.Pointer(cProtocol)) + + C.cGoWakuDialPeerById(self.wakuCtx, cPeerId, cProtocol, C.int(timeoutMs), resp) + + if C.getRet(resp) == C.RET_OK { + return nil + } + errMsg := "error DialPeerById: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return errors.New(errMsg) +} + func (self *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) { var resp = C.allocResp() defer C.freeResp(resp) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 14b17284ca..71138fe9d9 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -153,12 +153,11 @@ func parseNodes(rec []string) []*enode.Node { // // Using Docker: // -// IP_ADDRESS=$(ipconfig getifaddr en0) -// docker run \ -// -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \ -// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \ -// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ -// --tcp-port=61000 --rest-admin=true --shard=64 --dns-discovery=true --dns-discovery-url="/dns4/boot-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAR24Mbb6VuzoyUiGx42UenDkshENVDj4qnmmbabLvo31" +// IP_ADDRESS=$(hostname -I | awk '{print $1}'); +// docker run \ +// -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \ +// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \ +// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ func TestBasicWakuV2(t *testing.T) { extNodeRestPort := 8646 @@ -211,6 +210,7 @@ func TestBasicWakuV2(t *testing.T) { require.NoError(t, err) connectedStoreNodes, err := w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) + require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") err = w.DisconnectPeerById(storeNode.ID) @@ -221,6 +221,13 @@ func TestBasicWakuV2(t *testing.T) { isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) require.True(t, isDisconnected, "nwaku should be disconnected from the store node") + err = w.DialPeerByID(storeNode.ID) + require.NoError(t, err) + + connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) + require.NoError(t, err) + require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") + /* // Dropping Peer err = w.DropPeer(storeNode.PeerID) require.NoError(t, err) From d1f09c1c60c2d8237cadbe956e42a3b9a7abfdaa Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 14 Oct 2024 10:36:49 +0300 Subject: [PATCH 10/16] cleanup --- wakuv2/nwaku.go | 17 ----------------- wakuv2/nwaku_test.go | 17 +---------------- 2 files changed, 1 insertion(+), 33 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 05739ede0d..227640e3e7 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -55,9 +55,7 @@ package wakuv2 // resp must be set != NULL in case interest on retrieving data from the callback static void callback(int ret, char* msg, size_t len, void* resp) { - printf("---------- GABRIEL calling callback 1 ----\n"); if (resp != NULL) { - printf("---------- GABRIEL calling callback 2 ----\n"); Resp* m = (Resp*) resp; m->ret = ret; m->msg = msg; @@ -67,7 +65,6 @@ package wakuv2 #define WAKU_CALL(call) \ do { \ - printf("---------- GABRIEL calling WAKU_CALL 1 ----\n"); \ int ret = call; \ if (ret != 0) { \ printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ @@ -1875,17 +1872,12 @@ func (self *Waku) WakuStart() error { defer C.freeResp(resp) - fmt.Println("------------ GABRIEL called wakuStart") C.cGoWakuStart(self.wakuCtx, resp) - fmt.Println("------------ GABRIEL wakuStart 2") if C.getRet(resp) == C.RET_OK { - fmt.Println("------------ GABRIEL wakuStart received RET_OK") return nil } - fmt.Println("------------ GABRIEL wakuStart 3") errMsg := "error WakuStart: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - fmt.Println("------------ GABRIEL error in wakuStart ", errMsg) return errors.New(errMsg) } @@ -2457,16 +2449,11 @@ func New(nodeKey *ecdsa.PrivateKey, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { - fmt.Println("-------- GABRIEL func New 1 ---------") - // Lock the main goroutine to its current OS thread runtime.LockOSThread() - fmt.Println("-------- GABRIEL func New 2 ---------") - WakuSetup() // This should only be called once in the whole app's life - fmt.Println("-------- GABRIEL func New 3 ---------") node, err := wakuNew(nodeKey, fleet, cfg, logger, appDB, ts, onHistoricMessagesRequestFailed, @@ -2475,21 +2462,17 @@ func New(nodeKey *ecdsa.PrivateKey, return nil, err } - fmt.Println("-------- GABRIEL func New 4 ---------") defaultPubsubTopic, err := node.WakuDefaultPubsubTopic() if err != nil { fmt.Println("Error happened:", err.Error()) } - fmt.Println("-------- GABRIEL func New 5 ---------") err = node.WakuRelaySubscribe(defaultPubsubTopic) if err != nil { fmt.Println("Error happened:", err.Error()) } - fmt.Println("-------- GABRIEL func New 6 ---------") node.WakuSetEventCallback() - fmt.Println("-------- GABRIEL func New 7 ---------") return node, nil diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 71138fe9d9..e834f6fada 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -228,22 +228,7 @@ func TestBasicWakuV2(t *testing.T) { require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") - /* // Dropping Peer - err = w.DropPeer(storeNode.PeerID) - require.NoError(t, err) - - // Dialing with peerID - err = w.DialPeerByID(storeNode.PeerID) - require.NoError(t, err) - - err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) < 1 { - return errors.New("no peers discovered") - } - return nil - }, options) - require.NoError(t, err) - + /* filter := &common.Filter{ PubsubTopic: config.DefaultShardPubsubTopic, Messages: common.NewMemoryMessageStore(), From 4b9bee467b5b89a4946d53bafad09a173332e39f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 14 Oct 2024 11:39:43 +0300 Subject: [PATCH 11/16] commenting compilation errors --- wakuv2/api.go | 24 ++++++++++++++-- wakuv2/api_test.go | 14 ++++++++-- wakuv2/message_publishing.go | 18 ++++++++++-- wakuv2/nwaku.go | 53 ++++++++++++++---------------------- 4 files changed, 70 insertions(+), 39 deletions(-) diff --git a/wakuv2/api.go b/wakuv2/api.go index 53f9f2151d..519a01ee4f 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -18,7 +18,27 @@ package wakuv2 -/* +/* import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "sync" + "time" + + "github.com/waku-org/go-waku/waku/v2/payload" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + + "github.com/status-im/status-go/wakuv2/common" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + + "google.golang.org/protobuf/proto" +) + // List of errors var ( ErrSymAsym = errors.New("specify either a symmetric or an asymmetric key") @@ -493,4 +513,4 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) { api.mu.Unlock() return id, nil -} */ +} */ \ No newline at end of file diff --git a/wakuv2/api_test.go b/wakuv2/api_test.go index 7119650baa..86d6a10515 100644 --- a/wakuv2/api_test.go +++ b/wakuv2/api_test.go @@ -18,7 +18,16 @@ package wakuv2 -/* func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { +/* import ( + "testing" + "time" + + "golang.org/x/exp/maps" + + "github.com/status-im/status-go/wakuv2/common" +) + +func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { w, err := New(nil, "", nil, nil, nil, nil, nil, nil) if err != nil { t.Fatalf("Error creating WakuV2 client: %v", err) @@ -58,5 +67,4 @@ package wakuv2 if !found { t.Fatalf("Could not find filter with both topics") } -} -*/ \ No newline at end of file +} */ \ No newline at end of file diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 49051e55ce..a4a2b21472 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,5 +1,19 @@ package wakuv2 +/* import ( + "errors" + + "go.uber.org/zap" + + "github.com/waku-org/go-waku/waku/v2/api/publish" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/wakuv2/common" +) + type PublishMethod int const ( @@ -20,7 +34,7 @@ func (pm PublishMethod) String() string { // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. -/* func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { +func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { pubsubTopic = w.GetPubsubTopic(pubsubTopic) if w.protectedTopicStore != nil { privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic) @@ -146,4 +160,4 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.Pu }) } } -} */ +} */ \ No newline at end of file diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 227640e3e7..343b1ce87c 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -332,7 +332,7 @@ const peersToPublishForLightpush = 2 const publishingLimiterRate = rate.Limit(2) const publishingLimitBurst = 4 -type SentEnvelope struct { +/* type SentEnvelope struct { Envelope *protocol.Envelope PublishMethod PublishMethod } @@ -352,7 +352,7 @@ type ITelemetryClient interface { func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) { w.statusTelemetryClient = client -} +} */ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] { cache := ttlcache.New[gethcommon.Hash, *common.ReceivedMessage](ttlcache.WithTTL[gethcommon.Hash, *common.ReceivedMessage](cacheTTL)) @@ -412,12 +412,11 @@ func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription { wg.Wait() return result, nil -} */ +} type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) -// This should be handled by nwaku? -/* func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) error { +func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) error { w.logger.Info("retrieving nodes", zap.String("enr", enrtreeAddress)) ctx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() @@ -458,10 +457,9 @@ type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) wg.Wait() return nil -} */ +} -// This too? nwaku? -/* func (w *Waku) discoverAndConnectPeers() { +func (w *Waku) discoverAndConnectPeers() { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { defer wg.Done() if len(d.PeerInfo.Addrs) != 0 { @@ -591,22 +589,22 @@ func (w *Waku) runPeerExchangeLoop() { } } } -} */ +} -/* func (w *Waku) GetPubsubTopic(topic string) string { +func (w *Waku) GetPubsubTopic(topic string) string { if topic == "" { topic = w.cfg.DefaultShardPubsubTopic } return topic -} */ +} // CurrentTime returns current time. func (w *Waku) CurrentTime() time.Time { return w.timesource.Now() } -/* // APIs returns the RPC descriptors the Waku implementation offers +// APIs returns the RPC descriptors the Waku implementation offers func (w *Waku) APIs() []rpc.API { return []rpc.API{ { @@ -869,9 +867,9 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { return nil, fmt.Errorf("non-existent key ID") } -// Subscribe installs a new message handler used for filtering, decrypting +/* // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. -/* func (w *Waku) Subscribe(f *common.Filter) (string, error) { +func (w *Waku) Subscribe(f *common.Filter) (string, error) { f.PubsubTopic = w.GetPubsubTopic(f.PubsubTopic) id, err := w.filters.Install(f) if err != nil { @@ -898,7 +896,7 @@ func (w *Waku) Unsubscribe(ctx context.Context, id string) error { } return nil -} */ +} // GetFilter returns the filter by id. func (w *Waku) GetFilter(id string) *common.Filter { @@ -917,7 +915,7 @@ func (w *Waku) UnsubscribeMany(ids []string) error { return nil } -/* func (w *Waku) SkipPublishToTopic(value bool) { +func (w *Waku) SkipPublishToTopic(value bool) { w.cfg.SkipPublishToTopic = value } @@ -1044,10 +1042,6 @@ func (w *Waku) Start() error { return err } - // ch := make(chan os.Signal, 1) - // signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - //<-ch - // if err = w.node.Start(w.ctx); err != nil { // return fmt.Errorf("failed to start go-waku node: %v", err) // } @@ -1208,21 +1202,21 @@ func (w *Waku) checkForConnectionChanges() { // }() // } -/* func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { +func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { w.poolMu.Lock() defer w.poolMu.Unlock() return w.envelopeCache.Has(gethcommon.Hash(mh)), nil } -func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { +/* func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { if !w.cfg.EnableMissingMessageVerification { return } w.missingMsgVerifier.SetCriteriaInterest(peerID, protocol.NewContentFilter(pubsubTopic, contentTopics...)) -} */ +} -/* func (w *Waku) setupRelaySubscriptions() error { +func (w *Waku) setupRelaySubscriptions() error { if w.cfg.LightClient { return nil } @@ -1338,12 +1332,12 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) } return true, nil -} +} */ // postEvent queues the message for further processing. func (w *Waku) postEvent(envelope *common.ReceivedMessage) { w.msgQueue <- envelope -} */ +} /* // processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. func (w *Waku) processQueueLoop() { @@ -1733,7 +1727,6 @@ type Waku struct { cancel context.CancelFunc wg sync.WaitGroup - // cfg *Config cfg *WakuConfig options []node.WakuNodeOption @@ -1769,7 +1762,7 @@ type Waku struct { onHistoricMessagesRequestFailed func([]byte, peer.ID, error) onPeerStats func(types.ConnStatus) - statusTelemetryClient ITelemetryClient + // statusTelemetryClient ITelemetryClient defaultShardInfo protocol.RelayShards } @@ -1822,8 +1815,6 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, return nil, err } - fmt.Println("-------- CREATING CONFIG, jsonConfig: ", string(jsonConfig)) - var cJsonConfig = C.CString(string(jsonConfig)) var resp = C.allocResp() @@ -1870,8 +1861,6 @@ func (self *Waku) WakuStart() error { var resp = C.allocResp() defer C.freeResp(resp) - - C.cGoWakuStart(self.wakuCtx, resp) if C.getRet(resp) == C.RET_OK { From cd41b1367f9838f9b2d2460af8f787c82f89f2ff Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 14 Oct 2024 11:41:53 +0300 Subject: [PATCH 12/16] deleting some tabs --- wakuv2/nwaku_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index e834f6fada..952e2700b9 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -153,11 +153,11 @@ func parseNodes(rec []string) []*enode.Node { // // Using Docker: // -// IP_ADDRESS=$(hostname -I | awk '{print $1}'); -// docker run \ -// -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \ -// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \ -// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ +// IP_ADDRESS=$(hostname -I | awk '{print $1}'); +// docker run \ +// -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \ +// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \ +// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ func TestBasicWakuV2(t *testing.T) { extNodeRestPort := 8646 From 9373407a9c2ab76f9b26a238d5eebac5b3aeab88 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 14 Oct 2024 11:55:28 +0300 Subject: [PATCH 13/16] improving comments --- wakuv2/nwaku_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 952e2700b9..ce7ef15035 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -157,7 +157,7 @@ func parseNodes(rec []string) []*enode.Node { // docker run \ // -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \ // --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \ -// --nat=extip:${IP_ADDRESS} --discv5-discovery --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ +// --nat=extip:${IP_ADDRESS} --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ func TestBasicWakuV2(t *testing.T) { extNodeRestPort := 8646 @@ -205,25 +205,31 @@ func TestBasicWakuV2(t *testing.T) { }, options) require.NoError(t, err) + // Get local store node address storeNode, err :=peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) require.NoError(t, err) + // Check that we are indeed connected to the store node connectedStoreNodes, err := w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") + // Disconnect from the store node err = w.DisconnectPeerById(storeNode.ID) require.NoError(t, err) + // Check that we are indeed disconnected connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) require.NoError(t, err) isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) require.True(t, isDisconnected, "nwaku should be disconnected from the store node") + // Re-connect err = w.DialPeerByID(storeNode.ID) require.NoError(t, err) + // Check that we are connected again connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") From 524b566165eb9c892aa97bc019aafe9873a0d98a Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Oct 2024 14:13:00 +0300 Subject: [PATCH 14/16] adding TODO comments --- wakuv2/api.go | 3 ++- wakuv2/api_test.go | 3 ++- wakuv2/message_publishing.go | 3 ++- wakuv2/nwaku.go | 35 +++++++++++++++++++++++------------ 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/wakuv2/api.go b/wakuv2/api.go index 519a01ee4f..db03bbd95a 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -18,7 +18,8 @@ package wakuv2 -/* import ( +/* TODO-nwaku +import ( "context" "crypto/ecdsa" "errors" diff --git a/wakuv2/api_test.go b/wakuv2/api_test.go index 86d6a10515..10d16a4c33 100644 --- a/wakuv2/api_test.go +++ b/wakuv2/api_test.go @@ -18,7 +18,8 @@ package wakuv2 -/* import ( +/* TODO-nwaku +import ( "testing" "time" diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index a4a2b21472..4b9d0f667e 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,6 +1,7 @@ package wakuv2 -/* import ( +/* TODO-nwaku +import ( "errors" "go.uber.org/zap" diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 343b1ce87c..40450c4afc 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -332,7 +332,8 @@ const peersToPublishForLightpush = 2 const publishingLimiterRate = rate.Limit(2) const publishingLimitBurst = 4 -/* type SentEnvelope struct { +/* TODO-nwaku +type SentEnvelope struct { Envelope *protocol.Envelope PublishMethod PublishMethod } @@ -368,7 +369,8 @@ func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription { return subscription } -/* func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { +/* TODO-nwaku +func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { wg := sync.WaitGroup{} mu := sync.Mutex{} var result []*enode.Node @@ -502,7 +504,8 @@ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origi w.WakuConnect(addr.String(), 1000) } -/* func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { +/* TODO-nwaku +func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { w.wg.Add(1) defer w.wg.Done() @@ -867,7 +870,8 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { return nil, fmt.Errorf("non-existent key ID") } -/* // Subscribe installs a new message handler used for filtering, decrypting +/* TODO-nwaku +// Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. func (w *Waku) Subscribe(f *common.Filter) (string, error) { f.PubsubTopic = w.GetPubsubTopic(f.PubsubTopic) @@ -1017,7 +1021,8 @@ func (w *Waku) Query(ctx context.Context, return nil, 0, nil } -/* // OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. +/* TODO-nwaku +// OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) } */ @@ -1208,7 +1213,8 @@ func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { return w.envelopeCache.Has(gethcommon.Hash(mh)), nil } -/* func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { +/* TODO-nwaku +func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { if !w.cfg.EnableMissingMessageVerification { return } @@ -1250,7 +1256,8 @@ func (w *Waku) setupRelaySubscriptions() error { return nil } */ -/* func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { +/* TODO-nwaku +func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { if envelope == nil { return nil } @@ -1339,7 +1346,8 @@ func (w *Waku) postEvent(envelope *common.ReceivedMessage) { w.msgQueue <- envelope } -/* // processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. +/* TODO-nwaku +// processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. func (w *Waku) processQueueLoop() { if w.ctx == nil { return @@ -1435,7 +1443,8 @@ func (w *Waku) Peers() types.PeerStats { // return FormatPeerStats(w.node) } -/* func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { +/* TODO-nwaku +func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { if w.cfg.LightClient { return nil, errors.New("only available in relay mode") } @@ -1511,7 +1520,8 @@ func (w *Waku) handleNetworkChangeFromApp(state connection.State) { // } } -/* func (w *Waku) ConnectionChanged(state connection.State) { +/* TODO-nwaku +func (w *Waku) ConnectionChanged(state connection.State) { isOnline := !state.Offline if w.cfg.LightClient { //TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114 @@ -1762,7 +1772,7 @@ type Waku struct { onHistoricMessagesRequestFailed func([]byte, peer.ID, error) onPeerStats func(types.ConnStatus) - // statusTelemetryClient ITelemetryClient + // statusTelemetryClient ITelemetryClient // TODO-nwaku defaultShardInfo protocol.RelayShards } @@ -2424,7 +2434,8 @@ func (self *Waku) DisconnectPeerById(peerId peer.ID) error { // } // MaxMessageSize returns the maximum accepted message size. -/* func (w *Waku) MaxMessageSize() uint32 { +/* TODO-nwaku +func (w *Waku) MaxMessageSize() uint32 { return w.cfg.MaxMessageSize } */ From 37d7fd4486f831f0f80042ed9fa351fd15091b67 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Oct 2024 14:16:18 +0300 Subject: [PATCH 15/16] more TODO comments --- wakuv2/nwaku.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 40450c4afc..a1d7037981 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -1813,7 +1813,8 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, ts = timesource.Default() } - /* cfg = setDefaults(cfg) + /* TODO-nwaku + cfg = setDefaults(cfg) if err = cfg.Validate(logger); err != nil { return nil, err } */ @@ -1859,7 +1860,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), - //sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + //sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), // TODO-nwaku }, nil } From 5ec906558c470d023a6e8ad36c898f102a10ee28 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Oct 2024 17:31:46 +0300 Subject: [PATCH 16/16] removing spammy log --- wakuv2/nwaku_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index ce7ef15035..8f2ead7bb0 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -6,7 +6,6 @@ package wakuv2 import ( "context" "errors" - "fmt" "slices" "testing" "time" @@ -193,7 +192,6 @@ func TestBasicWakuV2(t *testing.T) { err = tt.RetryWithBackOff(func() error { numConnected, err := w.GetNumConnectedPeers() - fmt.Println("numConnected: ", numConnected) if err != nil { return err }