Skip to content

Commit

Permalink
Incorporated changes from dependent PRs
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Sep 6, 2023
1 parent 61f8c61 commit b7e2119
Showing 1 changed file with 32 additions and 25 deletions.
57 changes: 32 additions & 25 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,19 +313,20 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
}
}


class InternalPrimaryBatchShardAllocator extends PrimaryShardBatchAllocator {

@Override
@SuppressWarnings("unchecked")
protected AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation) {
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation) {
ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting;
if (shardRouting == null) {
return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap());
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}

String batchId = startedShardBatchLookup.getOrDefault(shardRouting.shardId(), null);
String batchId = getBatchId(shardRouting, shardRouting.primary());
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
Expand All @@ -343,25 +344,26 @@ protected AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShard

if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap());
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}

Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();

for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) {
for (ShardId shardId : shardsBatch.asyncBatch.shardToCustomDataPath.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncBatchShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncBatchShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData(
AsyncShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData(
allocation.nodes(),
shardToIgnoreNodes
);

if (shardBatchState.hasData()) {
shardBatchState.processAllocation(allocation);
}
return (AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) shardBatchState;
return (AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) shardBatchState;
}

}

class InternalReplicaShardAllocator extends ReplicaShardAllocator {
Expand Down Expand Up @@ -402,17 +404,16 @@ class InternalReplicaBatchShardAllocator extends ReplicaShardBatchAllocator {

@Override
@SuppressWarnings("unchecked")
protected AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation) {

protected AsyncShardFetch.FetchResult<NodeStoreFilesMetadataBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation) {
// get batch id for anyone given shard. We are assuming all shards will have same batch Id
ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting;
if (shardRouting == null) {
return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap());
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}

String batchId = storeShardBatchLookup.getOrDefault(shardRouting.shardId(), null);
String batchId = getBatchId(shardRouting, shardRouting.primary());
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
Expand All @@ -429,21 +430,27 @@ protected AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch> fetchDat

if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap());
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}
Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();
for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) {
for (ShardId shardId : shardsBatch.asyncBatch.shardToCustomDataPath.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncBatchShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncBatchShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData(
AsyncShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchStores = asyncFetcher.fetchData(
allocation.nodes(),
shardToIgnoreNodes
);
if (shardBatchState.hasData()) {
shardBatchState.processAllocation(allocation);
if (shardBatchStores.hasData()) {
shardBatchStores.processAllocation(allocation);
}
return (AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch>) shardBatchState;
return (AsyncShardFetch.FetchResult<NodeStoreFilesMetadataBatch>) shardBatchStores;
}

@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
String batchId = getBatchId(shard, shard.primary());
return batchId!=null;
}
}
}

0 comments on commit b7e2119

Please sign in to comment.