From 3cc1b165d4cac9969d439d2f4477189f851fab65 Mon Sep 17 00:00:00 2001 From: Prudhvi Godithi Date: Thu, 7 Nov 2024 08:39:47 -0800 Subject: [PATCH] Initial commit for scale to zero Signed-off-by: Prudhvi Godithi --- gradle/run.gradle | 11 +++ .../cluster/metadata/IndexMetadata.java | 44 +++++++++- .../MetadataUpdateSettingsService.java | 88 ++++++++++++++++++- .../cluster/routing/RoutingNodes.java | 2 +- .../cluster/routing/RoutingTable.java | 77 ++++++++++++++++ .../common/settings/IndexScopedSettings.java | 1 + .../org/opensearch/index/IndexService.java | 1 - .../org/opensearch/index/IndexSettings.java | 26 ++++++ 8 files changed, 244 insertions(+), 6 deletions(-) diff --git a/gradle/run.gradle b/gradle/run.gradle index 34651f1d94964..303f810b84d09 100644 --- a/gradle/run.gradle +++ b/gradle/run.gradle @@ -45,6 +45,17 @@ testClusters { plugin('plugins:'.concat(p)) } } + setting 'opensearch.experimental.feature.read.write.split.enabled', 'true' + setting 'path.repo', '["/tmp/my-repo"]' + setting 'node.attr.remote_store', 'true' + setting 'cluster.remote_store.state.enabled', 'true' + setting 'node.attr.remote_store.segment.repository', 'my-repository' + setting 'node.attr.remote_store.translog.repository', 'my-repository' + setting 'node.attr.remote_store.repository.my-repository.type', 'fs' + setting 'node.attr.remote_store.state.repository', 'my-repository' + setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo' + + } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index c8ea5442a0dd0..aa122bc27cb80 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -645,6 +645,15 @@ public static APIBlock readFrom(StreamInput input) throws IOException { Setting.Property.Final ); + public static final String SETTING_REMOVE_INDEXING_SHARDS = "index.remove_indexing_shards.enabled"; + public static final Setting INDEX_REMOVE_INDEXING_SHARDS_SETTING = Setting.boolSetting( + SETTING_REMOVE_INDEXING_SHARDS, + false, + Property.Dynamic, + Property.IndexScope + ); + + private final boolean isRemoveIndexingShards; public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations"; static final String KEY_VERSION = "version"; static final String KEY_MAPPING_VERSION = "mapping_version"; @@ -742,7 +751,8 @@ private IndexMetadata( final Map rolloverInfos, final boolean isSystem, final int indexTotalShardsPerNodeLimit, - final Context context + final Context context, + final boolean isRemoveIndexingShards ) { this.index = index; @@ -759,7 +769,12 @@ private IndexMetadata( this.numberOfShards = numberOfShards; this.numberOfReplicas = numberOfReplicas; this.numberOfSearchOnlyReplicas = numberOfSearchOnlyReplicas; - this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1); + this.isRemoveIndexingShards = isRemoveIndexingShards; + if (this.isRemoveIndexingShards) { + this.totalNumberOfShards = numberOfShards * numberOfSearchOnlyReplicas; + } else { + this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1); + } this.settings = settings; this.mappings = Collections.unmodifiableMap(mappings); this.customData = Collections.unmodifiableMap(customData); @@ -783,6 +798,10 @@ private IndexMetadata( assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } + public boolean isRemoveIndexingShards() { + return isRemoveIndexingShards; + } + public Index getIndex() { return index; } @@ -1376,6 +1395,7 @@ public static class Builder { private Integer routingNumShards; private boolean isSystem; private Context context; + private boolean isRemoveIndexingShards = false; public Builder(String index) { this.index = index; @@ -1387,6 +1407,11 @@ public Builder(String index) { this.isSystem = false; } + public Builder removeIndexingShards(boolean isRemoveIndexingShards) { + this.isRemoveIndexingShards = isRemoveIndexingShards; + return this; + } + public Builder(IndexMetadata indexMetadata) { this.index = indexMetadata.getIndex().getName(); this.state = indexMetadata.state; @@ -1653,6 +1678,18 @@ public IndexMetadata build() { final int numberOfReplicas = INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings); final int numberOfSearchReplicas = INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(settings); + /// The validation can be added at the Metadata level + /*if (isScaledToZero) { + if (numberOfSearchReplicas == 0) { + throw new IllegalArgumentException("Cannot scale to zero without search replicas"); + } + if (!INDEX_REMOTE_STORE_ENABLED_SETTING.get(settings)) { + throw new IllegalArgumentException("Remote store must be enabled to scale to zero"); + } + if (!INDEX_REPLICATION_TYPE_SETTING.get(settings).equals(ReplicationType.SEGMENT)) { + throw new IllegalArgumentException("Segment replication must be enabled to scale to zero"); + } + }*/ int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings); if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) { throw new IllegalArgumentException( @@ -1765,7 +1802,8 @@ public IndexMetadata build() { rolloverInfos, isSystem, indexTotalShardsPerNodeLimit, - context + context, + isRemoveIndexingShards ); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 8c350d6b9cef5..3872adb2ab72d 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -231,7 +231,93 @@ public ClusterState execute(ClusterState currentState) { } - if (validationErrors.size() > 0) { + if (IndexMetadata.INDEX_REMOVE_INDEXING_SHARDS_SETTING.exists(openSettings)) { + boolean removeIndexingShards = IndexMetadata.INDEX_REMOVE_INDEXING_SHARDS_SETTING.get(openSettings); + if (removeIndexingShards) { + // Existing scale down validation - no changes needed here + for (Index index : request.indices()) { + IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index); + if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) { + throw new IllegalArgumentException( + "Cannot scale to zero without search replicas for index: " + index.getName() + ); + } + if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { + throw new IllegalArgumentException( + "Remote store must be enabled to scale to zero for index: " + index.getName() + ); + } + if (!indexMetadata.getSettings() + .get(IndexMetadata.SETTING_REPLICATION_TYPE) + .equals(ReplicationType.SEGMENT.toString())) { + throw new IllegalArgumentException( + "Segment replication must be enabled to scale to zero for index: " + index.getName() + ); + } + } + // Process scale down + for (Index index : request.indices()) { + IndexMetadata indexMetadata = metadataBuilder.getSafe(index); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata) + .removeIndexingShards(removeIndexingShards); + metadataBuilder.put(indexMetadataBuilder); + routingTableBuilder.updateRemoveIndexShards(removeIndexingShards, actualIndices); + } + } else { + // Scale up validation + for (Index index : request.indices()) { + IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index); + + // First check if index is actually scaled down + if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS, false)) { + throw new IllegalArgumentException( + "Cannot restore indexing shards for index that is not scaled down: " + index.getName() + ); + } + + // Then verify prerequisites are still met + if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { + throw new IllegalArgumentException( + "Remote store must be enabled to restore indexing shards for index: " + index.getName() + ); + } + if (!indexMetadata.getSettings() + .get(IndexMetadata.SETTING_REPLICATION_TYPE) + .equals(ReplicationType.SEGMENT.toString())) { + throw new IllegalArgumentException( + "Segment replication must be enabled to restore indexing shards for index: " + index.getName() + ); + } + if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) { + throw new IllegalArgumentException( + "Cannot restore indexing shards without search replicas for index: " + index.getName() + ); + } + } + + // Process scale up + for (Index index : request.indices()) { + IndexMetadata indexMetadata = metadataBuilder.getSafe(index); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata) + .removeIndexingShards(removeIndexingShards); + metadataBuilder.put(indexMetadataBuilder); + + // Update routing table to restore primary and replica shards + routingTableBuilder.updateRemoveIndexShards(removeIndexingShards, actualIndices); + } + + // Force a reroute to allocate the new shards + ClusterState tempState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .build(); + + routingTableBuilder = RoutingTable.builder( + allocationService.reroute(tempState, "restore indexing shards").routingTable() + ); + } + } + if (!validationErrors.isEmpty()) { ValidationException exception = new ValidationException(); exception.addValidationErrors(validationErrors); throw exception; diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 76111f623e0a5..ea6ef565465d8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -127,7 +127,7 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { // also fill replicaSet information for (final IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) { for (IndexShardRoutingTable indexShard : indexRoutingTable) { - assert indexShard.primary != null; + // assert indexShard.primary != null; for (ShardRouting shard : indexShard) { // to get all the shards belonging to an index, including the replicas, // we define a replica set and keep track of it. A replica set is identified diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index 7128eb44bfb14..e27a1b4ccf271 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.routing; +import org.opensearch.Version; import org.opensearch.cluster.Diff; import org.opensearch.cluster.Diffable; import org.opensearch.cluster.DiffableUtils; @@ -50,6 +51,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.shard.ShardNotFoundException; +import org.opensearch.repositories.IndexId; import java.io.IOException; import java.util.ArrayList; @@ -59,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.function.Predicate; import static org.opensearch.cluster.metadata.MetadataIndexStateService.isIndexVerifiedBeforeClosed; @@ -602,6 +605,80 @@ public Builder updateNumberOfSearchReplicas(final int numberOfSearchReplicas, fi return this; } + public Builder updateRemoveIndexShards(boolean remove, final String[] indices) { + if (indicesRouting == null) { + throw new IllegalStateException("once build is called the builder cannot be reused"); + } + for (String index : indices) { + IndexRoutingTable indexRoutingTable = indicesRouting.get(index); + if (indexRoutingTable == null) { + continue; + } + + if (remove) { + // Scaling down - keep only search replicas + IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex()); + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(indexShardRoutingTable.shardId()); + for (ShardRouting shardRouting : indexShardRoutingTable) { + if (shardRouting.isSearchOnly()) { + shardBuilder.addShard(shardRouting); + } + } + builder.addIndexShard(shardBuilder.build()); + } + indicesRouting.put(index, builder.build()); + } else { + // Scaling up - we'll create a new routing table with unassigned primary and replica shards + // Let OpenSearch's allocation service handle the actual allocation + IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex()); + + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(indexShardRoutingTable.shardId()); + + // Keep all existing search replicas + for (ShardRouting shardRouting : indexShardRoutingTable) { + if (shardRouting.isSearchOnly()) { + shardBuilder.addShard(shardRouting); + } + } + + // Create recovery source for primary + RecoverySource.RemoteStoreRecoverySource remoteStoreRecoverySource = new RecoverySource.RemoteStoreRecoverySource( + UUID.randomUUID().toString(), + Version.CURRENT, + new IndexId( + indexShardRoutingTable.shardId().getIndex().getName(), + indexShardRoutingTable.shardId().getIndex().getUUID() + ) + ); + + // Add unassigned primary + ShardRouting primaryShard = ShardRouting.newUnassigned( + indexShardRoutingTable.shardId(), + true, // isPrimary + remoteStoreRecoverySource, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring primary shard") + ); + shardBuilder.addShard(primaryShard); + + // Add unassigned replica + ShardRouting replicaShard = ShardRouting.newUnassigned( + indexShardRoutingTable.shardId(), + false, // not primary + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring replica shard") + ); + shardBuilder.addShard(replicaShard); + + builder.addIndexShard(shardBuilder.build()); + } + indicesRouting.put(index, builder.build()); + } + } + return this; + } + public Builder addAsNew(IndexMetadata indexMetadata) { if (indexMetadata.getState() == IndexMetadata.State.OPEN) { IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew( diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 8d56a942c5d6e..6c37ae2791e6b 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -183,6 +183,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { FieldMapper.COERCE_SETTING, Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING, MapperService.INDEX_MAPPER_DYNAMIC_SETTING, + IndexSettings.INDEX_REMOVE_INDEXING_SHARDS_SETTING, MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING, MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING, MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index f1b36194bf62d..f0547d216d48a 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -1566,5 +1566,4 @@ public boolean clearCaches(boolean queryCache, boolean fieldDataCache, String... } return clearedAtLeastOne; } - } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 554e99764c1a1..91907dfd9d561 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -782,6 +782,15 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + public static final String SETTING_REMOVE_INDEXING_SHARDS = "index.remove_indexing_shards.enabled"; + public static final Setting INDEX_REMOVE_INDEXING_SHARDS_SETTING = Setting.boolSetting( + SETTING_REMOVE_INDEXING_SHARDS, + false, + Property.Dynamic, + Property.IndexScope + ); + private volatile boolean isRemoveIndexingShards; + private final Index index; private final Version version; private final Logger logger; @@ -1136,6 +1145,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, mergeSchedulerConfig::setMaxThreadAndMergeCount ); + scopedSettings.addSettingsUpdateConsumer(INDEX_REMOVE_INDEXING_SHARDS_SETTING, this::setRemoveIndexingShards); scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval); @@ -1202,6 +1212,22 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti ); } + private void setRemoveIndexingShards(boolean enabled) { + /* The validation can be added at the Settings level + if (enabled) { + if (settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) == 0) { + throw new IllegalArgumentException("Cannot scale to zero without search replicas"); + } + if (!isRemoteStoreEnabled()) { + throw new IllegalArgumentException("Remote store must be enabled to scale to zero"); + } + if (!ReplicationType.SEGMENT.equals(replicationType)) { + throw new IllegalArgumentException("Segment replication must be enabled to scale to zero"); + } + }*/ + this.isRemoveIndexingShards = enabled; + } + private void setSearchIdleAfter(TimeValue searchIdleAfter) { if (this.isRemoteStoreEnabled) { logger.warn("Search idle is not supported for remote backed indices");