Skip to content

Commit

Permalink
Fixing _list/shards API for closed indices
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Nov 13, 2024
1 parent 9f790ee commit 691803c
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
Expand All @@ -22,7 +25,10 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.search.SearchService.NO_TIMEOUT;
Expand Down Expand Up @@ -125,4 +131,72 @@ public void onFailure(Exception e) {
latch.await();
}

public void testCatShardsSuccessWithPaginationWithClosedIndices() throws InterruptedException, ExecutionException {
internalCluster().startClusterManagerOnlyNodes(1);
List<String> nodes = internalCluster().startDataOnlyNodes(3);
final int numIndices = 3;
final int numShards = 5;
final int numReplicas = 2;
final int pageSize = numIndices * numReplicas * (numShards + 1);
createIndex(
"test-1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
createIndex(
"test-2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
createIndex(
"test-3",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen();

// close index test-3
client().admin().indices().close(Requests.closeIndexRequest("test-3")).get();

ClusterStateResponse clusterStateResponse = client().admin()
.cluster()
.prepareState()
.clear()
.setMetadata(true)
.setIndices("test-3")
.get();
assertThat(clusterStateResponse.getState().metadata().index("test-3").getState(), equalTo(IndexMetadata.State.CLOSE));


final CatShardsRequest shardsRequest = new CatShardsRequest();
shardsRequest.setCancelAfterTimeInterval(NO_TIMEOUT);
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
shardsRequest.setPageParams(new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize));
CountDownLatch latch = new CountDownLatch(1);
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
List<ShardRouting> shardRoutings = catShardsResponse.getResponseShards();
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals("test-3")));
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail();
latch.countDown();
}
});
latch.await();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.pagination.ShardPaginationStrategy;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.breaker.ResponseLimitBreachedException;
Expand Down Expand Up @@ -148,7 +149,9 @@ public void onFailure(Exception e) {
}

private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) {
return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState());
return Objects.isNull(pageParams)
? null
: new ShardPaginationStrategy(pageParams, clusterStateResponse.getState(), IndicesOptions.strictExpandOpenAndForbidClosed());
}

private void validateRequestLimit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.action.pagination;

import org.opensearch.OpenSearchParseException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -37,14 +38,23 @@ public class ShardPaginationStrategy implements PaginationStrategy<ShardRouting>
private PageData pageData;

public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState) {
this(pageParams, clusterState, null);
}

public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState, IndicesOptions indicesOptions) {
ShardStrategyToken shardStrategyToken = getShardStrategyToken(pageParams.getRequestedToken());
// Get list of indices metadata sorted by their creation time and filtered by the last sent index
List<IndexMetadata> filteredIndices = getEligibleIndices(
List<IndexMetadata> filteredIndices = PaginationStrategy.getSortedIndexMetadata(
clusterState,
pageParams.getSort(),
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName,
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime
getMetadataFilter(
pageParams.getSort(),
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName,
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime,
indicesOptions
),
PageParams.PARAM_ASC_SORT_VALUE.equals(pageParams.getSort()) ? ASC_COMPARATOR : DESC_COMPARATOR
);

// Get the list of shards and indices belonging to current page.
this.pageData = getPageData(
filteredIndices,
Expand All @@ -54,39 +64,31 @@ public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState)
);
}

private static List<IndexMetadata> getEligibleIndices(
ClusterState clusterState,
private static Predicate<IndexMetadata> getMetadataFilter(
String sortOrder,
String lastIndexName,
Long lastIndexCreationTime
Long lastIndexCreationTime,
IndicesOptions indicesOptions
) {
if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) {
return PaginationStrategy.getSortedIndexMetadata(
clusterState,
PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR
);
} else {
return PaginationStrategy.getSortedIndexMetadata(
clusterState,
getMetadataFilter(sortOrder, lastIndexName, lastIndexCreationTime),
PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR
);
}
}

private static Predicate<IndexMetadata> getMetadataFilter(String sortOrder, String lastIndexName, Long lastIndexCreationTime) {
if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) {
return indexMetadata -> true;
return indexStateFilter(indicesOptions);
}
return indexNameFilter(lastIndexName).or(
IndexPaginationStrategy.getIndexCreateTimeFilter(sortOrder, lastIndexName, lastIndexCreationTime)
);
).and(indexStateFilter(indicesOptions));
}

private static Predicate<IndexMetadata> indexNameFilter(String lastIndexName) {
return metadata -> metadata.getIndex().getName().equals(lastIndexName);
}

private static Predicate<IndexMetadata> indexStateFilter(IndicesOptions indicesOptions) {
if (Objects.isNull(indicesOptions) || !indicesOptions.forbidClosedIndices()) {
return metadata -> true;
}
return metadata -> metadata.getState().equals(IndexMetadata.State.OPEN);
}

/**
* Will be used to get the list of shards and respective indices to which they belong,
* which are to be displayed in a page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -393,6 +394,33 @@ public void testRetrieveShardsWhenLastIndexGetsDeletedAndReCreated() {
assertNull(strategy.getResponseToken().getNextToken());
}

/**
* Validates strategy filters out CLOSED indices, if forbidClosed() indices options are provided.
*/
public void testNoClosedIndicesReturnedByStrategy() {
final int pageSize = DEFAULT_NUMBER_OF_SHARDS * (DEFAULT_NUMBER_OF_REPLICAS + 1);
ClusterState clusterState = getRandomClusterState(List.of(0, 1, 2, 3, 4, 5));
// Add 2 closed indices to cluster state
clusterState = addIndexToClusterState(clusterState, 6, DEFAULT_NUMBER_OF_SHARDS, DEFAULT_NUMBER_OF_REPLICAS, IndexMetadata.State.CLOSE);
clusterState = addIndexToClusterState(clusterState, 7, DEFAULT_NUMBER_OF_SHARDS, DEFAULT_NUMBER_OF_REPLICAS, IndexMetadata.State.CLOSE);
List<ShardRouting> shardRoutings = new ArrayList<>();
List<String> indices = new ArrayList<>();
String requestedToken = null;
ShardPaginationStrategy strategy;
do {
PageParams pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize);
strategy = new ShardPaginationStrategy(pageParams, clusterState, IndicesOptions.strictExpandOpenAndForbidClosed());
requestedToken = strategy.getResponseToken().getNextToken();
shardRoutings.addAll(strategy.getRequestedEntities());
indices.addAll(strategy.getRequestedIndices());
} while (requestedToken != null);
// assert that the closed indices do not appear in the response
assertFalse(indices.contains(TEST_INDEX_PREFIX + 6));
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 6)));
assertFalse(indices.contains(TEST_INDEX_PREFIX + 7));
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 7)));
}

public void testCreatingShardStrategyPageTokenWithRequestedTokenNull() {
try {
new ShardPaginationStrategy.ShardStrategyToken(null);
Expand Down Expand Up @@ -478,11 +506,40 @@ private ClusterState addIndexToClusterState(
final int numShards,
final int numReplicas,
final long creationTime
) {
return addIndexToClusterState(clusterState, indexNumber, numShards, numReplicas, creationTime, IndexMetadata.State.OPEN);
}

private ClusterState addIndexToClusterState(
ClusterState clusterState,
final int indexNumber,
final int numShards,
final int numReplicas,
final IndexMetadata.State state
) {
return addIndexToClusterState(
clusterState,
indexNumber,
numShards,
numReplicas,
Instant.now().plus(indexNumber, ChronoUnit.SECONDS).toEpochMilli(),
state
);
}

private ClusterState addIndexToClusterState(
ClusterState clusterState,
final int indexNumber,
final int numShards,
final int numReplicas,
final long creationTime,
final IndexMetadata.State state
) {
IndexMetadata indexMetadata = IndexMetadata.builder(TEST_INDEX_PREFIX + indexNumber)
.settings(settings(Version.CURRENT).put(SETTING_CREATION_DATE, creationTime))
.numberOfShards(numShards)
.numberOfReplicas(numReplicas)
.state(state)
.build();
IndexRoutingTable.Builder indexRoutingTableBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew(
indexMetadata
Expand Down

0 comments on commit 691803c

Please sign in to comment.