diff --git a/p2p/background/router_test.go b/p2p/background/router_test.go index 450946f71..0dd33c5b7 100644 --- a/p2p/background/router_test.go +++ b/p2p/background/router_test.go @@ -8,32 +8,41 @@ import ( "time" "github.com/golang/mock/gomock" + pubsub "github.com/libp2p/go-libp2p-pubsub" libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto" libp2pHost "github.com/libp2p/go-libp2p/core/host" libp2pNetwork "github.com/libp2p/go-libp2p/core/network" libp2pPeer "github.com/libp2p/go-libp2p/core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "github.com/pokt-network/pocket/internal/testutil" "github.com/pokt-network/pocket/p2p/config" + "github.com/pokt-network/pocket/p2p/protocol" typesP2P "github.com/pokt-network/pocket/p2p/types" mock_types "github.com/pokt-network/pocket/p2p/types/mocks" "github.com/pokt-network/pocket/p2p/utils" "github.com/pokt-network/pocket/runtime/configs" "github.com/pokt-network/pocket/runtime/defaults" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" + "github.com/pokt-network/pocket/shared/messaging" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" - "github.com/stretchr/testify/require" ) // https://www.rfc-editor.org/rfc/rfc3986#section-3.2.2 const testIP6ServiceURL = "[2a00:1450:4005:802::2004]:8080" // TECHDEBT(#609): move & de-dup. -var testLocalServiceURL = fmt.Sprintf("127.0.0.1:%d", defaults.DefaultP2PPort) +var ( + testLocalServiceURL = fmt.Sprintf("127.0.0.1:%d", defaults.DefaultP2PPort) + noopHandler = func(data []byte) error { return nil } +) func TestBackgroundRouter_AddPeer(t *testing.T) { - testRouter := newTestRouter(t, nil) + testRouter := newTestRouter(t, nil, noopHandler) libp2pPStore := testRouter.host.Peerstore() // NB: assert initial state @@ -81,7 +90,7 @@ func TestBackgroundRouter_AddPeer(t *testing.T) { } func TestBackgroundRouter_RemovePeer(t *testing.T) { - testRouter := newTestRouter(t, nil) + testRouter := newTestRouter(t, nil, noopHandler) peerstore := testRouter.host.Peerstore() // NB: assert initial state @@ -114,6 +123,73 @@ func TestBackgroundRouter_RemovePeer(t *testing.T) { require.Len(t, existingPeerstoreAddrs, 1) } +func TestBackgroundRouter_Validation(t *testing.T) { + ctx := context.Background() + libp2pMockNet := mocknet.New() + + invalidWireFormatData := []byte("test message") + invalidPocketEnvelope := &anypb.Any{ + TypeUrl: "/test", + Value: invalidWireFormatData, + } + invalidPocketEnvelopeBz, err := proto.Marshal(invalidPocketEnvelope) + require.NoError(t, err) + + invalidMessages := [][]byte{ + invalidWireFormatData, + invalidPocketEnvelopeBz, + } + + receivedChan := make(chan struct{}) + + receiverPrivKey, receiverPeer := newTestPeer(t) + receiverHost := newTestHost(t, libp2pMockNet, receiverPrivKey) + receiverRouter := newRouterWithSelfPeerAndHost(t, receiverPeer, receiverHost, func(data []byte) error { + receivedChan <- struct{}{} + return nil + }) + + t.Cleanup(func() { + err := receiverRouter.Close() + require.NoError(t, err) + }) + + senderPrivKey, _ := newTestPeer(t) + senderHost := newTestHost(t, libp2pMockNet, senderPrivKey) + gossipPubsub, err := pubsub.NewGossipSub(ctx, senderHost) + require.NoError(t, err) + + err = libp2pMockNet.LinkAll() + require.NoError(t, err) + + receiverAddrInfo, err := utils.Libp2pAddrInfoFromPeer(receiverPeer) + require.NoError(t, err) + + err = senderHost.Connect(ctx, receiverAddrInfo) + require.NoError(t, err) + + topic, err := gossipPubsub.Join(protocol.BackgroundTopicStr) + require.NoError(t, err) + + for _, invalidMessageBz := range invalidMessages { + err = topic.Publish(ctx, invalidMessageBz) + require.NoError(t, err) + } + + select { + case <-time.After(time.Second * 2): + // TECHDEBT: find a better way to prove that pre-propagation validation + // works as expected. Ideally, we should be able to distinguish which + // invalid message was received in the event of failure. + // + // CONSIDERATION: we could use the telemetry module mock to set expectations + // for validation failure telemetry calls, which would probably be useful in + // their own right. + case <-receivedChan: + t.Fatal("expected message to not be received") + } +} + func TestBackgroundRouter_Broadcast(t *testing.T) { const ( numPeers = 4 @@ -138,17 +214,31 @@ func TestBackgroundRouter_Broadcast(t *testing.T) { libp2pMockNet = mocknet.New() ) - // setup 4 libp2p hosts to listen for incoming streams from the test backgroundRouter + testPocketEnvelope, err := messaging.PackMessage(&anypb.Any{ + TypeUrl: "/test", + Value: []byte(testMsg), + }) + require.NoError(t, err) + + testPocketEnvelopeBz, err := proto.Marshal(testPocketEnvelope) + require.NoError(t, err) + + // setup 4 receiver routers to listen for incoming messages from the sender router for i := 0; i < numPeers; i++ { broadcastWaitgroup.Add(1) bootstrapWaitgroup.Add(1) - privKey, selfPeer := newTestPeer(t) + privKey, peer := newTestPeer(t) host := newTestHost(t, libp2pMockNet, privKey) testHosts = append(testHosts, host) expectedPeerIDs[i] = host.ID().String() - rtr := newRouterWithSelfPeerAndHost(t, selfPeer, host) - go readSubscription(t, ctx, &broadcastWaitgroup, rtr, &seenMessagesMutext, seenMessages) + newRouterWithSelfPeerAndHost(t, peer, host, func(data []byte) error { + seenMessagesMutext.Lock() + broadcastWaitgroup.Done() + seenMessages[host.ID().String()] = struct{}{} + seenMessagesMutext.Unlock() + return nil + }) } // bootstrap off of arbitrary testHost @@ -156,12 +246,12 @@ func TestBackgroundRouter_Broadcast(t *testing.T) { // set up a test backgroundRouter testRouterHost := newTestHost(t, libp2pMockNet, privKey) - testRouter := newRouterWithSelfPeerAndHost(t, selfPeer, testRouterHost) + testRouter := newRouterWithSelfPeerAndHost(t, selfPeer, testRouterHost, noopHandler) testHosts = append(testHosts, testRouterHost) // simulate network links between each to every other // (i.e. fully-connected network) - err := libp2pMockNet.LinkAll() + err = libp2pMockNet.LinkAll() require.NoError(t, err) // setup notifee/notify BEFORE bootstrapping @@ -189,7 +279,7 @@ func TestBackgroundRouter_Broadcast(t *testing.T) { // broadcast message t.Log("broadcasting...") - err := testRouter.Broadcast([]byte(testMsg)) + err := testRouter.Broadcast(testPocketEnvelopeBz) require.NoError(t, err) // wait for broadcast to be received by all peers @@ -241,7 +331,7 @@ func bootstrap(t *testing.T, ctx context.Context, testHosts []libp2pHost.Host) { } // TECHDEBT(#609): move & de-duplicate -func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet) *backgroundRouter { +func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet, handler typesP2P.MessageHandler) *backgroundRouter { t.Helper() privKey, selfPeer := newTestPeer(t) @@ -256,10 +346,10 @@ func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet) *backgroundRoute require.NoError(t, err) }) - return newRouterWithSelfPeerAndHost(t, selfPeer, host) + return newRouterWithSelfPeerAndHost(t, selfPeer, host, handler) } -func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host libp2pHost.Host) *backgroundRouter { +func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host libp2pHost.Host, handler typesP2P.MessageHandler) *backgroundRouter { t.Helper() ctrl := gomock.NewController(t) @@ -268,7 +358,7 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib P2P: &configs.P2PConfig{ IsClientOnly: false, }, - }) + }).AnyTimes() consensusMock := mockModules.NewMockConsensusModule(ctrl) consensusMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes() @@ -289,6 +379,7 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib PeerstoreProvider: pstoreProviderMock, CurrentHeightProvider: consensusMock, Host: host, + Handler: handler, }) require.NoError(t, err) @@ -345,31 +436,3 @@ func newTestHost(t *testing.T, mockNet mocknet.Mocknet, privKey cryptoPocket.Pri // construct mock host return newMockNetHostFromPeer(t, mockNet, privKey, peer) } - -func readSubscription( - t *testing.T, - ctx context.Context, - broadcastWaitGroup *sync.WaitGroup, - rtr *backgroundRouter, - mu *sync.Mutex, - seenMsgs map[string]struct{}, -) { - t.Helper() - - for { - if err := ctx.Err(); err != nil { - if err != context.Canceled || err != context.DeadlineExceeded { - require.NoError(t, err) - } - return - } - - _, err := rtr.subscription.Next(ctx) - require.NoError(t, err) - - mu.Lock() - broadcastWaitGroup.Done() - seenMsgs[rtr.host.ID().String()] = struct{}{} - mu.Unlock() - } -} diff --git a/p2p/module.go b/p2p/module.go index 1c06e5f78..06ad1dc30 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -67,20 +67,6 @@ func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, e return new(p2pModule).Create(bus, options...) } -// TODO_THIS_COMMIT: rename (WithHost) & consider moving to testutil file -// WithHostOption associates an existing (i.e. "started") libp2p `host.Host` -// with this module, instead of creating a new one on `#Start()`. -// Primarily intended for testing. -func WithHostOption(host libp2pHost.Host) modules.ModuleOption { - return func(m modules.InitializableModule) { - mod, ok := m.(*p2pModule) - if ok { - mod.host = host - mod.logger.Debug().Msg("using host provided via `WithHostOption`") - } - } -} - func (m *p2pModule) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) { logger.Global.Debug().Msg("Creating P2P module") *m = p2pModule{ @@ -157,7 +143,7 @@ func (m *p2pModule) Start() (err error) { telemetry.P2P_NODE_STARTED_TIMESERIES_METRIC_DESCRIPTION, ) - // Return early if host has already been started (e.g. via `WithHostOption`) + // Return early if host has already been started (e.g. via `WithHost`) if m.host == nil { // Libp2p hosts provided via `WithHost()` option are destroyed when // `#Stop()`ing the module. Therefore, a new one must be created. diff --git a/p2p/module_raintree_test.go b/p2p/module_raintree_test.go index 9bd873913..3dc98a988 100644 --- a/p2p/module_raintree_test.go +++ b/p2p/module_raintree_test.go @@ -9,18 +9,14 @@ import ( "regexp" "sort" "strconv" - "strings" "sync" "testing" - libp2pNetwork "github.com/libp2p/go-libp2p/core/network" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/anypb" "github.com/pokt-network/pocket/internal/testutil" - "github.com/pokt-network/pocket/p2p/protocol" - "github.com/pokt-network/pocket/p2p/raintree" ) // TODO(#314): Add the tooling and instructions on how to generate unit tests in this file. @@ -220,11 +216,13 @@ func TestRainTreeNetworkCompleteTwentySevenNodes(t *testing.T) { // 1. It creates and configures a "real" P2P module where all the other components of the node are mocked. // 2. It then triggers a single message and waits for all of the expected messages transmission to complete before announcing failure. func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig TestNetworkSimulationConfig) { + var readWriteWaitGroup sync.WaitGroup + // Configure & prepare test module numValidators := len(networkSimulationConfig) runtimeConfigs := createMockRuntimeMgrs(t, numValidators) genesisMock := runtimeConfigs[0].GetGenesis() - busMocks := createMockBuses(t, runtimeConfigs) + busMocks := createMockBuses(t, runtimeConfigs, &readWriteWaitGroup) valIds := make([]string, 0, numValidators) for valId := range networkSimulationConfig { @@ -241,7 +239,6 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te // Create connection and bus mocks along with a shared WaitGroup to track the number of expected // reads and writes throughout the mocked local network - var wg sync.WaitGroup for i, valId := range valIds { expectedCall := networkSimulationConfig[valId] expectedReads := expectedCall.numNetworkReads @@ -249,50 +246,41 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te log.Printf("[valId: %s] expected reads: %d\n", valId, expectedReads) log.Printf("[valId: %s] expected writes: %d\n", valId, expectedWrites) - wg.Add(expectedReads) - wg.Add(expectedWrites) + readWriteWaitGroup.Add(expectedReads) + readWriteWaitGroup.Add(expectedWrites) persistenceMock := preparePersistenceMock(t, busMocks[i], genesisMock) consensusMock := prepareConsensusMock(t, busMocks[i]) - telemetryMock := prepareTelemetryMock(t, busMocks[i], valId, &wg, expectedWrites) + telemetryMock := prepareTelemetryMock(t, busMocks[i], valId, &readWriteWaitGroup, expectedWrites) prepareBusMock(busMocks[i], persistenceMock, consensusMock, telemetryMock) } libp2pMockNet := mocknet.New() - defer func() { - err := libp2pMockNet.Close() - require.NoError(t, err) - }() // Inject the connection and bus mocks into the P2P modules p2pModules := createP2PModules(t, busMocks, libp2pMockNet) - for serviceURL, p2pMod := range p2pModules { + for _, p2pMod := range p2pModules { err := p2pMod.Start() require.NoError(t, err) - - sURL := strings.Clone(serviceURL) - mod := *p2pMod - p2pMod.host.SetStreamHandler(protocol.PoktProtocolID, func(stream libp2pNetwork.Stream) { - log.Printf("[valID: %s] Read\n", sURL) - (&mod).router.(*raintree.RainTreeRouter).HandleStream(stream) - wg.Done() - }) } // Wait for completion - defer waitForNetworkSimulationCompletion(t, &wg) + defer waitForNetworkSimulationCompletion(t, &readWriteWaitGroup) t.Cleanup(func() { // Stop all p2p modules for _, p2pMod := range p2pModules { err := p2pMod.Stop() require.NoError(t, err) } + + err := libp2pMockNet.Close() + require.NoError(t, err) }) // Send the first message (by the originator) to trigger a RainTree broadcast - p := &anypb.Any{} + p := &anypb.Any{TypeUrl: "test"} p2pMod := p2pModules[origNode] require.NoError(t, p2pMod.Broadcast(p)) } diff --git a/p2p/module_test.go b/p2p/module_test.go index 79cd17066..45477bd1d 100644 --- a/p2p/module_test.go +++ b/p2p/module_test.go @@ -111,7 +111,7 @@ func Test_Create_configureBootstrapNodes(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) mockRuntimeMgr := mockModules.NewMockRuntimeMgr(ctrl) - mockBus := createMockBus(t, mockRuntimeMgr) + mockBus := createMockBus(t, mockRuntimeMgr, nil) genesisStateMock := createMockGenesisState(keys) persistenceMock := preparePersistenceMock(t, mockBus, genesisStateMock) @@ -137,7 +137,7 @@ func Test_Create_configureBootstrapNodes(t *testing.T) { } host := newLibp2pMockNetHost(t, privKey, peer) - p2pMod, err := Create(mockBus, WithHostOption(host)) + p2pMod, err := Create(mockBus, WithHost(host)) if (err != nil) != tt.wantErr { t.Errorf("p2pModule.Create() error = %v, wantErr %v", err, tt.wantErr) } @@ -155,7 +155,7 @@ func TestP2pModule_WithHostOption_Restart(t *testing.T) { privKey := cryptoPocket.GetPrivKeySeed(1) mockRuntimeMgr := mockModules.NewMockRuntimeMgr(ctrl) - mockBus := createMockBus(t, mockRuntimeMgr) + mockBus := createMockBus(t, mockRuntimeMgr, nil) genesisStateMock := createMockGenesisState(nil) persistenceMock := preparePersistenceMock(t, mockBus, genesisStateMock) @@ -184,7 +184,7 @@ func TestP2pModule_WithHostOption_Restart(t *testing.T) { } mockNetHost := newLibp2pMockNetHost(t, privKey, peer) - p2pMod, err := Create(mockBus, WithHostOption(mockNetHost)) + p2pMod, err := Create(mockBus, WithHost(mockNetHost)) require.NoError(t, err) mod, ok := p2pMod.(*p2pModule) diff --git a/p2p/raintree/peers_manager_test.go b/p2p/raintree/peers_manager_test.go index 7fa01a3d3..e3c80d541 100644 --- a/p2p/raintree/peers_manager_test.go +++ b/p2p/raintree/peers_manager_test.go @@ -26,6 +26,8 @@ const ( addrAlphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ[" ) +var noopHandler = func(_ []byte) error { return nil } + type ExpectedRainTreeRouterConfig struct { numNodes int numExpectedLevels int @@ -101,6 +103,7 @@ func TestRainTree_Peerstore_HandleUpdate(t *testing.T) { Addr: pubKey.Address(), PeerstoreProvider: pstoreProviderMock, CurrentHeightProvider: currentHeightProviderMock, + Handler: noopHandler, } router, err := NewRainTreeRouter(mockBus, rtCfg) @@ -168,6 +171,7 @@ func BenchmarkPeerstoreUpdates(b *testing.B) { Addr: pubKey.Address(), PeerstoreProvider: pstoreProviderMock, CurrentHeightProvider: currentHeightProviderMock, + Handler: noopHandler, } router, err := NewRainTreeRouter(mockBus, rtCfg) @@ -293,6 +297,7 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM Addr: []byte{expectedMsgProp.orig}, PeerstoreProvider: pstoreProviderMock, CurrentHeightProvider: currentHeightProviderMock, + Handler: noopHandler, } router, err := NewRainTreeRouter(busMock, rtCfg) diff --git a/p2p/raintree/router_test.go b/p2p/raintree/router_test.go index 1ef093a20..2865a584b 100644 --- a/p2p/raintree/router_test.go +++ b/p2p/raintree/router_test.go @@ -57,6 +57,7 @@ func TestRainTreeRouter_AddPeer(t *testing.T) { Addr: selfAddr, PeerstoreProvider: peerstoreProviderMock, CurrentHeightProvider: currentHeightProviderMock, + Handler: noopHandler, } router, err := NewRainTreeRouter(busMock, rtCfg) @@ -119,6 +120,7 @@ func TestRainTreeRouter_RemovePeer(t *testing.T) { Addr: selfAddr, PeerstoreProvider: peerstoreProviderMock, CurrentHeightProvider: currentHeightProviderMock, + Handler: noopHandler, } router, err := NewRainTreeRouter(busMock, rtCfg) diff --git a/p2p/testutil.go b/p2p/testutil.go new file mode 100644 index 000000000..ee922c634 --- /dev/null +++ b/p2p/testutil.go @@ -0,0 +1,40 @@ +//go:build test + +package p2p + +import ( + libp2pHost "github.com/libp2p/go-libp2p/core/host" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/pokt-network/pocket/shared/modules" +) + +// WithHost associates an existing (i.e. "started") libp2p `host.Host` +// with this module, instead of creating a new one on `#Start()`. +// Primarily intended for testing. +func WithHost(host libp2pHost.Host) modules.ModuleOption { + return func(m modules.InitializableModule) { + mod, ok := m.(*p2pModule) + if ok { + mod.host = host + mod.logger.Debug().Msg("using host provided via `WithHost`") + } + } +} + +func WithUnstakedActorRouter(router typesP2P.Router) modules.ModuleOption { + return func(m modules.InitializableModule) { + mod, ok := m.(*p2pModule) + if ok { + mod.unstakedActorRouter = router + } + } +} + +func WithStakedActorRouter(router typesP2P.Router) modules.ModuleOption { + return func(m modules.InitializableModule) { + mod, ok := m.(*p2pModule) + if ok { + mod.stakedActorRouter = router + } + } +} diff --git a/p2p/transport_encryption_test.go b/p2p/transport_encryption_test.go index d95cb6496..799340a17 100644 --- a/p2p/transport_encryption_test.go +++ b/p2p/transport_encryption_test.go @@ -14,6 +14,7 @@ import ( "github.com/pokt-network/pocket/internal/testutil" "github.com/pokt-network/pocket/p2p/protocol" typesP2P "github.com/pokt-network/pocket/p2p/types" + mock_types "github.com/pokt-network/pocket/p2p/types/mocks" "github.com/pokt-network/pocket/p2p/utils" "github.com/pokt-network/pocket/runtime/configs" "github.com/pokt-network/pocket/runtime/configs/types" @@ -23,7 +24,7 @@ import ( mockModules "github.com/pokt-network/pocket/shared/modules/mocks" ) -func TestP2pModule_Insecure_Error(t *testing.T) { +func TestP2pModule_RainTreeRouter_Insecure_Error(t *testing.T) { // TECHDEBT(#609): refactor mock setup with similar test utilities. ctrl := gomock.NewController(t) hostname := "127.0.0.1" @@ -54,7 +55,7 @@ func TestP2pModule_Insecure_Error(t *testing.T) { telemetryMock.EXPECT().GetEventMetricsAgent().Return(eventMetricsAgentMock).AnyTimes() telemetryMock.EXPECT().GetModuleName().Return(modules.TelemetryModuleName).AnyTimes() - busMock := createMockBus(t, runtimeMgrMock) + busMock := createMockBus(t, runtimeMgrMock, nil) busMock.EXPECT().GetConsensusModule().Return(mockConsensusModule).AnyTimes() busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes() busMock.EXPECT().GetTelemetryModule().Return(telemetryMock).AnyTimes() @@ -73,7 +74,10 @@ func TestP2pModule_Insecure_Error(t *testing.T) { dnsDone := testutil.PrepareDNSMockFromServiceURLs(t, serviceURLs) t.Cleanup(dnsDone) - p2pMod, err := Create(busMock) + routerMock := mock_types.NewMockRouter(ctrl) + routerMock.EXPECT().Close().Times(1) + + p2pMod, err := Create(busMock, WithUnstakedActorRouter(routerMock)) require.NoError(t, err) err = p2pMod.Start() @@ -114,6 +118,6 @@ func TestP2pModule_Insecure_Error(t *testing.T) { require.NoError(t, err) ctx := context.Background() - _, err = clearNode.NewStream(ctx, libp2pPeerInfo.ID, protocol.PoktProtocolID) + _, err = clearNode.NewStream(ctx, libp2pPeerInfo.ID, protocol.RaintreeProtocolID) require.ErrorContains(t, err, "failed to negotiate security protocol: protocols not supported:") } diff --git a/p2p/utils_test.go b/p2p/utils_test.go index 633ea1425..b9f89fcf2 100644 --- a/p2p/utils_test.go +++ b/p2p/utils_test.go @@ -20,6 +20,7 @@ import ( "github.com/pokt-network/pocket/p2p/providers/current_height_provider" "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" typesP2P "github.com/pokt-network/pocket/p2p/types" + mock_types "github.com/pokt-network/pocket/p2p/types/mocks" "github.com/pokt-network/pocket/p2p/utils" "github.com/pokt-network/pocket/runtime" "github.com/pokt-network/pocket/runtime/configs" @@ -29,6 +30,7 @@ import ( "github.com/pokt-network/pocket/runtime/test_artifacts" coreTypes "github.com/pokt-network/pocket/shared/core/types" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" + "github.com/pokt-network/pocket/shared/messaging" "github.com/pokt-network/pocket/shared/modules" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" "github.com/pokt-network/pocket/telemetry" @@ -107,10 +109,20 @@ func waitForNetworkSimulationCompletion(t *testing.T, wg *sync.WaitGroup) { func createP2PModules(t *testing.T, busMocks []*mockModules.MockBus, netMock mocknet.Mocknet) (p2pModules map[string]*p2pModule) { peerIDs := setupMockNetPeers(t, netMock, len(busMocks)) + ctrl := gomock.NewController(t) + backgroundRouterMock := mock_types.NewMockRouter(ctrl) + backgroundRouterMock.EXPECT().Broadcast(gomock.Any()).Times(1) + backgroundRouterMock.EXPECT().Close().Times(len(busMocks)) + p2pModules = make(map[string]*p2pModule, len(busMocks)) for i := range busMocks { host := netMock.Host(peerIDs[i]) - p2pMod, err := Create(busMocks[i], WithHostOption(host)) + p2pMod, err := Create( + busMocks[i], + WithHost(host), + // mock background router to prevent background message propagation. + WithUnstakedActorRouter(backgroundRouterMock), + ) require.NoError(t, err) p2pModules[validatorId(i+1)] = p2pMod.(*p2pModule) } @@ -180,15 +192,23 @@ func createMockRuntimeMgrs(t *testing.T, numValidators int) []modules.RuntimeMgr return mockRuntimeMgrs } -func createMockBuses(t *testing.T, runtimeMgrs []modules.RuntimeMgr) []*mockModules.MockBus { +func createMockBuses( + t *testing.T, + runtimeMgrs []modules.RuntimeMgr, + readWriteWaitGroup *sync.WaitGroup, +) []*mockModules.MockBus { mockBuses := make([]*mockModules.MockBus, len(runtimeMgrs)) for i := range mockBuses { - mockBuses[i] = createMockBus(t, runtimeMgrs[i]) + mockBuses[i] = createMockBus(t, runtimeMgrs[i], readWriteWaitGroup) } return mockBuses } -func createMockBus(t *testing.T, runtimeMgr modules.RuntimeMgr) *mockModules.MockBus { +func createMockBus( + t *testing.T, + runtimeMgr modules.RuntimeMgr, + readWriteWaitGroup *sync.WaitGroup, +) *mockModules.MockBus { ctrl := gomock.NewController(t) mockBus := mockModules.NewMockBus(ctrl) mockBus.EXPECT().GetRuntimeMgr().Return(runtimeMgr).AnyTimes() @@ -199,6 +219,14 @@ func createMockBus(t *testing.T, runtimeMgr modules.RuntimeMgr) *mockModules.Moc mockModulesRegistry.EXPECT().GetModule(peerstore_provider.ModuleName).Return(nil, runtime.ErrModuleNotRegistered(peerstore_provider.ModuleName)).AnyTimes() mockModulesRegistry.EXPECT().GetModule(current_height_provider.ModuleName).Return(nil, runtime.ErrModuleNotRegistered(current_height_provider.ModuleName)).AnyTimes() mockBus.EXPECT().GetModulesRegistry().Return(mockModulesRegistry).AnyTimes() + mockBus.EXPECT().PublishEventToBus(gomock.AssignableToTypeOf(&messaging.PocketEnvelope{})). + Do(func(envelope *messaging.PocketEnvelope) { + fmt.Println("[valId: unknown] Read") + fmt.Printf("content type: %s\n", envelope.Content.GetTypeUrl()) + if readWriteWaitGroup != nil { + readWriteWaitGroup.Done() + } + }).AnyTimes() // TODO: specific times mockBus.EXPECT().PublishEventToBus(gomock.Any()).AnyTimes() return mockBus } @@ -313,11 +341,32 @@ func prepareEventMetricsAgentMock(t *testing.T, valId string, wg *sync.WaitGroup ctrl := gomock.NewController(t) eventMetricsAgentMock := mockModules.NewMockEventMetricsAgent(ctrl) - eventMetricsAgentMock.EXPECT().EmitEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - eventMetricsAgentMock.EXPECT().EmitEvent(gomock.Any(), gomock.Any(), gomock.Eq(telemetry.P2P_RAINTREE_MESSAGE_EVENT_METRIC_SEND_LABEL), gomock.Any()).Do(func(n, e any, l ...any) { + // DISCUSS_THIS_COMMIT: The number of times each telemetry event is expected + // (below) is dependent on the number of redundant messages all validators see, + // which is a function of the network size. Until this function is derived and + // implemented, we cannot predict the number of times each event is expected. + _ = expectedNumNetworkWrites + + eventMetricsAgentMock.EXPECT().EmitEvent( + gomock.Any(), + gomock.Any(), + gomock.Eq(telemetry.P2P_RAINTREE_MESSAGE_EVENT_METRIC_SEND_LABEL), + gomock.Any(), + ).Do(func(n, e any, l ...any) { + log.Printf("[valId: %s] Write\n", valId) + wg.Done() + }).AnyTimes() // TECHDEBT: expect specific number of non-redundant writes once known. + eventMetricsAgentMock.EXPECT().EmitEvent( + gomock.Any(), + gomock.Eq(telemetry.P2P_BROADCAST_MESSAGE_REDUNDANCY_PER_BLOCK_EVENT_METRIC_NAME), + gomock.Any(), + gomock.Any(), // nonce + gomock.Any(), + gomock.Any(), // blockHeight + ).Do(func(n, e any, l ...any) { log.Printf("[valId: %s] Write\n", valId) wg.Done() - }).Times(expectedNumNetworkWrites) + }).AnyTimes() // TECHDEBT: expect specific number of redundant writes once known. eventMetricsAgentMock.EXPECT().EmitEvent(gomock.Any(), gomock.Any(), gomock.Not(telemetry.P2P_RAINTREE_MESSAGE_EVENT_METRIC_SEND_LABEL), gomock.Any()).AnyTimes() return eventMetricsAgentMock