Skip to content

Commit

Permalink
fetching term information from master using light-weight call
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Feb 8, 2024
1 parent be67c23 commit 16d8152
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.opensearch.action.ActionType;

/**
* Transport action for obtaining cluster term
* Transport action for fetching cluster term
*
* @opensearch.internal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.io.IOException;

/**
* Request object for obtaining cluster termß
* Request object to get cluster term
*
* @opensearch.internal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.io.IOException;

/**
* Response object for obtaining cluster term
* Response object of cluster term
*
* @opensearch.internal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import java.io.IOException;

/**
* Transport action for obtaining cluster term
* Transport action for obtaining cluster term and version from cluster-manager
*
* @opensearch.internal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ public void handleResponse(ClusterTermVersionResponse response) {
threadPool.executor(executor)
.execute(ActionRunnable.wrap(GetDelegateForLocalExecute(clusterState), l -> clusterManagerOperation(task, request, clusterState, l)));
} else {
sendClusterStateRequest(nodes.getClusterManagerNode(), clusterState);
//cluster-manager has updated state
executeOnClusterManager(nodes.getClusterManagerNode(), clusterState);
}
}

Expand All @@ -298,7 +299,8 @@ public ClusterTermVersionResponse read(StreamInput in) throws IOException {
}
});
} else {
sendClusterStateRequest(nodes.getClusterManagerNode(), clusterState);
//should be always executed on cluster-manager
executeOnClusterManager(nodes.getClusterManagerNode(), clusterState);
}
}
}
Expand All @@ -310,46 +312,11 @@ public ClusterTermVersionResponse read(StreamInput in) throws IOException {
private ClusterTermVersionResponse readClusterTermResponse(StreamInput in) throws IOException {
return new ClusterTermVersionResponse(in);
}
private void sendClusterStateRequest(DiscoveryNode clusterManagerNode, ClusterState clusterState) {
final String actionName = getClusterManagerActionName(clusterManagerNode);

transportService.sendRequest(
clusterManagerNode,
actionName,
request,
new ActionListenerResponseHandler<Response>(listener, TransportClusterManagerNodeAction.this::read) {
@Override
public void handleException(final TransportException exp) {
handleTransportException(clusterManagerNode, clusterState, exp);
}
}
);
}

private void retryOnMasterChange(ClusterState state, Throwable failure) {
retry(state, failure, ClusterManagerNodeChangePredicate.build(state));
}

private void handleTransportException(DiscoveryNode clusterManagerNode, ClusterState clusterState, final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException
|| (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) {
// we want to retry here a bit to see if a new cluster-manager is elected

logger.debug(
"connection exception while trying to forward request with action name [{}] to "
+ "master node [{}], scheduling a retry. Error: [{}]",
actionName,
clusterManagerNode,
exp.getDetailedMessage()
);

retryOnMasterChange(clusterState, cause);
} else {
listener.onFailure(exp);
}
}

private void retry(ClusterState state, final Throwable failure, final Predicate<ClusterState> statePredicate) {
if (observer == null) {
final long remainingTimeoutMS = request.clusterManagerNodeTimeout().millis() - (threadPool.relativeTimeInMillis()
Expand Down Expand Up @@ -406,16 +373,44 @@ private ActionListener<Response> GetDelegateForLocalExecute(ClusterState cluster
}
});
}
}
private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) {
final String actionName = getClusterManagerActionName(clusterManagerNode);

/**
* Override to optimize calls to fetch cluster state with term version check from master
* @return - true if action needs to perform term check
*/
protected boolean checkTermVersion() {
return false;
}
transportService.sendRequest(
clusterManagerNode,
actionName,
request,
new ActionListenerResponseHandler<Response>(listener, TransportClusterManagerNodeAction.this::read) {
@Override
public void handleException(final TransportException exp) {
handleTransportException(clusterManagerNode, clusterState, exp);
}
}
);
}

private void handleTransportException(DiscoveryNode clusterManagerNode, ClusterState clusterState, final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException
|| (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) {
// we want to retry here a bit to see if a new cluster-manager is elected

logger.debug(
"connection exception while trying to forward request with action name [{}] to "
+ "master node [{}], scheduling a retry. Error: [{}]",
actionName,
clusterManagerNode,
exp.getDetailedMessage()
);

retryOnMasterChange(clusterState, cause);
} else {
listener.onFailure(exp);
}
}


}

/**
* Allows to conditionally return a different cluster-manager node action name in the case an action gets renamed.
Expand All @@ -436,7 +431,14 @@ protected String getMasterActionName(DiscoveryNode node) {
return getClusterManagerActionName(node);
}



/**
* Determines if transport action needs to check local cluster-state term with manager before
* executing the action on manager. This is generally true for actions that are read-only and can be executed locally
* on node if the term matches with cluster-manager.
* @return - true to perform term check and then execute the action
*/
protected boolean checkTermVersion() {
return false;
}

}

0 comments on commit 16d8152

Please sign in to comment.