Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
kkewwei committed Jan 18, 2025
1 parent 895c5cb commit faa0bd4
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 9 deletions.
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/action/DocWriteResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;

Expand Down Expand Up @@ -140,14 +141,20 @@ public void writeTo(StreamOutput out) throws IOException {
private final long primaryTerm;
private boolean forcedRefresh;
protected final Result result;
private final InternalEngine.IndexingStrategy indexingStrategy;

public DocWriteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) {
this(shardId, id, seqNo, primaryTerm, version, result, null);
}

public DocWriteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result, InternalEngine.IndexingStrategy indexingStrategy) {
this.shardId = Objects.requireNonNull(shardId);
this.id = Objects.requireNonNull(id);
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.result = Objects.requireNonNull(result);
this.indexingStrategy = indexingStrategy;
}

// needed for deserialization
Expand All @@ -164,6 +171,7 @@ protected DocWriteResponse(ShardId shardId, StreamInput in) throws IOException {
primaryTerm = in.readVLong();
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
indexingStrategy = new InternalEngine.IndexingStrategy(in);
}

/**
Expand All @@ -183,6 +191,7 @@ protected DocWriteResponse(StreamInput in) throws IOException {
primaryTerm = in.readVLong();
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
indexingStrategy = new InternalEngine.IndexingStrategy(in);
}

/**
Expand Down Expand Up @@ -237,6 +246,9 @@ public long getPrimaryTerm() {
return primaryTerm;
}

public InternalEngine.IndexingStrategy indexingStrategy() {
return indexingStrategy;
}
/**
* Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to
* {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will
Expand Down Expand Up @@ -319,6 +331,7 @@ private void writeWithoutShardId(StreamOutput out) throws IOException {
out.writeVLong(primaryTerm);
out.writeBoolean(forcedRefresh);
result.writeTo(out);
indexingStrategy.writeTo(out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ public void markOperationAsExecuted(Engine.Result result) {
result.getSeqNo(),
result.getTerm(),
indexResult.getVersion(),
indexResult.isCreated()
indexResult.isCreated(),
indexResult.indexingStrategy()
);
} else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) {
Engine.DeleteResult deleteResult = (Engine.DeleteResult) result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.engine.InternalEngine;

import java.io.IOException;

Expand All @@ -65,11 +66,15 @@ public IndexResponse(StreamInput in) throws IOException {
}

public IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, boolean created) {
this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED);
this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED, null);
}

private IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) {
super(shardId, id, seqNo, primaryTerm, version, assertCreatedOrUpdated(result));
public IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, boolean created, InternalEngine.IndexingStrategy indexingStrategy) {
this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED, indexingStrategy);
}

private IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result, InternalEngine.IndexingStrategy indexingStrategy) {
super(shardId, id, seqNo, primaryTerm, version, assertCreatedOrUpdated(result), indexingStrategy);
}

private static Result assertCreatedOrUpdated(Result result) {
Expand All @@ -92,6 +97,7 @@ public String toString() {
builder.append(",result=").append(getResult().getLowercase());
builder.append(",seqNo=").append(getSeqNo());
builder.append(",primaryTerm=").append(getPrimaryTerm());
builder.append(",indexingStrategy=" + indexingStrategy());
builder.append(",shards=").append(Strings.toString(MediaTypeRegistry.JSON, getShardInfo()));
return builder.append("]").toString();
}
Expand Down Expand Up @@ -124,7 +130,7 @@ public static void parseXContentFields(XContentParser parser, Builder context) t
public static class Builder extends DocWriteResponse.Builder {
@Override
public IndexResponse build() {
IndexResponse indexResponse = new IndexResponse(shardId, id, seqNo, primaryTerm, version, result);
IndexResponse indexResponse = new IndexResponse(shardId, id, seqNo, primaryTerm, version, result, null);
indexResponse.setForcedRefresh(forcedRefresh);
if (shardInfo != null) {
indexResponse.setShardInfo(shardInfo);
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ protected Result(Operation.TYPE operationType, long version, long term, long seq
this.failure = null;
this.requiredMappingUpdate = null;
this.resultType = Type.SUCCESS;
this.indexingStrategy = indexingStrategy;
}

protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate) {
Expand Down Expand Up @@ -566,6 +567,10 @@ public Operation.TYPE getOperationType() {
return operationType;
}

public InternalEngine.IndexingStrategy indexingStrategy() {
return indexingStrategy;
}

void setTranslogLocation(Translog.Location translogLocation) {
if (freeze.get() == null) {
this.translogLocation = translogLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.LoggerInfoStream;
Expand Down Expand Up @@ -1213,7 +1214,8 @@ private void addStaleDocs(final List<ParseContext.Document> docs, final IndexWri
*
* @opensearch.internal
*/
protected static final class IndexingStrategy implements Writeable {
@PublicApi(since = "3.0.0")
public static final class IndexingStrategy implements Writeable {
final boolean currentNotFoundOrDeleted;
final boolean useLuceneUpdateDocument;
final long versionForIndexing;
Expand All @@ -1222,7 +1224,7 @@ protected static final class IndexingStrategy implements Writeable {
final int reservedDocs;
final Optional<IndexResult> earlyResultOnPreFlightError;

private IndexingStrategy(
public IndexingStrategy(
boolean currentNotFoundOrDeleted,
boolean useLuceneUpdateDocument,
boolean indexIntoLucene,
Expand Down Expand Up @@ -1251,7 +1253,7 @@ private IndexingStrategy(
: Optional.of(earlyResultOnPreFlightError);
}

private IndexingStrategy(StreamInput in) throws IOException {
public IndexingStrategy(StreamInput in) throws IOException {
this.currentNotFoundOrDeleted = in.readBoolean();
this.useLuceneUpdateDocument = in.readBoolean();
this.versionForIndexing = in.readVLong();
Expand Down Expand Up @@ -1303,7 +1305,12 @@ static IndexingStrategy failAsTooManyDocs(Exception e) {

@Override
public void writeTo(StreamOutput out) throws IOException {

out.writeBoolean(currentNotFoundOrDeleted);
out.writeBoolean(useLuceneUpdateDocument);
out.writeVLong(versionForIndexing);
out.writeBoolean(indexIntoLucene);
out.writeBoolean(addStaleOpToLucene);
out.writeVInt(reservedDocs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.index.engine;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.rest.RestStatus;
Expand All @@ -43,6 +44,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "3.0.0")
public class VersionConflictEngineException extends EngineException {

public VersionConflictEngineException(ShardId shardId, Engine.Operation op, long currentVersion, boolean deleted) {
Expand Down

0 comments on commit faa0bd4

Please sign in to comment.