From 8a9a84ad58fa5930208ec8da63c3f0f25f858d98 Mon Sep 17 00:00:00 2001 From: Stefan Pingel <16143240+pinges@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:17:13 +1000 Subject: [PATCH] Check for snap server (#6609) * EthPeer add isServingSnap to be able to make sure that we have enough snap servers connected when we are snap syncing Signed-off-by: stefan.pingel@consensys.net Signed-off-by: Sally MacFarlane Co-authored-by: Sally MacFarlane --- .../org/hyperledger/besu/RunnerBuilder.java | 10 +- .../controller/BesuControllerBuilder.java | 36 +- ...onsensusScheduleBesuControllerBuilder.java | 7 +- .../MergeBesuControllerBuilder.java | 7 +- .../TransitionBesuControllerBuilder.java | 27 +- .../bft/protocol/BftProtocolManager.java | 6 - .../besu/ethereum/forkid/ForkIdManager.java | 6 +- .../besu/ethereum/eth/manager/EthPeer.java | 20 +- .../besu/ethereum/eth/manager/EthPeers.java | 375 +++++++++++++----- .../eth/manager/EthProtocolManager.java | 63 +-- .../ethereum/eth/manager/PeerRequest.java | 4 + .../eth/manager/PendingPeerRequest.java | 1 + .../snap/GetAccountRangeFromPeerTask.java | 39 +- .../manager/snap/GetBytecodeFromPeerTask.java | 32 +- .../snap/GetStorageRangeFromPeerTask.java | 41 +- .../manager/snap/GetTrieNodeFromPeerTask.java | 31 +- .../RetryingGetAccountRangeFromPeerTask.java | 6 + .../snap/RetryingGetBytecodeFromPeerTask.java | 5 + .../RetryingGetStorageRangeFromPeerTask.java | 5 + .../snap/RetryingGetTrieNodeFromPeerTask.java | 5 + .../eth/manager/snap/SnapProtocolManager.java | 6 - .../task/AbstractRetryingPeerTask.java | 20 +- .../AbstractRetryingSwitchingPeerTask.java | 17 +- .../ethereum/eth/sync/ChainHeadTracker.java | 75 ++-- .../eth/sync/DefaultSynchronizer.java | 7 +- .../ethereum/eth/sync/SnapServerChecker.java | 86 ++++ .../besu/ethereum/eth/sync/SyncMode.java | 4 + .../eth/sync/fastsync/SyncTargetManager.java | 9 +- .../ethereum/eth/manager/EthPeersTest.java | 89 ++++- .../eth/manager/EthProtocolManagerTest.java | 9 +- .../manager/EthProtocolManagerTestUtil.java | 69 +++- .../eth/manager/RespondingEthPeer.java | 36 +- .../ethtaskutils/AbstractMessageTaskTest.java | 7 +- .../AbstractBlockPropagationManagerTest.java | 11 +- .../eth/sync/ChainHeadTrackerTest.java | 6 +- .../fastsync/PivotBlockRetrieverTest.java | 6 +- .../ethereum/eth/transactions/TestNode.java | 28 +- .../TransactionPoolFactoryTest.java | 9 +- .../ethereum/p2p/network/NetworkRunner.java | 19 +- .../ethereum/p2p/network/ProtocolManager.java | 10 - .../rlpx/wire/messages/DisconnectMessage.java | 4 +- .../ethereum/retesteth/RetestethContext.java | 7 +- 42 files changed, 942 insertions(+), 318 deletions(-) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SnapServerChecker.java diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index e9f9ef6268c..7ed627cfc17 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -688,13 +688,14 @@ public Runner build() { .map(nodePerms -> PeerPermissions.combine(nodePerms, defaultPeerPermissions)) .orElse(defaultPeerPermissions); + final EthPeers ethPeers = besuController.getEthPeers(); + LOG.info("Detecting NAT service."); final boolean fallbackEnabled = natMethod == NatMethod.AUTO || natMethodFallbackEnabled; final NatService natService = new NatService(buildNatManager(natMethod), fallbackEnabled); final NetworkBuilder inactiveNetwork = caps -> new NoopP2PNetwork(); final NetworkBuilder activeNetwork = caps -> { - final EthPeers ethPeers = besuController.getEthPeers(); return DefaultP2PNetwork.builder() .vertx(vertx) .nodeKey(nodeKey) @@ -709,8 +710,8 @@ public Runner build() { .blockchain(context.getBlockchain()) .blockNumberForks(besuController.getGenesisConfigOptions().getForkBlockNumbers()) .timestampForks(besuController.getGenesisConfigOptions().getForkBlockTimestamps()) - .allConnectionsSupplier(ethPeers::getAllConnections) - .allActiveConnectionsSupplier(ethPeers::getAllActiveConnections) + .allConnectionsSupplier(ethPeers::streamAllConnections) + .allActiveConnectionsSupplier(ethPeers::streamAllActiveConnections) .maxPeers(ethPeers.getMaxPeers()) .build(); }; @@ -721,9 +722,10 @@ public Runner build() { .subProtocols(subProtocols) .network(p2pEnabled ? activeNetwork : inactiveNetwork) .metricsSystem(metricsSystem) + .ethPeersShouldConnect(ethPeers::shouldTryToConnect) .build(); - besuController.getEthPeers().setRlpxAgent(networkRunner.getRlpxAgent()); + ethPeers.setRlpxAgent(networkRunner.getRlpxAgent()); final P2PNetwork network = networkRunner.getNetwork(); // ForkId in Ethereum Node Record needs updating when we transition to a new protocol spec diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index ac0fa25fd30..6123535f637 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -46,7 +46,6 @@ import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.core.PrivacyParameters; -import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.SnapProtocol; @@ -77,6 +76,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory; +import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.config.NetworkingConfiguration; @@ -604,6 +604,12 @@ public BesuController build() { final int maxMessageSize = ethereumWireProtocolConfiguration.getMaxMessageSize(); final Supplier currentProtocolSpecSupplier = () -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()); + final ForkIdManager forkIdManager = + new ForkIdManager( + blockchain, + genesisConfigOptions.getForkBlockNumbers(), + genesisConfigOptions.getForkBlockTimestamps(), + ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()); final EthPeers ethPeers = new EthPeers( getSupportedProtocol(), @@ -615,7 +621,9 @@ public BesuController build() { nodeKey.getPublicKey().getEncodedBytes(), maxPeers, maxRemotelyInitiatedPeers, - randomPeerPriority); + randomPeerPriority, + syncConfig.getSyncMode(), + forkIdManager); final EthMessages ethMessages = new EthMessages(); final EthMessages snapMessages = new EthMessages(); @@ -681,13 +689,14 @@ public BesuController build() { ethMessages, scheduler, peerValidators, - Optional.empty()); + Optional.empty(), + forkIdManager); final PivotBlockSelector pivotBlockSelector = createPivotSelector( protocolSchedule, protocolContext, ethContext, syncState, metricsSystem, blockchain); - final Synchronizer synchronizer = + final DefaultSynchronizer synchronizer = createSynchronizer( protocolSchedule, worldStateStorageCoordinator, @@ -697,6 +706,16 @@ public BesuController build() { ethProtocolManager, pivotBlockSelector); + ethPeers.setTrailingPeerRequirementsSupplier(synchronizer::calculateTrailingPeerRequirements); + + if (SyncMode.isSnapSync(syncConfig.getSyncMode()) + || SyncMode.isCheckpointSync(syncConfig.getSyncMode())) { + synchronizer.subscribeInSync((b) -> ethPeers.snapServerPeersNeeded(!b)); + ethPeers.snapServerPeersNeeded(true); + } else { + ethPeers.snapServerPeersNeeded(false); + } + protocolContext.setSynchronizer(Optional.of(synchronizer)); final Optional maybeSnapProtocolManager = @@ -809,7 +828,7 @@ private TrieLogPruner createTrieLogPruner( * @param pivotBlockSelector the pivot block selector * @return the synchronizer */ - protected Synchronizer createSynchronizer( + protected DefaultSynchronizer createSynchronizer( final ProtocolSchedule protocolSchedule, final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolContext protocolContext, @@ -1000,6 +1019,7 @@ protected String getSupportedProtocol() { * @param scheduler the scheduler * @param peerValidators the peer validators * @param mergePeerFilter the merge peer filter + * @param forkIdManager the fork id manager * @return the eth protocol manager */ protected EthProtocolManager createEthProtocolManager( @@ -1012,7 +1032,8 @@ protected EthProtocolManager createEthProtocolManager( final EthMessages ethMessages, final EthScheduler scheduler, final List peerValidators, - final Optional mergePeerFilter) { + final Optional mergePeerFilter, + final ForkIdManager forkIdManager) { return new EthProtocolManager( protocolContext.getBlockchain(), networkId, @@ -1026,8 +1047,7 @@ protected EthProtocolManager createEthProtocolManager( mergePeerFilter, synchronizerConfiguration, scheduler, - genesisConfigOptions.getForkBlockNumbers(), - genesisConfigOptions.getForkBlockTimestamps()); + forkIdManager); } /** diff --git a/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java index d138c96a24e..af59a7625d3 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java @@ -49,6 +49,7 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; +import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration; import org.hyperledger.besu.ethereum.storage.StorageProvider; @@ -242,7 +243,8 @@ protected EthProtocolManager createEthProtocolManager( final EthMessages ethMessages, final EthScheduler scheduler, final List peerValidators, - final Optional mergePeerFilter) { + final Optional mergePeerFilter, + final ForkIdManager forkIdManager) { return besuControllerBuilderSchedule .get(0L) .createEthProtocolManager( @@ -255,7 +257,8 @@ protected EthProtocolManager createEthProtocolManager( ethMessages, scheduler, peerValidators, - mergePeerFilter); + mergePeerFilter, + forkIdManager); } @Override diff --git a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java index 1e8da674a50..cdd46ea5b98 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java @@ -41,6 +41,7 @@ import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; +import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; @@ -97,7 +98,8 @@ protected EthProtocolManager createEthProtocolManager( final EthMessages ethMessages, final EthScheduler scheduler, final List peerValidators, - final Optional mergePeerFilter) { + final Optional mergePeerFilter, + final ForkIdManager forkIdManager) { var mergeContext = protocolContext.getConsensusContext(MergeContext.class); @@ -126,7 +128,8 @@ protected EthProtocolManager createEthProtocolManager( ethMessages, scheduler, peerValidators, - filterToUse); + filterToUse, + forkIdManager); return ethProtocolManager; } diff --git a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java index a2f52c90ee4..4f084f3c138 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java @@ -33,7 +33,6 @@ import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.core.PrivacyParameters; -import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthMessages; @@ -49,6 +48,7 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; +import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; @@ -156,7 +156,8 @@ protected EthProtocolManager createEthProtocolManager( final EthMessages ethMessages, final EthScheduler scheduler, final List peerValidators, - final Optional mergePeerFilter) { + final Optional mergePeerFilter, + final ForkIdManager forkIdManager) { return mergeBesuControllerBuilder.createEthProtocolManager( protocolContext, synchronizerConfiguration, @@ -167,7 +168,8 @@ protected EthProtocolManager createEthProtocolManager( ethMessages, scheduler, peerValidators, - mergePeerFilter); + mergePeerFilter, + forkIdManager); } @Override @@ -212,7 +214,7 @@ protected PluginServiceFactory createAdditionalPluginServices( } @Override - protected Synchronizer createSynchronizer( + protected DefaultSynchronizer createSynchronizer( final ProtocolSchedule protocolSchedule, final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolContext protocolContext, @@ -222,15 +224,14 @@ protected Synchronizer createSynchronizer( final PivotBlockSelector pivotBlockSelector) { DefaultSynchronizer sync = - (DefaultSynchronizer) - super.createSynchronizer( - protocolSchedule, - worldStateStorageCoordinator, - protocolContext, - ethContext, - syncState, - ethProtocolManager, - pivotBlockSelector); + super.createSynchronizer( + protocolSchedule, + worldStateStorageCoordinator, + protocolContext, + ethContext, + syncState, + ethProtocolManager, + pivotBlockSelector); if (genesisConfigOptions.getTerminalTotalDifficulty().isPresent()) { LOG.info( diff --git a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/protocol/BftProtocolManager.java b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/protocol/BftProtocolManager.java index 0ef72f95edf..0c0dd5f67df 100644 --- a/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/protocol/BftProtocolManager.java +++ b/consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/protocol/BftProtocolManager.java @@ -20,7 +20,6 @@ import org.hyperledger.besu.consensus.common.bft.network.PeerConnectionTracker; import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager; -import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message; @@ -108,11 +107,6 @@ public void handleNewConnection(final PeerConnection peerConnection) { peers.add(peerConnection); } - @Override - public boolean shouldConnect(final Peer peer, final boolean incoming) { - return false; // for now the EthProtocolManager takes care of this - } - @Override public void handleDisconnect( final PeerConnection peerConnection, diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/forkid/ForkIdManager.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/forkid/ForkIdManager.java index bae8cfc339e..859fb918a4d 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/forkid/ForkIdManager.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/forkid/ForkIdManager.java @@ -58,7 +58,11 @@ public ForkIdManager( checkNotNull(blockchain); checkNotNull(blockNumberForks); this.chainHeadSupplier = blockchain::getChainHeadHeader; - this.genesisHash = blockchain.getGenesisBlock().getHash(); + try { + this.genesisHash = blockchain.getGenesisBlock().getHash(); + } catch (Exception e) { + throw new RuntimeException(e); + } this.blockNumbersForkIds = new ArrayList<>(); this.timestampsForkIds = new ArrayList<>(); this.legacyEth64 = legacyEth64; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index e0f0517f1e6..8cc907d6866 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -103,6 +103,7 @@ protected boolean removeEldestEntry(final Map.Entry eldest) { private final PeerReputation reputation = new PeerReputation(); private final Map validationStatus = new ConcurrentHashMap<>(); private final Bytes id; + private boolean isServingSnap = false; private static final Map roundMessages; @@ -393,6 +394,14 @@ public RequestManager.ResponseStream getSnapTrieNode( requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_TRIE_NODES), getTrieNodes); } + public void setIsServingSnap(final boolean isServingSnap) { + this.isServingSnap = isServingSnap; + } + + public boolean isServingSnap() { + return isServingSnap; + } + private RequestManager.ResponseStream sendRequest( final RequestManager requestManager, final MessageData messageData) throws PeerNotConnected { lastRequestTimestamp = clock.millis(); @@ -582,9 +591,9 @@ public String getProtocolName() { } /** - * Return A read-only snapshot of this peer's current {@code chainState} } + * Return A read-only snapshot of this peer's current {@code chainState} * - * @return A read-only snapshot of this peer's current {@code chainState} } + * @return A read-only snapshot of this peer's current {@code chainState} */ public ChainHeadEstimate chainStateSnapshot() { return chainHeadState.getSnapshot(); @@ -629,14 +638,17 @@ public boolean hasSupportForMessage(final int messageCode) { @Override public String toString() { return String.format( - "PeerId: %s %s, validated? %s, disconnected? %s, client: %s, %s, %s", + "PeerId: %s %s, validated? %s, disconnected? %s, client: %s, %s, %s, isServingSnap %s, has height %s, connected for %s ms", getLoggableId(), reputation, isFullyValidated(), isDisconnected(), connection.getPeerInfo().getClientId(), connection, - connection.getPeer().getEnodeURLString()); + connection.getPeer().getEnodeURLString(), + isServingSnap, + chainHeadState.getEstimatedHeight(), + System.currentTimeMillis() - connection.getInitiatedAt()); } @Nonnull diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index bef7a1a038f..851c075290e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -14,8 +14,16 @@ */ package org.hyperledger.besu.ethereum.eth.manager; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; +import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; +import org.hyperledger.besu.ethereum.eth.sync.SnapServerChecker; +import org.hyperledger.besu.ethereum.eth.sync.SyncMode; +import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements; +import org.hyperledger.besu.ethereum.forkid.ForkId; +import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent; @@ -35,8 +43,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -72,7 +82,7 @@ public class EthPeers { public static final int NODE_ID_LENGTH = 64; public static final int USEFULL_PEER_SCORE_THRESHOLD = 102; - private final Map completeConnections = new ConcurrentHashMap<>(); + private final Map activeConnections = new ConcurrentHashMap<>(); private final Cache incompleteConnections = CacheBuilder.newBuilder() @@ -92,12 +102,22 @@ public class EthPeers { private final Boolean randomPeerPriority; private final Bytes nodeIdMask = Bytes.random(NODE_ID_LENGTH); private final Supplier currentProtocolSpecSupplier; + private final SyncMode syncMode; + private final ForkIdManager forkIdManager; + private final int snapServerTargetNumber; + private final boolean shouldLimitRemoteConnections; private Comparator bestPeerComparator; private final Bytes localNodeId; private RlpxAgent rlpxAgent; private final Counter connectedPeersCounter; + // private List protocolManagers; + private ChainHeadTracker tracker; + private SnapServerChecker snapServerChecker; + private boolean snapServerPeersNeeded = false; + private Supplier trailingPeerRequirementsSupplier = + () -> TrailingPeerRequirements.UNRESTRICTED; public EthPeers( final String protocolName, @@ -109,7 +129,9 @@ public EthPeers( final Bytes localNodeId, final int peerUpperBound, final int maxRemotelyInitiatedConnections, - final Boolean randomPeerPriority) { + final Boolean randomPeerPriority, + final SyncMode syncMode, + final ForkIdManager forkIdManager) { this.protocolName = protocolName; this.currentProtocolSpecSupplier = currentProtocolSpecSupplier; this.clock = clock; @@ -121,11 +143,22 @@ public EthPeers( this.maxRemotelyInitiatedConnections = maxRemotelyInitiatedConnections; this.randomPeerPriority = randomPeerPriority; LOG.trace("MaxPeers: {}, Max Remote: {}", peerUpperBound, maxRemotelyInitiatedConnections); + this.syncMode = syncMode; + this.forkIdManager = forkIdManager; + this.snapServerTargetNumber = + peerUpperBound / 2; // 50% of peers should be snap servers while snap syncing + this.shouldLimitRemoteConnections = maxRemotelyInitiatedConnections < peerUpperBound; + metricsSystem.createIntegerGauge( BesuMetricCategory.ETHEREUM, "peer_count", "The current number of peers connected", - () -> (int) streamAvailablePeers().filter(p -> p.readyForRequests()).count()); + activeConnections::size); + metricsSystem.createIntegerGauge( + BesuMetricCategory.ETHEREUM, + "peer_count_snap_server", + "The current number of peers connected that serve snap data", + () -> (int) streamAvailablePeers().filter(EthPeer::isServingSnap).count()); metricsSystem.createIntegerGauge( BesuMetricCategory.PEERS, "pending_peer_requests_current", @@ -146,7 +179,7 @@ public void registerNewConnection( final PeerConnection newConnection, final List peerValidators) { final Bytes id = newConnection.getPeer().getId(); synchronized (this) { - EthPeer ethPeer = completeConnections.get(id); + EthPeer ethPeer = activeConnections.get(id); if (ethPeer == null) { final Optional peerInList = incompleteConnections.asMap().values().stream() @@ -193,12 +226,12 @@ private boolean registerDisconnect(final EthPeer peer, final PeerConnection conn if (peer.getConnection().equals(connection)) { final Bytes id = peer.getId(); if (!peerHasIncompleteConnection(id)) { - removed = completeConnections.remove(id, peer); + removed = activeConnections.remove(id, peer); disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer)); peer.handleDisconnect(); abortPendingRequestsAssignedToDisconnectedPeers(); if (peer.getReputation().getScore() > USEFULL_PEER_SCORE_THRESHOLD) { - LOG.atDebug().setMessage("Disconnected USEFULL peer {}").addArgument(peer).log(); + LOG.atDebug().setMessage("Disconnected USEFUL peer {}").addArgument(peer).log(); } else { LOG.atDebug() .setMessage("Disconnected EthPeer {}") @@ -227,7 +260,7 @@ private void abortPendingRequestsAssignedToDisconnectedPeers() { public EthPeer peer(final PeerConnection connection) { final EthPeer ethPeer = incompleteConnections.getIfPresent(connection); - return ethPeer != null ? ethPeer : completeConnections.get(connection.getPeer().getId()); + return ethPeer != null ? ethPeer : activeConnections.get(connection.getPeer().getId()); } public PendingPeerRequest executePeerRequest( @@ -265,7 +298,7 @@ public void dispatchMessage(final EthPeer peer, final EthMessage ethMessage) { @VisibleForTesting void reattemptPendingPeerRequests() { synchronized (this) { - final List peers = streamAvailablePeers().collect(Collectors.toList()); + final List peers = streamAvailablePeers().toList(); final Iterator iterator = pendingRequests.iterator(); while (iterator.hasNext() && peers.stream().anyMatch(EthPeer::hasAvailableRequestCapacity)) { final PendingPeerRequest request = iterator.next(); @@ -290,7 +323,7 @@ public void subscribeDisconnect(final DisconnectCallback callback) { public int peerCount() { removeDisconnectedPeers(); - return completeConnections.size(); + return activeConnections.size(); } public int getMaxPeers() { @@ -298,11 +331,11 @@ public int getMaxPeers() { } public Stream streamAllPeers() { - return completeConnections.values().stream(); + return activeConnections.values().stream(); } private void removeDisconnectedPeers() { - completeConnections + activeConnections .values() .forEach( ep -> { @@ -313,9 +346,7 @@ private void removeDisconnectedPeers() { } public Stream streamAvailablePeers() { - return streamAllPeers() - .filter(EthPeer::readyForRequests) - .filter(peer -> !peer.isDisconnected()); + return streamAllPeers().filter(peer -> !peer.isDisconnected()); } public Stream streamBestPeers() { @@ -350,53 +381,59 @@ public void setRlpxAgent(final RlpxAgent rlpxAgent) { this.rlpxAgent = rlpxAgent; } - public Stream getAllActiveConnections() { - return completeConnections.values().stream() + public Stream streamAllActiveConnections() { + return activeConnections.values().stream() .map(EthPeer::getConnection) .filter(c -> !c.isDisconnected()); } - public Stream getAllConnections() { + public Stream streamAllConnections() { return Stream.concat( - completeConnections.values().stream().map(EthPeer::getConnection), + activeConnections.values().stream().map(EthPeer::getConnection), incompleteConnections.asMap().keySet().stream()) .distinct() .filter(c -> !c.isDisconnected()); } - public boolean shouldConnect(final Peer peer, final boolean inbound) { + public boolean shouldTryToConnect(final Peer peer, final boolean inbound) { + + if (peer.getForkId().isPresent()) { + final ForkId forkId = peer.getForkId().get(); + if (!forkIdManager.peerCheck(forkId)) { + LOG.atDebug() + .setMessage("Wrong fork id, not trying to connect to peer {}") + .addArgument(peer::getId) + .log(); + + return false; + } + } + final Bytes id = peer.getId(); - if (peerCount() >= peerUpperBound && !canExceedPeerLimits(id)) { + if (alreadyConnectedOrConnecting(inbound, id)) { LOG.atTrace() - .setMessage("not connecting to peer {} - too many peers") + .setMessage("not connecting to peer {} - already connected") .addArgument(peer.getLoggableId()) .log(); return false; } - final EthPeer ethPeer = completeConnections.get(id); + + return peerCount() < getMaxPeers() || needMoreSnapServers() || canExceedPeerLimits(id); + } + + private boolean alreadyConnectedOrConnecting(final boolean inbound, final Bytes id) { + final EthPeer ethPeer = activeConnections.get(id); if (ethPeer != null && !ethPeer.isDisconnected()) { - LOG.atTrace() - .setMessage("not connecting to peer {} - already disconnected") - .addArgument(ethPeer.getLoggableId()) - .log(); - return false; + return true; } final List incompleteConnections = getIncompleteConnections(id); - if (!incompleteConnections.isEmpty()) { - if (incompleteConnections.stream() - .anyMatch(c -> !c.isDisconnected() && (!inbound || (inbound && c.inboundInitiated())))) { - LOG.atTrace() - .setMessage("not connecting to peer {} - new connection already in process") - .addArgument(peer.getLoggableId()) - .log(); - return false; - } - } - return true; + return incompleteConnections.stream() + .anyMatch(c -> !c.isDisconnected() && (!inbound || (inbound && c.inboundInitiated()))); } public void disconnectWorstUselessPeer() { streamAvailablePeers() + .filter(p -> !canExceedPeerLimits(p.getId())) .min(getBestPeerComparator()) .ifPresent( peer -> { @@ -411,6 +448,23 @@ public void disconnectWorstUselessPeer() { }); } + public void setChainHeadTracker(final ChainHeadTracker tracker) { + this.tracker = tracker; + } + + public void setSnapServerChecker(final SnapServerChecker checker) { + this.snapServerChecker = checker; + } + + public void snapServerPeersNeeded(final boolean b) { + this.snapServerPeersNeeded = b; + } + + public void setTrailingPeerRequirementsSupplier( + final Supplier tprSupplier) { + this.trailingPeerRequirementsSupplier = tprSupplier; + } + @FunctionalInterface public interface ConnectCallback { void onPeerConnected(EthPeer newPeer); @@ -418,21 +472,108 @@ public interface ConnectCallback { @Override public String toString() { - if (completeConnections.isEmpty()) { + if (activeConnections.isEmpty()) { return "0 EthPeers {}"; } final String connectionsList = - completeConnections.values().stream() + activeConnections.values().stream() .sorted() .map(EthPeer::toString) .collect(Collectors.joining(", \n")); - return completeConnections.size() + " EthPeers {\n" + connectionsList + '}'; + return activeConnections.size() + " EthPeers {\n" + connectionsList + '}'; } private void ethPeerStatusExchanged(final EthPeer peer) { - if (addPeerToEthPeers(peer)) { - connectedPeersCounter.inc(); - connectCallbacks.forEach(cb -> cb.onPeerConnected(peer)); + // We have a connection to a peer that is on the right chain and is willing to connect to us. + // Find out what the EthPeer block height is and whether it can serve snap data (if we are doing + // snap sync) + LOG.debug("Peer {} status exchanged", peer); + assert tracker != null : "ChainHeadTracker must be set before EthPeers can be used"; + CompletableFuture future = tracker.getBestHeaderFromPeer(peer); + + future.whenComplete( + (peerHeadBlockHeader, error) -> { + if (peerHeadBlockHeader == null) { + LOG.debug( + "Failed to retrieve chain head info. Disconnecting {}... {}", + peer.getLoggableId(), + error); + peer.disconnect( + DisconnectMessage.DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD); + } else { + + // we can check trailing peers now + final TrailingPeerRequirements trailingPeerRequirements = + trailingPeerRequirementsSupplier.get(); + if (trailingPeerRequirements != null) { + if (peer.chainState().getEstimatedHeight() + < trailingPeerRequirements.getMinimumHeightToBeUpToDate()) { + if (!(getNumTrailingPeers(trailingPeerRequirements.getMinimumHeightToBeUpToDate()) + < trailingPeerRequirements.getMaxTrailingPeers())) { + LOG.atTrace() + .setMessage( + "Adding trailing peer {} would exceed max trailing peers {}. Disconnecting...") + .addArgument(peer.getLoggableId()) + .addArgument(trailingPeerRequirements.getMaxTrailingPeers()) + .log(); + peer.disconnect( + DisconnectMessage.DisconnectReason.USELESS_PEER_EXCEEDS_TRAILING_PEERS); + return; + } + } + } + + peer.chainState().updateHeightEstimate(peerHeadBlockHeader.getNumber()); + CompletableFuture isServingSnapFuture; + if (SyncMode.isCheckpointSync(syncMode) || SyncMode.isSnapSync(syncMode)) { + // even if we have finished the snap sync, we still want to know if the peer is a snap + // server + isServingSnapFuture = + CompletableFuture.runAsync( + () -> { + try { + checkIsSnapServer(peer, peerHeadBlockHeader); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } else { + isServingSnapFuture = CompletableFuture.completedFuture(null); + } + isServingSnapFuture.thenRun( + () -> { + if (!peer.getConnection().isDisconnected() && addPeerToEthPeers(peer)) { + connectedPeersCounter.inc(); + connectCallbacks.forEach(cb -> cb.onPeerConnected(peer)); + } + }); + } + }); + } + + private void checkIsSnapServer(final EthPeer peer, final BlockHeader peersHeadBlockHeader) { + if (peer.getAgreedCapabilities().contains(SnapProtocol.SNAP1)) { + if (snapServerChecker != null) { + // set that peer is a snap server for doing the test + peer.setIsServingSnap(true); + Boolean isServer; + try { + isServer = snapServerChecker.check(peer, peersHeadBlockHeader).get(6L, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.atTrace() + .setMessage("Error checking if peer {} is a snap server. Setting to false.") + .addArgument(peer.getLoggableId()) + .log(); + peer.setIsServingSnap(false); + return; + } + peer.setIsServingSnap(isServer); + LOG.atTrace() + .setMessage("{}: peer {}") + .addArgument(isServer ? "Is a snap server" : "Is NOT a snap server") + .addArgument(peer.getLoggableId()) + .log(); + } } } @@ -468,7 +609,7 @@ private int compareByMaskedNodeId(final PeerConnection a, final PeerConnection b } private void enforceRemoteConnectionLimits() { - if (!shouldLimitRemoteConnections() || peerCount() < maxRemotelyInitiatedConnections) { + if (!shouldLimitRemoteConnections || peerCount() < maxRemotelyInitiatedConnections) { // Nothing to do return; } @@ -488,7 +629,7 @@ private void enforceRemoteConnectionLimits() { } private Stream getActivePrioritizedPeers() { - return completeConnections.values().stream() + return activeConnections.values().stream() .filter(p -> !p.isDisconnected()) .sorted(this::comparePeerPriorities); } @@ -512,19 +653,15 @@ private void enforceConnectionLimits() { }); } - private boolean remoteConnectionLimitReached() { - return shouldLimitRemoteConnections() - && countUntrustedRemotelyInitiatedConnections() >= maxRemotelyInitiatedConnections; - } - - private boolean shouldLimitRemoteConnections() { - return maxRemotelyInitiatedConnections < peerUpperBound; + private boolean inboundInitiatedConnectionLimitExceeded() { + return shouldLimitRemoteConnections + && countUntrustedRemotelyInitiatedConnections() > maxRemotelyInitiatedConnections; } private long countUntrustedRemotelyInitiatedConnections() { - return completeConnections.values().stream() - .map(ep -> ep.getConnection()) - .filter(c -> c.inboundInitiated()) + return activeConnections.values().stream() + .map(EthPeer::getConnection) + .filter(PeerConnection::inboundInitiated) .filter(c -> !c.isDisconnected()) .filter(conn -> !canExceedPeerLimits(conn.getPeer().getId())) .count(); @@ -534,67 +671,123 @@ private void onCacheRemoval( final RemovalNotification removalNotification) { if (removalNotification.wasEvicted()) { final PeerConnection peerConnectionRemoved = removalNotification.getKey(); - final PeerConnection peerConnectionOfEthPeer = removalNotification.getValue().getConnection(); - if (!peerConnectionRemoved.equals(peerConnectionOfEthPeer)) { - // If this connection is not the connection of the EthPeer by now we can disconnect - peerConnectionRemoved.disconnect(DisconnectMessage.DisconnectReason.ALREADY_CONNECTED); + final EthPeer peer = removalNotification.getValue(); + if (peer == null) { + return; + } + final PeerConnection peerConnectionOfEthPeer = peer.getConnection(); + if (peerConnectionRemoved != null) { + if (!peerConnectionRemoved.equals(peerConnectionOfEthPeer)) { + // If this connection is not the connection of the EthPeer by now we can disconnect + peerConnectionRemoved.disconnect(DisconnectMessage.DisconnectReason.ALREADY_CONNECTED); + } } } } - private boolean addPeerToEthPeers(final EthPeer peer) { + boolean addPeerToEthPeers(final EthPeer peer) { // We have a connection to a peer that is on the right chain and is willing to connect to us. - // Figure out whether we want to keep this peer and add it to the EthPeers connections. - if (completeConnections.containsValue(peer)) { + // Figure out whether we want to add it to the active connections. + final PeerConnection connection = peer.getConnection(); + if (activeConnections.containsValue(peer)) { return false; } - final PeerConnection connection = peer.getConnection(); + final Bytes id = peer.getId(); if (!randomPeerPriority) { - // Disconnect if too many peers - if (!canExceedPeerLimits(id) && peerCount() >= peerUpperBound) { - LOG.atTrace() - .setMessage("Too many peers. Disconnect connection: {}, max connections {}") - .addArgument(connection) - .addArgument(peerUpperBound) - .log(); - connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); - return false; - } - // Disconnect if too many remotely-initiated connections - if (connection.inboundInitiated() - && !canExceedPeerLimits(id) - && remoteConnectionLimitReached()) { - LOG.atTrace() - .setMessage( - "Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}") - .addArgument(connection) - .addArgument(maxRemotelyInitiatedConnections) - .log(); - connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); - return false; + + if (peerCount() >= peerUpperBound) { + final long numSnapServers = numberOfSnapServers(); + final boolean inboundLimitExceeded = inboundInitiatedConnectionLimitExceeded(); + // three reasons why we would disconnect an existing peer to accommodate the new peer + if (canExceedPeerLimits(id) + || (snapServerPeersNeeded + && numSnapServers < snapServerTargetNumber + && peer.isServingSnap()) + || (inboundLimitExceeded && !peer.getConnection().inboundInitiated())) { + + final boolean filterOutSnapServers = + snapServerPeersNeeded && (numSnapServers <= snapServerTargetNumber); + + // find and disconnect the least useful peer we can disconnect + activeConnections.values().stream() + .filter(p -> !canExceedPeerLimits(p.getId())) + .filter(filterOutSnapServers ? p -> !p.isServingSnap() : p -> true) + .filter(inboundLimitExceeded ? p -> p.getConnection().inboundInitiated() : p -> true) + .min(MOST_USEFUL_PEER) + .ifPresentOrElse( + pe -> { + pe.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); + LOG.atTrace() + .setMessage("Disconnecting peer {} to be replaced by prioritised peer {}") + .addArgument(pe.getLoggableId()) + .addArgument(peer.getLoggableId()) + .log(); + }, + () -> // disconnect the least useful peer + activeConnections.values().stream() + .filter(p -> !canExceedPeerLimits(p.getId())) + .min(MOST_USEFUL_PEER) + .ifPresent( + p -> { + p.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); + LOG.atTrace() + .setMessage( + "Disconnecting peer {} to be replaced by prioritised peer {}") + .addArgument(p.getLoggableId()) + .addArgument(peer.getLoggableId()) + .log(); + })); + } else { + LOG.atTrace() + .setMessage( + "Too many peers. Disconnect peer {} with connection: {}, max connections {}") + .addArgument(peer.getLoggableId()) + .addArgument(connection) + .addArgument(peerUpperBound) + .log(); + connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS); + return false; + } } - final boolean added = (completeConnections.putIfAbsent(id, peer) == null); + + final boolean added = (activeConnections.putIfAbsent(id, peer) == null); if (added) { LOG.atTrace() - .setMessage("Added peer {} with connection {} to completeConnections") + .setMessage("Added peer {} with connection {} to activeConnections") .addArgument(id) .addArgument(connection) .log(); } else { LOG.atTrace() - .setMessage("Did not add peer {} with connection {} to completeConnections") + .setMessage("Did not add peer {} with connection {} to activeConnections") .addArgument(id) .addArgument(connection) .log(); } return added; + } else { // randomPeerPriority! Add the peer and if there are too many connections fix it - completeConnections.putIfAbsent(id, peer); + // TODO: random peer priority does not care yet about snap server peers -> check later + activeConnections.putIfAbsent(id, peer); enforceRemoteConnectionLimits(); enforceConnectionLimits(); - return completeConnections.containsKey(id); + return activeConnections.containsKey(id); } } + + private long getNumTrailingPeers(final long minimumHeightToBeUpToDate) { + return streamAvailablePeers() + .filter(p -> p.chainState().getEstimatedHeight() < minimumHeightToBeUpToDate) + .count(); + } + + private boolean needMoreSnapServers() { + return snapServerPeersNeeded && numberOfSnapServers() < snapServerTargetNumber; + } + + private long numberOfSnapServers() { + return activeConnections.values().stream().filter(EthPeer::isServingSnap).count(); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index c8781e73e3c..5532bed865b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -34,7 +34,6 @@ import org.hyperledger.besu.ethereum.forkid.ForkId; import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager; -import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; @@ -161,41 +160,6 @@ public EthProtocolManager( ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled())); } - public EthProtocolManager( - final Blockchain blockchain, - final BigInteger networkId, - final WorldStateArchive worldStateArchive, - final TransactionPool transactionPool, - final EthProtocolConfiguration ethereumWireProtocolConfiguration, - final EthPeers ethPeers, - final EthMessages ethMessages, - final EthContext ethContext, - final List peerValidators, - final Optional mergePeerFilter, - final SynchronizerConfiguration synchronizerConfiguration, - final EthScheduler scheduler, - final List blockNumberForks, - final List timestampForks) { - this( - blockchain, - networkId, - worldStateArchive, - transactionPool, - ethereumWireProtocolConfiguration, - ethPeers, - ethMessages, - ethContext, - peerValidators, - mergePeerFilter, - synchronizerConfiguration, - scheduler, - new ForkIdManager( - blockchain, - blockNumberForks, - timestampForks, - ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled())); - } - public EthContext ethContext() { return ethContext; } @@ -398,28 +362,6 @@ public void handleNewConnection(final PeerConnection connection) { LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log(); } - @Override - public boolean shouldConnect(final Peer peer, final boolean incoming) { - if (peer.getForkId().map(forkIdManager::peerCheck).orElse(true)) { - LOG.atDebug() - .setMessage("ForkId OK or not available for peer {}") - .addArgument(peer::getLoggableId) - .log(); - if (ethPeers.shouldConnect(peer, incoming)) { - return true; - } - } else { - LOG.atDebug() - .setMessage("ForkId check failed for peer {} our fork id {} theirs {}") - .addArgument(peer::getLoggableId) - .addArgument(forkIdManager.getForkIdForChainHead()) - .addArgument(peer.getForkId()) - .log(); - return false; - } - return false; - } - @Override public void handleDisconnect( final PeerConnection connection, @@ -427,11 +369,10 @@ public void handleDisconnect( final boolean initiatedByPeer) { final boolean wasActiveConnection = ethPeers.registerDisconnect(connection); LOG.atDebug() - .setMessage("Disconnect - active Connection? {} - {} - {} {} - {} {} - {} peers left") + .setMessage("Disconnect - active Connection? {} - {} - {} - {} {} - {} peers left") .addArgument(wasActiveConnection) .addArgument(initiatedByPeer ? "Inbound" : "Outbound") - .addArgument(reason::getValue) - .addArgument(reason::name) + .addArgument(reason::toString) .addArgument(() -> connection.getPeer().getLoggableId()) .addArgument(() -> connection.getPeerInfo().getClientId()) .addArgument(ethPeers::peerCount) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerRequest.java index 09cbdb969d2..5daad56b9be 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PeerRequest.java @@ -19,4 +19,8 @@ public interface PeerRequest { ResponseStream sendRequest(EthPeer peer) throws PeerNotConnected; + + default boolean isEthPeerSuitable(final EthPeer ethPeer) { + return true; + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PendingPeerRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PendingPeerRequest.java index 120b32bdfe3..71da1e4d2d9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PendingPeerRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PendingPeerRequest.java @@ -86,6 +86,7 @@ private Optional getPeerToUse() { : ethPeers .streamAvailablePeers() .filter(peer -> peer.chainState().getEstimatedHeight() >= minimumBlockNumber) + .filter(request::isEthPeerSuitable) .min(EthPeers.LEAST_TO_MOST_BUSY); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetAccountRangeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetAccountRangeFromPeerTask.java index 75a6183eb4e..8e67ff4aeb2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetAccountRangeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetAccountRangeFromPeerTask.java @@ -17,10 +17,13 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.PeerRequest; import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest; +import org.hyperledger.besu.ethereum.eth.manager.RequestManager; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerRequestTask; import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage; import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -64,14 +67,34 @@ public static GetAccountRangeFromPeerTask forAccountRange( @Override protected PendingPeerRequest sendRequest() { return sendRequestToPeer( - peer -> { - LOG.trace( - "Requesting account range [{} ,{}] for state root {} from peer {} .", - startKeyHash, - endKeyHash, - blockHeader.getStateRoot(), - peer); - return peer.getSnapAccountRange(blockHeader.getStateRoot(), startKeyHash, endKeyHash); + new PeerRequest() { + @Override + public RequestManager.ResponseStream sendRequest(final EthPeer peer) + throws PeerConnection.PeerNotConnected { + LOG.atTrace() + .setMessage("Requesting account range [{} ,{}] for state root {} from peer {} .") + .addArgument(startKeyHash) + .addArgument(endKeyHash) + .addArgument(blockHeader) + .addArgument(peer) + .log(); + if (!peer.isServingSnap()) { + LOG.atDebug() + .setMessage("EthPeer that is not serving snap called in {}, peer: {}") + .addArgument(GetAccountRangeFromPeerTask.class) + .addArgument(peer) + .log(); + throw new RuntimeException( + "EthPeer that is not serving snap called in " + + GetAccountRangeFromPeerTask.class); + } + return peer.getSnapAccountRange(blockHeader.getStateRoot(), startKeyHash, endKeyHash); + } + + @Override + public boolean isEthPeerSuitable(final EthPeer ethPeer) { + return ethPeer.isServingSnap(); + } }, blockHeader.getNumber()); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetBytecodeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetBytecodeFromPeerTask.java index d57acc590c9..1c8c1d12d74 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetBytecodeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetBytecodeFromPeerTask.java @@ -21,10 +21,13 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.PeerRequest; import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest; +import org.hyperledger.besu.ethereum.eth.manager.RequestManager; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerRequestTask; import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage; import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -66,9 +69,32 @@ public static GetBytecodeFromPeerTask forBytecode( @Override protected PendingPeerRequest sendRequest() { return sendRequestToPeer( - peer -> { - LOG.trace("Requesting {} Bytecodes from {} .", codeHashes.size(), peer); - return peer.getSnapBytecode(blockHeader.getStateRoot(), codeHashes); + new PeerRequest() { + @Override + public RequestManager.ResponseStream sendRequest(final EthPeer peer) + throws PeerConnection.PeerNotConnected { + LOG.atTrace() + .setMessage("Requesting {} Bytecodes from {} .") + .addArgument(codeHashes.size()) + .addArgument(peer) + .log(); + if (!peer.isServingSnap()) { + LOG.atDebug() + .setMessage("EthPeer that is not serving snap called in {}, peer: {}") + .addArgument(GetAccountRangeFromPeerTask.class) + .addArgument(peer) + .log(); + throw new RuntimeException( + "EthPeer that is not serving snap called in " + + GetAccountRangeFromPeerTask.class); + } + return peer.getSnapBytecode(blockHeader.getStateRoot(), codeHashes); + } + + @Override + public boolean isEthPeerSuitable(final EthPeer ethPeer) { + return ethPeer.isServingSnap(); + } }, blockHeader.getNumber()); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetStorageRangeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetStorageRangeFromPeerTask.java index bcc8f23ebdd..cb3e4c83ea9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetStorageRangeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetStorageRangeFromPeerTask.java @@ -17,10 +17,13 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.PeerRequest; import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest; +import org.hyperledger.besu.ethereum.eth.manager.RequestManager; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerRequestTask; import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1; import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -69,15 +72,35 @@ public static GetStorageRangeFromPeerTask forStorageRange( @Override protected PendingPeerRequest sendRequest() { return sendRequestToPeer( - peer -> { - LOG.trace( - "Requesting storage range [{} ,{}] for {} accounts from peer {} .", - startKeyHash, - endKeyHash, - accountHashes.size(), - peer); - return peer.getSnapStorageRange( - blockHeader.getStateRoot(), accountHashes, startKeyHash, endKeyHash); + new PeerRequest() { + @Override + public RequestManager.ResponseStream sendRequest(final EthPeer peer) + throws PeerConnection.PeerNotConnected { + LOG.atTrace() + .setMessage("Requesting storage range [{} ,{}] for {} accounts from peer {} .") + .addArgument(startKeyHash) + .addArgument(endKeyHash) + .addArgument(accountHashes.size()) + .addArgument(peer) + .log(); + if (!peer.isServingSnap()) { + LOG.atDebug() + .setMessage("EthPeer that is not serving snap called in {}, peer: {}") + .addArgument(GetAccountRangeFromPeerTask.class) + .addArgument(peer) + .log(); + throw new RuntimeException( + "EthPeer that is not serving snap called in " + + GetAccountRangeFromPeerTask.class); + } + return peer.getSnapStorageRange( + blockHeader.getStateRoot(), accountHashes, startKeyHash, endKeyHash); + } + + @Override + public boolean isEthPeerSuitable(final EthPeer ethPeer) { + return ethPeer.isServingSnap(); + } }, blockHeader.getNumber()); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetTrieNodeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetTrieNodeFromPeerTask.java index 6b2e8cc6b00..a060ca79d0b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetTrieNodeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/GetTrieNodeFromPeerTask.java @@ -20,10 +20,13 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.PeerRequest; import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest; +import org.hyperledger.besu.ethereum.eth.manager.RequestManager; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerRequestTask; import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1; import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -72,9 +75,31 @@ public static GetTrieNodeFromPeerTask forTrieNodes( @Override protected PendingPeerRequest sendRequest() { return sendRequestToPeer( - peer -> { - LOG.trace("Requesting {} trie nodes from peer {}", paths.size(), peer); - return peer.getSnapTrieNode(blockHeader.getStateRoot(), paths); + new PeerRequest() { + @Override + public RequestManager.ResponseStream sendRequest(final EthPeer peer) + throws PeerConnection.PeerNotConnected { + LOG.atTrace() + .setMessage("Requesting {} trie nodes from peer {}") + .addArgument(paths.size()) + .addArgument(peer) + .log(); + if (!peer.isServingSnap()) { + LOG.debug( + "EthPeer that is not serving snap called in {}, {}", + GetAccountRangeFromPeerTask.class, + peer); + throw new RuntimeException( + "EthPeer that is not serving snap called in " + + GetAccountRangeFromPeerTask.class); + } + return peer.getSnapTrieNode(blockHeader.getStateRoot(), paths); + } + + @Override + public boolean isEthPeerSuitable(final EthPeer ethPeer) { + return ethPeer.isServingSnap(); + } }, blockHeader.getNumber()); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java index 36d6b75e6ee..0624a90589c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java @@ -31,6 +31,7 @@ public class RetryingGetAccountRangeFromPeerTask extends AbstractRetryingPeerTask { public static final int MAX_RETRIES = 4; + private final EthContext ethContext; private final Bytes32 startKeyHash; private final Bytes32 endKeyHash; @@ -79,4 +80,9 @@ protected CompletableFuture executePeerTas return peerResult.getResult(); }); } + + @Override + protected boolean isSuitablePeer(final EthPeer peer) { + return peer.isServingSnap(); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java index 9b340c8cdd4..3258298f2c7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java @@ -69,4 +69,9 @@ protected CompletableFuture> executePeerTask( return peerResult.getResult(); }); } + + @Override + protected boolean isSuitablePeer(final EthPeer peer) { + return peer.isServingSnap(); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java index cbda1a5fb07..731b0b7623b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java @@ -79,4 +79,9 @@ protected CompletableFuture executePeerTask( return peerResult.getResult(); }); } + + @Override + protected boolean isSuitablePeer(final EthPeer peer) { + return peer.isServingSnap(); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java index ca1c70f59b5..1abf0d72302 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java @@ -68,4 +68,9 @@ protected CompletableFuture> executePeerTask( return peerResult.getResult(); }); } + + @Override + protected boolean isSuitablePeer(final EthPeer peer) { + return peer.isServingSnap(); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java index 57b4d2c71a3..2c717d77a16 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java @@ -22,7 +22,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration; import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager; -import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractSnapMessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; @@ -143,11 +142,6 @@ public void processMessage(final Capability cap, final Message message) { @Override public void handleNewConnection(final PeerConnection connection) {} - @Override - public boolean shouldConnect(final Peer peer, final boolean incoming) { - return false; // EthManager is taking care of this for now - } - @Override public void handleDisconnect( final PeerConnection connection, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java index 46c3cf1b226..e7f1556b5a2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java @@ -67,8 +67,20 @@ protected AbstractRetryingPeerTask( this.metricsSystem = metricsSystem; } - public void assignPeer(final EthPeer peer) { - assignedPeer = Optional.of(peer); + /** + * Assign the peer to be used for the task. + * + * @param peer The peer to assign to the task. + * @return True if the peer was assigned, false otherwise. + */ + public boolean assignPeer(final EthPeer peer) { + if (isSuitablePeer(peer)) { + assignedPeer = Optional.of(peer); + return true; + } else { + assignedPeer = Optional.empty(); + return false; + } } public Optional getAssignedPeer() { @@ -167,4 +179,8 @@ public int getRetryCount() { public int getMaxRetries() { return maxRetries; } + + protected boolean isSuitablePeer(final EthPeer peer) { + return true; + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java index 88fe3bb6a73..a1021289779 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java @@ -49,9 +49,12 @@ protected AbstractRetryingSwitchingPeerTask( } @Override - public void assignPeer(final EthPeer peer) { - super.assignPeer(peer); - triedPeers.add(peer); + public boolean assignPeer(final EthPeer peer) { + if (super.assignPeer(peer)) { + triedPeers.add(peer); + return true; + } + return false; } protected abstract CompletableFuture executeTaskOnCurrentPeer(final EthPeer peer); @@ -62,8 +65,7 @@ protected CompletableFuture executePeerTask(final Optional assignedP final Optional maybePeer = assignedPeer .filter(u -> getRetryCount() == 1) // first try with the assigned peer if present - .map(Optional::of) - .orElseGet(this::selectNextPeer); // otherwise, select a new one from the pool + .or(this::selectNextPeer); // otherwise select a new one from the pool if (maybePeer.isEmpty()) { LOG.atTrace() @@ -101,7 +103,7 @@ protected CompletableFuture executePeerTask(final Optional assignedP @Override protected void handleTaskError(final Throwable error) { if (isPeerFailure(error)) { - getAssignedPeer().ifPresent(peer -> failedPeers.add(peer)); + getAssignedPeer().ifPresent(failedPeers::add); } super.handleTaskError(error); } @@ -124,10 +126,11 @@ private Optional selectNextPeer() { return maybeNextPeer; } - private Stream remainingPeersToTry() { + protected Stream remainingPeersToTry() { return getEthContext() .getEthPeers() .streamBestPeers() + .filter(this::isSuitablePeer) .filter(peer -> !triedPeers.contains(peer)); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTracker.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTracker.java index 5d563e012d7..67681acf6a7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTracker.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTracker.java @@ -19,24 +19,25 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.EthPeers.ConnectCallback; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; -import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; import org.hyperledger.besu.plugin.services.MetricsSystem; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ChainHeadTracker implements ConnectCallback { +public class ChainHeadTracker { private static final Logger LOG = LoggerFactory.getLogger(ChainHeadTracker.class); private final EthContext ethContext; private final ProtocolSchedule protocolSchedule; - private final TrailingPeerLimiter trailingPeerLimiter; private final MetricsSystem metricsSystem; public ChainHeadTracker( @@ -46,7 +47,6 @@ public ChainHeadTracker( final MetricsSystem metricsSystem) { this.ethContext = ethContext; this.protocolSchedule = protocolSchedule; - this.trailingPeerLimiter = trailingPeerLimiter; this.metricsSystem = metricsSystem; } @@ -60,48 +60,53 @@ public static void trackChainHeadForPeers( new TrailingPeerLimiter(ethContext.getEthPeers(), trailingPeerRequirementsCalculator); final ChainHeadTracker tracker = new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter, metricsSystem); - ethContext.getEthPeers().subscribeConnect(tracker); + ethContext.getEthPeers().setChainHeadTracker(tracker); blockchain.observeBlockAdded(trailingPeerLimiter); } - @Override - public void onPeerConnected(final EthPeer peer) { + public CompletableFuture getBestHeaderFromPeer(final EthPeer peer) { LOG.atDebug() .setMessage("Requesting chain head info from {}...") .addArgument(peer::getLoggableId) .log(); - GetHeadersFromPeerByHashTask.forSingleHash( + final CompletableFuture>> + bestHeaderFromPeerCompletableFuture = getBestHeaderFromPeerCompletableFuture(peer); + final CompletableFuture future = new CompletableFuture<>(); + bestHeaderFromPeerCompletableFuture.whenComplete( + (peerResult, error) -> { + if (peerResult != null && !peerResult.getResult().isEmpty()) { + final BlockHeader chainHeadHeader = peerResult.getResult().get(0); + peer.chainState().update(chainHeadHeader); + future.complete(chainHeadHeader); + LOG.atDebug() + .setMessage("Retrieved chain head info {} from {}...") + .addArgument( + () -> chainHeadHeader.getNumber() + " (" + chainHeadHeader.getBlockHash() + ")") + .addArgument(peer::getLoggableId) + .log(); + } else { + LOG.atDebug() + .setMessage("Failed to retrieve chain head info. Disconnecting {}... {}") + .addArgument(peer::getLoggableId) + .addArgument(error != null ? error : "Empty Response") + .log(); + peer.disconnect( + DisconnectMessage.DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD); + future.complete(null); + } + }); + return future; + } + + public CompletableFuture>> + getBestHeaderFromPeerCompletableFuture(final EthPeer peer) { + return GetHeadersFromPeerByHashTask.forSingleHash( protocolSchedule, ethContext, Hash.wrap(peer.chainState().getBestBlock().getHash()), 0, metricsSystem) .assignPeer(peer) - .run() - .whenComplete( - (peerResult, error) -> { - if (peerResult != null && !peerResult.getResult().isEmpty()) { - final BlockHeader chainHeadHeader = peerResult.getResult().get(0); - peer.chainState().update(chainHeadHeader); - trailingPeerLimiter.enforceTrailingPeerLimit(); - LOG.atDebug() - .setMessage("Retrieved chain head info {} from {}...") - .addArgument( - () -> - chainHeadHeader.getNumber() - + " (" - + chainHeadHeader.getBlockHash() - + ")") - .addArgument(peer::getLoggableId) - .log(); - } else { - LOG.atDebug() - .setMessage("Failed to retrieve chain head info. Disconnecting {}... {}") - .addArgument(peer::getLoggableId) - .addArgument(error) - .log(); - peer.disconnect(DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_STATE); - } - }); + .run(); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 5e596576aa3..7605754a65c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -99,6 +99,11 @@ public DefaultSynchronizer( this::calculateTrailingPeerRequirements, metricsSystem); + if (SyncMode.isSnapSync(syncConfig.getSyncMode()) + || SyncMode.isCheckpointSync(syncConfig.getSyncMode())) { + SnapServerChecker.createAndSetSnapServerChecker(ethContext, metricsSystem); + } + this.blockPropagationManager = terminationCondition.shouldStopDownload() ? Optional.empty() @@ -187,7 +192,7 @@ public DefaultSynchronizer( () -> getSyncStatus().isPresent() ? 0 : 1); } - private TrailingPeerRequirements calculateTrailingPeerRequirements() { + public TrailingPeerRequirements calculateTrailingPeerRequirements() { return fastSyncDownloader .flatMap(FastSyncDownloader::calculateTrailingPeerRequirements) .orElse( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SnapServerChecker.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SnapServerChecker.java new file mode 100644 index 00000000000..c7fa141837b --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SnapServerChecker.java @@ -0,0 +1,86 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.snap.GetAccountRangeFromPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask; +import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage; +import org.hyperledger.besu.plugin.services.MetricsSystem; + +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnapServerChecker { + + private static final Logger LOG = LoggerFactory.getLogger(SnapServerChecker.class); + + private final EthContext ethContext; + private final MetricsSystem metricsSystem; + + public SnapServerChecker(final EthContext ethContext, final MetricsSystem metricsSystem) { + this.ethContext = ethContext; + this.metricsSystem = metricsSystem; + } + + public static void createAndSetSnapServerChecker( + final EthContext ethContext, final MetricsSystem metricsSystem) { + final SnapServerChecker checker = new SnapServerChecker(ethContext, metricsSystem); + ethContext.getEthPeers().setSnapServerChecker(checker); + } + + public CompletableFuture check(final EthPeer peer, final BlockHeader peersHeadHeader) { + LOG.atTrace() + .setMessage("Checking whether peer {} is a snap server ...") + .addArgument(peer::getLoggableId) + .log(); + final CompletableFuture> + snapServerCheckCompletableFuture = getAccountRangeFromPeer(peer, peersHeadHeader); + final CompletableFuture future = new CompletableFuture<>(); + snapServerCheckCompletableFuture.whenComplete( + (peerResult, error) -> { + if (peerResult != null) { + if (!peerResult.getResult().accounts().isEmpty() + || !peerResult.getResult().proofs().isEmpty()) { + LOG.atTrace() + .setMessage("Peer {} is a snap server.") + .addArgument(peer::getLoggableId) + .log(); + future.complete(true); + } else { + LOG.atTrace() + .setMessage("Peer {} is not a snap server.") + .addArgument(peer::getLoggableId) + .log(); + future.complete(false); + } + } + }); + return future; + } + + public CompletableFuture> + getAccountRangeFromPeer(final EthPeer peer, final BlockHeader header) { + return GetAccountRangeFromPeerTask.forAccountRange( + ethContext, Hash.ZERO, Hash.ZERO, header, metricsSystem) + .assignPeer(peer) + .run(); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java index b5d0c2570bb..4f3affbe555 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java @@ -54,4 +54,8 @@ public static boolean isFullSync(final SyncMode syncMode) { public static boolean isCheckpointSync(final SyncMode syncMode) { return X_CHECKPOINT.equals(syncMode) || CHECKPOINT.equals(syncMode); } + + public static boolean isSnapSync(final SyncMode syncMode) { + return X_SNAP.equals(syncMode) || SNAP.equals(syncMode); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java index b587b18f264..2acfa54d04d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java @@ -42,6 +42,9 @@ public class SyncTargetManager extends AbstractSyncTargetManager { private static final Logger LOG = LoggerFactory.getLogger(SyncTargetManager.class); + + private static final int LOG_DEBUG_REPEAT_DELAY = 15; + private static final int LOG_INFO_REPEAT_DELAY = 120; private static final int SECONDS_PER_REQUEST = 6; // 5s per request + 1s wait between retries private final WorldStateStorageCoordinator worldStateStorageCoordinator; @@ -52,8 +55,6 @@ public class SyncTargetManager extends AbstractSyncTargetManager { private final FastSyncState fastSyncState; private final AtomicBoolean logDebug = new AtomicBoolean(true); private final AtomicBoolean logInfo = new AtomicBoolean(true); - private final int logDebugRepeatDelay = 15; - private final int logInfoRepeatDelay = 120; public SyncTargetManager( final SynchronizerConfiguration config, @@ -84,14 +85,14 @@ protected CompletableFuture> selectBestAvailableSyncTarget() { "Unable to find sync target. Currently checking %d peers for usefulness. Pivot block: %d", ethContext.getEthPeers().peerCount(), pivotBlockHeader.getNumber()), logDebug, - logDebugRepeatDelay); + LOG_DEBUG_REPEAT_DELAY); throttledLog( LOG::info, String.format( "Unable to find sync target. Currently checking %d peers for usefulness.", ethContext.getEthPeers().peerCount()), logInfo, - logInfoRepeatDelay); + LOG_INFO_REPEAT_DELAY); return completedFuture(Optional.empty()); } else { final EthPeer bestPeer = maybeBestPeer.get(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java index 59c02941428..f6c635aa51f 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java @@ -21,15 +21,18 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerDisconnectedException; import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage; +import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; @@ -37,6 +40,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import org.junit.jupiter.api.BeforeEach; @@ -56,6 +60,11 @@ public void setup() throws Exception { when(peerRequest.sendRequest(any())).thenReturn(responseStream); ethProtocolManager = EthProtocolManagerTestUtil.create(); ethPeers = ethProtocolManager.ethContext().getEthPeers(); + final ChainHeadTracker mock = mock(ChainHeadTracker.class); + final BlockHeader blockHeader = mock(BlockHeader.class); + when(mock.getBestHeaderFromPeer(any())) + .thenReturn(CompletableFuture.completedFuture(blockHeader)); + ethPeers.setChainHeadTracker(mock); } @Test @@ -112,6 +121,9 @@ public void comparesPeersWithTdAndNoHeight() { @Test public void shouldExecutePeerRequestImmediatelyWhenPeerIsAvailable() throws Exception { final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + + when(peerRequest.isEthPeerSuitable(peer.getEthPeer())).thenReturn(true); + final PendingPeerRequest pendingRequest = ethPeers.executePeerRequest(peerRequest, 10, Optional.empty()); @@ -127,6 +139,8 @@ public void shouldUseLeastBusyPeerForRequest() throws Exception { EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); useRequestSlot(workingPeer.getEthPeer()); + when(peerRequest.isEthPeerSuitable(any())).thenReturn(true); + final PendingPeerRequest pendingRequest = ethPeers.executePeerRequest(peerRequest, 10, Optional.empty()); @@ -147,6 +161,8 @@ public void shouldUseLeastRecentlyUsedPeerWhenBothHaveSameNumberOfOutstandingReq assertThat(leastRecentlyUsedPeer.getEthPeer().outstandingRequests()) .isEqualTo(mostRecentlyUsedPeer.getEthPeer().outstandingRequests()); + when(peerRequest.isEthPeerSuitable(any())).thenReturn(true); + final PendingPeerRequest pendingRequest = ethPeers.executePeerRequest(peerRequest, 10, Optional.empty()); @@ -180,10 +196,13 @@ public void shouldFailWhenAllPeersWithSufficientHeightHaveDisconnected() throws EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); useAllAvailableCapacity(suitablePeer.getEthPeer()); + when(peerRequest.isEthPeerSuitable(suitablePeer.getEthPeer())).thenReturn(true); + final PendingPeerRequest pendingRequest = ethPeers.executePeerRequest(peerRequest, 200, Optional.empty()); - verifyNoInteractions(peerRequest); + verify(peerRequest, times(0)).sendRequest(suitablePeer.getEthPeer()); + assertNotDone(pendingRequest); suitablePeer.disconnect(DisconnectReason.TOO_MANY_PEERS); @@ -194,6 +213,8 @@ public void shouldFailWhenAllPeersWithSufficientHeightHaveDisconnected() throws public void shouldFailWithPeerNotConnectedIfPeerRequestThrows() throws Exception { final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); when(peerRequest.sendRequest(peer.getEthPeer())).thenThrow(new PeerNotConnected("Oh dear")); + when(peerRequest.isEthPeerSuitable(any())).thenReturn(true); + final PendingPeerRequest pendingRequest = ethPeers.executePeerRequest(peerRequest, 100, Optional.empty()); @@ -205,9 +226,11 @@ public void shouldDelayExecutionUntilPeerHasCapacity() throws Exception { final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); useAllAvailableCapacity(peer.getEthPeer()); + when(peerRequest.isEthPeerSuitable(any())).thenReturn(true); + final PendingPeerRequest pendingRequest = ethPeers.executePeerRequest(peerRequest, 100, Optional.empty()); - verifyNoInteractions(peerRequest); + verify(peerRequest, times(0)).sendRequest(peer.getEthPeer()); freeUpCapacity(peer.getEthPeer()); @@ -221,11 +244,12 @@ public void shouldDelayExecutionUntilPeerWithSufficientHeightHasCapacity() throw EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + when(peerRequest.isEthPeerSuitable(peer.getEthPeer())).thenReturn(true); useAllAvailableCapacity(peer.getEthPeer()); final PendingPeerRequest pendingRequest = ethPeers.executePeerRequest(peerRequest, 100, Optional.empty()); - verifyNoInteractions(peerRequest); + verify(peerRequest, times(0)).sendRequest(peer.getEthPeer()); freeUpCapacity(peer.getEthPeer()); @@ -238,15 +262,17 @@ public void shouldNotExecuteAbortedRequest() throws Exception { final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); useAllAvailableCapacity(peer.getEthPeer()); + when(peerRequest.isEthPeerSuitable(peer.getEthPeer())).thenReturn(true); + final PendingPeerRequest pendingRequest = ethPeers.executePeerRequest(peerRequest, 100, Optional.empty()); - verifyNoInteractions(peerRequest); + verify(peerRequest, times(0)).sendRequest(peer.getEthPeer()); pendingRequest.abort(); freeUpCapacity(peer.getEthPeer()); - verifyNoInteractions(peerRequest); + verify(peerRequest, times(0)).sendRequest(peer.getEthPeer()); assertRequestFailure(pendingRequest, CancellationException.class); } @@ -349,6 +375,59 @@ public void toString_hasExpectedInfo() { assertThat(ethPeers.toString()).contains(peerA.getLoggableId()); } + @Test + public void snapServersPreferredWhileSyncing() { + + ethPeers.snapServerPeersNeeded(true); + + while (ethPeers.peerCount() < ethPeers.getMaxPeers()) { + final EthPeer ethPeer = + EthProtocolManagerTestUtil.createPeer( + ethProtocolManager, Difficulty.of(50), 20, false, false) + .getEthPeer(); + assertThat(ethPeers.addPeerToEthPeers(ethPeer)).isTrue(); + } + + final EthPeer nonSnapServingPeer = + EthProtocolManagerTestUtil.createPeer( + ethProtocolManager, Difficulty.of(50), 20, false, false) + .getEthPeer(); + + assertThat(ethPeers.addPeerToEthPeers(nonSnapServingPeer)).isFalse(); + assertThat(nonSnapServingPeer.getConnection().isDisconnected()).isTrue(); + + final EthPeer snapServingPeer = + EthProtocolManagerTestUtil.createPeer( + ethProtocolManager, Difficulty.of(50), 20, true, false) + .getEthPeer(); + + assertThat(ethPeers.addPeerToEthPeers(snapServingPeer)).isTrue(); + assertThat(ethPeers.peerCount()).isEqualTo(ethPeers.getMaxPeers()); + } + + @Test + public void snapServersNotPreferredWhenInSync() { + + ethPeers.snapServerPeersNeeded(false); + + while (ethPeers.peerCount() < ethPeers.getMaxPeers()) { + final EthPeer ethPeer = + EthProtocolManagerTestUtil.createPeer( + ethProtocolManager, Difficulty.of(50), 20, false, false) + .getEthPeer(); + assertThat(ethPeers.addPeerToEthPeers(ethPeer)).isTrue(); + } + + final EthPeer snapServingPeer = + EthProtocolManagerTestUtil.createPeer( + ethProtocolManager, Difficulty.of(50), 20, true, false) + .getEthPeer(); + + assertThat(ethPeers.addPeerToEthPeers(snapServingPeer)).isFalse(); + assertThat(snapServingPeer.getConnection().isDisconnected()).isTrue(); + assertThat(ethPeers.peerCount()).isEqualTo(ethPeers.getMaxPeers()); + } + private void freeUpCapacity(final EthPeer ethPeer) { ethPeers.dispatchMessage(ethPeer, new EthMessage(ethPeer, NodeDataMessage.create(emptyList()))); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java index ff5d655cc61..93a80be2fd2 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java @@ -528,7 +528,7 @@ public void respondToGetHeadersReversedWithSkip() private MockPeerConnection setupPeer( final EthProtocolManager ethManager, final PeerSendHandler onSend) { - final MockPeerConnection peer = setupPeerWithoutStatusExchange(ethManager, onSend); + final MockPeerConnection peerConnection = setupPeerWithoutStatusExchange(ethManager, onSend); final StatusMessage statusMessage = StatusMessage.create( EthProtocolVersion.V63, @@ -536,8 +536,11 @@ private MockPeerConnection setupPeer( blockchain.getChainHead().getTotalDifficulty(), blockchain.getChainHeadHash(), blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get().getHash()); - ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(peer, statusMessage)); - return peer; + ethManager.processMessage(EthProtocol.ETH63, new DefaultMessage(peerConnection, statusMessage)); + final EthPeers ethPeers = ethManager.ethContext().getEthPeers(); + final EthPeer ethPeer = ethPeers.peer(peerConnection); + ethPeers.addPeerToEthPeers(ethPeer); + return peerConnection; } private MockPeerConnection setupPeerWithoutStatusExchange( diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java index 63ada130408..0b0bd1e3eb7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -16,12 +16,15 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import org.hyperledger.besu.config.GenesisConfigFile; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.ChainHead; import org.hyperledger.besu.ethereum.chain.GenesisState; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture; @@ -29,6 +32,8 @@ import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; +import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; +import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.forkid.ForkIdManager; @@ -46,8 +51,10 @@ import java.util.Collections; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; import org.apache.tuweni.bytes.Bytes; +import org.mockito.Mockito; public class EthProtocolManagerTestUtil { @@ -86,7 +93,13 @@ public static EthProtocolManager create( Bytes.random(64), 25, 25, - false); + false, + SyncMode.FAST, + new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false)); + + final ChainHeadTracker chainHeadTrackerMock = getChainHeadTrackerMock(); + peers.setChainHeadTracker(chainHeadTrackerMock); + final EthMessages messages = new EthMessages(); final EthScheduler ethScheduler = new DeterministicEthScheduler(TimeoutPolicy.NEVER_TIMEOUT); final EthContext ethContext = new EthContext(peers, messages, ethScheduler); @@ -139,6 +152,8 @@ public static EthProtocolManager create( final EthContext ethContext, final ForkIdManager forkIdManager) { + ethPeers.setChainHeadTracker(getChainHeadTrackerMock()); + final BigInteger networkId = BigInteger.ONE; return new EthProtocolManager( blockchain, @@ -205,9 +220,15 @@ public static EthProtocolManager create( Bytes.random(64), 25, 25, - false); + false, + SyncMode.FAST, + new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false)); final EthMessages messages = new EthMessages(); + final ChainHeadTracker chtMock = getChainHeadTrackerMock(); + + peers.setChainHeadTracker(chtMock); + return create( blockchain, ethScheduler, @@ -219,6 +240,17 @@ public static EthProtocolManager create( new EthContext(peers, messages, ethScheduler)); } + public static ChainHeadTracker getChainHeadTrackerMock() { + final ChainHeadTracker chtMock = mock(ChainHeadTracker.class); + final BlockHeader blockHeaderMock = mock(BlockHeader.class); + Mockito.lenient() + .when(chtMock.getBestHeaderFromPeer(any())) + .thenReturn(CompletableFuture.completedFuture(blockHeaderMock)); + Mockito.lenient().when(blockHeaderMock.getNumber()).thenReturn(0L); + Mockito.lenient().when(blockHeaderMock.getStateRoot()).thenReturn(Hash.ZERO); + return chtMock; + } + public static EthProtocolManager create( final ProtocolSchedule protocolSchedule, final Blockchain blockchain, @@ -239,7 +271,9 @@ public static EthProtocolManager create( Bytes.random(64), 25, 25, - false); + false, + SyncMode.FAST, + new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false)); final EthMessages messages = new EthMessages(); return create( @@ -258,7 +292,7 @@ public static EthProtocolManager create( final ProtocolSchedule protocolSchedule, final Blockchain blockchain, final EthScheduler ethScheduler) { - final EthPeers peers = + final EthPeers ethPeers = new EthPeers( EthProtocol.NAME, () -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader()), @@ -269,7 +303,13 @@ public static EthProtocolManager create( Bytes.random(64), 25, 25, - false); + false, + SyncMode.FAST, + new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false)); + + final ChainHeadTracker chainHeadTrackerMock = getChainHeadTrackerMock(); + ethPeers.setChainHeadTracker(chainHeadTrackerMock); + final EthMessages messages = new EthMessages(); return create( @@ -278,9 +318,9 @@ public static EthProtocolManager create( BlockchainSetupUtil.forTesting(DataStorageFormat.FOREST).getWorldArchive(), mock(TransactionPool.class), EthProtocolConfiguration.defaultConfig(), - peers, + ethPeers, messages, - new EthContext(peers, messages, ethScheduler)); + new EthContext(ethPeers, messages, ethScheduler)); } public static EthProtocolManager create() { @@ -446,4 +486,19 @@ public static RespondingEthPeer createPeer( .estimatedHeight(blockchain.getChainHeadBlockNumber()) .build(); } + + public static RespondingEthPeer createPeer( + final EthProtocolManager ethProtocolManager, + final Difficulty td, + final int estimatedHeight, + final boolean isServingSnap, + final boolean addToEthPeers) { + return RespondingEthPeer.builder() + .ethProtocolManager(ethProtocolManager) + .totalDifficulty(td) + .estimatedHeight(estimatedHeight) + .isServingSnap(isServingSnap) + .addToEthPeers(addToEthPeers) + .build(); + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java index 7427545984b..58f4893432c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RespondingEthPeer.java @@ -121,7 +121,9 @@ private static RespondingEthPeer create( final Hash chainHeadHash, final Difficulty totalDifficulty, final OptionalLong estimatedHeight, - final List peerValidators) { + final List peerValidators, + final boolean isServingSnap, + final boolean addToEthPeers) { final EthPeers ethPeers = ethProtocolManager.ethContext().getEthPeers(); final Set caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63)); @@ -130,10 +132,24 @@ private static RespondingEthPeer create( new MockPeerConnection( caps, (cap, msg, conn) -> outgoingMessages.add(new OutgoingMessage(cap, msg))); ethPeers.registerNewConnection(peerConnection, peerValidators); + final int before = ethPeers.peerCount(); final EthPeer peer = ethPeers.peer(peerConnection); peer.registerStatusReceived(chainHeadHash, totalDifficulty, 63, peerConnection); estimatedHeight.ifPresent(height -> peer.chainState().update(chainHeadHash, height)); - peer.registerStatusSent(peerConnection); + if (addToEthPeers) { + peer.registerStatusSent(peerConnection); + ethPeers.addPeerToEthPeers(peer); + while (ethPeers.peerCount() + <= before) { // this is needed to make sure that the peer is added to the active + // connections + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + peer.setIsServingSnap(isServingSnap); return new RespondingEthPeer( ethProtocolManager, snapProtocolManager, peerConnection, peer, outgoingMessages); @@ -396,6 +412,8 @@ public static class Builder { private Difficulty totalDifficulty = Difficulty.of(1000L); private OptionalLong estimatedHeight = OptionalLong.of(1000L); private final List peerValidators = new ArrayList<>(); + private boolean isServingSnap = false; + private boolean addToEthPeers = true; public RespondingEthPeer build() { checkNotNull(ethProtocolManager, "Must configure EthProtocolManager"); @@ -406,7 +424,9 @@ public RespondingEthPeer build() { chainHeadHash, totalDifficulty, estimatedHeight, - peerValidators); + peerValidators, + isServingSnap, + addToEthPeers); } public Builder ethProtocolManager(final EthProtocolManager ethProtocolManager) { @@ -444,6 +464,11 @@ public Builder estimatedHeight(final long estimatedHeight) { return this; } + public Builder isServingSnap(final boolean isServingSnap) { + this.isServingSnap = isServingSnap; + return this; + } + public Builder peerValidators(final List peerValidators) { checkNotNull(peerValidators); this.peerValidators.addAll(peerValidators); @@ -454,6 +479,11 @@ public Builder peerValidators(final PeerValidator... peerValidators) { peerValidators(Arrays.asList(peerValidators)); return this; } + + public Builder addToEthPeers(final boolean addToEthPeers) { + this.addToEthPeers = addToEthPeers; + return this; + } } static class OutgoingMessage { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java index 2f0657d23d7..6dbea259cc2 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java @@ -38,11 +38,13 @@ import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; +import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory; +import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -121,7 +123,10 @@ public void setupTest() { Bytes.random(64), MAX_PEERS, MAX_PEERS, - false)); + false, + SyncMode.FAST, + new ForkIdManager( + blockchain, Collections.emptyList(), Collections.emptyList(), false))); final EthMessages ethMessages = new EthMessages(); final EthScheduler ethScheduler = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java index a91b19c8e9e..dcc0239f616 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java @@ -55,6 +55,7 @@ import org.hyperledger.besu.ethereum.eth.sync.BlockPropagationManager.ProcessingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; +import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; @@ -629,7 +630,10 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() { Bytes.random(64), 25, 25, - false), + false, + SyncMode.SNAP, + new ForkIdManager( + blockchain, Collections.emptyList(), Collections.emptyList(), false)), new EthMessages(), ethScheduler); final BlockPropagationManager blockPropagationManager = @@ -767,7 +771,10 @@ public Object answer(final InvocationOnMock invocation) throws Throwable { Bytes.random(64), 25, 25, - false), + false, + SyncMode.SNAP, + new ForkIdManager( + blockchain, Collections.emptyList(), Collections.emptyList(), false)), new EthMessages(), ethScheduler); final BlockPropagationManager blockPropagationManager = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java index 393824af755..b46a109c122 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/ChainHeadTrackerTest.java @@ -98,7 +98,7 @@ public void shouldRequestHeaderChainHeadWhenNewPeerConnects( blockchainSetupUtil.getBlockchain(), blockchainSetupUtil.getWorldArchive(), blockchainSetupUtil.getTransactionPool()); - chainHeadTracker.onPeerConnected(respondingPeer.getEthPeer()); + chainHeadTracker.getBestHeaderFromPeer(respondingPeer.getEthPeer()); Assertions.assertThat(chainHeadState().getEstimatedHeight()).isZero(); @@ -118,7 +118,7 @@ public void shouldIgnoreHeadersIfChainHeadHasAlreadyBeenUpdatedWhileWaiting( blockchainSetupUtil.getBlockchain(), blockchainSetupUtil.getWorldArchive(), blockchainSetupUtil.getTransactionPool()); - chainHeadTracker.onPeerConnected(respondingPeer.getEthPeer()); + chainHeadTracker.getBestHeaderFromPeer(respondingPeer.getEthPeer()); // Change the hash of the current known head respondingPeer.getEthPeer().chainState().statusReceived(Hash.EMPTY_TRIE_HASH, Difficulty.ONE); @@ -137,7 +137,7 @@ public void shouldCheckTrialingPeerLimits(final DataStorageFormat storageFormat) blockchainSetupUtil.getBlockchain(), blockchainSetupUtil.getWorldArchive(), blockchainSetupUtil.getTransactionPool()); - chainHeadTracker.onPeerConnected(respondingPeer.getEthPeer()); + chainHeadTracker.getBestHeaderFromPeer(respondingPeer.getEthPeer()); Assertions.assertThat(chainHeadState().getEstimatedHeight()).isZero(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java index ae96bbf7681..41317fdab38 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockRetrieverTest.java @@ -288,15 +288,15 @@ public void shouldRecoverFromUnresponsivePeer(final DataStorageFormat storageFor final CompletableFuture future = pivotBlockRetriever.downloadPivotBlockHeader(); peerA.respond(responder); - peerB.respondTimes(emptyResponder, 3); + peerB.respondTimes(emptyResponder, 4); // PeerA should have responded, while peerB is being retried, peerC shouldn't have been queried // yet assertThat(future).isNotCompleted(); assertThat(peerC.hasOutstandingRequests()).isFalse(); - // After exhausting retries for peerB, we should try peerC - peerB.respondTimes(emptyResponder, 2); + // After exhausting retries (max retries is 5) for peerB, we should try peerC + peerB.respondTimes(emptyResponder, 1); peerC.respond(responder); assertThat(future) diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java index 743f0b47e2c..8e9bda89c05 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java @@ -18,6 +18,7 @@ import static java.util.Objects.requireNonNull; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryWorldStateArchive; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -30,6 +31,7 @@ import org.hyperledger.besu.ethereum.chain.BadBlockManager; import org.hyperledger.besu.ethereum.chain.GenesisState; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions; import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; import org.hyperledger.besu.ethereum.core.MiningParameters; @@ -42,8 +44,11 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; +import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; +import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration; @@ -77,6 +82,7 @@ import io.vertx.core.Vertx; import org.apache.tuweni.bytes.Bytes; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +159,12 @@ public boolean isMessagePermitted(final EnodeURL destinationEnode, final int cod Bytes.random(64), 25, 25, - false); + false, + SyncMode.SNAP, + new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false)); + + final ChainHeadTracker mockCHT = getChainHeadTracker(); + ethPeers.setChainHeadTracker(mockCHT); final EthScheduler scheduler = new EthScheduler(1, 1, 1, metricsSystem); final EthContext ethContext = new EthContext(ethPeers, ethMessages, scheduler); @@ -189,6 +200,7 @@ public boolean isMessagePermitted(final EnodeURL destinationEnode, final int cod NetworkRunner.builder() .subProtocols(EthProtocol.get()) .protocolManagers(singletonList(ethProtocolManager)) + .ethPeersShouldConnect((p, d) -> true) .network( capabilities -> DefaultP2PNetwork.builder() @@ -201,8 +213,8 @@ public boolean isMessagePermitted(final EnodeURL destinationEnode, final int cod .blockchain(blockchain) .blockNumberForks(Collections.emptyList()) .timestampForks(Collections.emptyList()) - .allConnectionsSupplier(ethPeers::getAllConnections) - .allActiveConnectionsSupplier(ethPeers::getAllActiveConnections) + .allConnectionsSupplier(ethPeers::streamAllConnections) + .allActiveConnectionsSupplier(ethPeers::streamAllActiveConnections) .build()) .metricsSystem(new NoOpMetricsSystem()) .build(); @@ -217,6 +229,16 @@ public boolean isMessagePermitted(final EnodeURL destinationEnode, final int cod selfPeer = DefaultPeer.fromEnodeURL(network.getLocalEnode().get()); } + private static ChainHeadTracker getChainHeadTracker() { + final ChainHeadTracker mockCHT = mock(ChainHeadTracker.class); + final BlockHeader mockBlockHeader = mock(BlockHeader.class); + Mockito.lenient().when(mockBlockHeader.getNumber()).thenReturn(0L); + Mockito.lenient() + .when(mockCHT.getBestHeaderFromPeer(any())) + .thenReturn(CompletableFuture.completedFuture(mockBlockHeader)); + return mockCHT; + } + public Bytes id() { return nodeKey.getPublicKey().getEncodedBytes(); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java index d6b1b65381f..ef8d0701782 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java @@ -34,6 +34,7 @@ import org.hyperledger.besu.ethereum.chain.BadBlockManager; import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.core.PrivacyParameters; @@ -44,6 +45,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredPendingTransactions; @@ -102,6 +104,9 @@ public class TransactionPoolFactoryTest { @BeforeEach public void setup() { when(blockchain.getBlockHashByNumber(anyLong())).thenReturn(Optional.of(mock(Hash.class))); + final Block mockBlock = mock(Block.class); + when(mockBlock.getHash()).thenReturn(Hash.ZERO); + when(blockchain.getGenesisBlock()).thenReturn(mockBlock); when(context.getBlockchain()).thenReturn(blockchain); final NodeMessagePermissioningProvider nmpp = (destinationEnode, code) -> true; @@ -116,7 +121,9 @@ public void setup() { Bytes.random(64), 25, 25, - false); + false, + SyncMode.SNAP, + new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false)); when(ethContext.getEthMessages()).thenReturn(ethMessages); when(ethContext.getEthPeers()).thenReturn(ethPeers); diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/NetworkRunner.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/NetworkRunner.java index 028d5acd89c..77e9903bb89 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/NetworkRunner.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/NetworkRunner.java @@ -14,6 +14,7 @@ */ package org.hyperledger.besu.ethereum.p2p.network; +import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; @@ -31,6 +32,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -47,15 +49,18 @@ public class NetworkRunner implements AutoCloseable { private final Map subProtocols; private final List protocolManagers; private final LabelledMetric inboundMessageCounter; + private final BiFunction ethPeersShouldConnect; private NetworkRunner( final P2PNetwork network, final Map subProtocols, final List protocolManagers, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final BiFunction ethPeersShouldConnect) { this.network = network; this.protocolManagers = protocolManagers; this.subProtocols = subProtocols; + this.ethPeersShouldConnect = ethPeersShouldConnect; inboundMessageCounter = metricsSystem.createLabelledCounter( BesuMetricCategory.NETWORK, @@ -158,8 +163,7 @@ private void setupHandlers() { protocolManager.handleNewConnection(connection); }); - network.subscribeConnectRequest( - (peer, incoming) -> protocolManager.shouldConnect(peer, incoming)); + network.subscribeConnectRequest(ethPeersShouldConnect::apply); network.subscribeDisconnect( (connection, disconnectReason, initiatedByPeer) -> { @@ -186,6 +190,7 @@ public static class Builder { List protocolManagers = new ArrayList<>(); List subProtocols = new ArrayList<>(); MetricsSystem metricsSystem; + private BiFunction ethPeersShouldConnect; public NetworkRunner build() { final Map subProtocolMap = new HashMap<>(); @@ -203,7 +208,8 @@ public NetworkRunner build() { } } final P2PNetwork network = networkProvider.build(caps); - return new NetworkRunner(network, subProtocolMap, protocolManagers, metricsSystem); + return new NetworkRunner( + network, subProtocolMap, protocolManagers, metricsSystem, ethPeersShouldConnect); } public Builder protocolManagers(final List protocolManagers) { @@ -230,6 +236,11 @@ public Builder metricsSystem(final MetricsSystem metricsSystem) { this.metricsSystem = metricsSystem; return this; } + + public Builder ethPeersShouldConnect(final BiFunction shouldConnect) { + this.ethPeersShouldConnect = shouldConnect; + return this; + } } @FunctionalInterface diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/ProtocolManager.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/ProtocolManager.java index 603464d6900..c61e6d907cd 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/ProtocolManager.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/ProtocolManager.java @@ -14,7 +14,6 @@ */ package org.hyperledger.besu.ethereum.p2p.network; -import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message; @@ -59,15 +58,6 @@ public interface ProtocolManager extends AutoCloseable { */ void handleNewConnection(PeerConnection peerConnection); - /** - * Call this to find out whether we should try to connect to a certain peer - * - * @param peer the peer that we are trying to connect to - * @param incoming true if the connection is incoming - * @return true, if the ProtocolManager wants to connect to the peer, false otherwise - */ - boolean shouldConnect(Peer peer, final boolean incoming); - /** * Handles peer disconnects. * diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/messages/DisconnectMessage.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/messages/DisconnectMessage.java index 87df88c40ea..d951ecc5d9e 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/messages/DisconnectMessage.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/wire/messages/DisconnectMessage.java @@ -129,11 +129,11 @@ public enum DisconnectReason { USELESS_PEER_NO_SHARED_CAPABILITIES((byte) 0x03, "No shared capabilities"), USELESS_PEER_WORLD_STATE_NOT_AVAILABLE((byte) 0x03, "World state not available"), USELESS_PEER_MISMATCHED_PIVOT_BLOCK((byte) 0x03, "Mismatched pivot block"), - USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_STATE( - (byte) 0x03, "Failed to retrieve header for chain state"), + USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD((byte) 0x03, "Failed to retrieve chain head header"), USELESS_PEER_CANNOT_CONFIRM_PIVOT_BLOCK((byte) 0x03, "Peer failed to confirm pivot block"), USELESS_PEER_BY_REPUTATION((byte) 0x03, "Lowest reputation score"), USELESS_PEER_BY_CHAIN_COMPARATOR((byte) 0x03, "Lowest by chain height comparator"), + USELESS_PEER_EXCEEDS_TRAILING_PEERS((byte) 0x03, "Adding peer would exceed max trailing peers"), TOO_MANY_PEERS((byte) 0x04), ALREADY_CONNECTED((byte) 0x05), INCOMPATIBLE_P2P_PROTOCOL_VERSION((byte) 0x06), diff --git a/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java b/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java index eb43f96dfb9..fd39ef7f44c 100644 --- a/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java +++ b/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/RetestethContext.java @@ -42,12 +42,14 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; import org.hyperledger.besu.ethereum.eth.transactions.ImmutableTransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory; +import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.EpochCalculator; import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode; import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; @@ -73,6 +75,7 @@ import org.hyperledger.besu.util.number.Fraction; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -239,7 +242,9 @@ private boolean buildContext( localNodeKey, MAX_PEERS, MAX_PEERS, - false); + false, + SyncMode.FAST, + new ForkIdManager(blockchain, List.of(), List.of(), false)); final SyncState syncState = new SyncState(blockchain, ethPeers); ethScheduler = new EthScheduler(1, 1, 1, 1, metricsSystem);