Skip to content

Commit

Permalink
Add RSA Async batch shard fetch transport integ test
Browse files Browse the repository at this point in the history
Signed-off-by: sudarshan baliga <[email protected]>
  • Loading branch information
sudarshan-baliga committed Aug 3, 2023
1 parent 07ba88a commit ed52eb5
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;


import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.shard.ShardPath;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchIntegTestCase.internalCluster;
import static org.opensearch.test.OpenSearchIntegTestCase.resolveIndex;


public class AsyncShardFetchBatchTestUtils {

public static DiscoveryNode[] getDiscoveryNodes() throws ExecutionException, InterruptedException {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(false);
clusterStateRequest.clear().nodes(true).routingTable(true).indices("*");
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get();
final List<DiscoveryNode> nodes = new LinkedList<>(clusterStateResponse.getState().nodes().getDataNodes().values());
DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()];
nodes.toArray(disNodesArr);
return disNodesArr;
}

public static Map<ShardId, String> prepareRequestMap(String[] indices, int shardCount) {
Map<ShardId, String> shardIdCustomDataPathMap = new HashMap<>();
for (String indexName : indices) {
final Index index = resolveIndex(indexName);
final String customDataPath = IndexMetadata.INDEX_DATA_PATH_SETTING.get(
client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName)
);
for (int shardIdNum = 0; shardIdNum < shardCount; shardIdNum++) {
final ShardId shardId = new ShardId(index, shardIdNum);
shardIdCustomDataPathMap.put(shardId, customDataPath);
}
}
return shardIdCustomDataPathMap;
}

public static void corruptShard(String nodeName, ShardId shardId) throws IOException, InterruptedException {
for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
if (Files.exists(indexPath)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
Files.delete(item);
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.store;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.opensearch.action.support.ActionTestUtils;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Map;
import java.util.concurrent.ExecutionException;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.gateway.AsyncShardFetchBatchTestUtils.corruptShard;
import static org.opensearch.gateway.AsyncShardFetchBatchTestUtils.getDiscoveryNodes;
import static org.opensearch.gateway.AsyncShardFetchBatchTestUtils.prepareRequestMap;

public class TransportNodesListShardStoreMetadataBatchIT extends OpenSearchIntegTestCase {

public void testSingleShardStoreFetch() throws ExecutionException, InterruptedException {
String indexName = "test";
DiscoveryNode[] nodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(new String[]{indexName}, nodes);
Index index = resolveIndex(indexName);
ShardId shardId = new ShardId(index, 0);
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
.get(nodes[0].getId())
.getNodeStoreFilesMetadataBatch()
.get(shardId);
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId);
}

public void testShardStoreFetchMultiNodeMultiIndexes() throws Exception {
// start second node
internalCluster().startNode();
String indexName1 = "test1";
String indexName2 = "test2";
DiscoveryNode[] nodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(new String[]{indexName1, indexName2}, nodes);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
ShardId shardId = clusterSearchShardsGroup.getShardId();
ShardRouting[] shardRoutings = clusterSearchShardsGroup.getShards();
assertEquals(2, shardRoutings.length);
for (ShardRouting shardRouting : shardRoutings) {
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
.get(shardRouting.currentNodeId())
.getNodeStoreFilesMetadataBatch()
.get(shardId);
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId);
}
}
}

public void testShardStoreFetchNodeNotConnected() {
DiscoveryNode nonExistingNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
String indexName = "test";
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(new String[]{indexName}, new DiscoveryNode[]{nonExistingNode});
assertTrue(response.hasFailures());
assertEquals(1, response.failures().size());
assertEquals(nonExistingNode.getId(), response.failures().get(0).nodeId());
}

public void testShardStoreFetchCorruptedIndex() throws Exception {
// start second node
internalCluster().startNode();
String indexName = "test";
prepareIndices(new String[]{indexName}, 1, 1);
Map<ShardId, String> shardIdCustomDataPathMap = prepareRequestMap(new String[]{indexName}, 1);
Index index = resolveIndex(indexName);
ShardId shardId = new ShardId(index, 0);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();
assertEquals(2, searchShardsResponse.getNodes().length);
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId);
corruptShard(searchShardsResponse.getNodes()[1].getName(), shardId);
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(false).get();
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class),
new TransportNodesListShardStoreMetadataBatch.Request(shardIdCustomDataPathMap, discoveryNodes)
);
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeStoreFilesMetadataBatch()
.get(shardId);
assertNodeStoreFilesMetadataFailureCase(nodeStoreFilesMetadata, shardId);
}

private void prepareIndices(String[] indices, int numberOfPrimaryShards, int numberOfReplicaShards) {
for (String index : indices) {
createIndex(
index,
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicaShards).build()
);
index(index, "type", "1");
flush(index);
}
}

private TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch prepareAndSendRequest(String[] indices, DiscoveryNode[] nodes) {
Map<ShardId, String> shardIdCustomDataPathMap = null;
prepareIndices(indices, 1, 1);
shardIdCustomDataPathMap = prepareRequestMap(indices, 1);
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response;
return ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class),
new TransportNodesListShardStoreMetadataBatch.Request(shardIdCustomDataPathMap, nodes)
);
}

private void assertNodeStoreFilesMetadataFailureCase(
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata,
ShardId shardId
) {
assertNotNull(nodeStoreFilesMetadata.getStoreFileFetchException());
TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata storeFileMetadata = nodeStoreFilesMetadata.storeFilesMetadata();
assertEquals(shardId, storeFileMetadata.shardId());
assertTrue(storeFileMetadata.peerRecoveryRetentionLeases().isEmpty());
}

private void assertNodeStoreFilesMetadataSuccessCase(
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata,
ShardId shardId
) {
assertNull(nodeStoreFilesMetadata.getStoreFileFetchException());
TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata storeFileMetadata = nodeStoreFilesMetadata.storeFilesMetadata();
assertFalse(storeFileMetadata.isEmpty());
assertEquals(shardId, storeFileMetadata.shardId());
assertNotNull(storeFileMetadata.peerRecoveryRetentionLeases());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import org.opensearch.action.admin.indices.rollover.MaxDocsCondition;
import org.opensearch.action.admin.indices.rollover.MaxSizeCondition;
import org.opensearch.action.resync.TransportResyncReplicationAction;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.ParseField;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry.Entry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -71,16 +71,17 @@
import org.opensearch.index.mapper.TextFieldMapper;
import org.opensearch.index.mapper.VersionFieldMapper;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.seqno.GlobalCheckpointSyncAction;
import org.opensearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.opensearch.index.seqno.RetentionLeaseSyncAction;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.seqno.GlobalCheckpointSyncAction;
import org.opensearch.index.shard.PrimaryReplicaSyncer;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
import org.opensearch.plugins.MapperPlugin;

import java.util.ArrayList;
Expand Down Expand Up @@ -280,6 +281,7 @@ protected void configure() {
bind(IndicesStore.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetadata.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetadataBatch.class).asEagerSingleton();
bind(GlobalCheckpointSyncAction.class).asEagerSingleton();
bind(TransportResyncReplicationAction.class).asEagerSingleton();
bind(PrimaryReplicaSyncer.class).asEagerSingleton();
Expand Down

0 comments on commit ed52eb5

Please sign in to comment.