From accb5e8ef4c292c4bbdabf5f01bfe08930383756 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Thu, 8 Feb 2024 13:55:11 +0530 Subject: [PATCH] fetching term information from master using light-weight call before fetch of cluster-state --- .../org/opensearch/action/ActionModule.java | 3 + .../state/TransportClusterStateAction.java | 4 + .../state/term/ClusterTermVersionAction.java | 50 ++++++ .../state/term/ClusterTermVersionRequest.java | 58 +++++++ .../term/ClusterTermVersionResponse.java | 82 +++++++++ .../TransportClusterTermVersionAction.java | 110 ++++++++++++ .../cluster/state/term/package-info.java | 10 ++ .../TransportClusterManagerNodeAction.java | 162 +++++++++++++----- 8 files changed, 439 insertions(+), 40 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportClusterTermVersionAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/package-info.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 7b0b725c88f64..01bc77c101c7c 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -107,6 +107,8 @@ import org.opensearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction; import org.opensearch.action.admin.cluster.state.ClusterStateAction; import org.opensearch.action.admin.cluster.state.TransportClusterStateAction; +import org.opensearch.action.admin.cluster.state.term.ClusterTermVersionAction; +import org.opensearch.action.admin.cluster.state.term.TransportClusterTermVersionAction; import org.opensearch.action.admin.cluster.stats.ClusterStatsAction; import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction; import org.opensearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction; @@ -607,6 +609,7 @@ public void reg actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); + actions.register(ClusterTermVersionAction.INSTANCE, TransportClusterTermVersionAction.class); actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index 4aaa7f1950823..18a8ddd522023 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -231,4 +231,8 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false); } + @Override + protected boolean checkTermVersion() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionAction.java new file mode 100644 index 0000000000000..25330b090db16 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionAction.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.opensearch.action.ActionType; + +/** + * Transport action for fetching cluster term + * + * @opensearch.internal + */ +public class ClusterTermVersionAction extends ActionType { + + public static final ClusterTermVersionAction INSTANCE = new ClusterTermVersionAction(); + public static final String NAME = "cluster:monitor/term"; + + private ClusterTermVersionAction() { + super(NAME, ClusterTermVersionResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionRequest.java new file mode 100644 index 0000000000000..07518a04425fa --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionRequest.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest; +import org.opensearch.core.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * Request object to get cluster term + * + * @opensearch.internal + */ +public class ClusterTermVersionRequest extends ClusterManagerNodeReadRequest { + + public ClusterTermVersionRequest() {} + + public ClusterTermVersionRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionResponse.java new file mode 100644 index 0000000000000..109ea7ae4b5db --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionResponse.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Response object of cluster term + * + * @opensearch.internal + */ +public class ClusterTermVersionResponse extends ActionResponse { + + protected DiscoveryNode sourceNode; + + protected long term; + protected long version; + + public ClusterTermVersionResponse(DiscoveryNode sourceNode, long term, long version) { + this.sourceNode = sourceNode; + this.term = term; + this.version = version; + } + + public ClusterTermVersionResponse(StreamInput in) throws IOException { + super(in); + this.sourceNode = new DiscoveryNode(in); + this.term = in.readLong(); + this.version = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + sourceNode.writeTo(out); + out.writeLong(term); + out.writeLong(version); + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportClusterTermVersionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportClusterTermVersionAction.java new file mode 100644 index 0000000000000..9e57ac9de517e --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportClusterTermVersionAction.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Transport action for obtaining cluster term and version from cluster-manager + * + * @opensearch.internal + */ +public class TransportClusterTermVersionAction extends TransportClusterManagerNodeReadAction< + ClusterTermVersionRequest, + ClusterTermVersionResponse> { + + private final Logger logger = LogManager.getLogger(getClass()); + + @Inject + public TransportClusterTermVersionAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + ClusterTermVersionAction.NAME, + false, + transportService, + clusterService, + threadPool, + actionFilters, + ClusterTermVersionRequest::new, + indexNameExpressionResolver + ); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public ClusterTermVersionResponse read(StreamInput in) throws IOException { + return new ClusterTermVersionResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(ClusterTermVersionRequest request, ClusterState state) { + return null; + } + + @Override + protected void clusterManagerOperation( + ClusterTermVersionRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + ActionListener.completeWith(listener, () -> buildResponse(request, state)); + } + + private ClusterTermVersionResponse buildResponse(ClusterTermVersionRequest request, ClusterState state) { + logger.trace("Serving cluster term version request using term {} and version {}", state.term(), state.version()); + return new ClusterTermVersionResponse(state.getNodes().getClusterManagerNode(), state.term(), state.getVersion()); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/package-info.java new file mode 100644 index 0000000000000..0ee559c527d7d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Cluster Term transport handler. */ +package org.opensearch.action.admin.cluster.state.term; diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 536ddcdd402e2..a4cdac7c3543a 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -35,8 +35,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.ActionRunnable; +import org.opensearch.action.admin.cluster.state.term.ClusterTermVersionAction; +import org.opensearch.action.admin.cluster.state.term.ClusterTermVersionRequest; +import org.opensearch.action.admin.cluster.state.term.ClusterTermVersionResponse; import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; @@ -66,6 +70,7 @@ import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; import java.io.IOException; @@ -252,23 +257,13 @@ protected void doStart(ClusterState clusterState) { }); } } else { - ActionListener delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> { - if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) { - logger.debug( - () -> new ParameterizedMessage( - "master could not publish cluster state or " - + "stepped down before publishing action [{}], scheduling a retry", - actionName - ), - t - ); - retryOnMasterChange(clusterState, t); - } else { - delegatedListener.onFailure(t); - } - }); threadPool.executor(executor) - .execute(ActionRunnable.wrap(delegate, l -> clusterManagerOperation(task, request, clusterState, l))); + .execute( + ActionRunnable.wrap( + GetDelegateForLocalExecute(clusterState), + l -> clusterManagerOperation(task, request, clusterState, l) + ) + ); } } else { if (nodes.getClusterManagerNode() == null) { @@ -276,32 +271,50 @@ protected void doStart(ClusterState clusterState) { retryOnMasterChange(clusterState, null); } else { DiscoveryNode clusterManagerNode = nodes.getClusterManagerNode(); - final String actionName = getClusterManagerActionName(clusterManagerNode); - transportService.sendRequest( - clusterManagerNode, - actionName, - request, - new ActionListenerResponseHandler(listener, TransportClusterManagerNodeAction.this::read) { - @Override - public void handleException(final TransportException exp) { - Throwable cause = exp.unwrapCause(); - if (cause instanceof ConnectTransportException - || (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) { - // we want to retry here a bit to see if a new cluster-manager is elected - logger.debug( - "connection exception while trying to forward request with action name [{}] to " - + "master node [{}], scheduling a retry. Error: [{}]", - actionName, - nodes.getClusterManagerNode(), - exp.getDetailedMessage() - ); - retryOnMasterChange(clusterState, cause); - } else { - listener.onFailure(exp); + + if (clusterManagerNode.getVersion().onOrAfter(Version.V_2_11_0) && checkTermVersion()) { + transportService.sendRequest( + clusterManagerNode, + ClusterTermVersionAction.NAME, + new ClusterTermVersionRequest(), + new TransportResponseHandler() { + @Override + public void handleResponse(ClusterTermVersionResponse response) { + if (response.getTerm() == clusterState.term() && response.getVersion() == clusterState.version()) { + // same as local execute + threadPool.executor(executor) + .execute( + ActionRunnable.wrap( + GetDelegateForLocalExecute(clusterState), + l -> clusterManagerOperation(task, request, clusterState, l) + ) + ); + } else { + // cluster-manager has updated state + executeOnClusterManager(nodes.getClusterManagerNode(), clusterState); + } + } + + @Override + public void handleException(TransportException exp) { + handleTransportException(nodes.getClusterManagerNode(), clusterState, exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public ClusterTermVersionResponse read(StreamInput in) throws IOException { + return new ClusterTermVersionResponse(in); } } - } - ); + ); + } else { + // should be always executed on cluster-manager + executeOnClusterManager(nodes.getClusterManagerNode(), clusterState); + } } } } catch (Exception e) { @@ -309,6 +322,10 @@ public void handleException(final TransportException exp) { } } + private ClusterTermVersionResponse readClusterTermResponse(StreamInput in) throws IOException { + return new ClusterTermVersionResponse(in); + } + private void retryOnMasterChange(ClusterState state, Throwable failure) { retry(state, failure, ClusterManagerNodeChangePredicate.build(state)); } @@ -351,6 +368,61 @@ public void onTimeout(TimeValue timeout) { } }, statePredicate); } + + private ActionListener GetDelegateForLocalExecute(ClusterState clusterState) { + return ActionListener.delegateResponse(listener, (delegatedListener, t) -> { + if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) { + logger.debug( + () -> new ParameterizedMessage( + "master could not publish cluster state or " + "stepped down before publishing action [{}], scheduling a retry", + actionName + ), + t + ); + + retryOnMasterChange(clusterState, t); + } else { + delegatedListener.onFailure(t); + } + }); + } + + private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) { + final String actionName = getClusterManagerActionName(clusterManagerNode); + + transportService.sendRequest( + clusterManagerNode, + actionName, + request, + new ActionListenerResponseHandler(listener, TransportClusterManagerNodeAction.this::read) { + @Override + public void handleException(final TransportException exp) { + handleTransportException(clusterManagerNode, clusterState, exp); + } + } + ); + } + + private void handleTransportException(DiscoveryNode clusterManagerNode, ClusterState clusterState, final TransportException exp) { + Throwable cause = exp.unwrapCause(); + if (cause instanceof ConnectTransportException + || (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) { + // we want to retry here a bit to see if a new cluster-manager is elected + + logger.debug( + "connection exception while trying to forward request with action name [{}] to " + + "master node [{}], scheduling a retry. Error: [{}]", + actionName, + clusterManagerNode, + exp.getDetailedMessage() + ); + + retryOnMasterChange(clusterState, cause); + } else { + listener.onFailure(exp); + } + } + } /** @@ -372,4 +444,14 @@ protected String getMasterActionName(DiscoveryNode node) { return getClusterManagerActionName(node); } + /** + * Determines if transport action needs to check local cluster-state term with manager before + * executing the action on manager. This is generally true for actions that are read-only and can be executed locally + * on node if the term matches with cluster-manager. + * @return - true to perform term check and then execute the action + */ + protected boolean checkTermVersion() { + return false; + } + }