From faa0bd4a900c2b454b6ba51dbaf8490730328647 Mon Sep 17 00:00:00 2001 From: kkewwei Date: Sat, 18 Jan 2025 18:53:00 +0800 Subject: [PATCH] test --- .../org/opensearch/action/DocWriteResponse.java | 13 +++++++++++++ .../action/bulk/BulkPrimaryExecutionContext.java | 3 ++- .../opensearch/action/index/IndexResponse.java | 14 ++++++++++---- .../java/org/opensearch/index/engine/Engine.java | 5 +++++ .../opensearch/index/engine/InternalEngine.java | 15 +++++++++++---- .../engine/VersionConflictEngineException.java | 2 ++ 6 files changed, 43 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/DocWriteResponse.java b/server/src/main/java/org/opensearch/action/DocWriteResponse.java index aada56ed93fd3..763a10b3ab5d6 100644 --- a/server/src/main/java/org/opensearch/action/DocWriteResponse.java +++ b/server/src/main/java/org/opensearch/action/DocWriteResponse.java @@ -49,6 +49,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.seqno.SequenceNumbers; @@ -140,14 +141,20 @@ public void writeTo(StreamOutput out) throws IOException { private final long primaryTerm; private boolean forcedRefresh; protected final Result result; + private final InternalEngine.IndexingStrategy indexingStrategy; public DocWriteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) { + this(shardId, id, seqNo, primaryTerm, version, result, null); + } + + public DocWriteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result, InternalEngine.IndexingStrategy indexingStrategy) { this.shardId = Objects.requireNonNull(shardId); this.id = Objects.requireNonNull(id); this.seqNo = seqNo; this.primaryTerm = primaryTerm; this.version = version; this.result = Objects.requireNonNull(result); + this.indexingStrategy = indexingStrategy; } // needed for deserialization @@ -164,6 +171,7 @@ protected DocWriteResponse(ShardId shardId, StreamInput in) throws IOException { primaryTerm = in.readVLong(); forcedRefresh = in.readBoolean(); result = Result.readFrom(in); + indexingStrategy = new InternalEngine.IndexingStrategy(in); } /** @@ -183,6 +191,7 @@ protected DocWriteResponse(StreamInput in) throws IOException { primaryTerm = in.readVLong(); forcedRefresh = in.readBoolean(); result = Result.readFrom(in); + indexingStrategy = new InternalEngine.IndexingStrategy(in); } /** @@ -237,6 +246,9 @@ public long getPrimaryTerm() { return primaryTerm; } + public InternalEngine.IndexingStrategy indexingStrategy() { + return indexingStrategy; + } /** * Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to * {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will @@ -319,6 +331,7 @@ private void writeWithoutShardId(StreamOutput out) throws IOException { out.writeVLong(primaryTerm); out.writeBoolean(forcedRefresh); result.writeTo(out); + indexingStrategy.writeTo(out); } @Override diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java index 4e770f5851bc6..6ba69ac4599fc 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java @@ -275,7 +275,8 @@ public void markOperationAsExecuted(Engine.Result result) { result.getSeqNo(), result.getTerm(), indexResult.getVersion(), - indexResult.isCreated() + indexResult.isCreated(), + indexResult.indexingStrategy() ); } else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) { Engine.DeleteResult deleteResult = (Engine.DeleteResult) result; diff --git a/server/src/main/java/org/opensearch/action/index/IndexResponse.java b/server/src/main/java/org/opensearch/action/index/IndexResponse.java index 53f832fc12c43..a374be95a84b1 100644 --- a/server/src/main/java/org/opensearch/action/index/IndexResponse.java +++ b/server/src/main/java/org/opensearch/action/index/IndexResponse.java @@ -40,6 +40,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.engine.InternalEngine; import java.io.IOException; @@ -65,11 +66,15 @@ public IndexResponse(StreamInput in) throws IOException { } public IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, boolean created) { - this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED); + this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED, null); } - private IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) { - super(shardId, id, seqNo, primaryTerm, version, assertCreatedOrUpdated(result)); + public IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, boolean created, InternalEngine.IndexingStrategy indexingStrategy) { + this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED, indexingStrategy); + } + + private IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result, InternalEngine.IndexingStrategy indexingStrategy) { + super(shardId, id, seqNo, primaryTerm, version, assertCreatedOrUpdated(result), indexingStrategy); } private static Result assertCreatedOrUpdated(Result result) { @@ -92,6 +97,7 @@ public String toString() { builder.append(",result=").append(getResult().getLowercase()); builder.append(",seqNo=").append(getSeqNo()); builder.append(",primaryTerm=").append(getPrimaryTerm()); + builder.append(",indexingStrategy=" + indexingStrategy()); builder.append(",shards=").append(Strings.toString(MediaTypeRegistry.JSON, getShardInfo())); return builder.append("]").toString(); } @@ -124,7 +130,7 @@ public static void parseXContentFields(XContentParser parser, Builder context) t public static class Builder extends DocWriteResponse.Builder { @Override public IndexResponse build() { - IndexResponse indexResponse = new IndexResponse(shardId, id, seqNo, primaryTerm, version, result); + IndexResponse indexResponse = new IndexResponse(shardId, id, seqNo, primaryTerm, version, result, null); indexResponse.setForcedRefresh(forcedRefresh); if (shardInfo != null) { indexResponse.setShardInfo(shardInfo); diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index f11e581e7c725..1970c9f65dac0 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -504,6 +504,7 @@ protected Result(Operation.TYPE operationType, long version, long term, long seq this.failure = null; this.requiredMappingUpdate = null; this.resultType = Type.SUCCESS; + this.indexingStrategy = indexingStrategy; } protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate) { @@ -566,6 +567,10 @@ public Operation.TYPE getOperationType() { return operationType; } + public InternalEngine.IndexingStrategy indexingStrategy() { + return indexingStrategy; + } + void setTranslogLocation(Translog.Location translogLocation) { if (freeze.get() == null) { this.translogLocation = translogLocation; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 607ebff44d239..c5a2a259ab2df 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -71,6 +71,7 @@ import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.LoggerInfoStream; @@ -1213,7 +1214,8 @@ private void addStaleDocs(final List docs, final IndexWri * * @opensearch.internal */ - protected static final class IndexingStrategy implements Writeable { + @PublicApi(since = "3.0.0") + public static final class IndexingStrategy implements Writeable { final boolean currentNotFoundOrDeleted; final boolean useLuceneUpdateDocument; final long versionForIndexing; @@ -1222,7 +1224,7 @@ protected static final class IndexingStrategy implements Writeable { final int reservedDocs; final Optional earlyResultOnPreFlightError; - private IndexingStrategy( + public IndexingStrategy( boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, boolean indexIntoLucene, @@ -1251,7 +1253,7 @@ private IndexingStrategy( : Optional.of(earlyResultOnPreFlightError); } - private IndexingStrategy(StreamInput in) throws IOException { + public IndexingStrategy(StreamInput in) throws IOException { this.currentNotFoundOrDeleted = in.readBoolean(); this.useLuceneUpdateDocument = in.readBoolean(); this.versionForIndexing = in.readVLong(); @@ -1303,7 +1305,12 @@ static IndexingStrategy failAsTooManyDocs(Exception e) { @Override public void writeTo(StreamOutput out) throws IOException { - + out.writeBoolean(currentNotFoundOrDeleted); + out.writeBoolean(useLuceneUpdateDocument); + out.writeVLong(versionForIndexing); + out.writeBoolean(indexIntoLucene); + out.writeBoolean(addStaleOpToLucene); + out.writeVInt(reservedDocs); } } diff --git a/server/src/main/java/org/opensearch/index/engine/VersionConflictEngineException.java b/server/src/main/java/org/opensearch/index/engine/VersionConflictEngineException.java index 7804b8985e94d..98e173f77ebd6 100644 --- a/server/src/main/java/org/opensearch/index/engine/VersionConflictEngineException.java +++ b/server/src/main/java/org/opensearch/index/engine/VersionConflictEngineException.java @@ -31,6 +31,7 @@ package org.opensearch.index.engine; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.rest.RestStatus; @@ -43,6 +44,7 @@ * * @opensearch.internal */ +@PublicApi(since = "3.0.0") public class VersionConflictEngineException extends EngineException { public VersionConflictEngineException(ShardId shardId, Engine.Operation op, long currentVersion, boolean deleted) {