Skip to content

Commit

Permalink
Initial commit to support a search only replica for RW separation. (o…
Browse files Browse the repository at this point in the history
…pensearch-project#15410)

* Initial commit for search only replica.
This PR contains the following:
1. Introduce searchOnly flag on ShardRouting.
2. Added feature flag to enable/disable the feature.
3. supports both create and update APIs to toggle search replica count.
4. Changes to exclude search replicas from primary eligibility.
5. Changes to prevent replicationOperations from routing to search replicas.

Signed-off-by: Marc Handalian <[email protected]>

* add some missing feature flag checks

Signed-off-by: Marc Handalian <[email protected]>

* Pr feedback from @andrross

Signed-off-by: Marc Handalian <[email protected]>

* Add more unit tests for settings create and update

Signed-off-by: Marc Handalian <[email protected]>

* Fix broken tests from setting rename

Signed-off-by: Marc Handalian <[email protected]>

* Fix broken tests and add changelog entry

Signed-off-by: Marc Handalian <[email protected]>

* More PR feedback.

Signed-off-by: Marc Handalian <[email protected]>

* add missing searchOnly property to initializeTargetRelocatingShard.

Without this search replicas will become regular replicas on relocation.

Signed-off-by: Marc Handalian <[email protected]>

* test fixes

Signed-off-by: Marc Handalian <[email protected]>

* spotless

Signed-off-by: Marc Handalian <[email protected]>

---------

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored and dk2k committed Oct 16, 2024
1 parent 9099ed3 commit df050f7
Show file tree
Hide file tree
Showing 26 changed files with 1,286 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.settings;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {

private static final String TEST_INDEX = "test_index";

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.FALSE)
.build();
}

public void testCreateFeatureFlagDisabled() {
Settings settings = Settings.builder().put(indexSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, false).build();
SettingsException settingsException = expectThrows(SettingsException.class, () -> createIndex(TEST_INDEX, settings));
assertTrue(settingsException.getMessage().contains("unknown setting"));
}

public void testUpdateFeatureFlagDisabled() {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();

createIndex(TEST_INDEX, settings);
SettingsException settingsException = expectThrows(SettingsException.class, () -> {
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
});
assertTrue(settingsException.getMessage().contains("unknown setting"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.settings;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchOnlyReplicaIT extends OpenSearchIntegTestCase {

private static final String TEST_INDEX = "test_index";

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}

private final String expectedFailureMessage =
"To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0ms") // so that after we punt a node we can immediately try to
// reallocate after node left.
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

public void testCreateDocRepFails() {
Settings settings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();

IllegalArgumentException illegalArgumentException = expectThrows(
IllegalArgumentException.class,
() -> createIndex(TEST_INDEX, settings)
);
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage());
}

public void testUpdateDocRepFails() {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.build();
// create succeeds
createIndex(TEST_INDEX, settings);

// update fails
IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> {
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
});
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage());
}

public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
internalCluster().startClusterManagerOnlyNode();
String primaryNodeName = internalCluster().startDataOnlyNode();
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.build()
);
ensureYellow(TEST_INDEX);
// add 2 nodes for the replicas
internalCluster().startDataOnlyNodes(2);
ensureGreen(TEST_INDEX);

// assert shards are on separate nodes & all active
assertActiveShardCounts(numSearchReplicas, numWriterReplicas);

// stop the primary and ensure search shard is not promoted:
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName));
ensureYellowAndNoInitializingShards(TEST_INDEX);

assertActiveShardCounts(numSearchReplicas, 0); // 1 repl is inactive that was promoted to primary
// add back a node
internalCluster().startDataOnlyNode();
ensureGreen(TEST_INDEX);

}

public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 0;
internalCluster().startClusterManagerOnlyNode();
String primaryNodeName = internalCluster().startDataOnlyNode();
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.build()
);
ensureYellow(TEST_INDEX);
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
// start a node for our search replica
String replica = internalCluster().startDataOnlyNode();
ensureGreen(TEST_INDEX);
assertActiveSearchShards(numSearchReplicas);
assertHitCount(client(replica).prepareSearch(TEST_INDEX).setSize(0).setPreference("_only_local").get(), 1);

// stop the primary and ensure search shard is not promoted:
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName));
ensureRed(TEST_INDEX);
assertActiveSearchShards(numSearchReplicas);
// while red our search shard is still searchable
assertHitCount(client(replica).prepareSearch(TEST_INDEX).setSize(0).setPreference("_only_local").get(), 1);
}

public void testSearchReplicaScaling() {
internalCluster().startNodes(2);
createIndex(TEST_INDEX);
ensureGreen(TEST_INDEX);
// assert settings
Metadata metadata = client().admin().cluster().prepareState().get().getState().metadata();
int numSearchReplicas = Integer.parseInt(metadata.index(TEST_INDEX).getSettings().get(SETTING_NUMBER_OF_SEARCH_REPLICAS));
assertEquals(1, numSearchReplicas);

// assert cluster state & routing table
assertActiveSearchShards(1);

// Add another node and search replica
internalCluster().startDataOnlyNode();
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2))
.get();

ensureGreen(TEST_INDEX);
assertActiveSearchShards(2);

// remove all search shards
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0))
.get();
ensureGreen(TEST_INDEX);
assertActiveSearchShards(0);
}

/**
* Helper to assert counts of active shards for each type.
*/
private void assertActiveShardCounts(int expectedSearchReplicaCount, int expectedWriteReplicaCount) {
// assert routing table
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
// assert search replica count
int activeCount = expectedSearchReplicaCount + expectedWriteReplicaCount;
assertEquals(expectedSearchReplicaCount, indexShardRoutingTable.searchOnlyReplicas().stream().filter(ShardRouting::active).count());
assertEquals(expectedWriteReplicaCount, indexShardRoutingTable.writerReplicas().stream().filter(ShardRouting::active).count());
assertEquals(
expectedWriteReplicaCount + expectedSearchReplicaCount,
indexShardRoutingTable.replicaShards().stream().filter(ShardRouting::active).count()
);

// assert routing nodes
ClusterState clusterState = getClusterState();
assertEquals(activeCount, clusterState.getRoutingNodes().shards(r -> r.active() && !r.primary()).size());
assertEquals(expectedSearchReplicaCount, clusterState.getRoutingNodes().shards(r -> r.active() && r.isSearchOnly()).size());
assertEquals(
expectedWriteReplicaCount,
clusterState.getRoutingNodes().shards(r -> r.active() && !r.primary() && !r.isSearchOnly()).size()
);
}

private void assertActiveSearchShards(int expectedSearchReplicaCount) {
assertActiveShardCounts(expectedSearchReplicaCount, 0);
}

private IndexShardRoutingTable getIndexShardRoutingTable() {
return getClusterState().routingTable().index(TEST_INDEX).shards().values().stream().findFirst().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
Expand Down Expand Up @@ -243,6 +244,22 @@ static Setting<Integer> buildNumberOfShardsSetting() {
Property.IndexScope
);

/**
* Setting to control the number of search only replicas for an index.
* A search only replica exists solely to perform read operations for a shard and are designed to achieve
* isolation from writers (primary shards). This means they are not primary eligible and do not have any direct communication
* with their primary. Search replicas require the use of Segment Replication on the index and poll their {@link SegmentReplicationSource} for
* updates. //TODO: Once physical isolation is introduced, reference the setting here.
*/
public static final String SETTING_NUMBER_OF_SEARCH_REPLICAS = "index.number_of_search_only_replicas";
public static final Setting<Integer> INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING = Setting.intSetting(
SETTING_NUMBER_OF_SEARCH_REPLICAS,
0,
0,
Property.Dynamic,
Property.IndexScope
);

public static final String SETTING_ROUTING_PARTITION_SIZE = "index.routing_partition_size";
public static final Setting<Integer> INDEX_ROUTING_PARTITION_SIZE_SETTING = Setting.intSetting(
SETTING_ROUTING_PARTITION_SIZE,
Expand Down Expand Up @@ -649,6 +666,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {

private final int numberOfShards;
private final int numberOfReplicas;
private final int numberOfSearchOnlyReplicas;

private final Index index;
private final long version;
Expand Down Expand Up @@ -702,6 +720,7 @@ private IndexMetadata(
final State state,
final int numberOfShards,
final int numberOfReplicas,
final int numberOfSearchOnlyReplicas,
final Settings settings,
final Map<String, MappingMetadata> mappings,
final Map<String, AliasMetadata> aliases,
Expand Down Expand Up @@ -735,7 +754,8 @@ private IndexMetadata(
this.state = state;
this.numberOfShards = numberOfShards;
this.numberOfReplicas = numberOfReplicas;
this.totalNumberOfShards = numberOfShards * (numberOfReplicas + 1);
this.numberOfSearchOnlyReplicas = numberOfSearchOnlyReplicas;
this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1);
this.settings = settings;
this.mappings = Collections.unmodifiableMap(mappings);
this.customData = Collections.unmodifiableMap(customData);
Expand Down Expand Up @@ -838,6 +858,10 @@ public int getNumberOfReplicas() {
return numberOfReplicas;
}

public int getNumberOfSearchOnlyReplicas() {
return numberOfSearchOnlyReplicas;
}

public int getRoutingPartitionSize() {
return routingPartitionSize;
}
Expand Down Expand Up @@ -1358,6 +1382,11 @@ public Builder numberOfReplicas(int numberOfReplicas) {
return this;
}

public Builder numberOfSearchReplicas(int numberOfSearchReplicas) {
settings = Settings.builder().put(settings).put(SETTING_NUMBER_OF_SEARCH_REPLICAS, numberOfSearchReplicas).build();
return this;
}

public Builder routingPartitionSize(int routingPartitionSize) {
settings = Settings.builder().put(settings).put(SETTING_ROUTING_PARTITION_SIZE, routingPartitionSize).build();
return this;
Expand Down Expand Up @@ -1554,6 +1583,7 @@ public IndexMetadata build() {
throw new IllegalArgumentException("must specify number of replicas for index [" + index + "]");
}
final int numberOfReplicas = INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
final int numberOfSearchReplicas = INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(settings);

int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings);
if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) {
Expand Down Expand Up @@ -1649,6 +1679,7 @@ public IndexMetadata build() {
state,
numberOfShards,
numberOfReplicas,
numberOfSearchReplicas,
tmpSettings,
mappings,
tmpAliases,
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,24 @@ public Builder updateNumberOfReplicas(final int numberOfReplicas, final String[]
return this;
}

/**
* Update the number of search replicas for the specified indices.
*
* @param numberOfSearchReplicas the number of search replicas
* @param indices the indices to update the number of replicas for
* @return the builder
*/
public Builder updateNumberOfSearchReplicas(final int numberOfSearchReplicas, final String[] indices) {
for (String index : indices) {
IndexMetadata indexMetadata = this.indices.get(index);
if (indexMetadata == null) {
throw new IndexNotFoundException(index);
}
put(IndexMetadata.builder(indexMetadata).numberOfSearchReplicas(numberOfSearchReplicas));
}
return this;
}

public Builder coordinationMetadata(CoordinationMetadata coordinationMetadata) {
this.coordinationMetadata = coordinationMetadata;
return this;
Expand Down
Loading

0 comments on commit df050f7

Please sign in to comment.