Skip to content

Commit

Permalink
de-duping shards in ShardsBatchGatewayAllocator based on ShardId inst…
Browse files Browse the repository at this point in the history
…ead of ShardRouting#equals
  • Loading branch information
rajiv-kv committed May 16, 2024
1 parent f217270 commit ad7a4aa
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,17 @@ public void cleanCaches() {

// for tests
protected ShardsBatchGatewayAllocator() {
this(DEFAULT_SHARD_BATCH_SIZE);
}

protected ShardsBatchGatewayAllocator(long batchSize) {
this.rerouteService = null;
this.batchStartedAction = null;
this.primaryShardBatchAllocator = null;
this.batchStoreAction = null;
this.replicaShardBatchAllocator = null;
this.maxBatchSize = DEFAULT_SHARD_BATCH_SIZE;
this.maxBatchSize = batchSize;
}

// for tests

@Override
Expand Down Expand Up @@ -228,13 +231,13 @@ protected Set<String> createAndUpdateBatches(RoutingAllocation allocation, boole
batchEntry.getValue().getBatchedShards().forEach(shardId -> currentBatchedShards.put(shardId, batchEntry.getKey()));
}

Set<ShardRouting> newShardsToBatch = Sets.newHashSet();
Map<ShardId, ShardRouting> newShardsToBatch = new HashMap<>();
Set<ShardId> batchedShardsToAssign = Sets.newHashSet();
// add all unassigned shards to the batch if they are not already in a batch
unassigned.forEach(shardRouting -> {
if ((currentBatchedShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) {
assert shardRouting.unassigned();
newShardsToBatch.add(shardRouting);
newShardsToBatch.put(shardRouting.shardId(), shardRouting);
}
// if shard is already batched update to latest shardRouting information in the batches
// Replica shard assignment can be cancelled if we get a better match. These ShardRouting objects also
Expand Down Expand Up @@ -262,7 +265,7 @@ else if (shardRouting.primary() == primary) {

refreshShardBatches(currentBatches, batchedShardsToAssign, primary);

Iterator<ShardRouting> iterator = newShardsToBatch.iterator();
Iterator<ShardRouting> iterator = newShardsToBatch.values().iterator();
assert maxBatchSize > 0 : "Shards batch size must be greater than 0";

long batchSize = maxBatchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.opensearch.test.gateway.TestShardBatchGatewayAllocator;
Expand Down Expand Up @@ -222,6 +229,21 @@ public void testSafelyRemoveShardFromBothBatch() {
assertEquals(0, testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().size());
}

public void testDeDuplicationOfReplicaShardsAcrossBatch() {
final ShardId shardId = new ShardId("test", "_na_", 0);
final DiscoveryNode node = newNode("node1");
// number of replicas is greater than batch size - to ensure shardRouting gets de-duped across batch
createRoutingWithDifferentUnAssignedInfo(shardId, node, 50);
testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(10);

// only replica shard should be in the batch
Set<String> replicaBatches = testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, false);
assertEquals(1, replicaBatches.size());
ShardsBatchGatewayAllocator.ShardsBatch shardsBatch = testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch()
.get(replicaBatches.iterator().next());
assertEquals(1, shardsBatch.getBatchedShards().size());
}

public void testGetBatchIdExisting() {
createIndexAndUpdateClusterState(2, 1020, 1);
// get all shardsRoutings for test index
Expand Down Expand Up @@ -345,6 +367,59 @@ private void createIndexAndUpdateClusterState(int count, int numberOfShards, int
);
}

private void createRoutingWithDifferentUnAssignedInfo(ShardId primaryShardId, DiscoveryNode node, int numberOfReplicas) {

ShardRouting primaryShard = TestShardRouting.newShardRouting(primaryShardId, node.getId(), true, ShardRoutingState.STARTED);
Metadata metadata = Metadata.builder()
.put(
IndexMetadata.builder(primaryShardId.getIndexName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(numberOfReplicas)
.putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId()))
)
.build();

IndexRoutingTable.Builder isd = IndexRoutingTable.builder(primaryShardId.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(primaryShardId).addShard(primaryShard).build());

for (int i = 0; i < numberOfReplicas; i++) {
isd.addShard(
ShardRouting.newUnassigned(
primaryShardId,
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(
UnassignedInfo.Reason.REPLICA_ADDED,
"message for replica-copy " + i,
null,
0,
System.nanoTime(),
System.currentTimeMillis(),
false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
Collections.emptySet()
)
)
);
}

RoutingTable routingTable = RoutingTable.builder().add(isd).build();
clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.build();
testAllocation = new RoutingAllocation(
new AllocationDeciders(Collections.emptyList()),
new RoutingNodes(clusterState, false),
clusterState,
ClusterInfo.EMPTY,
SnapshotShardSizeInfo.EMPTY,
System.nanoTime()
);

}

// call this after index creation and update cluster state
private Tuple<Set<String>, Set<String>> createBatchesAndAssert(int expectedBatchSize) {
Set<String> primaryBatches = testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@

public class TestShardBatchGatewayAllocator extends ShardsBatchGatewayAllocator {

public TestShardBatchGatewayAllocator() {

}

public TestShardBatchGatewayAllocator(long maxBatchSize) {
super(maxBatchSize);
}

Map<String /* node id */, Map<ShardId, ShardRouting>> knownAllocations = new HashMap<>();
DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES;
Map<String, ReplicationCheckpoint> shardIdNodeToReplicationCheckPointMap = new HashMap<>();
Expand Down

0 comments on commit ad7a4aa

Please sign in to comment.