Skip to content

Commit

Permalink
Adding support for append only indices
Browse files Browse the repository at this point in the history
Signed-off-by: RS146BIJAY <[email protected]>
  • Loading branch information
RS146BIJAY committed Jan 16, 2025
1 parent 1935650 commit 38061f7
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

/**
* This exception indicates that retry has been made during indexing for AppendOnly index. If the response of any
* indexing request contains this Exception in the response, we do not need to add a translog entry for this request.
*
* @opensearch.internal
*/
public class IndexingRetryException extends RuntimeException {
public IndexingRetryException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.bulk;

import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.IngestTestPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsString;

public class AppendOnlyIndicesIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestTestPlugin.class, MockTransportService.TestPlugin.class);
}

public void testIndexDocumentWithACustomDocIdForAppendOnlyIndices() throws Exception {
Client client = internalCluster().coordOnlyNodeClient();
assertAcked(
client().admin()
.indices()
.prepareCreate("index")
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1)
.put("index.append_only.enabled", true)
)
);
ensureGreen("index");

BulkRequestBuilder bulkBuilder = client.prepareBulk();

XContentBuilder doc = null;
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
bulkBuilder.add(client.prepareIndex("index").setId(Integer.toString(0)).setSource(doc));

BulkResponse response = bulkBuilder.get();
assertThat(response.getItems()[0].getFailureMessage(), containsString("Operation [INDEX] is not allowed with a custom doc id 0"));
}

public void testUpdateDeleteDocumentForAppendOnlyIndices() throws Exception {
Client client = internalCluster().coordOnlyNodeClient();
assertAcked(
client().admin()
.indices()
.prepareCreate("index")
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1)
.put("index.append_only.enabled", true)
)
);
ensureGreen("index");

BulkRequestBuilder bulkBuilder = client.prepareBulk();

XContentBuilder doc = null;
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
bulkBuilder.add(client.prepareIndex("index").setSource(doc));

bulkBuilder.get();
BulkResponse response = client().prepareBulk().add(client().prepareUpdate("index", "0").setDoc("foo", "updated")).get();
assertThat(
response.getItems()[0].getFailureMessage(),
containsString("Operation [UPDATE] is not allowed for append only index index;")
);

response = client().prepareBulk().add(client().prepareDelete("index", "0")).get();
assertThat(
response.getItems()[0].getFailureMessage(),
containsString("Operation [DELETE] is not allowed for append only index index;")
);
}

public void testRetryForAppendOnlyIndices() throws Exception {
final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
int numDocs = scaledRandomIntBetween(100, 1000);
Client client = internalCluster().coordOnlyNodeClient();
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
NodeStats unluckyNode = randomFrom(
nodeStats.getNodes().stream().filter((s) -> s.getNode().isDataNode()).collect(Collectors.toList())
);
assertAcked(
client().admin()
.indices()
.prepareCreate("index")
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1)
.put("index.append_only.enabled", true)
)
);
ensureGreen("index");
logger.info("unlucky node: {}", unluckyNode.getNode());
// create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry.
for (NodeStats dataNode : nodeStats.getNodes()) {
if (exceptionThrown.get()) {
break;
}

MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
dataNode.getNode().getName()
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
connection.sendRequest(requestId, action, request, options);
if (action.equals(TransportShardBulkAction.ACTION_NAME) && exceptionThrown.compareAndSet(false, true)) {
logger.debug("Throw ConnectTransportException");
throw new ConnectTransportException(connection.getNode(), action);
}
}
);
}

BulkRequestBuilder bulkBuilder = client.prepareBulk();

for (int i = 0; i < numDocs; i++) {
XContentBuilder doc = null;
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
bulkBuilder.add(client.prepareIndex("index").setSource(doc));
}

BulkResponse response = bulkBuilder.get();
for (BulkItemResponse singleIndexResponse : response.getItems()) {
// Retry will not create a new version.
assertThat(singleIndexResponse.getVersion(), equalTo(1L));
}

response = client().prepareBulk().add(client().prepareUpdate("index", "0").setDoc("foo", "updated")).get();
assertThat(
response.getItems()[0].getFailureMessage(),
containsString("Operation [UPDATE] is not allowed for append only index index;")
);

response = client().prepareBulk().add(client().prepareDelete("index", "0")).get();
assertThat(
response.getItems()[0].getFailureMessage(),
containsString("Operation [DELETE] is not allowed for append only index index;")
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.TransportWriteAction;
import org.opensearch.common.IndexingRetryException;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -297,20 +298,36 @@ public void markOperationAsExecuted(Engine.Result result) {
locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation());
break;
case FAILURE:
executionResult = new BulkItemResponse(
current.id(),
docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(
request.index(),
docWriteRequest.id(),
result.getFailure(),
if (result.getFailure() instanceof IndexingRetryException) {
Engine.IndexResult indexResult = (Engine.IndexResult) result;
DocWriteResponse indexResponse = new IndexResponse(
primary.shardId(),
requestToExecute.id(),
result.getSeqNo(),
result.getTerm()
)
);
result.getTerm(),
indexResult.getVersion(),
indexResult.isCreated()
);

executionResult = new BulkItemResponse(current.id(), current.request().opType(), indexResponse);
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo());
} else {
executionResult = new BulkItemResponse(
current.id(),
docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(
request.index(),
docWriteRequest.id(),
result.getFailure(),
result.getSeqNo(),
result.getTerm()
)
);
}
break;
default:
throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.ValidationException;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -538,6 +539,11 @@ protected void doRun() {
if (docWriteRequest == null) {
continue;
}

if (addFailureIfIndexAppendOnlyAndOpsDeleteOrUpdate(docWriteRequest, i, concreteIndices, metadata)) {
continue;
}

if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
continue;
}
Expand Down Expand Up @@ -756,6 +762,44 @@ public void onTimeout(TimeValue timeout) {
});
}

private boolean addFailureIfIndexAppendOnlyAndOpsDeleteOrUpdate(
DocWriteRequest<?> request,
int idx,
final ConcreteIndices concreteIndices,
Metadata metadata
) {
Index concreteIndex = concreteIndices.getConcreteIndex(request.index());
if (concreteIndex == null) {
try {
concreteIndex = concreteIndices.resolveIfAbsent(request);
} catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
addFailure(request, idx, ex);
return true;
}
}

final IndexMetadata indexMetadata = metadata.index(concreteIndex);
if (indexMetadata.isAppendOnlyIndex() == true) {
if ((request.opType() == DocWriteRequest.OpType.UPDATE || request.opType() == DocWriteRequest.OpType.DELETE)) {
ValidationException exception = new ValidationException();
exception.addValidationError(
"Operation [" + request.opType() + "] is not allowed for append only index " + request.index()
);
addFailure(request, idx, exception);
return true;
} else if (request.id() != null && request.opType() == DocWriteRequest.OpType.INDEX) {
ValidationException exception = new ValidationException();
exception.addValidationError(
"Operation [" + request.opType() + "] is not allowed with a custom doc id " + request.id()
);
addFailure(request, idx, exception);
return true;
}
}

return false;
}

private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest<?> request, int idx, final Metadata metadata) {
if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) {
Exception exception = new IndexNotFoundException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ public Iterator<Setting<?>> settings() {
);

public static final String SETTING_REMOTE_STORE_ENABLED = "index.remote_store.enabled";
public static final String SETTING_INDEX_APPEND_ONLY_ENABLED = "index.append_only.enabled";

public static final String SETTING_REMOTE_SEGMENT_STORE_REPOSITORY = "index.remote_store.segment.repository";

Expand Down Expand Up @@ -384,6 +385,16 @@ public Iterator<Setting<?>> settings() {
Property.Dynamic
);

/**
* Used to specify if the index data should be persisted in the remote store.
*/
public static final Setting<Boolean> INDEX_APPEND_ONLY_ENABLED_SETTING = Setting.boolSetting(
SETTING_INDEX_APPEND_ONLY_ENABLED,
false,
Property.IndexScope,
Property.Final
);

/**
* Used to specify remote store repository to use for this index.
*/
Expand Down Expand Up @@ -710,6 +721,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final boolean isRemoteSnapshot;

private final int indexTotalShardsPerNodeLimit;
private final boolean isAppendOnlyIndex;

private final Context context;

Expand Down Expand Up @@ -741,6 +753,7 @@ private IndexMetadata(
final Map<String, RolloverInfo> rolloverInfos,
final boolean isSystem,
final int indexTotalShardsPerNodeLimit,
boolean isAppendOnlyIndex,
final Context context
) {

Expand Down Expand Up @@ -778,6 +791,7 @@ private IndexMetadata(
this.isSystem = isSystem;
this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
this.indexTotalShardsPerNodeLimit = indexTotalShardsPerNodeLimit;
this.isAppendOnlyIndex = isAppendOnlyIndex;
this.context = context;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}
Expand Down Expand Up @@ -940,6 +954,10 @@ public int getIndexTotalShardsPerNodeLimit() {
return this.indexTotalShardsPerNodeLimit;
}

public boolean isAppendOnlyIndex() {
return this.isAppendOnlyIndex;
}

@Nullable
public DiscoveryNodeFilters requireFilters() {
return requireFilters;
Expand Down Expand Up @@ -1695,6 +1713,7 @@ public IndexMetadata build() {
}

final int indexTotalShardsPerNodeLimit = ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(settings);
final boolean isAppendOnlyIndex = INDEX_APPEND_ONLY_ENABLED_SETTING.get(settings);

final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);

Expand Down Expand Up @@ -1726,6 +1745,7 @@ public IndexMetadata build() {
rolloverInfos,
isSystem,
indexTotalShardsPerNodeLimit,
isAppendOnlyIndex,
context
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_FORMAT_SETTING,
IndexMetadata.INDEX_HIDDEN_SETTING,
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING,
IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING,
Expand Down
Loading

0 comments on commit 38061f7

Please sign in to comment.