Skip to content

Commit

Permalink
Minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
shiv0408 committed Sep 4, 2023
1 parent b412bd3 commit b87122d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,20 @@ protected static boolean isResponsibleFor(final ShardRouting shard) {
|| shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT);
}


@Override
public AllocateUnassignedDecision makeAllocationDecision(
final ShardRouting unassignedShard,
final RoutingAllocation allocation,
final Logger logger
) {
/**
* Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is
* not responsible for this particular shard.
*
* @param unassignedShard unassigned shard routing
* @param allocation routing allocation object
* @return allocation decision taken for this shard
*/
protected AllocateUnassignedDecision skipSnapshotRestore(ShardRouting unassignedShard, RoutingAllocation allocation) {
if (isResponsibleFor(unassignedShard) == false) {
// this allocator is not responsible for allocating this shard
return AllocateUnassignedDecision.NOT_TAKEN;
}

final boolean explain = allocation.debugDecision();

if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT
&& allocation.snapshotShardSizeInfo().getShardSize(unassignedShard) == null) {
List<NodeAllocationResult> nodeDecisions = null;
Expand All @@ -112,12 +112,24 @@ public AllocateUnassignedDecision makeAllocationDecision(
}
return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
}
return null;
}

@Override
public AllocateUnassignedDecision makeAllocationDecision(
final ShardRouting unassignedShard,
final RoutingAllocation allocation,
final Logger logger
) {
AllocateUnassignedDecision decision = skipSnapshotRestore(unassignedShard, allocation);
if (decision != null) {
return decision;
}
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
if (shardState.hasData() == false) {
allocation.setHasPendingAsyncFetch();
List<NodeAllocationResult> nodeDecisions = null;
if (explain) {
if (allocation.debugDecision()) {
nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
}
return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,78 +187,6 @@ public int compare(INodeShardState o1, INodeShardState o2) {
return nodeShardStates;
}

/**
* Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is
* not responsible for this particular shard.
*
* @param unassignedShard unassigned shard routing
* @param allocation routing allocation object
* @return allocation decision taken for this shard
*/
private AllocateUnassignedDecision skipSnapshotRestore(ShardRouting unassignedShard, RoutingAllocation allocation) {
if (isResponsibleFor(unassignedShard) == false) {
// this allocator is not responsible for allocating this shard
return AllocateUnassignedDecision.NOT_TAKEN;
}
final boolean explain = allocation.debugDecision();
if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT
&& allocation.snapshotShardSizeInfo().getShardSize(unassignedShard) == null) {
List<NodeAllocationResult> nodeDecisions = null;
if (explain) {
nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
}
return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
}
return null;
}

/**
* Builds a map of nodes to the corresponding allocation decisions for those nodes.
*/
private static List<NodeAllocationResult> buildNodeDecisions(
NodesToAllocate nodesToAllocate,
Map<DiscoveryNode, NodeGatewayStartedShards> fetchedShardData,
Set<String> inSyncAllocationIds
) {
List<NodeAllocationResult> nodeResults = new ArrayList<>();
Map<DiscoveryNode, NodeGatewayStartedShards> ineligibleShards;
if (nodesToAllocate != null) {
final Set<DiscoveryNode> discoNodes = new HashSet<>();
nodeResults.addAll(
Stream.of(nodesToAllocate.yesNodeShards, nodesToAllocate.throttleNodeShards, nodesToAllocate.noNodeShards)
.flatMap(Collection::stream)
.map(node -> (DecidedNode) node)
.map(dnode -> {
discoNodes.add(dnode.nodeShardState.getNode());
return new NodeAllocationResult(
dnode.nodeShardState.getNode(),
shardStoreInfo(dnode.nodeShardState.getShardState(), inSyncAllocationIds),
dnode.decision
);
})
.collect(Collectors.toList())
);

ineligibleShards = fetchedShardData.entrySet()
.stream()
.filter(shardData -> discoNodes.contains(shardData.getKey()) == false)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} else {
// there were no shard copies that were eligible for being assigned the allocation,
// so all fetched shard data are ineligible shards
ineligibleShards = fetchedShardData;
}

nodeResults.addAll(
ineligibleShards.entrySet().stream()
.map(shardData -> new NodeAllocationResult(shardData.getKey(), shardStoreInfo(shardData.getValue(),
inSyncAllocationIds), null))
.collect(Collectors.toList())
);

return nodeResults;
}

/**
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
* inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
Expand Down

0 comments on commit b87122d

Please sign in to comment.