Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[P2P] chore: concurrent bootstrapping #694

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions p2p/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.45] - 2023-04-24

- Refactored P2P bootstrapping to its own go routine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

- Always bootstrap peerstore to discover non-staked P2P participants

## [0.0.0.44] - 2023-04-20

- Refactor `mockdns` test helpers
Expand Down
74 changes: 43 additions & 31 deletions p2p/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"strings"

rpcCHP "github.com/pokt-network/pocket/p2p/providers/current_height_provider/rpc"
rpcABP "github.com/pokt-network/pocket/p2p/providers/peerstore_provider/rpc"
typesP2P "github.com/pokt-network/pocket/p2p/types"
rpcPSP "github.com/pokt-network/pocket/p2p/providers/peerstore_provider/rpc"
"github.com/pokt-network/pocket/rpc"
"github.com/pokt-network/pocket/runtime/defaults"
"github.com/pokt-network/pocket/shared/utils"
)

// configureBootstrapNodes parses the bootstrap nodes from the config and validates them
Expand Down Expand Up @@ -43,37 +43,54 @@ func (m *p2pModule) configureBootstrapNodes() error {

// bootstrap attempts to bootstrap from a bootstrap node
func (m *p2pModule) bootstrap() error {
var pstore typesP2P.Peerstore
limiter := utils.NewLimiter(int(m.cfg.MaxBootstrapConcurrency))

for _, bootstrapNode := range m.bootstrapNodes {
m.logger.Info().Str("endpoint", bootstrapNode).Msg("Attempting to bootstrap from bootstrap node")
for _, serviceURL := range m.bootstrapNodes {
// concurrent bootstrapping
// TECHDEBT(#595): add ctx to interface methods and propagate down.
limiter.Go(context.TODO(), func() {
m.bootstrapFromRPC(strings.Clone(serviceURL))
})
}

client, err := rpc.NewClientWithResponses(bootstrapNode)
if err != nil {
continue
}
healthCheck, err := client.GetV1Health(context.TODO())
if err != nil || healthCheck == nil || healthCheck.StatusCode != http.StatusOK {
m.logger.Warn().Str("bootstrapNode", bootstrapNode).Msg("Error getting a green health check from bootstrap node")
continue
}
limiter.Close()

pstoreProvider := rpcABP.NewRPCPeerstoreProvider(
rpcABP.WithP2PConfig(
m.GetBus().GetRuntimeMgr().GetConfig().P2P,
),
rpcABP.WithCustomRPCURL(bootstrapNode),
)
return nil
}

currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider(rpcCHP.WithCustomRPCURL(bootstrapNode))
// bootstrapFromRPC fetches the peerstore of the peer at `serviceURL` via RPC
// and adds it to this host's peerstore after performing a health check.
// TECHDEBT(SOLID): refactor; this method has more than one reason to change
func (m *p2pModule) bootstrapFromRPC(serviceURL string) {
m.logger.Info().Str("endpoint", serviceURL).Msg("Attempting to bootstrap from bootstrap node")

pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(currentHeightProvider.CurrentHeight())
if err != nil {
m.logger.Warn().Err(err).Str("endpoint", bootstrapNode).Msg("Error getting address book from bootstrap node")
continue
}
client, err := rpc.NewClientWithResponses(serviceURL)
if err != nil {
return
}
healthCheck, err := client.GetV1Health(context.TODO())
if err != nil || healthCheck == nil || healthCheck.StatusCode != http.StatusOK {
m.logger.Warn().Str("serviceURL", serviceURL).Msg("Error getting a green health check from bootstrap node")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
m.logger.Warn().Str("serviceURL", serviceURL).Msg("Error getting a green health check from bootstrap node")
m.logger.Warn().Str("serviceURL", serviceURL).Int("code", healthCheck.StatusCode).Msg("Error getting a green health check from bootstrap node")

return
}

// fetch `serviceURL`'s peerstore
pstoreProvider := rpcPSP.NewRPCPeerstoreProvider(
rpcPSP.WithP2PConfig(
m.GetBus().GetRuntimeMgr().GetConfig().P2P,
),
rpcPSP.WithCustomRPCURL(serviceURL),
)

currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider(rpcCHP.WithCustomRPCURL(serviceURL))

pstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(currentHeightProvider.CurrentHeight())
if err != nil {
m.logger.Warn().Err(err).Str("endpoint", serviceURL).Msg("Error getting address book from bootstrap node")
return
}

// add `serviceURL`'s peers to this node's peerstore
for _, peer := range pstore.GetPeerList() {
m.logger.Debug().Str("address", peer.GetAddress().String()).Msg("Adding peer to network")
if err := m.network.AddPeer(peer); err != nil {
Expand All @@ -82,11 +99,6 @@ func (m *p2pModule) bootstrap() error {
Msg("adding peer")
}
}

if m.network.GetPeerstore().Size() == 0 {
return fmt.Errorf("bootstrap failed")
}
return nil
}

func isValidHostnamePort(str string) bool {
Expand Down
10 changes: 4 additions & 6 deletions p2p/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2p

import (
"fmt"

"github.com/pokt-network/pocket/shared/codec"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
"github.com/pokt-network/pocket/shared/messaging"
Expand Down Expand Up @@ -50,13 +49,12 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error {
m.logger.Debug().Fields(messaging.TransitionEventToMap(stateMachineTransitionEvent)).Msg("Received state machine transition event")

if stateMachineTransitionEvent.NewState == string(coreTypes.StateMachineState_P2P_Bootstrapping) {
if m.network.GetPeerstore().Size() == 0 {
m.logger.Warn().Msg("No peers in addrbook, bootstrapping")
m.logger.Info().Msg("Bootstrapping...")

if err := m.bootstrap(); err != nil {
return err
}
if err := m.bootstrap(); err != nil {
return err
}

m.logger.Info().Bool("TODO", true).Msg("Advertise self to network")
if err := m.GetBus().GetStateMachineModule().SendEvent(coreTypes.StateMachineEvent_P2P_IsBootstrapped); err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"strconv"
"time"

"github.com/libp2p/go-libp2p"
Expand Down Expand Up @@ -305,7 +306,7 @@ func (m *p2pModule) setupNetwork() (err error) {
return err
}

// setupHost creates a new libp2p host and assignes it to `m.host`. Libp2p host
// setupHost creates a new libp2p host and assigns it to `m.host`. Libp2p host
// starts listening upon instantiation.
func (m *p2pModule) setupHost() (err error) {
m.logger.Debug().Msg("creating new libp2p host")
Expand Down Expand Up @@ -336,9 +337,9 @@ func (m *p2pModule) setupHost() (err error) {
}

// TECHDEBT(#609): use `StringArrayLogMarshaler` post test-utilities refactor.
addrStrs := make(map[int]string)
addrStrs := make(map[string]string)
for i, addr := range libp2pHost.InfoFromHost(m.host).Addrs {
addrStrs[i] = addr.String()
addrStrs[strconv.Itoa(i)] = addr.String()
}
m.logger.Info().Fields(addrStrs).Msg("Listening for incoming connections...")
return nil
Expand Down
8 changes: 4 additions & 4 deletions p2p/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,29 @@ func Test_Create_configureBootstrapNodes(t *testing.T) {
wantErr bool
}{
{
name: "unset boostrap nodes should yield no error and return DefaultP2PBootstrapNodes",
name: "unset bootstrap nodes should yield no error and return DefaultP2PBootstrapNodes",
args: args{},
wantErr: false,
wantBootstrapNodes: defaultBootstrapNodes,
},
{
name: "empty string boostrap nodes should yield no error and return DefaultP2PBootstrapNodes",
name: "empty string bootstrap nodes should yield no error and return DefaultP2PBootstrapNodes",
args: args{
initialBootstrapNodesCsv: "",
},
wantErr: false,
wantBootstrapNodes: defaultBootstrapNodes,
},
{
name: "untrimmed empty string boostrap nodes should yield no error and return DefaultP2PBootstrapNodes",
name: "untrimmed empty string bootstrap nodes should yield no error and return DefaultP2PBootstrapNodes",
args: args{
initialBootstrapNodesCsv: " ",
},
wantErr: false,
wantBootstrapNodes: defaultBootstrapNodes,
},
{
name: "untrimmed string boostrap nodes should yield no error and return the trimmed urls",
name: "untrimmed string bootstrap nodes should yield no error and return the trimmed urls",
args: args{
initialBootstrapNodesCsv: " http://somenode:50832 , http://someothernode:50832 ",
},
Expand Down
9 changes: 5 additions & 4 deletions runtime/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ func NewDefaultConfig(options ...func(*Config)) *Config {
BlockStorePath: defaults.DefaultPersistenceBlockStorePath,
},
P2P: &P2PConfig{
Port: defaults.DefaultP2PPort,
UseRainTree: defaults.DefaultP2PUseRainTree,
ConnectionType: defaults.DefaultP2PConnectionType,
MaxMempoolCount: defaults.DefaultP2PMaxMempoolCount,
Port: defaults.DefaultP2PPort,
UseRainTree: defaults.DefaultP2PUseRainTree,
ConnectionType: defaults.DefaultP2PConnectionType,
MaxMempoolCount: defaults.DefaultP2PMaxMempoolCount,
MaxBootstrapConcurrency: defaults.DefaultP2PMaxBootstrapConcurrency,
},
Telemetry: &TelemetryConfig{
Enabled: defaults.DefaultTelemetryEnabled,
Expand Down
1 change: 1 addition & 0 deletions runtime/configs/proto/p2p_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ message P2PConfig {
uint64 max_mempool_count = 6; // this is used to limit the number of nonces that can be stored in the mempool, after which a FIFO mechanism is used to remove the oldest nonces and make space for the new ones
bool is_client_only = 7;
string bootstrap_nodes_csv = 8; // string in the format "http://somenode:50832,http://someothernode:50832". Refer to `p2p/module_test.go` for additional details.
uint32 max_bootstrap_concurrency = 9;
}
12 changes: 6 additions & 6 deletions runtime/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ var (
DefaultPersistencePostgresURL = "postgres://postgres:postgres@pocket-db:5432/postgres"
DefaultPersistenceBlockStorePath = "/var/blockstore"
// p2p
DefaultUseLibp2p = false
DefaultP2PPort = uint32(42069)
DefaultP2PUseRainTree = true
DefaultP2PConnectionType = types.ConnectionType_TCPConnection
DefaultP2PMaxMempoolCount = uint64(1e5)
DefaultP2PPort = uint32(42069)
DefaultP2PUseRainTree = true
DefaultP2PConnectionType = types.ConnectionType_TCPConnection
DefaultP2PMaxMempoolCount = uint64(1e5)
DefaultP2PMaxBootstrapConcurrency = uint32(4)
// telemetry
DefaultTelemetryEnabled = true
DefaultTelemetryAddress = "0.0.0.0:9000"
Expand All @@ -75,7 +75,7 @@ var (
DefaultRemoteCLIURL = fmt.Sprintf("http://%s:%s", DefaultRPCHost, DefaultRPCPort)
// DefaultP2PBootstrapNodesCsv is a list of nodes to bootstrap the network with. By convention, for now, the first validator will provide bootstrapping facilities.
//
// In LocalNet, the developer will have only one of the two stack online, therefore this is also a poor's man way to simulate the scenario in which a boostrap node is offline.
// In LocalNet, the developer will have only one of the two stack online, therefore this is also a poor's man way to simulate the scenario in which a bootstrap node is offline.
DefaultP2PBootstrapNodesCsv = fmt.Sprintf("%s,%s",
fmt.Sprintf("http://%s:%s", Validator1EndpointDockerCompose, DefaultRPCPort),
fmt.Sprintf("http://%s:%s", Validator1EndpointK8S, DefaultRPCPort),
Expand Down
26 changes: 14 additions & 12 deletions runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4207,12 +4207,13 @@ func TestNewManagerFromReaders(t *testing.T) {
HealthCheckPeriod: "30s",
},
P2P: &configs.P2PConfig{
PrivateKey: "0ca1a40ddecdab4f5b04fa0bfed1d235beaa2b8082e7554425607516f0862075dfe357de55649e6d2ce889acf15eb77e94ab3c5756fe46d3c7538d37f27f115e",
Hostname: "node1.consensus",
Port: defaults.DefaultP2PPort,
UseRainTree: true,
ConnectionType: configTypes.ConnectionType_TCPConnection,
MaxMempoolCount: 1e5,
PrivateKey: "0ca1a40ddecdab4f5b04fa0bfed1d235beaa2b8082e7554425607516f0862075dfe357de55649e6d2ce889acf15eb77e94ab3c5756fe46d3c7538d37f27f115e",
Hostname: "node1.consensus",
Port: defaults.DefaultP2PPort,
UseRainTree: true,
ConnectionType: configTypes.ConnectionType_TCPConnection,
MaxMempoolCount: 1e5,
MaxBootstrapConcurrency: 4,
},
Telemetry: &configs.TelemetryConfig{
Enabled: true,
Expand Down Expand Up @@ -4256,12 +4257,13 @@ func TestNewManagerFromReaders(t *testing.T) {
want: &Manager{
config: &configs.Config{
P2P: &configs.P2PConfig{
PrivateKey: "4ff3292ff14213149446f8208942b35439cb4b2c5e819f41fb612e880b5614bdd6cea8706f6ee6672c1e013e667ec8c46231e0e7abcf97ba35d89fceb8edae45",
Hostname: "node1.consensus",
Port: 42069,
UseRainTree: true,
ConnectionType: configTypes.ConnectionType_TCPConnection,
MaxMempoolCount: defaults.DefaultP2PMaxMempoolCount,
PrivateKey: "4ff3292ff14213149446f8208942b35439cb4b2c5e819f41fb612e880b5614bdd6cea8706f6ee6672c1e013e667ec8c46231e0e7abcf97ba35d89fceb8edae45",
Hostname: "node1.consensus",
Port: 42069,
UseRainTree: true,
ConnectionType: configTypes.ConnectionType_TCPConnection,
MaxMempoolCount: defaults.DefaultP2PMaxMempoolCount,
MaxBootstrapConcurrency: 4,
},
Keybase: defaultCfg.Keybase,
},
Expand Down
87 changes: 87 additions & 0 deletions shared/utils/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*Copyright (c) 2020 Storj Labs, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package utils

import (
"context"
"sync"
)

// Limiter implements concurrent goroutine limiting.
//
// After calling Wait or Close, no new goroutines are allowed
// to start.
type Limiter struct {
noCopy noCopy //nolint:structcheck // see noCopy definition

limit chan struct{}
close sync.Once
closed chan struct{}
}

// NewLimiter creates a new limiter with limit set to n.
func NewLimiter(n int) *Limiter {
return &Limiter{
limit: make(chan struct{}, n),
closed: make(chan struct{}),
}
}

// Go tries to start fn as a goroutine.
// When the limit is reached it will wait until it can run it
// or the context is canceled.
func (limiter *Limiter) Go(ctx context.Context, fn func()) bool {
if ctx.Err() != nil {
return false
}

select {
case limiter.limit <- struct{}{}:
case <-limiter.closed:
return false
case <-ctx.Done():
return false
}

go func() {
defer func() { <-limiter.limit }()
fn()
}()

return true
}

// Wait waits for all running goroutines to finish and
// disallows new goroutines to start.
func (limiter *Limiter) Wait() { limiter.Close() }

// Close waits for all running goroutines to finish and
// disallows new goroutines to start.
func (limiter *Limiter) Close() {
limiter.close.Do(func() {
close(limiter.closed)
// ensure all goroutines are finished
for i := 0; i < cap(limiter.limit); i++ {
limiter.limit <- struct{}{}
}
})
}
Loading