Skip to content

Commit

Permalink
Initial commit for scale to zero
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi committed Nov 13, 2024
1 parent 9f790ee commit 3cc1b16
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 6 deletions.
11 changes: 11 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
setting 'opensearch.experimental.feature.read.write.split.enabled', 'true'
setting 'path.repo', '["/tmp/my-repo"]'
setting 'node.attr.remote_store', 'true'
setting 'cluster.remote_store.state.enabled', 'true'
setting 'node.attr.remote_store.segment.repository', 'my-repository'
setting 'node.attr.remote_store.translog.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.type', 'fs'
setting 'node.attr.remote_store.state.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo'


}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,15 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
Setting.Property.Final
);

public static final String SETTING_REMOVE_INDEXING_SHARDS = "index.remove_indexing_shards.enabled";
public static final Setting<Boolean> INDEX_REMOVE_INDEXING_SHARDS_SETTING = Setting.boolSetting(
SETTING_REMOVE_INDEXING_SHARDS,
false,
Property.Dynamic,
Property.IndexScope
);

private final boolean isRemoveIndexingShards;
public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
static final String KEY_VERSION = "version";
static final String KEY_MAPPING_VERSION = "mapping_version";
Expand Down Expand Up @@ -742,7 +751,8 @@ private IndexMetadata(
final Map<String, RolloverInfo> rolloverInfos,
final boolean isSystem,
final int indexTotalShardsPerNodeLimit,
final Context context
final Context context,
final boolean isRemoveIndexingShards
) {

this.index = index;
Expand All @@ -759,7 +769,12 @@ private IndexMetadata(
this.numberOfShards = numberOfShards;
this.numberOfReplicas = numberOfReplicas;
this.numberOfSearchOnlyReplicas = numberOfSearchOnlyReplicas;
this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1);
this.isRemoveIndexingShards = isRemoveIndexingShards;
if (this.isRemoveIndexingShards) {
this.totalNumberOfShards = numberOfShards * numberOfSearchOnlyReplicas;
} else {
this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1);
}
this.settings = settings;
this.mappings = Collections.unmodifiableMap(mappings);
this.customData = Collections.unmodifiableMap(customData);
Expand All @@ -783,6 +798,10 @@ private IndexMetadata(
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

public boolean isRemoveIndexingShards() {
return isRemoveIndexingShards;
}

public Index getIndex() {
return index;
}
Expand Down Expand Up @@ -1376,6 +1395,7 @@ public static class Builder {
private Integer routingNumShards;
private boolean isSystem;
private Context context;
private boolean isRemoveIndexingShards = false;

public Builder(String index) {
this.index = index;
Expand All @@ -1387,6 +1407,11 @@ public Builder(String index) {
this.isSystem = false;
}

public Builder removeIndexingShards(boolean isRemoveIndexingShards) {
this.isRemoveIndexingShards = isRemoveIndexingShards;
return this;
}

public Builder(IndexMetadata indexMetadata) {
this.index = indexMetadata.getIndex().getName();
this.state = indexMetadata.state;
Expand Down Expand Up @@ -1653,6 +1678,18 @@ public IndexMetadata build() {
final int numberOfReplicas = INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
final int numberOfSearchReplicas = INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(settings);

/// The validation can be added at the Metadata level
/*if (isScaledToZero) {
if (numberOfSearchReplicas == 0) {
throw new IllegalArgumentException("Cannot scale to zero without search replicas");
}
if (!INDEX_REMOTE_STORE_ENABLED_SETTING.get(settings)) {
throw new IllegalArgumentException("Remote store must be enabled to scale to zero");
}
if (!INDEX_REPLICATION_TYPE_SETTING.get(settings).equals(ReplicationType.SEGMENT)) {
throw new IllegalArgumentException("Segment replication must be enabled to scale to zero");
}
}*/
int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings);
if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -1765,7 +1802,8 @@ public IndexMetadata build() {
rolloverInfos,
isSystem,
indexTotalShardsPerNodeLimit,
context
context,
isRemoveIndexingShards
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,93 @@ public ClusterState execute(ClusterState currentState) {

}

if (validationErrors.size() > 0) {
if (IndexMetadata.INDEX_REMOVE_INDEXING_SHARDS_SETTING.exists(openSettings)) {
boolean removeIndexingShards = IndexMetadata.INDEX_REMOVE_INDEXING_SHARDS_SETTING.get(openSettings);
if (removeIndexingShards) {
// Existing scale down validation - no changes needed here
for (Index index : request.indices()) {
IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index);
if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) {
throw new IllegalArgumentException(
"Cannot scale to zero without search replicas for index: " + index.getName()
);
}
if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
throw new IllegalArgumentException(
"Remote store must be enabled to scale to zero for index: " + index.getName()
);
}
if (!indexMetadata.getSettings()
.get(IndexMetadata.SETTING_REPLICATION_TYPE)
.equals(ReplicationType.SEGMENT.toString())) {
throw new IllegalArgumentException(
"Segment replication must be enabled to scale to zero for index: " + index.getName()
);
}
}
// Process scale down
for (Index index : request.indices()) {
IndexMetadata indexMetadata = metadataBuilder.getSafe(index);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata)
.removeIndexingShards(removeIndexingShards);
metadataBuilder.put(indexMetadataBuilder);
routingTableBuilder.updateRemoveIndexShards(removeIndexingShards, actualIndices);
}
} else {
// Scale up validation
for (Index index : request.indices()) {
IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index);

// First check if index is actually scaled down
if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS, false)) {
throw new IllegalArgumentException(
"Cannot restore indexing shards for index that is not scaled down: " + index.getName()
);
}

// Then verify prerequisites are still met
if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
throw new IllegalArgumentException(
"Remote store must be enabled to restore indexing shards for index: " + index.getName()
);
}
if (!indexMetadata.getSettings()
.get(IndexMetadata.SETTING_REPLICATION_TYPE)
.equals(ReplicationType.SEGMENT.toString())) {
throw new IllegalArgumentException(
"Segment replication must be enabled to restore indexing shards for index: " + index.getName()
);
}
if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) {
throw new IllegalArgumentException(
"Cannot restore indexing shards without search replicas for index: " + index.getName()
);
}
}

// Process scale up
for (Index index : request.indices()) {
IndexMetadata indexMetadata = metadataBuilder.getSafe(index);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata)
.removeIndexingShards(removeIndexingShards);
metadataBuilder.put(indexMetadataBuilder);

// Update routing table to restore primary and replica shards
routingTableBuilder.updateRemoveIndexShards(removeIndexingShards, actualIndices);
}

// Force a reroute to allocate the new shards
ClusterState tempState = ClusterState.builder(currentState)
.metadata(metadataBuilder)
.routingTable(routingTableBuilder.build())
.build();

routingTableBuilder = RoutingTable.builder(
allocationService.reroute(tempState, "restore indexing shards").routingTable()
);
}
}
if (!validationErrors.isEmpty()) {
ValidationException exception = new ValidationException();
exception.addValidationErrors(validationErrors);
throw exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
// also fill replicaSet information
for (final IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable) {
assert indexShard.primary != null;
// assert indexShard.primary != null;
for (ShardRouting shard : indexShard) {
// to get all the shards belonging to an index, including the replicas,
// we define a replica set and keep track of it. A replica set is identified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.routing;

import org.opensearch.Version;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.Diffable;
import org.opensearch.cluster.DiffableUtils;
Expand All @@ -50,6 +51,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.repositories.IndexId;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -59,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;

import static org.opensearch.cluster.metadata.MetadataIndexStateService.isIndexVerifiedBeforeClosed;
Expand Down Expand Up @@ -602,6 +605,80 @@ public Builder updateNumberOfSearchReplicas(final int numberOfSearchReplicas, fi
return this;
}

public Builder updateRemoveIndexShards(boolean remove, final String[] indices) {
if (indicesRouting == null) {
throw new IllegalStateException("once build is called the builder cannot be reused");
}
for (String index : indices) {
IndexRoutingTable indexRoutingTable = indicesRouting.get(index);
if (indexRoutingTable == null) {
continue;
}

if (remove) {
// Scaling down - keep only search replicas
IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex());
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(indexShardRoutingTable.shardId());
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.isSearchOnly()) {
shardBuilder.addShard(shardRouting);
}
}
builder.addIndexShard(shardBuilder.build());
}
indicesRouting.put(index, builder.build());
} else {
// Scaling up - we'll create a new routing table with unassigned primary and replica shards
// Let OpenSearch's allocation service handle the actual allocation
IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex());

for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(indexShardRoutingTable.shardId());

// Keep all existing search replicas
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.isSearchOnly()) {
shardBuilder.addShard(shardRouting);
}
}

// Create recovery source for primary
RecoverySource.RemoteStoreRecoverySource remoteStoreRecoverySource = new RecoverySource.RemoteStoreRecoverySource(
UUID.randomUUID().toString(),
Version.CURRENT,
new IndexId(
indexShardRoutingTable.shardId().getIndex().getName(),
indexShardRoutingTable.shardId().getIndex().getUUID()
)
);

// Add unassigned primary
ShardRouting primaryShard = ShardRouting.newUnassigned(
indexShardRoutingTable.shardId(),
true, // isPrimary
remoteStoreRecoverySource,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring primary shard")
);
shardBuilder.addShard(primaryShard);

// Add unassigned replica
ShardRouting replicaShard = ShardRouting.newUnassigned(
indexShardRoutingTable.shardId(),
false, // not primary
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring replica shard")
);
shardBuilder.addShard(replicaShard);

builder.addIndexShard(shardBuilder.build());
}
indicesRouting.put(index, builder.build());
}
}
return this;
}

public Builder addAsNew(IndexMetadata indexMetadata) {
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
FieldMapper.COERCE_SETTING,
Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING,
MapperService.INDEX_MAPPER_DYNAMIC_SETTING,
IndexSettings.INDEX_REMOVE_INDEXING_SHARDS_SETTING,
MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING,
MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING,
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1566,5 +1566,4 @@ public boolean clearCaches(boolean queryCache, boolean fieldDataCache, String...
}
return clearedAtLeastOne;
}

}
Loading

0 comments on commit 3cc1b16

Please sign in to comment.