From 12ac2a8bea9d61c1f000a903f96483bc52f9b552 Mon Sep 17 00:00:00 2001 From: rishavz_sagar Date: Wed, 6 Sep 2023 18:32:05 +0530 Subject: [PATCH] Cleanup Unreferenced file on segment merge failure (#9503) (#9808) (cherry picked from commit 37bdb6bb6536b31392f1dcfaa0eb0f0ab283c2c9) Signed-off-by: Rishav Sagar --- CHANGELOG.md | 1 + .../common/settings/IndexScopedSettings.java | 1 + .../org/opensearch/index/IndexSettings.java | 27 ++ .../org/opensearch/index/engine/Engine.java | 43 +++ .../index/engine/InternalEngine.java | 4 +- .../index/engine/InternalEngineTests.java | 323 ++++++++++++++++++ .../index/engine/EngineTestCase.java | 33 +- .../test/OpenSearchIntegTestCase.java | 12 + 8 files changed, 440 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bcd4416329ff..b0e0b35cb90d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -116,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507)) - [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263)) - Removing the vec file extension from INDEX_STORE_HYBRID_NIO_EXTENSIONS, to ensure the no performance degradation for vector search via Lucene Engine.([#9528](https://github.com/opensearch-project/OpenSearch/pull/9528))) +- Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 68d02151b50f5..30a3550d62c8a 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -171,6 +171,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexSettings.INDEX_SEARCH_IDLE_AFTER, IndexSettings.INDEX_SEARCH_THROTTLED, + IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index f57161619f3e1..768262dd8e980 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -514,6 +514,18 @@ public final class IndexSettings { Property.Dynamic ); + /** + * This setting controls if unreferenced files will be cleaned up in case segment merge fails due to disk full. + * + * Defaults to true which means unreferenced files will be cleaned up in case segment merge fails. + */ + public static final Setting INDEX_UNREFERENCED_FILE_CLEANUP = Setting.boolSetting( + "index.unreferenced_file_cleanup.enabled", + true, + Property.IndexScope, + Property.Dynamic + ); + /** * Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an * operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted @@ -678,6 +690,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { private volatile String defaultPipeline; private volatile String requiredPipeline; private volatile boolean searchThrottled; + private volatile boolean shouldCleanupUnreferencedFiles; private volatile long mappingNestedFieldsLimit; private volatile long mappingNestedDocsLimit; private volatile long mappingTotalFieldsLimit; @@ -794,6 +807,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti extendedCompatibilitySnapshotVersion = Version.CURRENT.minimumIndexCompatibilityVersion(); } this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); + this.shouldCleanupUnreferencedFiles = INDEX_UNREFERENCED_FILE_CLEANUP.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings); @@ -906,6 +920,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(FINAL_PIPELINE, this::setRequiredPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled); + scopedSettings.addSettingsUpdateConsumer(INDEX_UNREFERENCED_FILE_CLEANUP, this::setShouldCleanupUnreferencedFiles); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis); scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING, this::setMappingNestedFieldsLimit); scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING, this::setMappingNestedDocsLimit); @@ -1539,6 +1554,18 @@ private void setSearchThrottled(boolean searchThrottled) { this.searchThrottled = searchThrottled; } + /** + * Returns true if unreferenced files should be cleaned up on merge failure for this index. + * + */ + public boolean shouldCleanupUnreferencedFiles() { + return shouldCleanupUnreferencedFiles; + } + + private void setShouldCleanupUnreferencedFiles(boolean shouldCleanupUnreferencedFiles) { + this.shouldCleanupUnreferencedFiles = shouldCleanupUnreferencedFiles; + } + public long getMappingNestedFieldsLimit() { return mappingNestedFieldsLimit; } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 08db92af23d01..dd74265990054 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -38,7 +38,10 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; @@ -135,6 +138,8 @@ public abstract class Engine implements Closeable { public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum? public static final String CAN_MATCH_SEARCH_SOURCE = "can_match"; + public static final String FORCE_MERGE = "force merge"; + public static final String MERGE_FAILED = "merge failed"; protected final ShardId shardId; protected final Logger logger; @@ -983,6 +988,10 @@ protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegm } } + boolean shouldCleanupUnreferencedFiles() { + return engineConfig.getIndexSettings().shouldCleanupUnreferencedFiles(); + } + private Map getSegmentFileSizes(SegmentReader segmentReader) { Directory directory = null; SegmentCommitInfo segmentCommitInfo = segmentReader.getSegmentInfo(); @@ -1343,6 +1352,14 @@ public void failEngine(String reason, @Nullable Exception failure) { ); } } + + // If cleanup of unreferenced flag is enabled and force merge or regular merge failed due to IOException, + // clean all unreferenced files on best effort basis created during failed merge and reset the + // shard state back to last Lucene Commit. + if (shouldCleanupUnreferencedFiles() && isMergeFailureDueToIOException(failure, reason)) { + cleanUpUnreferencedFiles(); + } + eventListener.onFailedEngine(reason, failure); } } catch (Exception inner) { @@ -1361,6 +1378,32 @@ public void failEngine(String reason, @Nullable Exception failure) { } } + /** + * Cleanup all unreferenced files generated during failed segment merge. This resets shard state to last Lucene + * commit. + */ + private void cleanUpUnreferencedFiles() { + try ( + IndexWriter writer = new IndexWriter( + store.directory(), + new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) + .setCommitOnClose(false) + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND) + ) + ) { + // do nothing and close this will kick off IndexFileDeleter which will remove all unreferenced files. + } catch (Exception ex) { + logger.error("Error while deleting unreferenced file ", ex); + } + } + + /** Check whether the merge failure happened due to IOException. */ + private boolean isMergeFailureDueToIOException(Exception failure, String reason) { + return (reason.equals(FORCE_MERGE) || reason.equals(MERGE_FAILED)) + && ExceptionsHelper.unwrap(failure, IOException.class) instanceof IOException; + } + /** Check whether the engine should be failed */ protected boolean maybeFailEngine(String source, Exception e) { if (Lucene.isCorruptionException(e)) { diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 08dbb029398ce..a401d27318fb8 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2189,7 +2189,7 @@ public void forceMerge( throw ex; } catch (Exception e) { try { - maybeFailEngine("force merge", e); + maybeFailEngine(FORCE_MERGE, e); } catch (Exception inner) { e.addSuppressed(inner); } @@ -2639,7 +2639,7 @@ protected void doRun() throws Exception { * confidence that the call stack does not contain catch statements that would cause the error that might be thrown * here from being caught and never reaching the uncaught exception handler. */ - failEngine("merge failed", new MergePolicy.MergeException(exc)); + failEngine(MERGE_FAILED, new MergePolicy.MergeException(exc)); } }); } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 239500b48ccf1..533572e0e879b 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -40,6 +40,7 @@ import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.filter.RegexFilter; +import org.apache.lucene.codecs.LiveDocsFormat; import org.apache.lucene.document.Field; import org.apache.lucene.document.KeywordField; import org.apache.lucene.document.LongPoint; @@ -78,6 +79,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; +import org.apache.lucene.tests.mockfile.ExtrasFS; import org.apache.lucene.tests.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -152,6 +154,7 @@ import org.opensearch.index.translog.TranslogException; import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.MockLogAppender; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; @@ -3231,6 +3234,326 @@ public void testFailStart() throws IOException { } } + public void testUnreferencedFileCleanUpOnSegmentMergeFailureWithCleanUpEnabled() throws Exception { + MockDirectoryWrapper wrapper = newMockDirectory(); + final CountDownLatch cleanupCompleted = new CountDownLatch(1); + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + public boolean didFail1; + public boolean didFail2; + + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + if (!doFail) { + return; + } + + // Fail segment merge with diskfull during merging terms. + if (callStackContainsAnyOf("mergeTerms") && !didFail1) { + didFail1 = true; + throw new IOException("No space left on device"); + } + if (callStackContains(LiveDocsFormat.class, "writeLiveDocs") && !didFail2) { + didFail2 = true; + throw new IOException("No space left on device"); + } + } + }; + + wrapper.failOn(fail); + try { + Store store = createStore(wrapper); + final Engine.EventListener eventListener = new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, Exception e) { + try { + // extra0 file is added as a part of + // https://lucene.apache.org/core/7_2_1/test-framework/org/apache/lucene/mockfile/ExtrasFS.html + // Safe to remove from file count along with write.lock without impacting the test. + long fileCount = Arrays.stream(store.directory().listAll()) + .filter(file -> file.equals("write.lock") == false && ExtrasFS.isExtra(file) == false) + .count(); + + // Since only one document is committed and unreferenced files are cleaned up, + // there are 4 files (*cfs, *cfe, *si and segments_*). + assertThat(fileCount, equalTo(4L)); + wrapper.close(); + store.close(); + engine.close(); + cleanupCompleted.countDown(); + } catch (IOException ex) { + throw new AssertionError(ex); + } + } + }; + + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final AtomicLong retentionLeasesVersion = new AtomicLong(); + final AtomicReference retentionLeasesHolder = new AtomicReference<>( + new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList()) + ); + InternalEngine engine = createEngine( + config( + defaultSettings, + store, + createTempDir(), + newMergePolicy(), + null, + null, + null, + globalCheckpoint::get, + retentionLeasesHolder::get, + new NoneCircuitBreakerService(), + eventListener + ) + ); + + List segments = engine.segments(true); + assertThat(segments.isEmpty(), equalTo(true)); + + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + engine.refresh("test"); + engine.flush(); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(1)); + + ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null); + engine.index(indexForDoc(doc2)); + engine.refresh("test"); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(2)); + + fail.setDoFail(); + // IndexWriter can throw either IOException or IllegalStateException depending on whether tragedy is set or not. + expectThrowsAnyOf( + Arrays.asList(IOException.class, IllegalStateException.class), + () -> engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()) + ); + + assertTrue(cleanupCompleted.await(10, TimeUnit.SECONDS)); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + + public void testUnreferencedFileCleanUpOnSegmentMergeFailureWithCleanUpDisabled() throws Exception { + MockDirectoryWrapper wrapper = newMockDirectory(); + final CountDownLatch cleanupCompleted = new CountDownLatch(1); + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + public boolean didFail1; + public boolean didFail2; + + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + if (!doFail) { + return; + } + if (callStackContainsAnyOf("mergeTerms") && !didFail1) { + didFail1 = true; + throw new IOException("No space left on device"); + } + if (callStackContains(LiveDocsFormat.class, "writeLiveDocs") && !didFail2) { + didFail2 = true; + throw new IOException("No space left on device"); + } + } + }; + + wrapper.failOn(fail); + try { + Store store = createStore(wrapper); + final Engine.EventListener eventListener = new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, Exception e) { + try { + // extra0 file is added as a part of + // https://lucene.apache.org/core/7_2_1/test-framework/org/apache/lucene/mockfile/ExtrasFS.html + // Safe to remove from file count along with write.lock without impacting the test + long fileCount = Arrays.stream(store.directory().listAll()) + .filter(file -> file.equals("write.lock") == false && ExtrasFS.isExtra(file) == false) + .count(); + + // Since now cleanup is not happening now, all unrefrenced files now be present as well. + assertThat(fileCount, equalTo(13L)); + wrapper.close(); + store.close(); + engine.close(); + cleanupCompleted.countDown(); + } catch (IOException ex) { + throw new AssertionError(ex); + } + } + }; + + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final AtomicLong retentionLeasesVersion = new AtomicLong(); + final AtomicReference retentionLeasesHolder = new AtomicReference<>( + new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList()) + ); + InternalEngine engine = createEngine( + config( + defaultSettings, + store, + createTempDir(), + newMergePolicy(), + null, + null, + null, + globalCheckpoint::get, + retentionLeasesHolder::get, + new NoneCircuitBreakerService(), + eventListener + ) + ); + + // Disable cleanup + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata()) + .settings( + Settings.builder().put(indexSettings.getSettings()).put(IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP.getKey(), false) + ) + .build(); + indexSettings.updateIndexMetadata(indexMetadata); + + List segments = engine.segments(true); + assertThat(segments.isEmpty(), equalTo(true)); + + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + engine.refresh("test"); + engine.flush(); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(1)); + + ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null); + engine.index(indexForDoc(doc2)); + engine.refresh("test"); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(2)); + + fail.setDoFail(); + // IndexWriter can throw either IOException or IllegalStateException depending on whether tragedy is set or not. + expectThrowsAnyOf( + Arrays.asList(IOException.class, IllegalStateException.class), + () -> engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()) + ); + + assertTrue(cleanupCompleted.await(10, TimeUnit.SECONDS)); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + + public void testUnreferencedFileCleanUpFailsOnSegmentMergeFailureWhenDirectoryClosed() throws Exception { + MockDirectoryWrapper wrapper = newMockDirectory(); + final CountDownLatch cleanupCompleted = new CountDownLatch(1); + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + public boolean didFail1; + public boolean didFail2; + + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + if (!doFail) { + return; + } + if (callStackContainsAnyOf("mergeTerms") && !didFail1) { + didFail1 = true; + throw new IOException("No space left on device"); + } + if (callStackContains(LiveDocsFormat.class, "writeLiveDocs") && !didFail2) { + didFail2 = true; + throw new IOException("No space left on device"); + } + } + }; + + wrapper.failOn(fail); + MockLogAppender mockAppender = MockLogAppender.createForLoggers(Loggers.getLogger(Engine.class, shardId)); + try { + Store store = createStore(wrapper); + final Engine.EventListener eventListener = new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, Exception e) { + try { + store.close(); + engine.close(); + mockAppender.assertAllExpectationsMatched(); + mockAppender.close(); + cleanupCompleted.countDown(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + }; + + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final AtomicLong retentionLeasesVersion = new AtomicLong(); + final AtomicReference retentionLeasesHolder = new AtomicReference<>( + new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList()) + ); + InternalEngine engine = createEngine( + config( + defaultSettings, + store, + createTempDir(), + newMergePolicy(), + null, + null, + null, + globalCheckpoint::get, + retentionLeasesHolder::get, + new NoneCircuitBreakerService(), + eventListener + ) + ); + + List segments = engine.segments(true); + assertThat(segments.isEmpty(), equalTo(true)); + + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + engine.refresh("test"); + engine.flush(); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(1)); + + ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null); + engine.index(indexForDoc(doc2)); + engine.refresh("test"); + + segments = engine.segments(false); + assertThat(segments.size(), equalTo(2)); + + fail.setDoFail(); + // Close the store so that unreferenced file cleanup will fail. + store.close(); + + final String message = "Error while deleting unreferenced file *"; + mockAppender.start(); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation("expected message", Engine.class.getCanonicalName(), Level.ERROR, message) + ); + + // IndexWriter can throw either IOException or IllegalStateException depending on whether tragedy is set or not. + expectThrowsAnyOf( + Arrays.asList(IOException.class, IllegalStateException.class), + () -> engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()) + ); + + assertTrue(cleanupCompleted.await(10, TimeUnit.SECONDS)); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + public void testSettings() { CodecService codecService = new CodecService(null, engine.config().getIndexSettings(), logger); LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 018bc675c069b..5376d0b45da76 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -840,10 +840,39 @@ public EngineConfig config( final @Nullable Supplier maybeRetentionLeasesSupplier, final CircuitBreakerService breakerService ) { - final IndexWriterConfig iwc = newIndexWriterConfig(); - final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); final Engine.EventListener eventListener = new Engine.EventListener() { }; // we don't need to notify anybody in this test + + return config( + indexSettings, + store, + translogPath, + mergePolicy, + externalRefreshListener, + internalRefreshListener, + indexSort, + maybeGlobalCheckpointSupplier, + maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY, + breakerService, + eventListener + ); + } + + public EngineConfig config( + final IndexSettings indexSettings, + final Store store, + final Path translogPath, + final MergePolicy mergePolicy, + final ReferenceManager.RefreshListener externalRefreshListener, + final ReferenceManager.RefreshListener internalRefreshListener, + final Sort indexSort, + final @Nullable LongSupplier maybeGlobalCheckpointSupplier, + final @Nullable Supplier maybeRetentionLeasesSupplier, + final CircuitBreakerService breakerService, + final Engine.EventListener eventListener + ) { + final IndexWriterConfig iwc = newIndexWriterConfig(); + final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); final List extRefreshListenerList = externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 665dcb013490a..c75a153b39b85 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -1496,6 +1496,18 @@ protected ForceMergeResponse forceMerge() { return actionGet; } + protected ForceMergeResponse forceMerge(int maxNumSegments) { + waitForRelocation(); + ForceMergeResponse actionGet = client().admin() + .indices() + .prepareForceMerge() + .setMaxNumSegments(maxNumSegments) + .execute() + .actionGet(); + assertNoFailures(actionGet); + return actionGet; + } + /** * Returns true iff the given index exists otherwise false */