forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial commit to support a search only replica for RW separation. (o…
…pensearch-project#15410) * Initial commit for search only replica. This PR contains the following: 1. Introduce searchOnly flag on ShardRouting. 2. Added feature flag to enable/disable the feature. 3. supports both create and update APIs to toggle search replica count. 4. Changes to exclude search replicas from primary eligibility. 5. Changes to prevent replicationOperations from routing to search replicas. Signed-off-by: Marc Handalian <[email protected]> * add some missing feature flag checks Signed-off-by: Marc Handalian <[email protected]> * Pr feedback from @andrross Signed-off-by: Marc Handalian <[email protected]> * Add more unit tests for settings create and update Signed-off-by: Marc Handalian <[email protected]> * Fix broken tests from setting rename Signed-off-by: Marc Handalian <[email protected]> * Fix broken tests and add changelog entry Signed-off-by: Marc Handalian <[email protected]> * More PR feedback. Signed-off-by: Marc Handalian <[email protected]> * add missing searchOnly property to initializeTargetRelocatingShard. Without this search replicas will become regular replicas on relocation. Signed-off-by: Marc Handalian <[email protected]> * test fixes Signed-off-by: Marc Handalian <[email protected]> * spotless Signed-off-by: Marc Handalian <[email protected]> --------- Signed-off-by: Marc Handalian <[email protected]>
- Loading branch information
1 parent
5540a00
commit b46aa10
Showing
26 changed files
with
1,286 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
56 changes: 56 additions & 0 deletions
56
...ernalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.indices.settings; | ||
|
||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.settings.SettingsException; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.indices.replication.common.ReplicationType; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; | ||
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1) | ||
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase { | ||
|
||
private static final String TEST_INDEX = "test_index"; | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
return Settings.builder() | ||
.put(super.featureFlagSettings()) | ||
.put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.FALSE) | ||
.build(); | ||
} | ||
|
||
public void testCreateFeatureFlagDisabled() { | ||
Settings settings = Settings.builder().put(indexSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, false).build(); | ||
SettingsException settingsException = expectThrows(SettingsException.class, () -> createIndex(TEST_INDEX, settings)); | ||
assertTrue(settingsException.getMessage().contains("unknown setting")); | ||
} | ||
|
||
public void testUpdateFeatureFlagDisabled() { | ||
Settings settings = Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) | ||
.build(); | ||
|
||
createIndex(TEST_INDEX, settings); | ||
SettingsException settingsException = expectThrows(SettingsException.class, () -> { | ||
client().admin() | ||
.indices() | ||
.prepareUpdateSettings(TEST_INDEX) | ||
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)) | ||
.get(); | ||
}); | ||
assertTrue(settingsException.getMessage().contains("unknown setting")); | ||
} | ||
} |
210 changes: 210 additions & 0 deletions
210
server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,210 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.indices.settings; | ||
|
||
import org.opensearch.action.support.WriteRequest; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.cluster.metadata.Metadata; | ||
import org.opensearch.cluster.routing.IndexShardRoutingTable; | ||
import org.opensearch.cluster.routing.ShardRouting; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.indices.replication.common.ReplicationType; | ||
import org.opensearch.test.InternalTestCluster; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import java.io.IOException; | ||
|
||
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; | ||
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; | ||
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class SearchOnlyReplicaIT extends OpenSearchIntegTestCase { | ||
|
||
private static final String TEST_INDEX = "test_index"; | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build(); | ||
} | ||
|
||
private final String expectedFailureMessage = | ||
"To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT"; | ||
|
||
@Override | ||
public Settings indexSettings() { | ||
return Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) | ||
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0ms") // so that after we punt a node we can immediately try to | ||
// reallocate after node left. | ||
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) | ||
.build(); | ||
} | ||
|
||
public void testCreateDocRepFails() { | ||
Settings settings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); | ||
|
||
IllegalArgumentException illegalArgumentException = expectThrows( | ||
IllegalArgumentException.class, | ||
() -> createIndex(TEST_INDEX, settings) | ||
); | ||
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage()); | ||
} | ||
|
||
public void testUpdateDocRepFails() { | ||
Settings settings = Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) | ||
.build(); | ||
// create succeeds | ||
createIndex(TEST_INDEX, settings); | ||
|
||
// update fails | ||
IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> { | ||
client().admin() | ||
.indices() | ||
.prepareUpdateSettings(TEST_INDEX) | ||
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)) | ||
.get(); | ||
}); | ||
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage()); | ||
} | ||
|
||
public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOException { | ||
int numSearchReplicas = 1; | ||
int numWriterReplicas = 1; | ||
internalCluster().startClusterManagerOnlyNode(); | ||
String primaryNodeName = internalCluster().startDataOnlyNode(); | ||
createIndex( | ||
TEST_INDEX, | ||
Settings.builder() | ||
.put(indexSettings()) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas) | ||
.build() | ||
); | ||
ensureYellow(TEST_INDEX); | ||
// add 2 nodes for the replicas | ||
internalCluster().startDataOnlyNodes(2); | ||
ensureGreen(TEST_INDEX); | ||
|
||
// assert shards are on separate nodes & all active | ||
assertActiveShardCounts(numSearchReplicas, numWriterReplicas); | ||
|
||
// stop the primary and ensure search shard is not promoted: | ||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName)); | ||
ensureYellowAndNoInitializingShards(TEST_INDEX); | ||
|
||
assertActiveShardCounts(numSearchReplicas, 0); // 1 repl is inactive that was promoted to primary | ||
// add back a node | ||
internalCluster().startDataOnlyNode(); | ||
ensureGreen(TEST_INDEX); | ||
|
||
} | ||
|
||
public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException { | ||
int numSearchReplicas = 1; | ||
int numWriterReplicas = 0; | ||
internalCluster().startClusterManagerOnlyNode(); | ||
String primaryNodeName = internalCluster().startDataOnlyNode(); | ||
createIndex( | ||
TEST_INDEX, | ||
Settings.builder() | ||
.put(indexSettings()) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas) | ||
.build() | ||
); | ||
ensureYellow(TEST_INDEX); | ||
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); | ||
// start a node for our search replica | ||
String replica = internalCluster().startDataOnlyNode(); | ||
ensureGreen(TEST_INDEX); | ||
assertActiveSearchShards(numSearchReplicas); | ||
assertHitCount(client(replica).prepareSearch(TEST_INDEX).setSize(0).setPreference("_only_local").get(), 1); | ||
|
||
// stop the primary and ensure search shard is not promoted: | ||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName)); | ||
ensureRed(TEST_INDEX); | ||
assertActiveSearchShards(numSearchReplicas); | ||
// while red our search shard is still searchable | ||
assertHitCount(client(replica).prepareSearch(TEST_INDEX).setSize(0).setPreference("_only_local").get(), 1); | ||
} | ||
|
||
public void testSearchReplicaScaling() { | ||
internalCluster().startNodes(2); | ||
createIndex(TEST_INDEX); | ||
ensureGreen(TEST_INDEX); | ||
// assert settings | ||
Metadata metadata = client().admin().cluster().prepareState().get().getState().metadata(); | ||
int numSearchReplicas = Integer.parseInt(metadata.index(TEST_INDEX).getSettings().get(SETTING_NUMBER_OF_SEARCH_REPLICAS)); | ||
assertEquals(1, numSearchReplicas); | ||
|
||
// assert cluster state & routing table | ||
assertActiveSearchShards(1); | ||
|
||
// Add another node and search replica | ||
internalCluster().startDataOnlyNode(); | ||
client().admin() | ||
.indices() | ||
.prepareUpdateSettings(TEST_INDEX) | ||
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)) | ||
.get(); | ||
|
||
ensureGreen(TEST_INDEX); | ||
assertActiveSearchShards(2); | ||
|
||
// remove all search shards | ||
client().admin() | ||
.indices() | ||
.prepareUpdateSettings(TEST_INDEX) | ||
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)) | ||
.get(); | ||
ensureGreen(TEST_INDEX); | ||
assertActiveSearchShards(0); | ||
} | ||
|
||
/** | ||
* Helper to assert counts of active shards for each type. | ||
*/ | ||
private void assertActiveShardCounts(int expectedSearchReplicaCount, int expectedWriteReplicaCount) { | ||
// assert routing table | ||
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable(); | ||
// assert search replica count | ||
int activeCount = expectedSearchReplicaCount + expectedWriteReplicaCount; | ||
assertEquals(expectedSearchReplicaCount, indexShardRoutingTable.searchOnlyReplicas().stream().filter(ShardRouting::active).count()); | ||
assertEquals(expectedWriteReplicaCount, indexShardRoutingTable.writerReplicas().stream().filter(ShardRouting::active).count()); | ||
assertEquals( | ||
expectedWriteReplicaCount + expectedSearchReplicaCount, | ||
indexShardRoutingTable.replicaShards().stream().filter(ShardRouting::active).count() | ||
); | ||
|
||
// assert routing nodes | ||
ClusterState clusterState = getClusterState(); | ||
assertEquals(activeCount, clusterState.getRoutingNodes().shards(r -> r.active() && !r.primary()).size()); | ||
assertEquals(expectedSearchReplicaCount, clusterState.getRoutingNodes().shards(r -> r.active() && r.isSearchOnly()).size()); | ||
assertEquals( | ||
expectedWriteReplicaCount, | ||
clusterState.getRoutingNodes().shards(r -> r.active() && !r.primary() && !r.isSearchOnly()).size() | ||
); | ||
} | ||
|
||
private void assertActiveSearchShards(int expectedSearchReplicaCount) { | ||
assertActiveShardCounts(expectedSearchReplicaCount, 0); | ||
} | ||
|
||
private IndexShardRoutingTable getIndexShardRoutingTable() { | ||
return getClusterState().routingTable().index(TEST_INDEX).shards().values().stream().findFirst().get(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.