Skip to content

Commit

Permalink
Cancellation support for cat/nodes and optimize it
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei committed Dec 5, 2024
1 parent 42dc22e commit 572cf7c
Show file tree
Hide file tree
Showing 13 changed files with 605 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
- Cancellation support for cat/nodes and optimize it ([#14853](https://github.com/opensearch-project/OpenSearch/pull/14853))

### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http;

import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.io.IOException;

import static org.apache.hc.core5.http.HttpStatus.SC_OK;
import static org.hamcrest.Matchers.containsString;

@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 5, numClientNodes = 0)
public class HttpCatIT extends HttpSmokeTestCase {

public void testdoCatRequest() throws IOException {
try (RestClient restClient = getRestClient()) {
int nodesCount = restClient.getNodes().size();
assertEquals(5, nodesCount);

// to make sure the timeout is working
for (int i = 0; i < 5; i++) {
sendRequest(restClient, 30, nodesCount);
}

// no timeout
for (int i = 0; i < 5; i++) {
sendRequest(restClient, -1, nodesCount);
}

for (int i = 1; i < 5; i++) {
long timeout = randomInt(300);
sendRequest(restClient, timeout, nodesCount);
}
}
}

private void sendRequest(RestClient restClient, long timeout, int nodesCount) {
Request nodesRequest;
if (timeout < 0) {
nodesRequest = new Request("GET", "/_cat/nodes");
} else {
nodesRequest = new Request("GET", "/_cat/nodes?timeout=" + timeout + "ms");
}
try {
Response response = restClient.performRequest(nodesRequest);
assertEquals(SC_OK, response.getStatusLine().getStatusCode());
String result = EntityUtils.toString(response.getEntity());
String[] NodeInfos = result.split("\n");
assertEquals(nodesCount, NodeInfos.length);
} catch (IOException | ParseException e) {
// it means that it costs too long to get ClusterState from the master.
assertThat(e.getMessage(), containsString("There is not enough time to obtain nodesInfo metric from the cluster manager"));
}
}

}
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction;
import org.opensearch.action.admin.cluster.health.ClusterHealthAction;
import org.opensearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.opensearch.action.admin.cluster.node.Nodes.CatNodesAction;
import org.opensearch.action.admin.cluster.node.Nodes.TransportCatNodesAction;
import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
import org.opensearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction;
Expand Down Expand Up @@ -664,6 +666,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class);
actions.register(CatNodesAction.INSTANCE, TransportCatNodesAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.Nodes;

import org.opensearch.action.ActionType;

/**
* Transport action for cat nodes
*
* @opensearch.internal
*/
public class CatNodesAction extends ActionType<CatNodesResponse> {
public static final CatNodesAction INSTANCE = new CatNodesAction();
public static final String NAME = "cluster:monitor/nodes/cat";

public CatNodesAction() {
super(NAME, CatNodesResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.Nodes;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;

import java.io.IOException;
import java.util.Map;

/**
* A request of _cat/nodes.
*
* @opensearch.api
*/
public class CatNodesRequest extends ClusterManagerNodeReadRequest<CatNodesRequest> {

private TimeValue cancelAfterTimeInterval;
private long timeout = -1;

public CatNodesRequest() {}

public CatNodesRequest(StreamInput in) throws IOException {
super(in);
}

public void setCancelAfterTimeInterval(TimeValue timeout) {
this.cancelAfterTimeInterval = timeout;
}

public TimeValue getCancelAfterTimeInterval() {
return cancelAfterTimeInterval;
}

public void setTimeout(long timeout) {
this.timeout = timeout;
}

public long getTimeout() {
return timeout;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.Nodes;

import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* A response of a cat shards request.
*
* @opensearch.api
*/
public class CatNodesResponse extends ActionResponse {

private ClusterStateResponse clusterStateResponse;
private NodesInfoResponse nodesInfoResponse;
private NodesStatsResponse nodesStatsResponse;

public CatNodesResponse(
ClusterStateResponse clusterStateResponse,
NodesInfoResponse nodesInfoResponse,
NodesStatsResponse nodesStatsResponse
) {
this.clusterStateResponse = clusterStateResponse;
this.nodesInfoResponse = nodesInfoResponse;
this.nodesStatsResponse = nodesStatsResponse;
}

public CatNodesResponse(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateResponse.writeTo(out);
nodesInfoResponse.writeTo(out);
nodesStatsResponse.writeTo(out);
}

public NodesStatsResponse getNodesStatsResponse() {
return nodesStatsResponse;
}

public NodesInfoResponse getNodesInfoResponse() {
return nodesInfoResponse;
}

public ClusterStateResponse getClusterStateResponse() {
return clusterStateResponse;
}
}
Loading

0 comments on commit 572cf7c

Please sign in to comment.