Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing _list/shards API for closed indices #16606

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support retrieving doc values of unsigned long field ([#16543](https://github.com/opensearch-project/OpenSearch/pull/16543))
- Fix rollover alias supports restored searchable snapshot index([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Fix permissions error on scripted query against remote snapshot ([#16544](https://github.com/opensearch-project/OpenSearch/pull/16544))
- Fix _list/shards API failing when closed indices are present in a cluster ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
Expand All @@ -22,10 +25,12 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.search.SearchService.NO_TIMEOUT;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class TransportCatShardsActionIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -125,4 +130,71 @@ public void onFailure(Exception e) {
latch.await();
}

public void testCatShardsSuccessWithPaginationWithClosedIndices() throws InterruptedException, ExecutionException {
internalCluster().startClusterManagerOnlyNodes(1);
List<String> nodes = internalCluster().startDataOnlyNodes(3);
final int numIndices = 3;
final int numShards = 5;
final int numReplicas = 2;
final int pageSize = numIndices * numReplicas * (numShards + 1);
createIndex(
"test-1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
createIndex(
"test-2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
createIndex(
"test-3",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen();

// close index test-3
client().admin().indices().close(Requests.closeIndexRequest("test-3")).get();

ClusterStateResponse clusterStateResponse = client().admin()
.cluster()
.prepareState()
.clear()
.setMetadata(true)
.setIndices("test-3")
.get();
assertThat(clusterStateResponse.getState().metadata().index("test-3").getState(), equalTo(IndexMetadata.State.CLOSE));

final CatShardsRequest shardsRequest = new CatShardsRequest();
shardsRequest.setCancelAfterTimeInterval(NO_TIMEOUT);
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
shardsRequest.setPageParams(new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize));
CountDownLatch latch = new CountDownLatch(1);
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
List<ShardRouting> shardRoutings = catShardsResponse.getResponseShards();
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals("test-3")));
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail();
latch.countDown();
}
});
latch.await();
}
Comment on lines +182 to +198
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can skip the latch by using the variant the returns a future, something like:

ActionFuture<CatShardsResponse> response = client().execute(CatShardsAction.INSTANCE, shardsRequest);
List<ShardRouting> shardRoutings = response.get().getResponseShards();
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals("test-3")));


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.pagination.ShardPaginationStrategy;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.breaker.ResponseLimitBreachedException;
Expand Down Expand Up @@ -148,7 +149,9 @@
}

private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) {
return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState());
return Objects.isNull(pageParams)
? null
: new ShardPaginationStrategy(pageParams, clusterStateResponse.getState(), IndicesOptions.strictExpandOpenAndForbidClosed());

Check warning on line 154 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L153-L154

Added lines #L153 - L154 were not covered by tests
}

private void validateRequestLimit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.action.pagination;

import org.opensearch.OpenSearchParseException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -37,14 +38,23 @@ public class ShardPaginationStrategy implements PaginationStrategy<ShardRouting>
private PageData pageData;

public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState) {
this(pageParams, clusterState, null);
}

public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState, IndicesOptions indicesOptions) {
ShardStrategyToken shardStrategyToken = getShardStrategyToken(pageParams.getRequestedToken());
// Get list of indices metadata sorted by their creation time and filtered by the last sent index
List<IndexMetadata> filteredIndices = getEligibleIndices(
List<IndexMetadata> filteredIndices = PaginationStrategy.getSortedIndexMetadata(
clusterState,
pageParams.getSort(),
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName,
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime
getMetadataFilter(
pageParams.getSort(),
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName,
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime,
indicesOptions
),
PageParams.PARAM_ASC_SORT_VALUE.equals(pageParams.getSort()) ? ASC_COMPARATOR : DESC_COMPARATOR
);

// Get the list of shards and indices belonging to current page.
this.pageData = getPageData(
filteredIndices,
Expand All @@ -54,39 +64,31 @@ public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState)
);
}

private static List<IndexMetadata> getEligibleIndices(
ClusterState clusterState,
private static Predicate<IndexMetadata> getMetadataFilter(
String sortOrder,
String lastIndexName,
Long lastIndexCreationTime
Long lastIndexCreationTime,
IndicesOptions indicesOptions
) {
if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) {
return PaginationStrategy.getSortedIndexMetadata(
clusterState,
PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR
);
} else {
return PaginationStrategy.getSortedIndexMetadata(
clusterState,
getMetadataFilter(sortOrder, lastIndexName, lastIndexCreationTime),
PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR
);
}
}

private static Predicate<IndexMetadata> getMetadataFilter(String sortOrder, String lastIndexName, Long lastIndexCreationTime) {
if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) {
return indexMetadata -> true;
return indexStateFilter(indicesOptions);
}
return indexNameFilter(lastIndexName).or(
IndexPaginationStrategy.getIndexCreateTimeFilter(sortOrder, lastIndexName, lastIndexCreationTime)
);
).and(indexStateFilter(indicesOptions));
}

private static Predicate<IndexMetadata> indexNameFilter(String lastIndexName) {
return metadata -> metadata.getIndex().getName().equals(lastIndexName);
}

private static Predicate<IndexMetadata> indexStateFilter(IndicesOptions indicesOptions) {
if (Objects.isNull(indicesOptions) || !indicesOptions.forbidClosedIndices()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Objects.isNull exists to use as a predicate. The canonical way to do this is just indicesOptions == null

return metadata -> true;
}
return metadata -> metadata.getState().equals(IndexMetadata.State.OPEN);
}

/**
* Will be used to get the list of shards and respective indices to which they belong,
* which are to be displayed in a page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -393,6 +394,45 @@ public void testRetrieveShardsWhenLastIndexGetsDeletedAndReCreated() {
assertNull(strategy.getResponseToken().getNextToken());
}

/**
* Validates strategy filters out CLOSED indices, if forbidClosed() indices options are provided.
*/
public void testNoClosedIndicesReturnedByStrategy() {
final int pageSize = DEFAULT_NUMBER_OF_SHARDS * (DEFAULT_NUMBER_OF_REPLICAS + 1);
ClusterState clusterState = getRandomClusterState(List.of(0, 1, 2, 3, 4, 5));
// Add 2 closed indices to cluster state
clusterState = addIndexToClusterState(
clusterState,
6,
DEFAULT_NUMBER_OF_SHARDS,
DEFAULT_NUMBER_OF_REPLICAS,
IndexMetadata.State.CLOSE
);
clusterState = addIndexToClusterState(
clusterState,
7,
DEFAULT_NUMBER_OF_SHARDS,
DEFAULT_NUMBER_OF_REPLICAS,
IndexMetadata.State.CLOSE
);
List<ShardRouting> shardRoutings = new ArrayList<>();
List<String> indices = new ArrayList<>();
String requestedToken = null;
ShardPaginationStrategy strategy;
do {
PageParams pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize);
strategy = new ShardPaginationStrategy(pageParams, clusterState, IndicesOptions.strictExpandOpenAndForbidClosed());
requestedToken = strategy.getResponseToken().getNextToken();
shardRoutings.addAll(strategy.getRequestedEntities());
indices.addAll(strategy.getRequestedIndices());
} while (requestedToken != null);
// assert that the closed indices do not appear in the response
assertFalse(indices.contains(TEST_INDEX_PREFIX + 6));
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 6)));
assertFalse(indices.contains(TEST_INDEX_PREFIX + 7));
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 7)));
}

public void testCreatingShardStrategyPageTokenWithRequestedTokenNull() {
try {
new ShardPaginationStrategy.ShardStrategyToken(null);
Expand Down Expand Up @@ -478,11 +518,40 @@ private ClusterState addIndexToClusterState(
final int numShards,
final int numReplicas,
final long creationTime
) {
return addIndexToClusterState(clusterState, indexNumber, numShards, numReplicas, creationTime, IndexMetadata.State.OPEN);
}

private ClusterState addIndexToClusterState(
ClusterState clusterState,
final int indexNumber,
final int numShards,
final int numReplicas,
final IndexMetadata.State state
) {
return addIndexToClusterState(
clusterState,
indexNumber,
numShards,
numReplicas,
Instant.now().plus(indexNumber, ChronoUnit.SECONDS).toEpochMilli(),
state
);
}

private ClusterState addIndexToClusterState(
ClusterState clusterState,
final int indexNumber,
final int numShards,
final int numReplicas,
final long creationTime,
final IndexMetadata.State state
) {
IndexMetadata indexMetadata = IndexMetadata.builder(TEST_INDEX_PREFIX + indexNumber)
.settings(settings(Version.CURRENT).put(SETTING_CREATION_DATE, creationTime))
.numberOfShards(numShards)
.numberOfReplicas(numReplicas)
.state(state)
.build();
IndexRoutingTable.Builder indexRoutingTableBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew(
indexMetadata
Expand Down
Loading