From 691803cb2479620a5886ac0e32731ed8bdd5057b Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Mon, 11 Nov 2024 10:24:59 +0530 Subject: [PATCH] Fixing _list/shards API for closed indices Signed-off-by: Harsh Garg --- .../shards/TransportCatShardsActionIT.java | 74 +++++++++++++++++++ .../shards/TransportCatShardsAction.java | 5 +- .../pagination/ShardPaginationStrategy.java | 50 +++++++------ .../ShardPaginationStrategyTests.java | 57 ++++++++++++++ 4 files changed, 161 insertions(+), 25 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java index 32d5b3db85629..25102df145c3f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java @@ -8,7 +8,10 @@ package org.opensearch.action.admin.cluster.shards; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.pagination.PageParams; +import org.opensearch.client.Requests; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.settings.Settings; @@ -22,7 +25,10 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.search.SearchService.NO_TIMEOUT; @@ -125,4 +131,72 @@ public void onFailure(Exception e) { latch.await(); } + public void testCatShardsSuccessWithPaginationWithClosedIndices() throws InterruptedException, ExecutionException { + internalCluster().startClusterManagerOnlyNodes(1); + List nodes = internalCluster().startDataOnlyNodes(3); + final int numIndices = 3; + final int numShards = 5; + final int numReplicas = 2; + final int pageSize = numIndices * numReplicas * (numShards + 1); + createIndex( + "test-1", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m") + .build() + ); + createIndex( + "test-2", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m") + .build() + ); + createIndex( + "test-3", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m") + .build() + ); + ensureGreen(); + + // close index test-3 + client().admin().indices().close(Requests.closeIndexRequest("test-3")).get(); + + ClusterStateResponse clusterStateResponse = client().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setIndices("test-3") + .get(); + assertThat(clusterStateResponse.getState().metadata().index("test-3").getState(), equalTo(IndexMetadata.State.CLOSE)); + + + final CatShardsRequest shardsRequest = new CatShardsRequest(); + shardsRequest.setCancelAfterTimeInterval(NO_TIMEOUT); + shardsRequest.setIndices(Strings.EMPTY_ARRAY); + shardsRequest.setPageParams(new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize)); + CountDownLatch latch = new CountDownLatch(1); + client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener() { + @Override + public void onResponse(CatShardsResponse catShardsResponse) { + List shardRoutings = catShardsResponse.getResponseShards(); + assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals("test-3"))); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail(); + latch.countDown(); + } + }); + latch.await(); + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java index 7b36b7a10f4f2..17243a6d5cce2 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java @@ -16,6 +16,7 @@ import org.opensearch.action.pagination.ShardPaginationStrategy; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.TimeoutTaskCancellationUtility; import org.opensearch.client.node.NodeClient; import org.opensearch.common.breaker.ResponseLimitBreachedException; @@ -148,7 +149,9 @@ public void onFailure(Exception e) { } private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) { - return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState()); + return Objects.isNull(pageParams) + ? null + : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState(), IndicesOptions.strictExpandOpenAndForbidClosed()); } private void validateRequestLimit( diff --git a/server/src/main/java/org/opensearch/action/pagination/ShardPaginationStrategy.java b/server/src/main/java/org/opensearch/action/pagination/ShardPaginationStrategy.java index 1eb364c883e60..3fb0821d39224 100644 --- a/server/src/main/java/org/opensearch/action/pagination/ShardPaginationStrategy.java +++ b/server/src/main/java/org/opensearch/action/pagination/ShardPaginationStrategy.java @@ -9,6 +9,7 @@ package org.opensearch.action.pagination; import org.opensearch.OpenSearchParseException; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -37,14 +38,23 @@ public class ShardPaginationStrategy implements PaginationStrategy private PageData pageData; public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState) { + this(pageParams, clusterState, null); + } + + public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState, IndicesOptions indicesOptions) { ShardStrategyToken shardStrategyToken = getShardStrategyToken(pageParams.getRequestedToken()); // Get list of indices metadata sorted by their creation time and filtered by the last sent index - List filteredIndices = getEligibleIndices( + List filteredIndices = PaginationStrategy.getSortedIndexMetadata( clusterState, - pageParams.getSort(), - Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName, - Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime + getMetadataFilter( + pageParams.getSort(), + Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName, + Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime, + indicesOptions + ), + PageParams.PARAM_ASC_SORT_VALUE.equals(pageParams.getSort()) ? ASC_COMPARATOR : DESC_COMPARATOR ); + // Get the list of shards and indices belonging to current page. this.pageData = getPageData( filteredIndices, @@ -54,39 +64,31 @@ public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState) ); } - private static List getEligibleIndices( - ClusterState clusterState, + private static Predicate getMetadataFilter( String sortOrder, String lastIndexName, - Long lastIndexCreationTime + Long lastIndexCreationTime, + IndicesOptions indicesOptions ) { if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) { - return PaginationStrategy.getSortedIndexMetadata( - clusterState, - PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR - ); - } else { - return PaginationStrategy.getSortedIndexMetadata( - clusterState, - getMetadataFilter(sortOrder, lastIndexName, lastIndexCreationTime), - PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR - ); - } - } - - private static Predicate getMetadataFilter(String sortOrder, String lastIndexName, Long lastIndexCreationTime) { - if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) { - return indexMetadata -> true; + return indexStateFilter(indicesOptions); } return indexNameFilter(lastIndexName).or( IndexPaginationStrategy.getIndexCreateTimeFilter(sortOrder, lastIndexName, lastIndexCreationTime) - ); + ).and(indexStateFilter(indicesOptions)); } private static Predicate indexNameFilter(String lastIndexName) { return metadata -> metadata.getIndex().getName().equals(lastIndexName); } + private static Predicate indexStateFilter(IndicesOptions indicesOptions) { + if (Objects.isNull(indicesOptions) || !indicesOptions.forbidClosedIndices()) { + return metadata -> true; + } + return metadata -> metadata.getState().equals(IndexMetadata.State.OPEN); + } + /** * Will be used to get the list of shards and respective indices to which they belong, * which are to be displayed in a page. diff --git a/server/src/test/java/org/opensearch/action/pagination/ShardPaginationStrategyTests.java b/server/src/test/java/org/opensearch/action/pagination/ShardPaginationStrategyTests.java index aed7315660378..ff64171e0f9d9 100644 --- a/server/src/test/java/org/opensearch/action/pagination/ShardPaginationStrategyTests.java +++ b/server/src/test/java/org/opensearch/action/pagination/ShardPaginationStrategyTests.java @@ -10,6 +10,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.Version; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -393,6 +394,33 @@ public void testRetrieveShardsWhenLastIndexGetsDeletedAndReCreated() { assertNull(strategy.getResponseToken().getNextToken()); } + /** + * Validates strategy filters out CLOSED indices, if forbidClosed() indices options are provided. + */ + public void testNoClosedIndicesReturnedByStrategy() { + final int pageSize = DEFAULT_NUMBER_OF_SHARDS * (DEFAULT_NUMBER_OF_REPLICAS + 1); + ClusterState clusterState = getRandomClusterState(List.of(0, 1, 2, 3, 4, 5)); + // Add 2 closed indices to cluster state + clusterState = addIndexToClusterState(clusterState, 6, DEFAULT_NUMBER_OF_SHARDS, DEFAULT_NUMBER_OF_REPLICAS, IndexMetadata.State.CLOSE); + clusterState = addIndexToClusterState(clusterState, 7, DEFAULT_NUMBER_OF_SHARDS, DEFAULT_NUMBER_OF_REPLICAS, IndexMetadata.State.CLOSE); + List shardRoutings = new ArrayList<>(); + List indices = new ArrayList<>(); + String requestedToken = null; + ShardPaginationStrategy strategy; + do { + PageParams pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize); + strategy = new ShardPaginationStrategy(pageParams, clusterState, IndicesOptions.strictExpandOpenAndForbidClosed()); + requestedToken = strategy.getResponseToken().getNextToken(); + shardRoutings.addAll(strategy.getRequestedEntities()); + indices.addAll(strategy.getRequestedIndices()); + } while (requestedToken != null); + // assert that the closed indices do not appear in the response + assertFalse(indices.contains(TEST_INDEX_PREFIX + 6)); + assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 6))); + assertFalse(indices.contains(TEST_INDEX_PREFIX + 7)); + assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 7))); + } + public void testCreatingShardStrategyPageTokenWithRequestedTokenNull() { try { new ShardPaginationStrategy.ShardStrategyToken(null); @@ -478,11 +506,40 @@ private ClusterState addIndexToClusterState( final int numShards, final int numReplicas, final long creationTime + ) { + return addIndexToClusterState(clusterState, indexNumber, numShards, numReplicas, creationTime, IndexMetadata.State.OPEN); + } + + private ClusterState addIndexToClusterState( + ClusterState clusterState, + final int indexNumber, + final int numShards, + final int numReplicas, + final IndexMetadata.State state + ) { + return addIndexToClusterState( + clusterState, + indexNumber, + numShards, + numReplicas, + Instant.now().plus(indexNumber, ChronoUnit.SECONDS).toEpochMilli(), + state + ); + } + + private ClusterState addIndexToClusterState( + ClusterState clusterState, + final int indexNumber, + final int numShards, + final int numReplicas, + final long creationTime, + final IndexMetadata.State state ) { IndexMetadata indexMetadata = IndexMetadata.builder(TEST_INDEX_PREFIX + indexNumber) .settings(settings(Version.CURRENT).put(SETTING_CREATION_DATE, creationTime)) .numberOfShards(numShards) .numberOfReplicas(numReplicas) + .state(state) .build(); IndexRoutingTable.Builder indexRoutingTableBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew( indexMetadata