From 30b1961bbc78171bae71066d6bc8e51a14a18e48 Mon Sep 17 00:00:00 2001 From: kkewwei Date: Sat, 18 Jan 2025 16:43:20 +0800 Subject: [PATCH] test --- .../org/opensearch/index/engine/Engine.java | 35 ++++++++-- .../index/engine/InternalEngine.java | 69 +++++++++++++------ 2 files changed, 79 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index c945d082c9a35..1d163f65313ca 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -1518,8 +1518,9 @@ public String getLowercase() { private final VersionType versionType; private final Origin origin; private final long startTime; + private final InternalEngine.IndexingStrategy indexingStrategy; - public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime) { + public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime, InternalEngine.IndexingStrategy indexingStrategy) { this.uid = uid; this.seqNo = seqNo; this.primaryTerm = primaryTerm; @@ -1527,6 +1528,7 @@ public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionTy this.versionType = versionType; this.origin = origin; this.startTime = startTime; + this.indexingStrategy = indexingStrategy; } /** @@ -1587,6 +1589,10 @@ public long startTime() { abstract String id(); public abstract TYPE operationType(); + + public InternalEngine.IndexingStrategy indexingStrategy() { + return indexingStrategy; + }; } /** @@ -1617,7 +1623,25 @@ public Index( long ifSeqNo, long ifPrimaryTerm ) { - super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + this(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry, ifSeqNo, ifPrimaryTerm, null); + } + + public Index( + Term uid, + ParsedDocument doc, + long seqNo, + long primaryTerm, + long version, + VersionType versionType, + Origin origin, + long startTime, + long autoGeneratedIdTimestamp, + boolean isRetry, + long ifSeqNo, + long ifPrimaryTerm, + InternalEngine.IndexingStrategy indexingStrategy + ) { + super(uid, seqNo, primaryTerm, version, versionType, origin, startTime, indexingStrategy); assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative"; assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset"; @@ -1630,6 +1654,7 @@ public Index( this.ifPrimaryTerm = ifPrimaryTerm; } + public Index(Term uid, long primaryTerm, ParsedDocument doc) { this(uid, primaryTerm, doc, Versions.MATCH_ANY); } // TEST ONLY @@ -1706,6 +1731,8 @@ public long getIfSeqNo() { public long getIfPrimaryTerm() { return ifPrimaryTerm; } + + } /** @@ -1732,7 +1759,7 @@ public Delete( long ifSeqNo, long ifPrimaryTerm ) { - super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + super(uid, seqNo, primaryTerm, version, versionType, origin, startTime, null); assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative"; assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset"; @@ -1812,7 +1839,7 @@ public String reason() { } public NoOp(final long seqNo, final long primaryTerm, final Origin origin, final long startTime, final String reason) { - super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime); + super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime, null); this.reason = reason; } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index c6fc07629ae4e..b44ad12becf2a 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -684,7 +684,7 @@ enum OpVsLuceneDocStatus { /** the op is more recent than the one that last modified the doc found in lucene*/ OP_NEWER, /** the op is older or the same as the one that last modified the doc found in lucene*/ - OP_STALE_OR_EQUAL, + OP_STALE_OR_EQUAL,// 一样,或者更旧的 /** no doc was found in lucene */ LUCENE_DOC_NOT_FOUND } @@ -692,8 +692,8 @@ enum OpVsLuceneDocStatus { private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) { Objects.requireNonNull(versionValue); if (seqNo > versionValue.seqNo) { - return OpVsLuceneDocStatus.OP_NEWER; - } else if (seqNo == versionValue.seqNo) { + return OpVsLuceneDocStatus.OP_NEWER;// 新的 + } else if (seqNo == versionValue.seqNo) {// 一样 assert versionValue.term == primaryTerm : "primary term not matched; id=" + id + " seq_no=" @@ -711,12 +711,13 @@ private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; - VersionValue versionValue = getVersionFromMap(op.uid().bytes()); + + VersionValue versionValue = getVersionFromMap(op.uid().bytes());// 从maps中找下,看可以找到吗 assert incrementVersionLookup(); boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); - if (versionValue != null) { + if (versionValue != null) {// maps中找到了 status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue); - } else { + } else {// 没找到 // load from index assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { @@ -906,10 +907,11 @@ public IndexResult index(Index index) throws IOException { index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), - index.getIfPrimaryTerm() + index.getIfPrimaryTerm(), + plan ); - final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; + final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;// 不带主键的写入 if (toAppend == false) { advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo()); } @@ -922,6 +924,9 @@ public IndexResult index(Index index) throws IOException { if (plan.indexIntoLucene || plan.addStaleOpToLucene) { indexResult = indexIntoLucene(index, plan); } else { + if (plan.versionForIndexing != index.indexingStrategy().versionForIndexing || plan.currentNotFoundOrDeleted != index.indexingStrategy().currentNotFoundOrDeleted) { + throw new RuntimeException("plan.versionForIndexing != index.indexingStrategy().versionForIndexing || plan.currentNotFoundOrDeleted != index.indexingStrategy().currentNotFoundOrDeleted"); + } indexResult = new IndexResult( plan.versionForIndexing, index.primaryTerm(), @@ -956,6 +961,9 @@ public IndexResult index(Index index) throws IOException { index.uid().bytes(), new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm()) ); + if (plan.versionForIndexing != index.indexingStrategy().versionForIndexing) { + System.out.println("wrong"); + } } localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo()); if (indexResult.getTranslogLocation() == null) { @@ -987,7 +995,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO assert assertNonPrimaryOrigin(index); // needs to maintain the auto_id timestamp in case this replica becomes primary if (canOptimizeAddDocument(index)) {// 如果不带主键的写入,肯定可以优化 - mayHaveBeenIndexedBefore(index); + mayHaveBeenIndexedBefore(index);// 可能更新maxUnsafeAutoIdTimestamp和必须更新maxSeenAutoIdTimestamp } final IndexingStrategy plan; // unlike the primary, replicas don't really care to about creation status of documents @@ -1010,19 +1018,24 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO } else { boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); versionMap.enforceSafeAccess(); - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - if (segRepEnabled) { - // For segrep based indices, we can't completely rely on localCheckpointTracker - // as the preserved checkpoint may not have all the operations present in lucene - // we don't need to index it again as stale op as it would create multiple documents for same seq no - plan = IndexingStrategy.processButSkipLucene(false, index.version()); - } else { - plan = IndexingStrategy.processAsStaleOp(index.version()); + +// if (index.IndexingStrategy() != null && segRepEnabled == false) { +// return index.IndexingStrategy(); +// } else { + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {// seqNo小于等于当前已经写入的 + if (segRepEnabled) { + // For segrep based indices, we can't completely rely on localCheckpointTracker + // as the preserved checkpoint may not have all the operations present in lucene + // we don't need to index it again as stale op as it would create multiple documents for same seq no + plan = IndexingStrategy.processButSkipLucene(false, index.version()); + } else { + plan = IndexingStrategy.processAsStaleOp(index.version()); + } + } else {// 更新的,或者没找到 + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0); } - } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0); - } +// } } return plan; } @@ -1111,6 +1124,11 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin(); assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing; assert plan.indexIntoLucene || plan.addStaleOpToLucene; + InternalEngine.IndexingStrategy indexingStrategy = index.indexingStrategy(); + if (indexingStrategy.versionForIndexing != plan.versionForIndexing || (indexingStrategy.indexIntoLucene || indexingStrategy.addStaleOpToLucene) == false) { + throw new RuntimeException("dddddd 0"); + } + /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. @@ -1119,8 +1137,14 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I index.parsedDoc().version().setLongValue(plan.versionForIndexing); try { if (plan.addStaleOpToLucene) { + if (indexingStrategy.addStaleOpToLucene == false) { + throw new RuntimeException("indexingStrategy.addStaleOpToLucene == false"); + } addStaleDocs(index.docs(), indexWriter); } else if (plan.useLuceneUpdateDocument) { + if (indexingStrategy.useLuceneUpdateDocument == false) { + throw new RuntimeException("indexingStrategy.useLuceneUpdateDocument == false"); + } assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true); updateDocs(index.uid(), index.docs(), indexWriter); } else { @@ -1128,6 +1152,9 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); addDocs(index.docs(), indexWriter); } + if (plan.versionForIndexing != indexingStrategy.versionForIndexing || plan.currentNotFoundOrDeleted != indexingStrategy.currentNotFoundOrDeleted) { + throw new RuntimeException("plan.versionForIndexing != indexingStrategy.versionForIndexing || plan.currentNotFoundOrDeleted != indexingStrategy.currentNotFoundOrDeleted"); + } return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted); } catch (Exception ex) { if (ex instanceof AlreadyClosedException == false