Skip to content

Commit

Permalink
allow RemotePublication configured nodes to join remote cluster in mi…
Browse files Browse the repository at this point in the history
…xed mode
  • Loading branch information
rajiv-kv committed Sep 19, 2024
1 parent 620db0a commit 844b9d5
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotemigration.MigrationBaseTestCase;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.junit.Before;

import java.util.Collection;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* Tests the compatibility between types of nodes based on the configured repositories
* Non Remote node [No Repositories configured]
* Remote Publish Configured Node [Cluster State + Routing Table]
* Remote Node [Cluster State + Segment + Translog]
* Remote Node With Routing Table [Cluster State + Segment + Translog + Routing Table]
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemotePublicationConfigurationIT extends MigrationBaseTestCase {
private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
/* Adding the following mock plugins:
- InternalSettingsPlugin : To override default intervals of retention lease and global ckp sync
- MockFsRepositoryPlugin and MockTransportService.TestPlugin: To ensure remote interactions are not no-op and retention leases are properly propagated
*/
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(InternalSettingsPlugin.class, MockFsRepositoryPlugin.class, MockTransportService.TestPlugin.class)
).collect(Collectors.toList());
}

@Before
public void setUp() throws Exception {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
}
super.setUp();
}

public Settings.Builder remotePublishConfiguredNodeSetting() {
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
REPOSITORY_NAME
);
String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey();
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
REPOSITORY_NAME
);
String routingTableRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
ROUTING_TABLE_REPO_NAME
);
String routingTableRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
ROUTING_TABLE_REPO_NAME
);

Settings.Builder builder = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME)
.put(stateRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
return builder;
}

public Settings.Builder remoteWithRoutingTableNodeSetting() {
// Remote Cluster with Routing table
return Settings.builder()
.put(
buildRemoteStoreNodeAttributes(
REPOSITORY_NAME,
segmentRepoPath,
REPOSITORY_2_NAME,
translogRepoPath,
REPOSITORY_NAME,
segmentRepoPath,
false
)
)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true);
}

public void testRemotePublishConfigNodeJoinNonRemoteCluster() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);

Settings.Builder build = remotePublishConfiguredNodeSetting();
internalCluster().startClusterManagerOnlyNode(build.build());
internalCluster().startDataOnlyNodes(2, build.build());

ensureStableCluster(6);
ensureGreen();
}

public void testRemotePublishConfigNodeJoinRemoteCluster() throws Exception {
// Remote Cluster without Routing table
setAddRemote(true);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);
setAddRemote(false);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
Settings.Builder build = remotePublishConfiguredNodeSetting();
internalCluster().startClusterManagerOnlyNode(build.build());
ensureStableCluster(4);
ensureGreen();
}

public void testRemoteNodeWithRoutingTableJoinRemoteCluster() throws Exception {
setAddRemote(true);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);
setAddRemote(false);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// Remote Repo with Routing table
Settings settings = remoteWithRoutingTableNodeSetting().build();
internalCluster().startClusterManagerOnlyNode(settings);
ensureStableCluster(4);
ensureGreen();
}

public void testNonRemoteNodeJoinRemoteWithRoutingCluster() throws Exception {
Settings settings = remoteWithRoutingTableNodeSetting().build();
internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNodes(2, settings);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

internalCluster().startClusterManagerOnlyNode();
ensureStableCluster(4);
ensureGreen();
}

public void testRemotePublishConfigNodeJoinRemoteWithRoutingCluster() throws Exception {
Settings settings = remoteWithRoutingTableNodeSetting().build();
internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNodes(2, settings);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

internalCluster().startClusterManagerOnlyNode(remotePublishConfiguredNodeSetting().build());

ensureStableCluster(4);
ensureGreen();
}

public void testNonRemoteNodeJoiningPublishConfigCluster() throws Exception {
Settings.Builder build = remotePublishConfiguredNodeSetting();
internalCluster().startClusterManagerOnlyNode(build.build());
internalCluster().startDataOnlyNodes(2, build.build());

internalCluster().startClusterManagerOnlyNode();

ensureStableCluster(4);
ensureGreen();
}

public void testRemoteNodeJoiningPublishConfigCluster() throws Exception {
Settings.Builder build = remotePublishConfiguredNodeSetting();
internalCluster().startClusterManagerOnlyNode(build.build());
internalCluster().startDataOnlyNodes(2, build.build());

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

setAddRemote(true);
internalCluster().startClusterManagerOnlyNode();
ensureStableCluster(4);
ensureGreen();
}

public void testRemoteNodeWithRoutingTableJoiningPublishConfigCluster() throws Exception {
Settings.Builder build = remotePublishConfiguredNodeSetting();
internalCluster().startClusterManagerOnlyNode(build.build());
internalCluster().startDataOnlyNodes(2, build.build());

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);

Settings settings = Settings.builder()
.put(
buildRemoteStoreNodeAttributes(
REPOSITORY_NAME,
segmentRepoPath,
REPOSITORY_2_NAME,
translogRepoPath,
ROUTING_TABLE_REPO_NAME,
segmentRepoPath,
false
)
)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.build();
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
internalCluster().startClusterManagerOnlyNode(settings);

ensureStableCluster(4);
ensureGreen();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@

public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected static final String ROUTING_TABLE_REPO_NAME = "remote-routing-repo";

protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";

protected Path segmentRepoPath;
Expand All @@ -72,7 +74,7 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
randomAlphaOfLength(5)
);

void setAddRemote(boolean addRemote) {
public void setAddRemote(boolean addRemote) {
this.addRemote = addRemote;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,13 @@ public static void ensureIndexCompatibility(final Version nodeVersion, Metadata
* ensures that the joining node has a version that's compatible with all current nodes
*/
public static void ensureNodesCompatibility(final DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {
final Version minNodeVersion = currentNodes.getMinNodeVersion();
final Version maxNodeVersion = currentNodes.getMaxNodeVersion();
ensureNodesCompatibility(joiningNode, currentNodes, metadata, minNodeVersion, maxNodeVersion);
try {
final Version minNodeVersion = currentNodes.getMinNodeVersion();
final Version maxNodeVersion = currentNodes.getMaxNodeVersion();
ensureNodesCompatibility(joiningNode, currentNodes, metadata, minNodeVersion, maxNodeVersion);
} catch (Exception e) {
logger.error("Exception in NodesCompatibility validation", e);
}
}

/**
Expand Down Expand Up @@ -539,9 +543,11 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());

List<String> reposToSkip = new ArrayList<>(1);
// find a remote node which has routing table configured
Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream()
.filter(
node -> node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
&& node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
)
.findFirst();
// If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node.
Expand Down Expand Up @@ -579,10 +585,9 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
throw new IllegalStateException(reason);
}
if (joiningNode.isRemoteStoreNode()) {
Optional<DiscoveryNode> remoteDN = remoteRoutingTableNode.isPresent()
? remoteRoutingTableNode
: existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, reposToSkip));
remoteRoutingTableNode.ifPresent(
discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, reposToSkip)
);
}
}
}
Expand Down

0 comments on commit 844b9d5

Please sign in to comment.