From b87122dc571eb382f6470196f001bd5aa13fdb26 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 4 Sep 2023 06:09:31 +0530 Subject: [PATCH] Minor changes --- .../gateway/PrimaryShardAllocator.java | 32 ++++++--- .../gateway/PrimaryShardBatchAllocator.java | 72 ------------------- 2 files changed, 22 insertions(+), 82 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 597add1d35570..1bb149910b268 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -90,20 +90,20 @@ protected static boolean isResponsibleFor(final ShardRouting shard) { || shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT); } - - @Override - public AllocateUnassignedDecision makeAllocationDecision( - final ShardRouting unassignedShard, - final RoutingAllocation allocation, - final Logger logger - ) { + /** + * Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is + * not responsible for this particular shard. + * + * @param unassignedShard unassigned shard routing + * @param allocation routing allocation object + * @return allocation decision taken for this shard + */ + protected AllocateUnassignedDecision skipSnapshotRestore(ShardRouting unassignedShard, RoutingAllocation allocation) { if (isResponsibleFor(unassignedShard) == false) { // this allocator is not responsible for allocating this shard return AllocateUnassignedDecision.NOT_TAKEN; } - final boolean explain = allocation.debugDecision(); - if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT && allocation.snapshotShardSizeInfo().getShardSize(unassignedShard) == null) { List nodeDecisions = null; @@ -112,12 +112,24 @@ public AllocateUnassignedDecision makeAllocationDecision( } return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } + return null; + } + @Override + public AllocateUnassignedDecision makeAllocationDecision( + final ShardRouting unassignedShard, + final RoutingAllocation allocation, + final Logger logger + ) { + AllocateUnassignedDecision decision = skipSnapshotRestore(unassignedShard, allocation); + if (decision != null) { + return decision; + } final FetchResult shardState = fetchData(unassignedShard, allocation); if (shardState.hasData() == false) { allocation.setHasPendingAsyncFetch(); List nodeDecisions = null; - if (explain) { + if (allocation.debugDecision()) { nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); } return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index b3559741fee4f..08ab3212dfde9 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -187,78 +187,6 @@ public int compare(INodeShardState o1, INodeShardState o2) { return nodeShardStates; } - /** - * Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is - * not responsible for this particular shard. - * - * @param unassignedShard unassigned shard routing - * @param allocation routing allocation object - * @return allocation decision taken for this shard - */ - private AllocateUnassignedDecision skipSnapshotRestore(ShardRouting unassignedShard, RoutingAllocation allocation) { - if (isResponsibleFor(unassignedShard) == false) { - // this allocator is not responsible for allocating this shard - return AllocateUnassignedDecision.NOT_TAKEN; - } - final boolean explain = allocation.debugDecision(); - if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT - && allocation.snapshotShardSizeInfo().getShardSize(unassignedShard) == null) { - List nodeDecisions = null; - if (explain) { - nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); - } - return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); - } - return null; - } - - /** - * Builds a map of nodes to the corresponding allocation decisions for those nodes. - */ - private static List buildNodeDecisions( - NodesToAllocate nodesToAllocate, - Map fetchedShardData, - Set inSyncAllocationIds - ) { - List nodeResults = new ArrayList<>(); - Map ineligibleShards; - if (nodesToAllocate != null) { - final Set discoNodes = new HashSet<>(); - nodeResults.addAll( - Stream.of(nodesToAllocate.yesNodeShards, nodesToAllocate.throttleNodeShards, nodesToAllocate.noNodeShards) - .flatMap(Collection::stream) - .map(node -> (DecidedNode) node) - .map(dnode -> { - discoNodes.add(dnode.nodeShardState.getNode()); - return new NodeAllocationResult( - dnode.nodeShardState.getNode(), - shardStoreInfo(dnode.nodeShardState.getShardState(), inSyncAllocationIds), - dnode.decision - ); - }) - .collect(Collectors.toList()) - ); - - ineligibleShards = fetchedShardData.entrySet() - .stream() - .filter(shardData -> discoNodes.contains(shardData.getKey()) == false) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } else { - // there were no shard copies that were eligible for being assigned the allocation, - // so all fetched shard data are ineligible shards - ineligibleShards = fetchedShardData; - } - - nodeResults.addAll( - ineligibleShards.entrySet().stream() - .map(shardData -> new NodeAllocationResult(shardData.getKey(), shardStoreInfo(shardData.getValue(), - inSyncAllocationIds), null)) - .collect(Collectors.toList()) - ); - - return nodeResults; - } - /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but