Skip to content

Commit

Permalink
Set default interval for search replicas on the setting as 5s.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Sep 3, 2024
1 parent 5bb99a1 commit 45185de
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.indices.settings;

import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -21,18 +23,20 @@
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchOnlyReplicaIT extends OpenSearchIntegTestCase {

private static final String TEST_INDEX = "test_index";

@Override
@Override /**/
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}
Expand Down Expand Up @@ -175,6 +179,46 @@ public void testSearchReplicaScaling() {
assertActiveSearchShards(0);
}

public void testDefaultReplicationSettings() throws ExecutionException, InterruptedException {
internalCluster().startNodes(2);
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.build()
);
ensureGreen(TEST_INDEX);
// assert settings
Metadata metadata = client().admin().cluster().prepareState().get().getState().metadata();
IndexMetadata indexMetadata = metadata.index(TEST_INDEX);
Settings settings = indexMetadata.getSettings();
int numSearchReplicas = Integer.parseInt(settings.get(SETTING_NUMBER_OF_SEARCH_REPLICAS));
assertEquals(0, numSearchReplicas);

GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(TEST_INDEX).includeDefaults(true))
.get();
assertEquals("0ms", settingsResponse.getSetting(TEST_INDEX, INDEX_REPLICATION_INTERVAL_SETTING.getKey()));

// assert cluster state & routing table
assertActiveSearchShards(0);

// Add a search replica
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
ensureGreen(TEST_INDEX);
assertActiveSearchShards(1);

settingsResponse = client().admin().indices().getSettings(new GetSettingsRequest().indices(TEST_INDEX).includeDefaults(true)).get();
assertEquals("5s", settingsResponse.getSetting(TEST_INDEX, INDEX_REPLICATION_INTERVAL_SETTING.getKey()));
}

/**
* Helper to assert counts of active shards for each type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findContextTemplateName;
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
Expand Down Expand Up @@ -1096,10 +1095,6 @@ private static void updateSearchOnlyReplicas(Settings requestSettings, Settings.
);
}
builder.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings));
if (INDEX_REPLICATION_INTERVAL_SETTING.exists(requestSettings) == false) {
// set the search replica to a default interval
builder.put(INDEX_REPLICATION_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(5));
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1096,8 +1096,8 @@ private void updateReplicationInterval() {
}

/**
* Gets the replication interval seen by the index service. Index setting overrides takes the highest precedence.
* @return the refresh interval.
* Gets the replication interval seen by the index service.
* @return the replication interval.
*/
private TimeValue getReplicationInterval() {
return getIndexSettings().getReplicationInterval();
Expand Down
19 changes: 10 additions & 9 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.function.UnaryOperator;

import static org.opensearch.Version.V_2_7_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING;
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.index.codec.fuzzy.FuzzySetParameters.DEFAULT_FALSE_POSITIVE_PROBABILITY;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
Expand Down Expand Up @@ -376,14 +377,14 @@ public static IndexMergePolicy fromString(String text) {
* pull based replication is disabled.
*/
public static final TimeValue DEFAULT_REPLICATION_INTERVAL = TimeValue.ZERO;
public static final TimeValue DEFAULT_SEARCH_REPLICA_INTERVAL = TimeValue.timeValueSeconds(5);
public static final TimeValue MINIMUM_REPLICATION_INTERVAL = TimeValue.ZERO;
public static final Setting<TimeValue> INDEX_REPLICATION_INTERVAL_SETTING = Setting.timeSetting(
"index.replication_interval",
DEFAULT_REPLICATION_INTERVAL,
MINIMUM_REPLICATION_INTERVAL,
Property.Dynamic,
Property.IndexScope
);
public static final Setting<TimeValue> INDEX_REPLICATION_INTERVAL_SETTING = Setting.timeSetting("index.replication_interval", (s) -> {
if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(s) > 0) {
return DEFAULT_SEARCH_REPLICA_INTERVAL;
}
return DEFAULT_REPLICATION_INTERVAL;
}, MINIMUM_REPLICATION_INTERVAL, Property.Dynamic, Property.IndexScope);

public static final Setting<ByteSizeValue> INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING = Setting.byteSizeSetting(
"index.translog.flush_threshold_size",
Expand Down Expand Up @@ -1198,10 +1199,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(nodeSettings)) {
// Move setting registration to IndexScopedSettings when feature flag is removed.
scopedSettings.registerSetting(INDEX_REPLICATION_INTERVAL_SETTING);
replicationInterval = scopedSettings.get(INDEX_REPLICATION_INTERVAL_SETTING);
setReplicationInterval(scopedSettings.get(INDEX_REPLICATION_INTERVAL_SETTING));
scopedSettings.addSettingsUpdateConsumer(INDEX_REPLICATION_INTERVAL_SETTING, this::setReplicationInterval);
} else {
replicationInterval = TimeValue.ZERO;
replicationInterval = DEFAULT_REPLICATION_INTERVAL;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,7 @@ public void onReplicationDone(SegmentReplicationState state) {

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.error(
() -> new ParameterizedMessage(
"Failed segment replication for {}",
shard.shardId()
),
e
);
logger.error(() -> new ParameterizedMessage("Failed segment replication for {}", shard.shardId()), e);
if (sendShardFailure) {
shard.failShard("unrecoverable replication failure", e);
}
Expand Down

0 comments on commit 45185de

Please sign in to comment.