diff --git a/p2p/CHANGELOG.md b/p2p/CHANGELOG.md index 41993167e..1d9ab2593 100644 --- a/p2p/CHANGELOG.md +++ b/p2p/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.45] - 2023-04-24 + +- Refactored P2P bootstrapping to its own go routine +- Always bootstrap peerstore to discover non-staked P2P participants + ## [0.0.0.44] - 2023-04-20 - Refactor `mockdns` test helpers diff --git a/p2p/bootstrap.go b/p2p/bootstrap.go index bd49ea2fd..e515e74c4 100644 --- a/p2p/bootstrap.go +++ b/p2p/bootstrap.go @@ -10,10 +10,10 @@ import ( "strings" rpcCHP "github.com/pokt-network/pocket/p2p/providers/current_height_provider/rpc" - rpcABP "github.com/pokt-network/pocket/p2p/providers/peerstore_provider/rpc" - typesP2P "github.com/pokt-network/pocket/p2p/types" + rpcPSP "github.com/pokt-network/pocket/p2p/providers/peerstore_provider/rpc" "github.com/pokt-network/pocket/rpc" "github.com/pokt-network/pocket/runtime/defaults" + "github.com/pokt-network/pocket/shared/utils" ) // configureBootstrapNodes parses the bootstrap nodes from the config and validates them @@ -43,37 +43,54 @@ func (m *p2pModule) configureBootstrapNodes() error { // bootstrap attempts to bootstrap from a bootstrap node func (m *p2pModule) bootstrap() error { - var pstore typesP2P.Peerstore + limiter := utils.NewLimiter(int(m.cfg.MaxBootstrapConcurrency)) - for _, bootstrapNode := range m.bootstrapNodes { - m.logger.Info().Str("endpoint", bootstrapNode).Msg("Attempting to bootstrap from bootstrap node") + for _, serviceURL := range m.bootstrapNodes { + // concurrent bootstrapping + // TECHDEBT(#595): add ctx to interface methods and propagate down. + limiter.Go(context.TODO(), func() { + m.bootstrapFromRPC(strings.Clone(serviceURL)) + }) + } - client, err := rpc.NewClientWithResponses(bootstrapNode) - if err != nil { - continue - } - healthCheck, err := client.GetV1Health(context.TODO()) - if err != nil || healthCheck == nil || healthCheck.StatusCode != http.StatusOK { - m.logger.Warn().Str("bootstrapNode", bootstrapNode).Msg("Error getting a green health check from bootstrap node") - continue - } + limiter.Close() - pstoreProvider := rpcABP.NewRPCPeerstoreProvider( - rpcABP.WithP2PConfig( - m.GetBus().GetRuntimeMgr().GetConfig().P2P, - ), - rpcABP.WithCustomRPCURL(bootstrapNode), - ) + return nil +} - currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider(rpcCHP.WithCustomRPCURL(bootstrapNode)) +// bootstrapFromRPC fetches the peerstore of the peer at `serviceURL` via RPC +// and adds it to this host's peerstore after performing a health check. +// TECHDEBT(SOLID): refactor; this method has more than one reason to change +func (m *p2pModule) bootstrapFromRPC(serviceURL string) { + m.logger.Info().Str("endpoint", serviceURL).Msg("Attempting to bootstrap from bootstrap node") - pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(currentHeightProvider.CurrentHeight()) - if err != nil { - m.logger.Warn().Err(err).Str("endpoint", bootstrapNode).Msg("Error getting address book from bootstrap node") - continue - } + client, err := rpc.NewClientWithResponses(serviceURL) + if err != nil { + return + } + healthCheck, err := client.GetV1Health(context.TODO()) + if err != nil || healthCheck == nil || healthCheck.StatusCode != http.StatusOK { + m.logger.Warn().Str("serviceURL", serviceURL).Msg("Error getting a green health check from bootstrap node") + return } + // fetch `serviceURL`'s peerstore + pstoreProvider := rpcPSP.NewRPCPeerstoreProvider( + rpcPSP.WithP2PConfig( + m.GetBus().GetRuntimeMgr().GetConfig().P2P, + ), + rpcPSP.WithCustomRPCURL(serviceURL), + ) + + currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider(rpcCHP.WithCustomRPCURL(serviceURL)) + + pstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(currentHeightProvider.CurrentHeight()) + if err != nil { + m.logger.Warn().Err(err).Str("endpoint", serviceURL).Msg("Error getting address book from bootstrap node") + return + } + + // add `serviceURL`'s peers to this node's peerstore for _, peer := range pstore.GetPeerList() { m.logger.Debug().Str("address", peer.GetAddress().String()).Msg("Adding peer to network") if err := m.network.AddPeer(peer); err != nil { @@ -82,11 +99,6 @@ func (m *p2pModule) bootstrap() error { Msg("adding peer") } } - - if m.network.GetPeerstore().Size() == 0 { - return fmt.Errorf("bootstrap failed") - } - return nil } func isValidHostnamePort(str string) bool { diff --git a/p2p/event_handler.go b/p2p/event_handler.go index 7ea596eef..91af7c222 100644 --- a/p2p/event_handler.go +++ b/p2p/event_handler.go @@ -2,7 +2,6 @@ package p2p import ( "fmt" - "github.com/pokt-network/pocket/shared/codec" coreTypes "github.com/pokt-network/pocket/shared/core/types" "github.com/pokt-network/pocket/shared/messaging" @@ -50,13 +49,12 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error { m.logger.Debug().Fields(messaging.TransitionEventToMap(stateMachineTransitionEvent)).Msg("Received state machine transition event") if stateMachineTransitionEvent.NewState == string(coreTypes.StateMachineState_P2P_Bootstrapping) { - if m.network.GetPeerstore().Size() == 0 { - m.logger.Warn().Msg("No peers in addrbook, bootstrapping") + m.logger.Info().Msg("Bootstrapping...") - if err := m.bootstrap(); err != nil { - return err - } + if err := m.bootstrap(); err != nil { + return err } + m.logger.Info().Bool("TODO", true).Msg("Advertise self to network") if err := m.GetBus().GetStateMachineModule().SendEvent(coreTypes.StateMachineEvent_P2P_IsBootstrapped); err != nil { return err diff --git a/p2p/module.go b/p2p/module.go index 7f97a418e..590ecad2a 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "strconv" "time" "github.com/libp2p/go-libp2p" @@ -305,7 +306,7 @@ func (m *p2pModule) setupNetwork() (err error) { return err } -// setupHost creates a new libp2p host and assignes it to `m.host`. Libp2p host +// setupHost creates a new libp2p host and assigns it to `m.host`. Libp2p host // starts listening upon instantiation. func (m *p2pModule) setupHost() (err error) { m.logger.Debug().Msg("creating new libp2p host") @@ -336,9 +337,9 @@ func (m *p2pModule) setupHost() (err error) { } // TECHDEBT(#609): use `StringArrayLogMarshaler` post test-utilities refactor. - addrStrs := make(map[int]string) + addrStrs := make(map[string]string) for i, addr := range libp2pHost.InfoFromHost(m.host).Addrs { - addrStrs[i] = addr.String() + addrStrs[strconv.Itoa(i)] = addr.String() } m.logger.Info().Fields(addrStrs).Msg("Listening for incoming connections...") return nil diff --git a/p2p/module_test.go b/p2p/module_test.go index 9a8cc1e0c..e6f6045a5 100644 --- a/p2p/module_test.go +++ b/p2p/module_test.go @@ -37,13 +37,13 @@ func Test_Create_configureBootstrapNodes(t *testing.T) { wantErr bool }{ { - name: "unset boostrap nodes should yield no error and return DefaultP2PBootstrapNodes", + name: "unset bootstrap nodes should yield no error and return DefaultP2PBootstrapNodes", args: args{}, wantErr: false, wantBootstrapNodes: defaultBootstrapNodes, }, { - name: "empty string boostrap nodes should yield no error and return DefaultP2PBootstrapNodes", + name: "empty string bootstrap nodes should yield no error and return DefaultP2PBootstrapNodes", args: args{ initialBootstrapNodesCsv: "", }, @@ -51,7 +51,7 @@ func Test_Create_configureBootstrapNodes(t *testing.T) { wantBootstrapNodes: defaultBootstrapNodes, }, { - name: "untrimmed empty string boostrap nodes should yield no error and return DefaultP2PBootstrapNodes", + name: "untrimmed empty string bootstrap nodes should yield no error and return DefaultP2PBootstrapNodes", args: args{ initialBootstrapNodesCsv: " ", }, @@ -59,7 +59,7 @@ func Test_Create_configureBootstrapNodes(t *testing.T) { wantBootstrapNodes: defaultBootstrapNodes, }, { - name: "untrimmed string boostrap nodes should yield no error and return the trimmed urls", + name: "untrimmed string bootstrap nodes should yield no error and return the trimmed urls", args: args{ initialBootstrapNodesCsv: " http://somenode:50832 , http://someothernode:50832 ", }, diff --git a/runtime/configs/config.go b/runtime/configs/config.go index a4f35ac4e..0da9ec05e 100644 --- a/runtime/configs/config.go +++ b/runtime/configs/config.go @@ -127,10 +127,11 @@ func NewDefaultConfig(options ...func(*Config)) *Config { BlockStorePath: defaults.DefaultPersistenceBlockStorePath, }, P2P: &P2PConfig{ - Port: defaults.DefaultP2PPort, - UseRainTree: defaults.DefaultP2PUseRainTree, - ConnectionType: defaults.DefaultP2PConnectionType, - MaxMempoolCount: defaults.DefaultP2PMaxMempoolCount, + Port: defaults.DefaultP2PPort, + UseRainTree: defaults.DefaultP2PUseRainTree, + ConnectionType: defaults.DefaultP2PConnectionType, + MaxMempoolCount: defaults.DefaultP2PMaxMempoolCount, + MaxBootstrapConcurrency: defaults.DefaultP2PMaxBootstrapConcurrency, }, Telemetry: &TelemetryConfig{ Enabled: defaults.DefaultTelemetryEnabled, diff --git a/runtime/configs/proto/p2p_config.proto b/runtime/configs/proto/p2p_config.proto index d679e74b5..64de70f27 100644 --- a/runtime/configs/proto/p2p_config.proto +++ b/runtime/configs/proto/p2p_config.proto @@ -15,4 +15,5 @@ message P2PConfig { uint64 max_mempool_count = 6; // this is used to limit the number of nonces that can be stored in the mempool, after which a FIFO mechanism is used to remove the oldest nonces and make space for the new ones bool is_client_only = 7; string bootstrap_nodes_csv = 8; // string in the format "http://somenode:50832,http://someothernode:50832". Refer to `p2p/module_test.go` for additional details. + uint32 max_bootstrap_concurrency = 9; } diff --git a/runtime/defaults/defaults.go b/runtime/defaults/defaults.go index 46bd9e4c1..e17f8a9dd 100644 --- a/runtime/defaults/defaults.go +++ b/runtime/defaults/defaults.go @@ -47,11 +47,11 @@ var ( DefaultPersistencePostgresURL = "postgres://postgres:postgres@pocket-db:5432/postgres" DefaultPersistenceBlockStorePath = "/var/blockstore" // p2p - DefaultUseLibp2p = false - DefaultP2PPort = uint32(42069) - DefaultP2PUseRainTree = true - DefaultP2PConnectionType = types.ConnectionType_TCPConnection - DefaultP2PMaxMempoolCount = uint64(1e5) + DefaultP2PPort = uint32(42069) + DefaultP2PUseRainTree = true + DefaultP2PConnectionType = types.ConnectionType_TCPConnection + DefaultP2PMaxMempoolCount = uint64(1e5) + DefaultP2PMaxBootstrapConcurrency = uint32(4) // telemetry DefaultTelemetryEnabled = true DefaultTelemetryAddress = "0.0.0.0:9000" @@ -75,7 +75,7 @@ var ( DefaultRemoteCLIURL = fmt.Sprintf("http://%s:%s", DefaultRPCHost, DefaultRPCPort) // DefaultP2PBootstrapNodesCsv is a list of nodes to bootstrap the network with. By convention, for now, the first validator will provide bootstrapping facilities. // - // In LocalNet, the developer will have only one of the two stack online, therefore this is also a poor's man way to simulate the scenario in which a boostrap node is offline. + // In LocalNet, the developer will have only one of the two stack online, therefore this is also a poor's man way to simulate the scenario in which a bootstrap node is offline. DefaultP2PBootstrapNodesCsv = fmt.Sprintf("%s,%s", fmt.Sprintf("http://%s:%s", Validator1EndpointDockerCompose, DefaultRPCPort), fmt.Sprintf("http://%s:%s", Validator1EndpointK8S, DefaultRPCPort), diff --git a/runtime/manager_test.go b/runtime/manager_test.go index 0774fbd64..60ab5ea0c 100644 --- a/runtime/manager_test.go +++ b/runtime/manager_test.go @@ -4207,12 +4207,13 @@ func TestNewManagerFromReaders(t *testing.T) { HealthCheckPeriod: "30s", }, P2P: &configs.P2PConfig{ - PrivateKey: "0ca1a40ddecdab4f5b04fa0bfed1d235beaa2b8082e7554425607516f0862075dfe357de55649e6d2ce889acf15eb77e94ab3c5756fe46d3c7538d37f27f115e", - Hostname: "node1.consensus", - Port: defaults.DefaultP2PPort, - UseRainTree: true, - ConnectionType: configTypes.ConnectionType_TCPConnection, - MaxMempoolCount: 1e5, + PrivateKey: "0ca1a40ddecdab4f5b04fa0bfed1d235beaa2b8082e7554425607516f0862075dfe357de55649e6d2ce889acf15eb77e94ab3c5756fe46d3c7538d37f27f115e", + Hostname: "node1.consensus", + Port: defaults.DefaultP2PPort, + UseRainTree: true, + ConnectionType: configTypes.ConnectionType_TCPConnection, + MaxMempoolCount: 1e5, + MaxBootstrapConcurrency: 4, }, Telemetry: &configs.TelemetryConfig{ Enabled: true, @@ -4256,12 +4257,13 @@ func TestNewManagerFromReaders(t *testing.T) { want: &Manager{ config: &configs.Config{ P2P: &configs.P2PConfig{ - PrivateKey: "4ff3292ff14213149446f8208942b35439cb4b2c5e819f41fb612e880b5614bdd6cea8706f6ee6672c1e013e667ec8c46231e0e7abcf97ba35d89fceb8edae45", - Hostname: "node1.consensus", - Port: 42069, - UseRainTree: true, - ConnectionType: configTypes.ConnectionType_TCPConnection, - MaxMempoolCount: defaults.DefaultP2PMaxMempoolCount, + PrivateKey: "4ff3292ff14213149446f8208942b35439cb4b2c5e819f41fb612e880b5614bdd6cea8706f6ee6672c1e013e667ec8c46231e0e7abcf97ba35d89fceb8edae45", + Hostname: "node1.consensus", + Port: 42069, + UseRainTree: true, + ConnectionType: configTypes.ConnectionType_TCPConnection, + MaxMempoolCount: defaults.DefaultP2PMaxMempoolCount, + MaxBootstrapConcurrency: 4, }, Keybase: defaultCfg.Keybase, }, diff --git a/shared/utils/limiter.go b/shared/utils/limiter.go new file mode 100644 index 000000000..dce33af54 --- /dev/null +++ b/shared/utils/limiter.go @@ -0,0 +1,87 @@ +/*Copyright (c) 2020 Storj Labs, Inc. +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to deal +* in the Software without restriction, including without limitation the rights +* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all +* copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +* SOFTWARE. + */ + +package utils + +import ( + "context" + "sync" +) + +// Limiter implements concurrent goroutine limiting. +// +// After calling Wait or Close, no new goroutines are allowed +// to start. +type Limiter struct { + noCopy noCopy //nolint:structcheck // see noCopy definition + + limit chan struct{} + close sync.Once + closed chan struct{} +} + +// NewLimiter creates a new limiter with limit set to n. +func NewLimiter(n int) *Limiter { + return &Limiter{ + limit: make(chan struct{}, n), + closed: make(chan struct{}), + } +} + +// Go tries to start fn as a goroutine. +// When the limit is reached it will wait until it can run it +// or the context is canceled. +func (limiter *Limiter) Go(ctx context.Context, fn func()) bool { + if ctx.Err() != nil { + return false + } + + select { + case limiter.limit <- struct{}{}: + case <-limiter.closed: + return false + case <-ctx.Done(): + return false + } + + go func() { + defer func() { <-limiter.limit }() + fn() + }() + + return true +} + +// Wait waits for all running goroutines to finish and +// disallows new goroutines to start. +func (limiter *Limiter) Wait() { limiter.Close() } + +// Close waits for all running goroutines to finish and +// disallows new goroutines to start. +func (limiter *Limiter) Close() { + limiter.close.Do(func() { + close(limiter.closed) + // ensure all goroutines are finished + for i := 0; i < cap(limiter.limit); i++ { + limiter.limit <- struct{}{} + } + }) +} diff --git a/shared/utils/limiter_test.go b/shared/utils/limiter_test.go new file mode 100644 index 000000000..48b0654f0 --- /dev/null +++ b/shared/utils/limiter_test.go @@ -0,0 +1,98 @@ +/*Copyright (c) 2020 Storj Labs, Inc. +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to deal +* in the Software without restriction, including without limitation the rights +* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all +* copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +* SOFTWARE. + */ + +package utils_test + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/pokt-network/pocket/shared/utils" +) + +func TestLimiterLimiting(t *testing.T) { + t.Parallel() + + const N, Limit = 1000, 10 + ctx := context.Background() + limiter := utils.NewLimiter(Limit) + counter := int32(0) + for i := 0; i < N; i++ { + limiter.Go(ctx, func() { + if atomic.AddInt32(&counter, 1) > Limit { + panic("limit exceeded") + } + time.Sleep(time.Millisecond) + atomic.AddInt32(&counter, -1) + }) + } + limiter.Close() +} + +func TestLimiterCanceling(t *testing.T) { + t.Parallel() + + const N, Limit = 1000, 10 + limiter := utils.NewLimiter(Limit) + + ctx, cancel := context.WithCancel(context.Background()) + + counter := int32(0) + + waitForCancel := make(chan struct{}, N) + block := make(chan struct{}) + allreturned := make(chan struct{}) + + go func() { + for i := 0; i < N; i++ { + limiter.Go(ctx, func() { + if atomic.AddInt32(&counter, 1) > Limit { + panic("limit exceeded") + } + + waitForCancel <- struct{}{} + <-block + }) + } + close(allreturned) + }() + + for i := 0; i < Limit; i++ { + <-waitForCancel + } + cancel() + <-allreturned + close(block) + + limiter.Close() + if counter > Limit { + t.Fatal("too many times run") + } + + started := limiter.Go(context.Background(), func() { + panic("should not start") + }) + if started { + t.Fatal("should not start") + } +} diff --git a/shared/utils/nocopy.go b/shared/utils/nocopy.go new file mode 100644 index 000000000..c216b9c15 --- /dev/null +++ b/shared/utils/nocopy.go @@ -0,0 +1,33 @@ +/*Copyright (c) 2020 Storj Labs, Inc. +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to deal +* in the Software without restriction, including without limitation the rights +* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all +* copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +* SOFTWARE. + */ + +package utils + +// noCopy is used to ensure that we don't copy things that shouldn't +// be copied. +// +// See https://golang.org/issues/8005#issuecomment-190753527. +// +// Currently users of noCopy must use "//nolint:structcheck", +// because golint-ci does not handle this correctly. +type noCopy struct{} + +func (noCopy) Lock() {}