diff --git a/libs/common/src/main/java/org/opensearch/common/IndexingRetryException.java b/libs/common/src/main/java/org/opensearch/common/IndexingRetryException.java new file mode 100644 index 0000000000000..27f1337cb18a3 --- /dev/null +++ b/libs/common/src/main/java/org/opensearch/common/IndexingRetryException.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +/** + * This exception indicates that retry has been made during indexing for AppendOnly index. If the response of any + * indexing request contains this Exception in the response, we do not need to add a translog entry for this request. + * + * @opensearch.internal + */ +public class IndexingRetryException extends RuntimeException { + public IndexingRetryException(String message) { + super(message); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/bulk/AppendOnlyIndicesIT.java b/server/src/internalClusterTest/java/org/opensearch/action/bulk/AppendOnlyIndicesIT.java new file mode 100644 index 0000000000000..971f80cf033e9 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/bulk/AppendOnlyIndicesIT.java @@ -0,0 +1,170 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.bulk; + +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.ingest.IngestTestPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.ConnectTransportException; +import org.opensearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.containsString; + +public class AppendOnlyIndicesIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(IngestTestPlugin.class, MockTransportService.TestPlugin.class); + } + + public void testIndexDocumentWithACustomDocIdForAppendOnlyIndices() throws Exception { + Client client = internalCluster().coordOnlyNodeClient(); + assertAcked( + client().admin() + .indices() + .prepareCreate("index") + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 1) + .put("index.append_only.enabled", true) + ) + ); + ensureGreen("index"); + + BulkRequestBuilder bulkBuilder = client.prepareBulk(); + + XContentBuilder doc = null; + doc = jsonBuilder().startObject().field("foo", "bar").endObject(); + bulkBuilder.add(client.prepareIndex("index").setId(Integer.toString(0)).setSource(doc)); + + BulkResponse response = bulkBuilder.get(); + assertThat(response.getItems()[0].getFailureMessage(), containsString("Operation [INDEX] is not allowed with a custom doc id 0")); + } + + public void testUpdateDeleteDocumentForAppendOnlyIndices() throws Exception { + Client client = internalCluster().coordOnlyNodeClient(); + assertAcked( + client().admin() + .indices() + .prepareCreate("index") + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 1) + .put("index.append_only.enabled", true) + ) + ); + ensureGreen("index"); + + BulkRequestBuilder bulkBuilder = client.prepareBulk(); + + XContentBuilder doc = null; + doc = jsonBuilder().startObject().field("foo", "bar").endObject(); + bulkBuilder.add(client.prepareIndex("index").setSource(doc)); + + bulkBuilder.get(); + BulkResponse response = client().prepareBulk().add(client().prepareUpdate("index", "0").setDoc("foo", "updated")).get(); + assertThat( + response.getItems()[0].getFailureMessage(), + containsString("Operation [UPDATE] is not allowed for append only index index;") + ); + + response = client().prepareBulk().add(client().prepareDelete("index", "0")).get(); + assertThat( + response.getItems()[0].getFailureMessage(), + containsString("Operation [DELETE] is not allowed for append only index index;") + ); + } + + public void testRetryForAppendOnlyIndices() throws Exception { + final AtomicBoolean exceptionThrown = new AtomicBoolean(false); + int numDocs = scaledRandomIntBetween(100, 1000); + Client client = internalCluster().coordOnlyNodeClient(); + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); + NodeStats unluckyNode = randomFrom( + nodeStats.getNodes().stream().filter((s) -> s.getNode().isDataNode()).collect(Collectors.toList()) + ); + assertAcked( + client().admin() + .indices() + .prepareCreate("index") + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 1) + .put("index.append_only.enabled", true) + ) + ); + ensureGreen("index"); + logger.info("unlucky node: {}", unluckyNode.getNode()); + // create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry. + for (NodeStats dataNode : nodeStats.getNodes()) { + if (exceptionThrown.get()) { + break; + } + + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + dataNode.getNode().getName() + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), + (connection, requestId, action, request, options) -> { + connection.sendRequest(requestId, action, request, options); + if (action.equals(TransportShardBulkAction.ACTION_NAME) && exceptionThrown.compareAndSet(false, true)) { + logger.debug("Throw ConnectTransportException"); + throw new ConnectTransportException(connection.getNode(), action); + } + } + ); + } + + BulkRequestBuilder bulkBuilder = client.prepareBulk(); + + for (int i = 0; i < numDocs; i++) { + XContentBuilder doc = null; + doc = jsonBuilder().startObject().field("foo", "bar").endObject(); + bulkBuilder.add(client.prepareIndex("index").setSource(doc)); + } + + BulkResponse response = bulkBuilder.get(); + for (BulkItemResponse singleIndexResponse : response.getItems()) { + // Retry will not create a new version. + assertThat(singleIndexResponse.getVersion(), equalTo(1L)); + } + + response = client().prepareBulk().add(client().prepareUpdate("index", "0").setDoc("foo", "updated")).get(); + assertThat( + response.getItems()[0].getFailureMessage(), + containsString("Operation [UPDATE] is not allowed for append only index index;") + ); + + response = client().prepareBulk().add(client().prepareDelete("index", "0")).get(); + assertThat( + response.getItems()[0].getFailureMessage(), + containsString("Operation [DELETE] is not allowed for append only index index;") + ); + } + +} diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java index 4e770f5851bc6..fc49774682c98 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java @@ -38,6 +38,7 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.TransportWriteAction; +import org.opensearch.common.IndexingRetryException; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.Translog; @@ -297,20 +298,36 @@ public void markOperationAsExecuted(Engine.Result result) { locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation()); break; case FAILURE: - executionResult = new BulkItemResponse( - current.id(), - docWriteRequest.opType(), - // Make sure to use request.index() here, if you - // use docWriteRequest.index() it will use the - // concrete index instead of an alias if used! - new BulkItemResponse.Failure( - request.index(), - docWriteRequest.id(), - result.getFailure(), + if (result.getFailure() instanceof IndexingRetryException) { + Engine.IndexResult indexResult = (Engine.IndexResult) result; + DocWriteResponse indexResponse = new IndexResponse( + primary.shardId(), + requestToExecute.id(), result.getSeqNo(), - result.getTerm() - ) - ); + result.getTerm(), + indexResult.getVersion(), + indexResult.isCreated() + ); + + executionResult = new BulkItemResponse(current.id(), current.request().opType(), indexResponse); + // set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though. + executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo()); + } else { + executionResult = new BulkItemResponse( + current.id(), + docWriteRequest.opType(), + // Make sure to use request.index() here, if you + // use docWriteRequest.index() it will use the + // concrete index instead of an alias if used! + new BulkItemResponse.Failure( + request.index(), + docWriteRequest.id(), + result.getFailure(), + result.getSeqNo(), + result.getTerm() + ) + ); + } break; default: throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType()); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 19ffb12859183..1901d7150eebd 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -66,6 +66,7 @@ import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.ValidationException; import org.opensearch.common.inject.Inject; import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; @@ -538,6 +539,11 @@ protected void doRun() { if (docWriteRequest == null) { continue; } + + if (addFailureIfIndexAppendOnlyAndOpsDeleteOrUpdate(docWriteRequest, i, concreteIndices, metadata)) { + continue; + } + if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) { continue; } @@ -756,6 +762,44 @@ public void onTimeout(TimeValue timeout) { }); } + private boolean addFailureIfIndexAppendOnlyAndOpsDeleteOrUpdate( + DocWriteRequest request, + int idx, + final ConcreteIndices concreteIndices, + Metadata metadata + ) { + Index concreteIndex = concreteIndices.getConcreteIndex(request.index()); + if (concreteIndex == null) { + try { + concreteIndex = concreteIndices.resolveIfAbsent(request); + } catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) { + addFailure(request, idx, ex); + return true; + } + } + + final IndexMetadata indexMetadata = metadata.index(concreteIndex); + if (indexMetadata.isAppendOnlyIndex() == true) { + if ((request.opType() == DocWriteRequest.OpType.UPDATE || request.opType() == DocWriteRequest.OpType.DELETE)) { + ValidationException exception = new ValidationException(); + exception.addValidationError( + "Operation [" + request.opType() + "] is not allowed for append only index " + request.index() + ); + addFailure(request, idx, exception); + return true; + } else if (request.id() != null && request.opType() == DocWriteRequest.OpType.INDEX) { + ValidationException exception = new ValidationException(); + exception.addValidationError( + "Operation [" + request.opType() + "] is not allowed with a custom doc id " + request.id() + ); + addFailure(request, idx, exception); + return true; + } + } + + return false; + } + private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest request, int idx, final Metadata metadata) { if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) { Exception exception = new IndexNotFoundException( diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index bb470ea9e4ab8..cf07a02da1b09 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -342,6 +342,7 @@ public Iterator> settings() { ); public static final String SETTING_REMOTE_STORE_ENABLED = "index.remote_store.enabled"; + public static final String SETTING_INDEX_APPEND_ONLY_ENABLED = "index.append_only.enabled"; public static final String SETTING_REMOTE_SEGMENT_STORE_REPOSITORY = "index.remote_store.segment.repository"; @@ -384,6 +385,16 @@ public Iterator> settings() { Property.Dynamic ); + /** + * Used to specify if the index data should be persisted in the remote store. + */ + public static final Setting INDEX_APPEND_ONLY_ENABLED_SETTING = Setting.boolSetting( + SETTING_INDEX_APPEND_ONLY_ENABLED, + false, + Property.IndexScope, + Property.Final + ); + /** * Used to specify remote store repository to use for this index. */ @@ -710,6 +721,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final boolean isRemoteSnapshot; private final int indexTotalShardsPerNodeLimit; + private final boolean isAppendOnlyIndex; private final Context context; @@ -741,6 +753,7 @@ private IndexMetadata( final Map rolloverInfos, final boolean isSystem, final int indexTotalShardsPerNodeLimit, + boolean isAppendOnlyIndex, final Context context ) { @@ -778,6 +791,7 @@ private IndexMetadata( this.isSystem = isSystem; this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); this.indexTotalShardsPerNodeLimit = indexTotalShardsPerNodeLimit; + this.isAppendOnlyIndex = isAppendOnlyIndex; this.context = context; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -940,6 +954,10 @@ public int getIndexTotalShardsPerNodeLimit() { return this.indexTotalShardsPerNodeLimit; } + public boolean isAppendOnlyIndex() { + return this.isAppendOnlyIndex; + } + @Nullable public DiscoveryNodeFilters requireFilters() { return requireFilters; @@ -1695,6 +1713,7 @@ public IndexMetadata build() { } final int indexTotalShardsPerNodeLimit = ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(settings); + final boolean isAppendOnlyIndex = INDEX_APPEND_ONLY_ENABLED_SETTING.get(settings); final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); @@ -1726,6 +1745,7 @@ public IndexMetadata build() { rolloverInfos, isSystem, indexTotalShardsPerNodeLimit, + isAppendOnlyIndex, context ); } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 8d56a942c5d6e..387ed0ed92680 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -109,6 +109,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_FORMAT_SETTING, IndexMetadata.INDEX_HIDDEN_SETTING, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING, + IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING, SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING, SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING, SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 554e99764c1a1..53c0b1251aebd 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -584,7 +584,7 @@ public static IndexMergePolicy fromString(String text) { */ public static final Setting INDEX_UNREFERENCED_FILE_CLEANUP = Setting.boolSetting( "index.unreferenced_file_cleanup.enabled", - true, + false, Property.IndexScope, Property.Dynamic ); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 59165b936aec8..47d09ab14ab79 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -69,8 +69,10 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.index.IndexRequest; import org.opensearch.common.Booleans; +import org.opensearch.common.IndexingRetryException; import org.opensearch.common.Nullable; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.ValidationException; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.LoggerInfoStream; @@ -932,19 +934,21 @@ public IndexResult index(Index index) throws IOException { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { location = translogManager.add(new Translog.Index(index, indexResult)); - } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no - final NoOp noOp = new NoOp( - indexResult.getSeqNo(), - index.primaryTerm(), - index.origin(), - index.startTime(), - indexResult.getFailure().toString() - ); - location = innerNoOp(noOp).getTranslogLocation(); - } else { - location = null; - } + } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + && indexResult.getFailure() != null + && !(indexResult.getFailure() instanceof IndexingRetryException)) { + // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no + final NoOp noOp = new NoOp( + indexResult.getSeqNo(), + index.primaryTerm(), + index.origin(), + index.startTime(), + indexResult.getFailure().toString() + ); + location = innerNoOp(noOp).getTranslogLocation(); + } else { + location = null; + } indexResult.setTranslogLocation(location); } if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) { @@ -955,7 +959,8 @@ public IndexResult index(Index index) throws IOException { ); } localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo()); - if (indexResult.getTranslogLocation() == null) { + if (indexResult.getTranslogLocation() == null + && !(indexResult.getFailure() != null && (indexResult.getFailure() instanceof IndexingRetryException))) { // the op is coming from the translog (and is hence persisted already) or it does not have a sequence number assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO; localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo()); @@ -1049,7 +1054,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { } else { versionMap.enforceSafeAccess(); // resolves incoming version - final VersionValue versionValue = resolveDocVersion(index, index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO); + final VersionValue versionValue = resolveDocVersion(index, true); final long currentVersion; final boolean currentNotFoundOrDeleted; if (versionValue == null) { @@ -1092,6 +1097,29 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); if (reserveError != null) { plan = IndexingStrategy.failAsTooManyDocs(reserveError); + } else if (currentVersion >= 1 && engineConfig.getIndexSettings().getIndexMetadata().isAppendOnlyIndex()) { + // Retry happens for indexing requests for append only indices, since we are rejecting update requests + // at Transport layer itself. So for any retry, we are reconstructing response from already indexed + // document version for append only index. + if (index.isRetry()) { + IndexingRetryException retryException = new IndexingRetryException( + "Indexing operation retried for append only indices" + ); + final IndexResult result = new IndexResult( + retryException, + currentVersion, + versionValue.term, + versionValue.seqNo + ); + plan = IndexingStrategy.failAsIndexAppendOnly(result, currentVersion, 0); + } else { + ValidationException validationException = new ValidationException(); + validationException.addValidationError( + "Operation [" + index.operationType() + "] is not allowed for append only indices" + ); + final IndexResult result = new IndexResult(validationException, Versions.NOT_FOUND); + plan = IndexingStrategy.failAsIndexAppendOnly(result, Versions.NOT_FOUND, 0); + } } else { plan = IndexingStrategy.processNormally( currentNotFoundOrDeleted, @@ -1283,6 +1311,10 @@ static IndexingStrategy failAsTooManyDocs(Exception e) { final IndexResult result = new IndexResult(e, Versions.NOT_FOUND); return new IndexingStrategy(false, false, false, false, Versions.NOT_FOUND, 0, result); } + + static IndexingStrategy failAsIndexAppendOnly(IndexResult result, long versionForIndexing, int reservedDocs) { + return new IndexingStrategy(false, false, false, true, versionForIndexing, reservedDocs, result); + } } /**