diff --git a/gradle/run.gradle b/gradle/run.gradle
index 34651f1d94964..303f810b84d09 100644
--- a/gradle/run.gradle
+++ b/gradle/run.gradle
@@ -45,6 +45,17 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
+ setting 'opensearch.experimental.feature.read.write.split.enabled', 'true'
+ setting 'path.repo', '["/tmp/my-repo"]'
+ setting 'node.attr.remote_store', 'true'
+ setting 'cluster.remote_store.state.enabled', 'true'
+ setting 'node.attr.remote_store.segment.repository', 'my-repository'
+ setting 'node.attr.remote_store.translog.repository', 'my-repository'
+ setting 'node.attr.remote_store.repository.my-repository.type', 'fs'
+ setting 'node.attr.remote_store.state.repository', 'my-repository'
+ setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo'
+
+
}
}
diff --git a/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java
index 779b136abef5c..ecf17ddbfaf7f 100644
--- a/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java
+++ b/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java
@@ -38,14 +38,19 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
+import org.opensearch.cluster.AckedClusterStateUpdateTask;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.block.ClusterBlocks;
+import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
+import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
+import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
@@ -170,6 +175,41 @@ protected void clusterManagerOperation(
updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
+ if (response.isAcknowledged() && request.settings().hasValue(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS) && request.settings().getAsBoolean(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS, false)) {
+ UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest()
+ .indices(concreteIndices)
+ .ackTimeout(request.timeout())
+ .masterNodeTimeout(request.clusterManagerNodeTimeout());
+
+ clusterService.submitStateUpdateTask(
+ "update-routing-table-after-settings",
+ new AckedClusterStateUpdateTask<>(Priority.URGENT, updateRequest, new ActionListener() {
+ @Override
+ public void onResponse(ClusterStateUpdateResponse response) {
+ listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ listener.onFailure(e);
+ }
+ }) {
+
+ @Override
+ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
+ return new ClusterStateUpdateResponse(acknowledged);
+ }
+
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ return updateSettingsService.updateRoutingTableForRemoveIndexShards(
+ Arrays.stream(concreteIndices).map(Index::getName).toArray(String[]::new),
+ currentState
+ );
+ }
+ }
+ );
+ }
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}
diff --git a/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java b/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java
index 77d96cb0af792..04e4c0a634e88 100644
--- a/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java
+++ b/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java
@@ -77,6 +77,8 @@ public final class ClusterIndexHealth implements Iterable, W
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String SHARDS = "shards";
+ private static final String INDEXING_SHARDS_REMOVED = "indexing_shards_removed";
+
private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
"cluster_index_health",
@@ -91,6 +93,7 @@ public final class ClusterIndexHealth implements Iterable, W
int unassignedShards = (int) parsedObjects[i++];
int activePrimaryShards = (int) parsedObjects[i++];
String statusStr = (String) parsedObjects[i++];
+ boolean indexingShardsRemoved = (boolean) parsedObjects[i++];
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
@SuppressWarnings("unchecked")
List shardList = (List) parsedObjects[i];
@@ -113,6 +116,7 @@ public final class ClusterIndexHealth implements Iterable, W
unassignedShards,
activePrimaryShards,
status,
+ indexingShardsRemoved,
shards
);
}
@@ -132,6 +136,7 @@ public final class ClusterIndexHealth implements Iterable, W
PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_PRIMARY_SHARDS));
PARSER.declareString(constructorArg(), new ParseField(STATUS));
+ PARSER.declareBoolean(constructorArg(), new ParseField(INDEXING_SHARDS_REMOVED));
// Can be absent if LEVEL == 'indices' or 'cluster'
PARSER.declareNamedObjects(optionalConstructorArg(), SHARD_PARSER, new ParseField(SHARDS));
}
@@ -147,6 +152,7 @@ public final class ClusterIndexHealth implements Iterable, W
private final int activePrimaryShards;
private final ClusterHealthStatus status;
private final Map shards;
+ private final boolean indexingShardsRemoved;
public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingTable indexRoutingTable) {
this.index = indexMetadata.getIndex().getName();
@@ -154,10 +160,6 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
this.numberOfReplicas = indexMetadata.getNumberOfReplicas();
shards = new HashMap<>();
- for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
- int shardId = shardRoutingTable.shardId().id();
- shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable));
- }
// update the index status
ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN;
@@ -167,7 +169,11 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
int computeInitializingShards = 0;
int computeUnassignedShards = 0;
int computeDelayedUnassignedShards = 0;
- for (ClusterShardHealth shardHealth : shards.values()) {
+ boolean computeIndexingShardsRemoved = false;
+
+ for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
+ int shardId = shardRoutingTable.shardId().id();
+ ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, shardRoutingTable);
if (shardHealth.isPrimaryActive()) {
computeActivePrimaryShards++;
}
@@ -176,10 +182,18 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
computeInitializingShards += shardHealth.getInitializingShards();
computeUnassignedShards += shardHealth.getUnassignedShards();
computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards();
-
computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus);
+
+ // Check if any shard has indexing removed
+ if (shardHealth.isIndexingShardsRemoved()) {
+ computeIndexingShardsRemoved = true;
+ }
+
+ shards.put(shardId, shardHealth);
}
- if (shards.isEmpty()) { // might be since none has been created yet (two phase index creation)
+
+ if (indexRoutingTable.shards() == null || indexRoutingTable.shards().isEmpty()) {
+ // might be since none has been created yet (two phase index creation)
computeStatus = ClusterHealthStatus.RED;
}
@@ -190,6 +204,7 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;
this.delayedUnassignedShards = computeDelayedUnassignedShards;
+ this.indexingShardsRemoved = computeIndexingShardsRemoved;
}
public ClusterIndexHealth(
@@ -211,6 +226,7 @@ public ClusterIndexHealth(
int computeInitializingShards = 0;
int computeUnassignedShards = 0;
int computeDelayedUnassignedShards = 0;
+ boolean computeIndexingShardsRemoved = false;
boolean isShardLevelHealthRequired = healthLevel == ClusterHealthRequest.Level.SHARDS;
if (isShardLevelHealthRequired) {
@@ -226,6 +242,9 @@ public ClusterIndexHealth(
computeUnassignedShards += shardHealth.getUnassignedShards();
computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards();
computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus);
+ if (shardHealth.isIndexingShardsRemoved()) {
+ computeIndexingShardsRemoved = true;
+ }
shards.put(shardId, shardHealth);
}
} else {
@@ -252,9 +271,15 @@ public ClusterIndexHealth(
}
}
ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
- if (primaryShard.active()) {
+ if (primaryShard != null && primaryShard.active()) {
computeActivePrimaryShards++;
}
+
+ // Check if primary is unassigned but has active replicas
+ if (primaryShard == null || (primaryShard.unassigned() && hasActiveReplicas(indexShardRoutingTable))) {
+ computeIndexingShardsRemoved = true;
+ }
+
ClusterHealthStatus shardHealth = ClusterShardHealth.getShardHealth(
primaryShard,
activeShardsPerShardId,
@@ -276,20 +301,28 @@ public ClusterIndexHealth(
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;
this.delayedUnassignedShards = computeDelayedUnassignedShards;
+ this.indexingShardsRemoved = computeIndexingShardsRemoved;
+ }
+ private boolean hasActiveReplicas(IndexShardRoutingTable shardRoutingTable) {
+ return shardRoutingTable.shards().stream()
+ .anyMatch(shard -> !shard.primary() && shard.active());
}
public static ClusterHealthStatus getIndexHealthStatus(ClusterHealthStatus shardHealth, ClusterHealthStatus computeStatus) {
switch (shardHealth) {
case RED:
+ // Only go RED if we're not already GREEN or YELLOW
+ if (computeStatus == ClusterHealthStatus.GREEN || computeStatus == ClusterHealthStatus.YELLOW) {
+ return computeStatus;
+ }
return ClusterHealthStatus.RED;
case YELLOW:
- // do not override an existing red
- if (computeStatus != ClusterHealthStatus.RED) {
- return ClusterHealthStatus.YELLOW;
- } else {
- return ClusterHealthStatus.RED;
+ // Don't override GREEN or RED
+ if (computeStatus == ClusterHealthStatus.GREEN || computeStatus == ClusterHealthStatus.RED) {
+ return computeStatus;
}
+ return ClusterHealthStatus.YELLOW;
default:
return computeStatus;
}
@@ -305,6 +338,7 @@ public ClusterIndexHealth(final StreamInput in) throws IOException {
initializingShards = in.readVInt();
unassignedShards = in.readVInt();
status = ClusterHealthStatus.fromValue(in.readByte());
+ indexingShardsRemoved = in.readBoolean();
int size = in.readVInt();
shards = new HashMap<>(size);
@@ -327,6 +361,7 @@ public ClusterIndexHealth(final StreamInput in) throws IOException {
int unassignedShards,
int activePrimaryShards,
ClusterHealthStatus status,
+ boolean indexingShardsRemoved,
Map shards
) {
this.index = index;
@@ -337,6 +372,7 @@ public ClusterIndexHealth(final StreamInput in) throws IOException {
this.initializingShards = initializingShards;
this.unassignedShards = unassignedShards;
this.activePrimaryShards = activePrimaryShards;
+ this.indexingShardsRemoved = indexingShardsRemoved;
this.status = status;
this.shards = shards;
}
@@ -401,6 +437,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeVInt(initializingShards);
out.writeVInt(unassignedShards);
out.writeByte(status.value());
+ out.writeBoolean(indexingShardsRemoved);
out.writeCollection(shards.values());
}
@@ -416,6 +453,10 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field(INITIALIZING_SHARDS, getInitializingShards());
builder.field(UNASSIGNED_SHARDS, getUnassignedShards());
+ if (indexingShardsRemoved) {
+ builder.field(INDEXING_SHARDS_REMOVED, true);
+ }
+
if ("shards".equals(params.param("level", "indices"))) {
builder.startObject(SHARDS);
for (ClusterShardHealth shardHealth : shards.values()) {
@@ -481,6 +522,7 @@ public boolean equals(Object o) {
&& initializingShards == that.initializingShards
&& unassignedShards == that.unassignedShards
&& activePrimaryShards == that.activePrimaryShards
+ && indexingShardsRemoved == that.indexingShardsRemoved
&& status == that.status
&& Objects.equals(shards, that.shards);
}
@@ -497,7 +539,8 @@ public int hashCode() {
unassignedShards,
activePrimaryShards,
status,
- shards
+ shards,
+ indexingShardsRemoved
);
}
}
diff --git a/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java b/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java
index ace4537a5e291..b0eda7d3e953c 100644
--- a/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java
+++ b/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java
@@ -70,6 +70,7 @@ public final class ClusterShardHealth implements Writeable, ToXContentFragment {
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String PRIMARY_ACTIVE = "primary_active";
+ private static final String INDEXING_SHARDS_REMOVED = "indexing_shards_removed";
public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
"cluster_shard_health",
@@ -81,6 +82,7 @@ public final class ClusterShardHealth implements Writeable, ToXContentFragment {
int relocatingShards = (int) parsedObjects[i++];
int initializingShards = (int) parsedObjects[i++];
int unassignedShards = (int) parsedObjects[i++];
+ boolean indexingShardsRemoved = (boolean) parsedObjects[i++];
String statusStr = (String) parsedObjects[i];
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
return new ClusterShardHealth(
@@ -90,7 +92,8 @@ public final class ClusterShardHealth implements Writeable, ToXContentFragment {
relocatingShards,
initializingShards,
unassignedShards,
- primaryActive
+ primaryActive,
+ indexingShardsRemoved
);
}
);
@@ -101,6 +104,7 @@ public final class ClusterShardHealth implements Writeable, ToXContentFragment {
PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(INITIALIZING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS));
+ PARSER.declareBoolean(constructorArg(), new ParseField(INDEXING_SHARDS_REMOVED));
PARSER.declareString(constructorArg(), new ParseField(STATUS));
}
@@ -112,40 +116,116 @@ public final class ClusterShardHealth implements Writeable, ToXContentFragment {
private final int unassignedShards;
private int delayedUnassignedShards;
private final boolean primaryActive;
+ private final boolean indexingShardsRemoved;
public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardRoutingTable) {
this.shardId = shardId;
+ ShardRouting primaryShard = shardRoutingTable.primaryShard();
+ this.indexingShardsRemoved = primaryShard == null || primaryShard.unassigned() && hasActiveReplicas(shardRoutingTable);
int computeActiveShards = 0;
int computeRelocatingShards = 0;
int computeInitializingShards = 0;
int computeUnassignedShards = 0;
int computeDelayedUnassignedShards = 0;
- List shardRoutings = shardRoutingTable.shards();
- for (int index = 0; index < shardRoutings.size(); index++) {
- ShardRouting shardRouting = shardRoutings.get(index);
- if (shardRouting.active()) {
- computeActiveShards++;
- if (shardRouting.relocating()) {
- // the shard is relocating, the one it is relocating to will be in initializing state, so we don't count it
- computeRelocatingShards++;
- }
- } else if (shardRouting.initializing()) {
- computeInitializingShards++;
- } else if (shardRouting.unassigned()) {
- computeUnassignedShards++;
- if (shardRouting.unassignedInfo() != null && shardRouting.unassignedInfo().isDelayed()) {
- computeDelayedUnassignedShards++;
+ ClusterHealthStatus computeStatus;
+ boolean computePrimaryActive;
+
+ if (indexingShardsRemoved) {
+ // For scaled down indices, only consider search replica health
+ computeStatus = getScaledDownShardHealth(shardRoutingTable);
+ // Don't count primary/replica shards in standard counts when scaled down
+ computeActiveShards = computeSearchReplicaCount(shardRoutingTable);
+ computeUnassignedShards = 0; // Don't count missing primary/replicas as unassigned
+ computePrimaryActive = false; // Primary is intentionally removed
+ computeRelocatingShards = 0;
+ computeInitializingShards = 0;
+ computeDelayedUnassignedShards = 0;
+ } else {
+ List shardRoutings = shardRoutingTable.shards();
+ for (int index = 0; index < shardRoutings.size(); index++) {
+ ShardRouting shardRouting = shardRoutings.get(index);
+ if (shardRouting.active()) {
+ computeActiveShards++;
+ if (shardRouting.relocating()) {
+ computeRelocatingShards++;
+ }
+ } else if (shardRouting.initializing()) {
+ computeInitializingShards++;
+ } else if (shardRouting.unassigned()) {
+ computeUnassignedShards++;
+ if (shardRouting.unassignedInfo() != null && shardRouting.unassignedInfo().isDelayed()) {
+ computeDelayedUnassignedShards++;
+ }
}
}
+ computeStatus = getShardHealth(primaryShard, computeActiveShards, shardRoutingTable.size());
+ computePrimaryActive = primaryShard.active();
}
- final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
- this.status = getShardHealth(primaryRouting, computeActiveShards, shardRoutingTable.size());
+
+ this.status = computeStatus;
this.activeShards = computeActiveShards;
this.relocatingShards = computeRelocatingShards;
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;
this.delayedUnassignedShards = computeDelayedUnassignedShards;
- this.primaryActive = primaryRouting.active();
+ this.primaryActive = computePrimaryActive;
+ }
+
+ public boolean isIndexingShardsRemoved() {
+ return indexingShardsRemoved;
+ }
+
+ /**
+ * Check if shard routing table has any active replicas
+ */
+ private boolean hasActiveReplicas(IndexShardRoutingTable shardRoutingTable) {
+ return shardRoutingTable.shards().stream()
+ .anyMatch(shard -> !shard.primary() && shard.active());
+ }
+
+ private boolean hasInitializingReplicas(IndexShardRoutingTable shardRoutingTable) {
+ return shardRoutingTable.shards().stream()
+ .anyMatch(shard -> !shard.primary() && shard.initializing());
+ }
+
+ /**
+ * Calculate health status for scaled down shards
+ */
+ private ClusterHealthStatus getScaledDownShardHealth(IndexShardRoutingTable shardRoutingTable) {
+ if (hasActiveReplicas(shardRoutingTable)) {
+ return ClusterHealthStatus.GREEN; // Change to GREEN when we have active search replicas
+ }
+ // If replicas are initializing, show as yellow
+ if (hasInitializingReplicas(shardRoutingTable)) {
+ return ClusterHealthStatus.YELLOW;
+ }
+ // No active or initializing search replicas
+ return ClusterHealthStatus.RED;
+ }
+
+ /**
+ * Check if shard has active search replicas
+ */
+ private boolean hasActiveSearchReplicas(IndexShardRoutingTable shardRoutingTable) {
+ return shardRoutingTable.shards().stream()
+ .anyMatch(shard -> !shard.primary() && shard.active());
+ }
+
+ /**
+ * Count active search replicas
+ */
+ private int computeSearchReplicaCount(IndexShardRoutingTable shardRoutingTable) {
+ return (int) shardRoutingTable.shards().stream()
+ .filter(shard -> !shard.primary() && shard.active())
+ .count();
+ }
+
+ /**
+ * Check if shard has initializing search replicas
+ */
+ private boolean hasInitializingSearchReplicas(IndexShardRoutingTable shardRoutingTable) {
+ return shardRoutingTable.shards().stream()
+ .anyMatch(shard -> !shard.primary() && shard.initializing());
}
public ClusterShardHealth(final StreamInput in) throws IOException {
@@ -156,6 +236,7 @@ public ClusterShardHealth(final StreamInput in) throws IOException {
initializingShards = in.readVInt();
unassignedShards = in.readVInt();
primaryActive = in.readBoolean();
+ indexingShardsRemoved = in.readBoolean();
}
/**
@@ -168,7 +249,8 @@ public ClusterShardHealth(final StreamInput in) throws IOException {
int relocatingShards,
int initializingShards,
int unassignedShards,
- boolean primaryActive
+ boolean primaryActive,
+ boolean indexingShardsRemoved
) {
this.shardId = shardId;
this.status = status;
@@ -177,6 +259,7 @@ public ClusterShardHealth(final StreamInput in) throws IOException {
this.initializingShards = initializingShards;
this.unassignedShards = unassignedShards;
this.primaryActive = primaryActive;
+ this.indexingShardsRemoved = indexingShardsRemoved;
}
public int getShardId() {
@@ -220,6 +303,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeVInt(initializingShards);
out.writeVInt(unassignedShards);
out.writeBoolean(primaryActive);
+ out.writeBoolean(indexingShardsRemoved);
}
/**
@@ -231,7 +315,15 @@ public void writeTo(final StreamOutput out) throws IOException {
*
*/
public static ClusterHealthStatus getShardHealth(final ShardRouting primaryRouting, final int activeShards, final int totalShards) {
- assert primaryRouting != null : "Primary shard routing can't be null";
+ // If primaryRouting is null or unassigned, check if it's due to remove_indexing_shards
+ if (primaryRouting == null || primaryRouting.unassigned()) {
+ // If we have any active shards (which would be search replicas), it's an intentional state
+ if (activeShards > 0) {
+ return ClusterHealthStatus.GREEN; // Intentionally scaled down with active search replicas
+ }
+ return ClusterHealthStatus.RED; // No active shards at all
+ }
+
if (primaryRouting.active()) {
if (activeShards == totalShards) {
return ClusterHealthStatus.GREEN;
@@ -264,8 +356,8 @@ public static ClusterHealthStatus getInactivePrimaryHealth(final ShardRouting sh
if (unassignedInfo.getLastAllocationStatus() != AllocationStatus.DECIDERS_NO
&& unassignedInfo.getNumFailedAllocations() == 0
&& (recoveryType == RecoverySource.Type.EMPTY_STORE
- || recoveryType == RecoverySource.Type.LOCAL_SHARDS
- || recoveryType == RecoverySource.Type.SNAPSHOT)) {
+ || recoveryType == RecoverySource.Type.LOCAL_SHARDS
+ || recoveryType == RecoverySource.Type.SNAPSHOT)) {
return ClusterHealthStatus.YELLOW;
} else {
return ClusterHealthStatus.RED;
@@ -281,6 +373,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(RELOCATING_SHARDS, getRelocatingShards());
builder.field(INITIALIZING_SHARDS, getInitializingShards());
builder.field(UNASSIGNED_SHARDS, getUnassignedShards());
+ if (indexingShardsRemoved) {
+ builder.field(INDEXING_SHARDS_REMOVED, true);
+ }
builder.endObject();
return builder;
}
@@ -320,6 +415,6 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return Objects.hash(shardId, status, activeShards, relocatingShards, initializingShards, unassignedShards, primaryActive);
+ return Objects.hash(shardId, status, activeShards, relocatingShards, initializingShards, unassignedShards, primaryActive, indexingShardsRemoved);
}
}
diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java
index c8ea5442a0dd0..6a79b1c1ec9ee 100644
--- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java
+++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java
@@ -645,6 +645,15 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
Setting.Property.Final
);
+ public static final String SETTING_REMOVE_INDEXING_SHARDS = "index.remove_indexing_shards.enabled";
+ public static final Setting INDEX_REMOVE_INDEXING_SHARDS_SETTING = Setting.boolSetting(
+ SETTING_REMOVE_INDEXING_SHARDS,
+ false,
+ Property.Dynamic,
+ Property.IndexScope
+ );
+
+ private final boolean isRemoveIndexingShards;
public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
static final String KEY_VERSION = "version";
static final String KEY_MAPPING_VERSION = "mapping_version";
@@ -742,7 +751,8 @@ private IndexMetadata(
final Map rolloverInfos,
final boolean isSystem,
final int indexTotalShardsPerNodeLimit,
- final Context context
+ final Context context,
+ final boolean isRemoveIndexingShards
) {
this.index = index;
@@ -759,7 +769,12 @@ private IndexMetadata(
this.numberOfShards = numberOfShards;
this.numberOfReplicas = numberOfReplicas;
this.numberOfSearchOnlyReplicas = numberOfSearchOnlyReplicas;
- this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1);
+ this.isRemoveIndexingShards = isRemoveIndexingShards;
+ if (this.isRemoveIndexingShards) {
+ this.totalNumberOfShards = numberOfShards * numberOfSearchOnlyReplicas;
+ } else {
+ this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1);
+ }
this.settings = settings;
this.mappings = Collections.unmodifiableMap(mappings);
this.customData = Collections.unmodifiableMap(customData);
@@ -1344,6 +1359,10 @@ public boolean isRemoteSnapshot() {
return isRemoteSnapshot;
}
+ public boolean isRemoveIndexingShards() {
+ return isRemoveIndexingShards;
+ }
+
public static Builder builder(String index) {
return new Builder(index);
}
@@ -1376,6 +1395,7 @@ public static class Builder {
private Integer routingNumShards;
private boolean isSystem;
private Context context;
+ private boolean isRemoveIndexingShards;
public Builder(String index) {
this.index = index;
@@ -1387,6 +1407,11 @@ public Builder(String index) {
this.isSystem = false;
}
+ public Builder removeIndexingShards(boolean isRemoveIndexingShards) {
+ this.isRemoveIndexingShards = isRemoveIndexingShards;
+ return this;
+ }
+
public Builder(IndexMetadata indexMetadata) {
this.index = indexMetadata.getIndex().getName();
this.state = indexMetadata.state;
@@ -1653,6 +1678,18 @@ public IndexMetadata build() {
final int numberOfReplicas = INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
final int numberOfSearchReplicas = INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(settings);
+ /// The validation can be added at the Metadata level
+ /*if (isScaledToZero) {
+ if (numberOfSearchReplicas == 0) {
+ throw new IllegalArgumentException("Cannot scale to zero without search replicas");
+ }
+ if (!INDEX_REMOTE_STORE_ENABLED_SETTING.get(settings)) {
+ throw new IllegalArgumentException("Remote store must be enabled to scale to zero");
+ }
+ if (!INDEX_REPLICATION_TYPE_SETTING.get(settings).equals(ReplicationType.SEGMENT)) {
+ throw new IllegalArgumentException("Segment replication must be enabled to scale to zero");
+ }
+ }*/
int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings);
if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) {
throw new IllegalArgumentException(
@@ -1765,7 +1802,8 @@ public IndexMetadata build() {
rolloverInfos,
isSystem,
indexTotalShardsPerNodeLimit,
- context
+ context,
+ isRemoveIndexingShards
);
}
diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java
index 8c350d6b9cef5..767753eb9eba4 100644
--- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java
+++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java
@@ -43,7 +43,10 @@
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.block.ClusterBlock;
import org.opensearch.cluster.block.ClusterBlocks;
+import org.opensearch.cluster.routing.IndexRoutingTable;
+import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
+import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
@@ -60,6 +63,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
+import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
@@ -129,6 +133,51 @@ public MetadataUpdateSettingsService(
updateSettingsTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_SETTINGS_KEY, true);
}
+ public ClusterState updateRoutingTableForRemoveIndexShards(
+ String[] actualIndices,
+ ClusterState currentState
+ ) {
+ RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
+ Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
+
+ for (String indexName : actualIndices) {
+ Index index = currentState.metadata().index(indexName).getIndex();
+ IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index);
+
+ // Validation for scale-down
+ if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) {
+ throw new IllegalArgumentException(
+ "Cannot scale to zero without search replicas for index: " + indexName
+ );
+ }
+ if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
+ throw new IllegalArgumentException(
+ "Remote store must be enabled to scale to zero for index: " + indexName
+ );
+ }
+ if (!ReplicationType.SEGMENT.toString().equals(
+ indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))) {
+ throw new IllegalArgumentException(
+ "Segment replication must be enabled to scale to zero for index: " + indexName
+ );
+ }
+
+ // Update metadata and routing table for scale-down
+ IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata)
+ .removeIndexingShards(true);
+ metadataBuilder.put(indexMetadataBuilder);
+ routingTableBuilder.updateRemoveIndexShards(true, new String[]{indexName});
+ }
+
+ // Return the updated cluster state
+ return ClusterState.builder(currentState)
+ .metadata(metadataBuilder)
+ .routingTable(routingTableBuilder.build())
+ .build();
+ }
+
+
+
public void updateSettings(
final UpdateSettingsClusterStateUpdateRequest request,
final ActionListener listener
@@ -231,7 +280,63 @@ public ClusterState execute(ClusterState currentState) {
}
- if (validationErrors.size() > 0) {
+ if (IndexMetadata.INDEX_REMOVE_INDEXING_SHARDS_SETTING.exists(openSettings)) {
+ boolean removeIndexingShards = IndexMetadata.INDEX_REMOVE_INDEXING_SHARDS_SETTING.get(openSettings);
+ if (!removeIndexingShards) {
+ // Scale up validation
+ for (Index index : request.indices()) {
+ IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index);
+
+ // First check if index is actually scaled down
+ if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS, false)) {
+ throw new IllegalArgumentException(
+ "Cannot restore indexing shards for index that is not scaled down: " + index.getName()
+ );
+ }
+
+ // Then verify prerequisites are still met
+ if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
+ throw new IllegalArgumentException(
+ "Remote store must be enabled to restore indexing shards for index: " + index.getName()
+ );
+ }
+ if (!indexMetadata.getSettings()
+ .get(IndexMetadata.SETTING_REPLICATION_TYPE)
+ .equals(ReplicationType.SEGMENT.toString())) {
+ throw new IllegalArgumentException(
+ "Segment replication must be enabled to restore indexing shards for index: " + index.getName()
+ );
+ }
+ if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) {
+ throw new IllegalArgumentException(
+ "Cannot restore indexing shards without search replicas for index: " + index.getName()
+ );
+ }
+ }
+
+ // Process scale up
+ for (Index index : request.indices()) {
+ IndexMetadata indexMetadata = metadataBuilder.getSafe(index);
+ IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata)
+ .removeIndexingShards(removeIndexingShards);
+ metadataBuilder.put(indexMetadataBuilder);
+
+ // Update routing table to restore primary and replica shards
+ routingTableBuilder.updateRemoveIndexShards(removeIndexingShards, actualIndices);
+ }
+
+ // Force a reroute to allocate the new shards
+ ClusterState tempState = ClusterState.builder(currentState)
+ .metadata(metadataBuilder)
+ .routingTable(routingTableBuilder.build())
+ .build();
+
+ routingTableBuilder = RoutingTable.builder(
+ allocationService.reroute(tempState, "restore indexing shards").routingTable()
+ );
+ }
+ }
+ if (!validationErrors.isEmpty()) {
ValidationException exception = new ValidationException();
exception.addValidationErrors(validationErrors);
throw exception;
@@ -341,6 +446,7 @@ public ClusterState execute(ClusterState currentState) {
if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(indexSettings) == false) {
indexSettings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, defaultReplicaCount);
}
+
Settings finalSettings = indexSettings.build();
indexScopedSettings.validate(
finalSettings.filter(k -> indexScopedSettings.isPrivateSetting(k) == false),
diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
index 76111f623e0a5..ea6ef565465d8 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
@@ -127,7 +127,7 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
// also fill replicaSet information
for (final IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable) {
- assert indexShard.primary != null;
+ // assert indexShard.primary != null;
for (ShardRouting shard : indexShard) {
// to get all the shards belonging to an index, including the replicas,
// we define a replica set and keep track of it. A replica set is identified
diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java
index 7128eb44bfb14..e27a1b4ccf271 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java
@@ -32,6 +32,7 @@
package org.opensearch.cluster.routing;
+import org.opensearch.Version;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.Diffable;
import org.opensearch.cluster.DiffableUtils;
@@ -50,6 +51,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.ShardNotFoundException;
+import org.opensearch.repositories.IndexId;
import java.io.IOException;
import java.util.ArrayList;
@@ -59,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.function.Predicate;
import static org.opensearch.cluster.metadata.MetadataIndexStateService.isIndexVerifiedBeforeClosed;
@@ -602,6 +605,80 @@ public Builder updateNumberOfSearchReplicas(final int numberOfSearchReplicas, fi
return this;
}
+ public Builder updateRemoveIndexShards(boolean remove, final String[] indices) {
+ if (indicesRouting == null) {
+ throw new IllegalStateException("once build is called the builder cannot be reused");
+ }
+ for (String index : indices) {
+ IndexRoutingTable indexRoutingTable = indicesRouting.get(index);
+ if (indexRoutingTable == null) {
+ continue;
+ }
+
+ if (remove) {
+ // Scaling down - keep only search replicas
+ IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex());
+ for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
+ IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(indexShardRoutingTable.shardId());
+ for (ShardRouting shardRouting : indexShardRoutingTable) {
+ if (shardRouting.isSearchOnly()) {
+ shardBuilder.addShard(shardRouting);
+ }
+ }
+ builder.addIndexShard(shardBuilder.build());
+ }
+ indicesRouting.put(index, builder.build());
+ } else {
+ // Scaling up - we'll create a new routing table with unassigned primary and replica shards
+ // Let OpenSearch's allocation service handle the actual allocation
+ IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex());
+
+ for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
+ IndexShardRoutingTable.Builder shardBuilder = new IndexShardRoutingTable.Builder(indexShardRoutingTable.shardId());
+
+ // Keep all existing search replicas
+ for (ShardRouting shardRouting : indexShardRoutingTable) {
+ if (shardRouting.isSearchOnly()) {
+ shardBuilder.addShard(shardRouting);
+ }
+ }
+
+ // Create recovery source for primary
+ RecoverySource.RemoteStoreRecoverySource remoteStoreRecoverySource = new RecoverySource.RemoteStoreRecoverySource(
+ UUID.randomUUID().toString(),
+ Version.CURRENT,
+ new IndexId(
+ indexShardRoutingTable.shardId().getIndex().getName(),
+ indexShardRoutingTable.shardId().getIndex().getUUID()
+ )
+ );
+
+ // Add unassigned primary
+ ShardRouting primaryShard = ShardRouting.newUnassigned(
+ indexShardRoutingTable.shardId(),
+ true, // isPrimary
+ remoteStoreRecoverySource,
+ new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring primary shard")
+ );
+ shardBuilder.addShard(primaryShard);
+
+ // Add unassigned replica
+ ShardRouting replicaShard = ShardRouting.newUnassigned(
+ indexShardRoutingTable.shardId(),
+ false, // not primary
+ RecoverySource.PeerRecoverySource.INSTANCE,
+ new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "Restoring replica shard")
+ );
+ shardBuilder.addShard(replicaShard);
+
+ builder.addIndexShard(shardBuilder.build());
+ }
+ indicesRouting.put(index, builder.build());
+ }
+ }
+ return this;
+ }
+
public Builder addAsNew(IndexMetadata indexMetadata) {
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew(
diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
index 8d56a942c5d6e..6c37ae2791e6b 100644
--- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
@@ -183,6 +183,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
FieldMapper.COERCE_SETTING,
Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING,
MapperService.INDEX_MAPPER_DYNAMIC_SETTING,
+ IndexSettings.INDEX_REMOVE_INDEXING_SHARDS_SETTING,
MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING,
MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING,
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING,
diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java
index f1b36194bf62d..5e142eea9ad22 100644
--- a/server/src/main/java/org/opensearch/index/IndexService.java
+++ b/server/src/main/java/org/opensearch/index/IndexService.java
@@ -43,6 +43,7 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
+import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
@@ -87,6 +88,7 @@
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
+import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardNotFoundException;
@@ -646,7 +648,7 @@ protected void closeInternal() {
Directory directory = null;
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING) &&
- // TODO : Need to remove this check after support for hot indices is added in Composite Directory
+ // TODO : Need to remove this check after support for hot indices is added in Composite Directory
this.indexSettings.isStoreLocalityPartial()) {
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
directory = new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
@@ -778,6 +780,7 @@ public synchronized void removeShard(int shardId, String reason) {
HashMap newShards = new HashMap<>(shards);
indexShard = newShards.remove(shardId);
shards = unmodifiableMap(newShards);
+ logger.info("calling the closeShard from removeShard");
closeShard(reason, sId, indexShard, indexShard.store(), indexShard.getIndexEventListener());
logger.debug("[{}] closed (reason: [{}])", shardId, reason);
}
@@ -786,12 +789,47 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store
final int shardId = sId.id();
final Settings indexSettings = this.getIndexSettings().getSettings();
Store remoteStore = null;
+
if (indexShard != null) {
remoteStore = indexShard.remoteStore();
}
if (store != null) {
store.beforeClose();
}
+ // Check if final sync needed - get setting directly from the shard to ensure latest value
+ if (indexShard != null &&
+ !indexShard.state().equals(IndexShardState.CLOSED) &&
+ indexShard.indexSettings().getIndexMetadata().getSettings().getAsBoolean(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS, false)) {
+ try {
+ if (remoteStore != null) {
+ try {
+ logger.info("Doing final sync before closing shard, remove_indexing_shards is enabled");
+ logger.info("[{}][{}] Primary shard starting final sync. Current segments: {}",
+ indexShard.shardId().getIndex().getName(),
+ indexShard.shardId().id(),
+ String.join(",", indexShard.store().directory().listAll())
+ );
+ indexShard.sync();
+ logger.info("[{}][{}] Primary shard sync completed, waiting for remote store sync",
+ indexShard.shardId().getIndex().getName(),
+ indexShard.shardId().id()
+ );
+ logger.info("Waiting for final sync to complete");
+ indexShard.waitForRemoteStoreSync();
+ logger.info("[{}][{}] Primary shard final sync completed. Final segments: {}",
+ indexShard.shardId().getIndex().getName(),
+ indexShard.shardId().id(),
+ String.join(",", indexShard.store().directory().listAll())
+ );
+ } catch (IOException e) {
+ logger.warn("Failed to perform final sync to remote store before closing shard", e);
+ }
+ }
+ } catch (AlreadyClosedException e) {
+ logger.warn("Failed to perform final sync - shard already closed", e);
+ }
+ }
+
try {
try {
listener.beforeIndexShardClosed(sId, indexShard, indexSettings);
@@ -1566,5 +1604,4 @@ public boolean clearCaches(boolean queryCache, boolean fieldDataCache, String...
}
return clearedAtLeastOne;
}
-
}
diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java
index 554e99764c1a1..91907dfd9d561 100644
--- a/server/src/main/java/org/opensearch/index/IndexSettings.java
+++ b/server/src/main/java/org/opensearch/index/IndexSettings.java
@@ -782,6 +782,15 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);
+ public static final String SETTING_REMOVE_INDEXING_SHARDS = "index.remove_indexing_shards.enabled";
+ public static final Setting INDEX_REMOVE_INDEXING_SHARDS_SETTING = Setting.boolSetting(
+ SETTING_REMOVE_INDEXING_SHARDS,
+ false,
+ Property.Dynamic,
+ Property.IndexScope
+ );
+ private volatile boolean isRemoveIndexingShards;
+
private final Index index;
private final Version version;
private final Logger logger;
@@ -1136,6 +1145,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING,
mergeSchedulerConfig::setMaxThreadAndMergeCount
);
+ scopedSettings.addSettingsUpdateConsumer(INDEX_REMOVE_INDEXING_SHARDS_SETTING, this::setRemoveIndexingShards);
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);
@@ -1202,6 +1212,22 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
);
}
+ private void setRemoveIndexingShards(boolean enabled) {
+ /* The validation can be added at the Settings level
+ if (enabled) {
+ if (settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) == 0) {
+ throw new IllegalArgumentException("Cannot scale to zero without search replicas");
+ }
+ if (!isRemoteStoreEnabled()) {
+ throw new IllegalArgumentException("Remote store must be enabled to scale to zero");
+ }
+ if (!ReplicationType.SEGMENT.equals(replicationType)) {
+ throw new IllegalArgumentException("Segment replication must be enabled to scale to zero");
+ }
+ }*/
+ this.isRemoveIndexingShards = enabled;
+ }
+
private void setSearchIdleAfter(TimeValue searchIdleAfter) {
if (this.isRemoteStoreEnabled) {
logger.warn("Search idle is not supported for remote backed indices");
diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
index eb3999718ca5b..9f71eaed6f881 100644
--- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
@@ -1722,23 +1722,28 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
public Tuple, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
assert indexSettings.isSegRepEnabledOrRemoteNode();
- // do not close the snapshot - caller will close it.
+ // Do not close the snapshot here; the caller will close it.
GatedCloseable snapshot = null;
try {
snapshot = getSegmentInfosSnapshot();
final SegmentInfos segmentInfos = snapshot.get();
return new Tuple<>(snapshot, computeReplicationCheckpoint(segmentInfos));
- } catch (IOException | AlreadyClosedException e) {
- logger.error("Error Fetching SegmentInfos and latest checkpoint", e);
- if (snapshot != null) {
- try {
- snapshot.close();
- } catch (IOException ex) {
- throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
- }
+ } catch (AlreadyClosedException e) {
+ // The shard is closed; this is expected during shutdown or reconfiguration.
+ logger.debug("Shard is already closed; cannot fetch SegmentInfos and checkpoint.", e);
+ // Return a null snapshot and the latest replication checkpoint.
+ return new Tuple<>(new GatedCloseable<>(null, () -> {}), getLatestReplicationCheckpoint());
+ } catch (IOException e) {
+ // An actual I/O error occurred; log this as an error.
+ logger.error("IOException while fetching SegmentInfos and checkpoint", e);
+ try {
+ snapshot.close();
+ } catch (IOException ex) {
+ logger.error("Error closing SegmentInfos snapshot after IOException", ex);
}
+ // Depending on your application's needs, you might rethrow or handle the IOException differently.
+ return new Tuple<>(new GatedCloseable<>(null, () -> {}), getLatestReplicationCheckpoint());
}
- return new Tuple<>(new GatedCloseable<>(null, () -> {}), getLatestReplicationCheckpoint());
}
/**