From b412bd39ddc6f8848008b3c1539e0d75f9b0756e Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 4 Sep 2023 05:23:54 +0530 Subject: [PATCH] Made PSBA child class of PSA --- .../gateway/BaseGatewayShardAllocator.java | 15 + .../gateway/BaseNodeGatewayStartedShards.java | 19 + .../gateway/PrimaryShardAllocator.java | 201 +++++++--- .../gateway/PrimaryShardBatchAllocator.java | 375 ++++-------------- ...ransportNodesListGatewayStartedShards.java | 2 +- ...ortNodesListGatewayStartedShardsBatch.java | 2 +- 6 files changed, 258 insertions(+), 356 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/BaseNodeGatewayStartedShards.java diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index 40d3f654ba35f..309852eb155be 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; @@ -47,6 +48,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -192,4 +194,17 @@ protected static List buildDecisionsForAllNodes(ShardRouti } return results; } + + + protected interface INodeShardState { + BaseNodeGatewayStartedShards getShardState(); + DiscoveryNode getNode(); + } + protected interface INodeShardStates { + void add(V value); + void add(T key, V value); + V get(T key); + int size(); + Iterator iterator(); + } } diff --git a/server/src/main/java/org/opensearch/gateway/BaseNodeGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/BaseNodeGatewayStartedShards.java new file mode 100644 index 0000000000000..957203361a3dc --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/BaseNodeGatewayStartedShards.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +public interface BaseNodeGatewayStartedShards { + + String allocationId(); + boolean primary(); + ReplicationCheckpoint replicationCheckpoint(); + Exception storeException(); +} diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 00680c3ee51f6..597add1d35570 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -57,6 +57,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -81,7 +82,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { /** * Is the allocator responsible for allocating the given {@link ShardRouting}? */ - private static boolean isResponsibleFor(final ShardRouting shard) { + protected static boolean isResponsibleFor(final ShardRouting shard) { return shard.primary() // must be primary && shard.unassigned() // must be unassigned // only handle either an existing store or a snapshot recovery @@ -121,7 +122,13 @@ public AllocateUnassignedDecision makeAllocationDecision( } return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } + NodeShardStates nodeShardStates = new NodeShardStates((shardState.getData().values().stream().map(NodeShardState::new).collect(Collectors.toList()))); + return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); + } + protected AllocateUnassignedDecision getAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, + INodeShardStates shardState, Logger logger) { + final boolean explain = allocation.debugDecision(); // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); @@ -186,22 +193,23 @@ public AllocateUnassignedDecision makeAllocationDecision( boolean throttled = false; if (nodesToAllocate.yesNodeShards.isEmpty() == false) { DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); + INodeShardState nodeShardState = decidedNode.nodeShardState; logger.debug( "[{}][{}]: allocating [{}] to [{}] on primary allocation", unassignedShard.index(), unassignedShard.id(), unassignedShard, - decidedNode.nodeShardState.getNode() + nodeShardState.getNode() ); - node = decidedNode.nodeShardState.getNode(); - allocationId = decidedNode.nodeShardState.allocationId(); + node = nodeShardState.getNode(); + allocationId = nodeShardState.getShardState().allocationId(); } else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) { // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard // can be force-allocated to one of the nodes. nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true); if (nodesToAllocate.yesNodeShards.isEmpty() == false) { final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState; + final INodeShardState nodeShardState = decidedNode.nodeShardState; logger.debug( "[{}][{}]: allocating [{}] to [{}] on forced primary allocation", unassignedShard.index(), @@ -210,7 +218,7 @@ public AllocateUnassignedDecision makeAllocationDecision( nodeShardState.getNode() ); node = nodeShardState.getNode(); - allocationId = nodeShardState.allocationId(); + allocationId = nodeShardState.getShardState().allocationId(); } else if (nodesToAllocate.throttleNodeShards.isEmpty() == false) { logger.debug( "[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation", @@ -261,11 +269,11 @@ public AllocateUnassignedDecision makeAllocationDecision( */ private static List buildNodeDecisions( NodesToAllocate nodesToAllocate, - FetchResult fetchedShardData, + INodeShardStates fetchedShardData, Set inSyncAllocationIds ) { List nodeResults = new ArrayList<>(); - Collection ineligibleShards; + Collection ineligibleShards = new ArrayList<>(); if (nodesToAllocate != null) { final Set discoNodes = new HashSet<>(); nodeResults.addAll( @@ -275,47 +283,47 @@ private static List buildNodeDecisions( discoNodes.add(dnode.nodeShardState.getNode()); return new NodeAllocationResult( dnode.nodeShardState.getNode(), - shardStoreInfo(dnode.nodeShardState, inSyncAllocationIds), + shardStoreInfo(dnode.nodeShardState.getShardState(), inSyncAllocationIds), dnode.decision ); }) .collect(Collectors.toList()) ); - ineligibleShards = fetchedShardData.getData() - .values() - .stream() - .filter(shardData -> discoNodes.contains(shardData.getNode()) == false) - .collect(Collectors.toList()); + fetchedShardData.iterator().forEachRemaining(shardData -> { + if (discoNodes.contains(shardData.getNode()) == false) { + ineligibleShards.add(shardData); + } + }); } else { // there were no shard copies that were eligible for being assigned the allocation, // so all fetched shard data are ineligible shards - ineligibleShards = fetchedShardData.getData().values(); + fetchedShardData.iterator().forEachRemaining(ineligibleShards::add); } nodeResults.addAll( ineligibleShards.stream() - .map(shardData -> new NodeAllocationResult(shardData.getNode(), shardStoreInfo(shardData, inSyncAllocationIds), null)) + .map(shardData -> new NodeAllocationResult(shardData.getNode(), shardStoreInfo(shardData.getShardState(), inSyncAllocationIds), null)) .collect(Collectors.toList()) ); return nodeResults; } - private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set inSyncAllocationIds) { + protected static ShardStoreInfo shardStoreInfo(BaseNodeGatewayStartedShards nodeShardState, Set inSyncAllocationIds) { final Exception storeErr = nodeShardState.storeException(); final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId()); return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr); } - private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( - (NodeGatewayStartedShards state) -> state.storeException() == null + private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( + (INodeShardState state) -> state.getShardState().storeException() == null ).reversed(); - private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::primary + private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( + (INodeShardState state) -> state.getShardState().primary() ).reversed(); - private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::replicationCheckpoint, + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( + (INodeShardState state) -> state.getShardState().replicationCheckpoint(), Comparator.nullsLast(Comparator.naturalOrder()) ); @@ -324,25 +332,27 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but * entries with matching allocation id are always at the front of the list. */ - protected static NodeShardsResult buildNodeShardsResult( + protected NodeShardsResult buildNodeShardsResult( ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, Set inSyncAllocationIds, - FetchResult shardState, + INodeShardStates shardState, Logger logger ) { - List nodeShardStates = new ArrayList<>(); + NodeShardStates nodeShardStates = new NodeShardStates(); int numberOfAllocationsFound = 0; - for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { + Iterator iterator = shardState.iterator(); + while (iterator.hasNext()) { + NodeShardState nodeShardState = (NodeShardState) iterator.next(); DiscoveryNode node = nodeShardState.getNode(); - String allocationId = nodeShardState.allocationId(); + String allocationId = nodeShardState.getShardState().allocationId(); if (ignoreNodes.contains(node.getId())) { continue; } - if (nodeShardState.storeException() == null) { + if (nodeShardState.getShardState().storeException() == null) { if (allocationId == null) { logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode()); } else { @@ -350,7 +360,7 @@ protected static NodeShardsResult buildNodeShardsResult( } } else { final String finalAllocationId = allocationId; - if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { + if (nodeShardState.getShardState().storeException() instanceof ShardLockObtainFailedException) { logger.trace( () -> new ParameterizedMessage( "[{}] on node [{}] has allocation id [{}] but the store can not be " @@ -359,7 +369,7 @@ protected static NodeShardsResult buildNodeShardsResult( nodeShardState.getNode(), finalAllocationId ), - nodeShardState.storeException() + nodeShardState.getShardState().storeException() ); } else { logger.trace( @@ -369,35 +379,48 @@ protected static NodeShardsResult buildNodeShardsResult( nodeShardState.getNode(), finalAllocationId ), - nodeShardState.storeException() + nodeShardState.getShardState().storeException() ); allocationId = null; } } if (allocationId != null) { - assert nodeShardState.storeException() == null || nodeShardState.storeException() instanceof ShardLockObtainFailedException + assert nodeShardState.getShardState().storeException() == null || nodeShardState.getShardState().storeException() instanceof ShardLockObtainFailedException : "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " + "store throwing " - + nodeShardState.storeException(); + + nodeShardState.getShardState().storeException(); numberOfAllocationsFound++; - if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { + if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.getShardState().allocationId())) { nodeShardStates.add(nodeShardState); } } } + nodeShardStates.sort(getComparator(matchAnyShard, inSyncAllocationIds)); + + if (logger.isTraceEnabled()) { + logger.trace( + "{} candidates for allocation: {}", + shard, + nodeShardStates.nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")) + ); + } + return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); + } + + protected static Comparator getComparator(boolean matchAnyShard, Set inSyncAllocationIds) { /** * Orders the active shards copies based on below comparators * 1. No store exception i.e. shard copy is readable * 2. Prefer previous primary shard * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. */ - final Comparator comparator; // allocation preference + final Comparator comparator; // allocation preference if (matchAnyShard) { // prefer shards with matching allocation ids - Comparator matchingAllocationsFirst = Comparator.comparing( - (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) + Comparator matchingAllocationsFirst = Comparator.comparing( + (INodeShardState state) -> inSyncAllocationIds.contains(state.getShardState().allocationId()) ).reversed(); comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) .thenComparing(PRIMARY_FIRST_COMPARATOR) @@ -407,31 +430,24 @@ protected static NodeShardsResult buildNodeShardsResult( .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); } - nodeShardStates.sort(comparator); - - if (logger.isTraceEnabled()) { - logger.trace( - "{} candidates for allocation: {}", - shard, - nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")) - ); - } - return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); + return comparator; } /** * Split the list of node shard states into groups yes/no/throttle based on allocation deciders */ - private static NodesToAllocate buildNodesToAllocate( + protected static NodesToAllocate buildNodesToAllocate( RoutingAllocation allocation, - List nodeShardStates, + INodeShardStates nodeShardStates, ShardRouting shardRouting, boolean forceAllocate ) { List yesNodeShards = new ArrayList<>(); List throttledNodeShards = new ArrayList<>(); List noNodeShards = new ArrayList<>(); - for (NodeGatewayStartedShards nodeShardState : nodeShardStates) { + Iterator iterator = nodeShardStates.iterator(); + while (iterator.hasNext()) { + INodeShardState nodeShardState = iterator.next(); RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId()); if (node == null) { continue; @@ -458,22 +474,83 @@ private static NodesToAllocate buildNodesToAllocate( protected abstract FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); - private static class NodeShardsResult { - final List orderedAllocationCandidates; + + private static class NodeShardState implements INodeShardState { + NodeGatewayStartedShards shardState; + + public NodeShardState(NodeGatewayStartedShards shardState) { + this.shardState = shardState; + } + + @Override + public BaseNodeGatewayStartedShards getShardState() { + return shardState; + } + + @Override + public DiscoveryNode getNode() { + return shardState.getNode(); + } + } + + private static class NodeShardStates implements INodeShardStates { + List nodeShardStates; + + public NodeShardStates() { + nodeShardStates = new ArrayList<>(); + } + + public NodeShardStates(List nodeShardStates) { + this.nodeShardStates = nodeShardStates; + } + + @Override + public void add(NodeShardState value) { + nodeShardStates.add(value); + } + + @Override + public void add(NodeShardState key, NodeShardState value) { + // ignore the key, just add the value + add(value); + } + + @Override + public NodeShardState get(NodeShardState key) { + return nodeShardStates.contains(key) ? key : null; + } + + @Override + public int size() { + return nodeShardStates.size(); + } + + @Override + public Iterator iterator() { + return nodeShardStates.iterator(); + } + + public void sort(Comparator comparator) { + nodeShardStates.sort(comparator); + } + } + + protected static class NodeShardsResult { + final INodeShardStates orderedAllocationCandidates; final int allocationsFound; - NodeShardsResult(List orderedAllocationCandidates, int allocationsFound) { + NodeShardsResult(INodeShardStates orderedAllocationCandidates, int allocationsFound) { this.orderedAllocationCandidates = orderedAllocationCandidates; this.allocationsFound = allocationsFound; } } - static class NodesToAllocate { - final List yesNodeShards; - final List throttleNodeShards; - final List noNodeShards; + protected static class NodesToAllocate { + final List yesNodeShards; + final List throttleNodeShards; + final List noNodeShards; - NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { + NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { this.yesNodeShards = yesNodeShards; this.throttleNodeShards = throttleNodeShards; this.noNodeShards = noNodeShards; @@ -484,11 +561,11 @@ static class NodesToAllocate { * This class encapsulates the shard state retrieved from a node and the decision that was made * by the allocator for allocating to the node that holds the shard copy. */ - private static class DecidedNode { - final NodeGatewayStartedShards nodeShardState; + protected static class DecidedNode { + final INodeShardState nodeShardState; final Decision decision; - private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) { + protected DecidedNode(INodeShardState nodeShardState, Decision decision) { this.nodeShardState = nodeShardState; this.decision = decision; } diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index 5cc6841d0271a..b3559741fee4f 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -34,7 +34,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; @@ -44,10 +43,7 @@ import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.NodeAllocationResult; -import org.opensearch.cluster.routing.allocation.NodeAllocationResult.ShardStoreInfo; import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.cluster.routing.allocation.decider.Decision.Type; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; @@ -59,6 +55,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -84,22 +81,16 @@ * * @opensearch.internal */ -public abstract class PrimaryShardBatchAllocator extends BaseGatewayShardAllocator { - /** - * Is the allocator responsible for allocating the given {@link ShardRouting}? - */ - private static boolean isResponsibleFor(final ShardRouting shard) { - return shard.primary() // must be primary - && shard.unassigned() // must be unassigned - // only handle either an existing store or a snapshot recovery - && (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE - || shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT); - } +public abstract class PrimaryShardBatchAllocator extends PrimaryShardAllocator { abstract protected FetchResult fetchData(Set shardsEligibleForFetch, Set inEligibleShards, RoutingAllocation allocation); + protected FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation){ + return null; + } + @Override public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, @@ -170,170 +161,30 @@ public HashMap makeAllocationDecision( AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions)); } else { - Map nodeResponses = shardsState.getData(); - Map shardData = new HashMap<>(); - // build data for a shard from all the nodes - for (Map.Entry nodeEntry : nodeResponses.entrySet()) { - shardData.put(nodeEntry.getKey(), - nodeEntry.getValue().getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId())); - } + + NodeShardStates nodeShardStates = getNodeShardStates(unassignedShard, shardsState); // get allocation decision for this shard shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, - shardData, logger)); + nodeShardStates, logger)); } } return shardAllocationDecisions; } - /** - * Below code is very similar to {@link org.opensearch.gateway.PrimaryShardAllocator} class makeAllocationDecision, - * only difference is that NodeGatewayStartedShards object doesn't have the DiscoveryNode object as - * BaseNodeResponse. So, DiscoveryNode reference is passed in Map<DiscoveryNode, NodeGatewayStartedShards> so - * corresponding DiscoveryNode object can be used for rest of the implementation. Also, DiscoveryNode object - * reference is added in DecidedNode class to achieve same use case of accessing corresponding DiscoveryNode object. - * - * @param unassignedShard unassigned shard routing - * @param allocation routing allocation object - * @param shardState shard metadata fetched from all data nodes - * @param logger logger - * @return allocation decision taken for this shard - */ - private AllocateUnassignedDecision getAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, - Map shardState, - Logger logger) { - final boolean explain = allocation.debugDecision(); - // don't create a new IndexSetting object for every shard as this could cause a lot of garbage - // on cluster restart if we allocate a boat load of shards - final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); - final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id()); - final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT; - - assert inSyncAllocationIds.isEmpty() == false; - // use in-sync allocation ids to select nodes - final PrimaryShardBatchAllocator.NodeShardsResult nodeShardsResult = buildNodeShardsResult( - unassignedShard, - snapshotRestore, - allocation.getIgnoreNodes(unassignedShard.shardId()), - inSyncAllocationIds, - shardState, - logger - ); - final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0; - logger.debug( - "[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", - unassignedShard.index(), - unassignedShard.id(), - nodeShardsResult.orderedAllocationCandidates.size(), - unassignedShard, - inSyncAllocationIds - ); - - if (enoughAllocationsFound == false) { - if (snapshotRestore) { - // let BalancedShardsAllocator take care of allocating this shard - logger.debug( - "[{}][{}]: missing local data, will restore from [{}]", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard.recoverySource() - ); - return AllocateUnassignedDecision.NOT_TAKEN; - } else { - // We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary. - // We could just be waiting for the node that holds the primary to start back up, in which case the allocation for - // this shard will be picked up when the node joins and we do another allocation reroute - logger.debug( - "[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", - unassignedShard.index(), - unassignedShard.id(), - nodeShardsResult.allocationsFound - ); - return AllocateUnassignedDecision.no( - AllocationStatus.NO_VALID_SHARD_COPY, - explain ? buildNodeDecisions(null, shardState, inSyncAllocationIds) : null - ); - } - } - - NodesToAllocate nodesToAllocate = buildNodesToAllocate( - allocation, - nodeShardsResult.orderedAllocationCandidates, - unassignedShard, - false - ); - DiscoveryNode node = null; - String allocationId = null; - boolean throttled = false; - if (nodesToAllocate.yesNodeShards.isEmpty() == false) { - DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - logger.debug( - "[{}][{}]: allocating [{}] to [{}] on primary allocation", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard, - decidedNode.getNode() - ); - node = decidedNode.getNode(); - allocationId = decidedNode.nodeShardState.allocationId(); - } else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) { - // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard - // can be force-allocated to one of the nodes. - nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true); - if (nodesToAllocate.yesNodeShards.isEmpty() == false) { - final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState; - logger.debug( - "[{}][{}]: allocating [{}] to [{}] on forced primary allocation", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard, - decidedNode.getNode() - ); - node = decidedNode.getNode(); - allocationId = nodeShardState.allocationId(); - } else if (nodesToAllocate.throttleNodeShards.isEmpty() == false) { - logger.debug( - "[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard, - nodesToAllocate.throttleNodeShards - ); - throttled = true; - } else { - logger.debug( - "[{}][{}]: forced primary allocation denied [{}]", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard - ); + private static NodeShardStates getNodeShardStates(ShardRouting unassignedShard, FetchResult shardsState) { + NodeShardStates nodeShardStates = new NodeShardStates(new Comparator() { + @Override + public int compare(INodeShardState o1, INodeShardState o2) { + return 1; } - } else { - // we are throttling this, since we are allowed to allocate to this node but there are enough allocations - // taking place on the node currently, ignore it for now - logger.debug( - "[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard, - nodesToAllocate.throttleNodeShards - ); - throttled = true; - } - - List nodeResults = null; - if (explain) { - nodeResults = buildNodeDecisions(nodesToAllocate, shardState, inSyncAllocationIds); - } - if (allocation.hasPendingAsyncFetch()) { - return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeResults); - } else if (node != null) { - return AllocateUnassignedDecision.yes(node, allocationId, nodeResults, false); - } else if (throttled) { - return AllocateUnassignedDecision.throttle(nodeResults); - } else { - return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, nodeResults, true); - } + }); + Map nodeResponses = shardsState.getData(); + + // build data for a shard from all the nodes + nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> { + nodeShardStates.add(new NodeShardState(nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId()), node), node); + }); + return nodeShardStates; } /** @@ -376,11 +227,12 @@ private static List buildNodeDecisions( nodeResults.addAll( Stream.of(nodesToAllocate.yesNodeShards, nodesToAllocate.throttleNodeShards, nodesToAllocate.noNodeShards) .flatMap(Collection::stream) + .map(node -> (DecidedNode) node) .map(dnode -> { - discoNodes.add(dnode.getNode()); + discoNodes.add(dnode.nodeShardState.getNode()); return new NodeAllocationResult( - dnode.getNode(), - shardStoreInfo(dnode.nodeShardState, inSyncAllocationIds), + dnode.nodeShardState.getNode(), + shardStoreInfo(dnode.nodeShardState.getShardState(), inSyncAllocationIds), dnode.decision ); }) @@ -407,69 +259,33 @@ private static List buildNodeDecisions( return nodeResults; } - private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set inSyncAllocationIds) { - final Exception storeErr = nodeShardState.storeException(); - final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId()); - return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr); - } - - private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( - (NodeGatewayStartedShards state) -> state.storeException() == null - ).reversed(); - private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::primary - ).reversed(); - - private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::replicationCheckpoint, - Comparator.nullsLast(Comparator.naturalOrder()) - ); - /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but * entries with matching allocation id are always at the front of the list. */ - protected static NodeShardsResult buildNodeShardsResult( + @Override + protected NodeShardsResult buildNodeShardsResult( ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, Set inSyncAllocationIds, - Map shardState, + INodeShardStates shardState, Logger logger ) { - /** - * Orders the active shards copies based on below comparators - * 1. No store exception i.e. shard copy is readable - * 2. Prefer previous primary shard - * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. - */ - final Comparator comparator; // allocation preference - if (matchAnyShard) { - // prefer shards with matching allocation ids - Comparator matchingAllocationsFirst = Comparator.comparing( - (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) - ).reversed(); - comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) - .thenComparing(PRIMARY_FIRST_COMPARATOR) - .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); - } else { - comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR) - .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); - } - // TreeMap will sort the entries based on key, comparator is assigned above - TreeMap shardStatesToNode = new TreeMap<>(comparator); + NodeShardStates shardStatesToNode = new NodeShardStates(getComparator(matchAnyShard, inSyncAllocationIds)); int numberOfAllocationsFound = 0; - for (Map.Entry nodeShardStateEntry : shardState.entrySet()) { - DiscoveryNode node = nodeShardStateEntry.getKey(); - NodeGatewayStartedShards nodeShardState = nodeShardStateEntry.getValue(); - String allocationId = nodeShardState.allocationId(); + Iterator iterator = shardState.iterator(); + while (iterator.hasNext()) { + NodeShardState nodeShardState = (NodeShardState) iterator.next(); + DiscoveryNode node = nodeShardState.getNode(); + String allocationId = nodeShardState.getShardState().allocationId(); if (ignoreNodes.contains(node.getId())) { continue; } - if (nodeShardState.storeException() == null) { + if (nodeShardState.getShardState().storeException() == null) { if (allocationId == null) { logger.trace("[{}] on node [{}] has no shard state information", shard, node); } else { @@ -477,7 +293,7 @@ protected static NodeShardsResult buildNodeShardsResult( } } else { final String finalAllocationId = allocationId; - if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { + if (nodeShardState.getShardState().storeException() instanceof ShardLockObtainFailedException) { logger.trace( () -> new ParameterizedMessage( "[{}] on node [{}] has allocation id [{}] but the store can not be " @@ -486,7 +302,7 @@ protected static NodeShardsResult buildNodeShardsResult( node, finalAllocationId ), - nodeShardState.storeException() + nodeShardState.getShardState().storeException() ); } else { logger.trace( @@ -496,110 +312,85 @@ protected static NodeShardsResult buildNodeShardsResult( node, finalAllocationId ), - nodeShardState.storeException() + nodeShardState.getShardState().storeException() ); allocationId = null; } } if (allocationId != null) { - assert nodeShardState.storeException() == null || nodeShardState.storeException() instanceof ShardLockObtainFailedException + assert nodeShardState.getShardState().storeException() == null || nodeShardState.getShardState().storeException() instanceof ShardLockObtainFailedException : "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " + "store throwing " - + nodeShardState.storeException(); + + nodeShardState.getShardState().storeException(); numberOfAllocationsFound++; - if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { - shardStatesToNode.put(nodeShardState, node); + if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.getShardState().allocationId())) { + shardStatesToNode.add(nodeShardState, node); } } } if (logger.isTraceEnabled()) { + Set nodes = new HashSet<>(); + shardState.iterator().forEachRemaining(nodeShardState -> nodes.add(nodeShardState.getNode())); logger.trace( "{} candidates for allocation: {}", shard, - shardState.keySet().stream().map(DiscoveryNode::getName).collect(Collectors.joining(", ")) + nodes.stream().map(DiscoveryNode::getName).collect(Collectors.joining(", ")) ); } return new NodeShardsResult(shardStatesToNode, numberOfAllocationsFound); } - /** - * Split the list of node shard states into groups yes/no/throttle based on allocation deciders - */ - private static NodesToAllocate buildNodesToAllocate( - RoutingAllocation allocation, - TreeMap shardStateToNode, - ShardRouting shardRouting, - boolean forceAllocate - ) { - List yesNodeShards = new ArrayList<>(); - List throttledNodeShards = new ArrayList<>(); - List noNodeShards = new ArrayList<>(); - for (Map.Entry nodeShardState : shardStateToNode.entrySet()) { - RoutingNode node = allocation.routingNodes().node(nodeShardState.getValue().getId()); - if (node == null) { - continue; - } - Decision decision = forceAllocate - ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) - : allocation.deciders().canAllocate(shardRouting, node, allocation); - DecidedNode decidedNode = new DecidedNode(nodeShardState.getKey(), decision, nodeShardState.getValue()); - if (decision.type() == Type.THROTTLE) { - throttledNodeShards.add(decidedNode); - } else if (decision.type() == Type.NO) { - noNodeShards.add(decidedNode); - } else { - yesNodeShards.add(decidedNode); - } + private static class NodeShardState implements INodeShardState { + NodeGatewayStartedShards shardState; + DiscoveryNode node; + + public NodeShardState(NodeGatewayStartedShards shardState, DiscoveryNode node) { + this.shardState = shardState; + this.node = node; } - return new NodesToAllocate( - Collections.unmodifiableList(yesNodeShards), - Collections.unmodifiableList(throttledNodeShards), - Collections.unmodifiableList(noNodeShards) - ); - } - private static class NodeShardsResult { - final TreeMap orderedAllocationCandidates; - final int allocationsFound; + @Override + public BaseNodeGatewayStartedShards getShardState() { + return this.shardState; + } - NodeShardsResult(TreeMap orderedAllocationCandidates, int allocationsFound) { - this.orderedAllocationCandidates = orderedAllocationCandidates; - this.allocationsFound = allocationsFound; + @Override + public DiscoveryNode getNode() { + return this.node; } } - static class NodesToAllocate { - final List yesNodeShards; - final List throttleNodeShards; - final List noNodeShards; + private static class NodeShardStates implements INodeShardStates { + TreeMap nodeShardStates; - NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { - this.yesNodeShards = yesNodeShards; - this.throttleNodeShards = throttleNodeShards; - this.noNodeShards = noNodeShards; + public NodeShardStates(Comparator comparator) { + this.nodeShardStates = new TreeMap<>(comparator); } - } - /** - * This class encapsulates the shard state retrieved from a node and the decision that was made - * by the allocator for allocating to the node that holds the shard copy. - */ - private static class DecidedNode { - final NodeGatewayStartedShards nodeShardState; - final Decision decision; - final DiscoveryNode node; - - private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision, DiscoveryNode node) { - this.nodeShardState = nodeShardState; - this.decision = decision; - this.node = node; + @Override + public void add(DiscoveryNode value) {} + + @Override + public void add(NodeShardState key, DiscoveryNode value) { + this.nodeShardStates.put(key, value); } - public DiscoveryNode getNode() { - return node; + @Override + public DiscoveryNode get(NodeShardState key) { + return this.nodeShardStates.get(key); + } + + @Override + public int size() { + return this.nodeShardStates.size(); + } + + @Override + public Iterator iterator() { + return this.nodeShardStates.keySet().iterator(); } } } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 59f01cadff12f..0e815028697c4 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -350,7 +350,7 @@ public String getCustomDataPath() { * * @opensearch.internal */ - public static class NodeGatewayStartedShards extends BaseNodeResponse { + public static class NodeGatewayStartedShards extends BaseNodeResponse implements BaseNodeGatewayStartedShards { private final String allocationId; private final boolean primary; private final Exception storeException; diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java index 5c69e45c71b28..95a6576305c74 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java @@ -312,7 +312,7 @@ public void writeTo(StreamOutput out) throws IOException { * * @opensearch.internal */ - public static class NodeGatewayStartedShards { + public static class NodeGatewayStartedShards implements BaseNodeGatewayStartedShards { private final String allocationId; private final boolean primary; private final Exception storeException;