diff --git a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java index 64cd27858c6a8..f9158c9260747 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java +++ b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java @@ -10,8 +10,11 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.core.action.ActionListener; +import org.opensearch.threadpool.ThreadPool; import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; /** * Hook for running code that needs to be executed before the upload of index metadata. Here we have introduced a hook @@ -20,7 +23,16 @@ * * @opensearch.internal */ -public interface IndexMetadataUploadListener { +public abstract class IndexMetadataUploadListener { + + private final ExecutorService executorService; + + public IndexMetadataUploadListener(ThreadPool threadPool, String threadPoolName) { + Objects.requireNonNull(threadPool); + Objects.requireNonNull(threadPoolName); + assert ThreadPool.THREAD_POOL_TYPES.containsKey(threadPoolName) && ThreadPool.Names.SAME.equals(threadPoolName) == false; + this.executorService = threadPool.executor(threadPoolName); + } /** * Runs before the new index upload of index metadata (or first time upload). The caller is expected to trigger @@ -29,7 +41,9 @@ public interface IndexMetadataUploadListener { * @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload). * @param actionListener listener to be invoked on success or failure. */ - void beforeNewIndexUpload(List indexMetadataList, ActionListener actionListener); + public final void onNewIndexUpload(List indexMetadataList, ActionListener actionListener) { + executorService.execute(() -> doOnNewIndexUpload(indexMetadataList, actionListener)); + } - String getThreadpoolName(); + protected abstract void doOnNewIndexUpload(List indexMetadataList, ActionListener actionListener); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index a7fbf752108e2..d2f927c827e5b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -532,17 +532,13 @@ private void invokeIndexMetadataUploadListeners( List exceptionList ) { for (IndexMetadataUploadListener listener : indexMetadataUploadListeners) { - // We are submitting the task for async execution to ensure that we are not blocking the cluster state upload String listenerName = listener.getClass().getSimpleName(); - String threadPoolName = listener.getThreadpoolName(); - assert ThreadPool.THREAD_POOL_TYPES.containsKey(threadPoolName) && ThreadPool.Names.SAME.equals(threadPoolName) == false; - threadpool.executor(threadPoolName).execute(() -> { - listener.beforeNewIndexUpload( - newIndexMetadataList, - getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName) - ); - }); + listener.onNewIndexUpload( + newIndexMetadataList, + getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName) + ); } + } private ActionListener getIndexMetadataUploadActionListener( diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java index 8165ae594d41e..1ac7e41014d23 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java @@ -27,8 +27,8 @@ import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; -import org.opensearch.repositories.blobstore.BlobStoreFormat; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.ConfigBlobStoreFormat; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -57,9 +57,11 @@ * @opensearch.internal */ @ExperimentalApi -public class RemoteIndexPathUploader implements IndexMetadataUploadListener { +public class RemoteIndexPathUploader extends IndexMetadataUploadListener { - public static final BlobStoreFormat REMOTE_INDEX_PATH_FORMAT = new BlobStoreFormat<>(RemoteIndexPath.FILE_NAME_FORMAT); + public static final ConfigBlobStoreFormat REMOTE_INDEX_PATH_FORMAT = new ConfigBlobStoreFormat<>( + RemoteIndexPath.FILE_NAME_FORMAT + ); private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s"; private static final String UPLOAD_EXCEPTION_MSG = "Exception occurred while uploading remote index paths for indexes=%s"; @@ -79,7 +81,13 @@ public class RemoteIndexPathUploader implements IndexMetadataUploadListener { private BlobStoreRepository translogRepository; private BlobStoreRepository segmentRepository; - public RemoteIndexPathUploader(Settings settings, Supplier repositoriesService, ClusterSettings clusterSettings) { + public RemoteIndexPathUploader( + ThreadPool threadPool, + Settings settings, + Supplier repositoriesService, + ClusterSettings clusterSettings + ) { + super(threadPool, ThreadPool.Names.GENERIC); this.settings = Objects.requireNonNull(settings); this.repositoriesService = Objects.requireNonNull(repositoriesService); isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings); @@ -91,12 +99,7 @@ public RemoteIndexPathUploader(Settings settings, Supplier } @Override - public String getThreadpoolName() { - return ThreadPool.Names.GENERIC; - } - - @Override - public void beforeNewIndexUpload(List indexMetadataList, ActionListener actionListener) { + protected void doOnNewIndexUpload(List indexMetadataList, ActionListener actionListener) { if (isRemoteDataAttributePresent == false) { logger.trace("Skipping beforeNewIndexUpload as there are no remote indexes"); actionListener.onResponse(null); @@ -105,24 +108,16 @@ public void beforeNewIndexUpload(List indexMetadataList, ActionLi long startTime = System.nanoTime(); boolean success = false; + List eligibleList = indexMetadataList.stream().filter(this::requiresPathUpload).collect(Collectors.toList()); + String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(",")); + int latchCount = eligibleList.size() * (isTranslogSegmentRepoSame ? 1 : 2); + CountDownLatch latch = new CountDownLatch(latchCount); + List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); try { - List eligibleList = indexMetadataList.stream().filter(this::requiresPathUpload).collect(Collectors.toList()); - int latchCount = eligibleList.size() * (isTranslogSegmentRepoSame ? 1 : 2); - CountDownLatch latch = new CountDownLatch(latchCount); - List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); for (IndexMetadata indexMetadata : eligibleList) { - try { - writeIndexPathAsync(indexMetadata, latch, exceptionList); - } catch (IOException exception) { - RemoteStateTransferException ex = new RemoteStateTransferException( - String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(indexMetadata.getIndex().getName())) - ); - exceptionList.forEach(ex::addSuppressed); - actionListener.onFailure(ex); - return; - } + writeIndexPathAsync(indexMetadata, latch, exceptionList); } - String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(",")); + logger.trace(new ParameterizedMessage("Remote index path upload started for {}", indexNames)); try { @@ -153,9 +148,13 @@ public void beforeNewIndexUpload(List indexMetadataList, ActionLi } success = true; actionListener.onResponse(null); - } catch (Exception ex) { + } catch (Exception exception) { + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, indexNames), + exception + ); + exceptionList.forEach(ex::addSuppressed); actionListener.onFailure(ex); - throw ex; } finally { long tookTimeNs = System.nanoTime() - startTime; logger.trace(new ParameterizedMessage("executed beforeNewIndexUpload status={} tookTimeNs={}", success, tookTimeNs)); @@ -163,7 +162,7 @@ public void beforeNewIndexUpload(List indexMetadataList, ActionLi } - private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List exceptionList) throws IOException { + private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List exceptionList) { if (isTranslogSegmentRepoSame) { // If the repositories are same, then we need to upload a single file containing paths for both translog and segments. writePathToRemoteStore(idxMD, translogRepository, latch, exceptionList, COMBINED_PATH); @@ -181,7 +180,7 @@ private void writePathToRemoteStore( CountDownLatch latch, List exceptionList, Map> pathCreationMap - ) throws IOException { + ) { Map remoteCustomData = idxMD.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.valueOf(remoteCustomData.get(RemoteStoreEnums.PathType.NAME)); RemoteStoreEnums.PathHashAlgorithm hashAlgorithm = RemoteStoreEnums.PathHashAlgorithm.valueOf( @@ -191,13 +190,20 @@ private void writePathToRemoteStore( int shardCount = idxMD.getNumberOfShards(); BlobPath basePath = repository.basePath(); BlobContainer blobContainer = repository.blobStore().blobContainer(basePath.add(RemoteIndexPath.DIR)); - REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( - new RemoteIndexPath(indexUUID, shardCount, basePath, pathType, hashAlgorithm, pathCreationMap), - blobContainer, - indexUUID, - getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap) - ); - + ActionListener actionListener = getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap); + try { + REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( + new RemoteIndexPath(indexUUID, shardCount, basePath, pathType, hashAlgorithm, pathCreationMap), + blobContainer, + indexUUID, + actionListener + ); + } catch (IOException ioException) { + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(idxMD.getIndex().getName())) + ); + actionListener.onFailure(ioException); + } } private Repository validateAndGetRepository(String repoSetting) { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 3e3348ae43a0b..47f128af438a6 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -730,6 +730,7 @@ protected Node( final RemoteIndexPathUploader remoteIndexPathUploader; if (isRemoteStoreClusterStateEnabled(settings)) { remoteIndexPathUploader = new RemoteIndexPathUploader( + threadPool, settings, repositoriesServiceReference::get, clusterService.getClusterSettings() diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/AbstractBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/AbstractBlobStoreFormat.java new file mode 100644 index 0000000000000..f49e738040305 --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/blobstore/AbstractBlobStoreFormat.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.IndexOutputOutputStream; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Locale; +import java.util.Objects; + +/** + * Provides common methods, variables that can be used by the implementors. + * + * @opensearch.internal + */ +public class AbstractBlobStoreFormat { + + private static final int BUFFER_SIZE = 4096; + + private final String blobNameFormat; + + private final boolean skipHeaderFooter; + + /** + * @param blobNameFormat format of the blobname in {@link String#format} format + */ + public AbstractBlobStoreFormat(String blobNameFormat, boolean skipHeaderFooter) { + this.blobNameFormat = blobNameFormat; + this.skipHeaderFooter = skipHeaderFooter; + } + + protected String blobName(String name) { + return String.format(Locale.ROOT, blobNameFormat, name); + } + + /** + * Writes blob with resolving the blob name using {@link #blobName} method. + *

+ * The blob will optionally by compressed. + * + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @param compressor whether to use compression + * @param params ToXContent params + * @param codec codec used + * @param version version used + */ + protected void write( + final T obj, + final BlobContainer blobContainer, + final String name, + final Compressor compressor, + final ToXContent.Params params, + XContentType xContentType, + String codec, + Integer version + ) throws IOException { + final String blobName = blobName(name); + final BytesReference bytes = serialize(obj, blobName, compressor, params, xContentType, codec, version); + blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); + } + + public BytesReference serialize( + final T obj, + final String blobName, + final Compressor compressor, + final ToXContent.Params params, + XContentType xContentType, + String codec, + Integer version + ) throws IOException { + assert skipHeaderFooter || (Objects.nonNull(codec) && Objects.nonNull(version)); + try (BytesStreamOutput outputStream = new BytesStreamOutput()) { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")", + blobName, + outputStream, + BUFFER_SIZE + ) + ) { + if (skipHeaderFooter == false) { + CodecUtil.writeHeader(indexOutput, codec, version); + } + try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { + @Override + public void close() { + // this is important since some of the XContentBuilders write bytes on close. + // in order to write the footer we need to prevent closing the actual index input. + } + }; + XContentBuilder builder = MediaTypeRegistry.contentBuilder( + xContentType, + compressor.threadLocalOutputStream(indexOutputOutputStream) + ) + ) { + builder.startObject(); + obj.toXContent(builder, params); + builder.endObject(); + } + if (skipHeaderFooter == false) { + CodecUtil.writeFooter(indexOutput); + } + } + return outputStream.bytes(); + } + } + + protected String getBlobNameFormat() { + return blobNameFormat; + } +} diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 3e6052a5ef820..de09fe2622ba2 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -38,7 +38,6 @@ import org.apache.lucene.store.ByteBuffersDataInput; import org.apache.lucene.store.ByteBuffersIndexInput; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.util.BytesRef; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.CheckedFunction; @@ -48,26 +47,21 @@ import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.io.Streams; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; -import org.opensearch.common.lucene.store.IndexOutputOutputStream; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.compress.Compressor; -import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.gateway.CorruptStateException; import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.snapshots.SnapshotInfo; import java.io.IOException; -import java.io.OutputStream; import java.util.Arrays; import java.util.HashMap; import java.util.Locale; @@ -80,7 +74,7 @@ * * @opensearch.internal */ -public final class ChecksumBlobStoreFormat { +public final class ChecksumBlobStoreFormat extends AbstractBlobStoreFormat { // Serialization parameters to specify correct context for metadata serialization public static final ToXContent.Params SNAPSHOT_ONLY_FORMAT_PARAMS; @@ -98,12 +92,8 @@ public final class ChecksumBlobStoreFormat { // The format version public static final int VERSION = 1; - private static final int BUFFER_SIZE = 4096; - private final String codec; - private final String blobNameFormat; - private final CheckedFunction reader; /** @@ -112,8 +102,8 @@ public final class ChecksumBlobStoreFormat { * @param reader prototype object that can deserialize T from XContent */ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunction reader) { + super(blobNameFormat, false); this.reader = reader; - this.blobNameFormat = blobNameFormat; this.codec = codec; } @@ -130,7 +120,7 @@ public T read(BlobContainer blobContainer, String name, NamedXContentRegistry na } public String blobName(String name) { - return String.format(Locale.ROOT, blobNameFormat, name); + return String.format(Locale.ROOT, getBlobNameFormat(), name); } public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException { @@ -170,30 +160,7 @@ public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistr * @param compressor whether to use compression */ public void write(final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor) throws IOException { - write(obj, blobContainer, name, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS); - } - - /** - * Writes blob with resolving the blob name using {@link #blobName} method. - *

- * The blob will optionally by compressed. - * - * @param obj object to be serialized - * @param blobContainer blob container - * @param name blob name - * @param compressor whether to use compression - * @param params ToXContent params - */ - public void write( - final T obj, - final BlobContainer blobContainer, - final String name, - final Compressor compressor, - final ToXContent.Params params - ) throws IOException { - final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor, params); - blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); + write(obj, blobContainer, name, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS, XContentType.SMILE, codec, VERSION); } /** @@ -251,7 +218,7 @@ private void writeAsyncWithPriority( final ToXContent.Params params ) throws IOException { if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { - write(obj, blobContainer, name, compressor, params); + write(obj, blobContainer, name, compressor, params, XContentType.SMILE, codec, VERSION); listener.onResponse(null); return; } @@ -290,35 +257,6 @@ private void writeAsyncWithPriority( public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params) throws IOException { - try (BytesStreamOutput outputStream = new BytesStreamOutput()) { - try ( - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( - "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")", - blobName, - outputStream, - BUFFER_SIZE - ) - ) { - CodecUtil.writeHeader(indexOutput, codec, VERSION); - try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { - @Override - public void close() throws IOException { - // this is important since some of the XContentBuilders write bytes on close. - // in order to write the footer we need to prevent closing the actual index input. - } - }; - XContentBuilder builder = MediaTypeRegistry.contentBuilder( - XContentType.SMILE, - compressor.threadLocalOutputStream(indexOutputOutputStream) - ) - ) { - builder.startObject(); - obj.toXContent(builder, params); - builder.endObject(); - } - CodecUtil.writeFooter(indexOutput); - } - return outputStream.bytes(); - } + return serialize(obj, blobName, compressor, params, XContentType.SMILE, codec, VERSION); } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java similarity index 66% rename from server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreFormat.java rename to server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java index 0e8a98e1c62f6..98aca64498d44 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java @@ -15,17 +15,16 @@ import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.lucene.store.ByteArrayIndexInput; -import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.core.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Locale; /** - * Standard format that has only content for writes. Read interface does not exist as it not yet required. This format + * Format for writing short configurations to remote. Read interface does not exist as it not yet required. This format * should be used for writing data from in-memory to remote store where there is no need for checksum and the client * library for the remote store has inbuilt checksum capabilities while upload and download both. This format would * serialise the data in Json format and store it on remote store as is. This does not support compression yet (this @@ -35,39 +34,24 @@ * * @opensearch.internal */ -public class BlobStoreFormat { - - private final String blobNameFormat; +public class ConfigBlobStoreFormat extends AbstractBlobStoreFormat { /** * @param blobNameFormat format of the blobname in {@link String#format} format */ - public BlobStoreFormat(String blobNameFormat) { - this.blobNameFormat = blobNameFormat; - } - - private String blobName(String name) { - return String.format(Locale.ROOT, blobNameFormat, name); - } - - private BytesReference serialize(final T obj) throws IOException { - try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { - xContentBuilder.startObject(); - obj.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); - xContentBuilder.endObject(); - return BytesReference.bytes(xContentBuilder); - } + public ConfigBlobStoreFormat(String blobNameFormat) { + super(blobNameFormat, true); } public void writeAsyncWithUrgentPriority(T obj, BlobContainer blobContainer, String name, ActionListener listener) throws IOException { if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { - write(obj, blobContainer, name); + write(obj, blobContainer, name, new NoneCompressor(), ToXContent.EMPTY_PARAMS, XContentType.JSON, null, null); listener.onResponse(null); return; } String blobName = blobName(name); - BytesReference bytes = serialize(obj); + BytesReference bytes = serialize(obj, blobName, new NoneCompressor(), ToXContent.EMPTY_PARAMS, XContentType.JSON, null, null); String resourceDescription = "BlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")"; try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { try ( @@ -79,18 +63,11 @@ public void writeAsyncWithUrgentPriority(T obj, BlobContainer blobContainer, Str WritePriority.URGENT, (size, position) -> new OffsetRangeIndexInputStream(input, size, position), null, - ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported() + false ) ) { ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); } } } - - private void write(final T obj, final BlobContainer blobContainer, final String name) throws IOException { - String blobName = blobName(name); - BytesReference bytes = serialize(obj); - blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); - } - } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 012d2623b45df..3ba98c44f8d3e 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -490,7 +490,7 @@ public void testDataOnlyNodePersistence() throws Exception { clusterSettings, () -> 0L, threadPool, - List.of(new RemoteIndexPathUploader(settings, repositoriesServiceSupplier, clusterSettings)) + List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) ); } else { return null; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 9ddbce3f794e6..9f321cd62847c 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -156,7 +156,7 @@ public void setup() { clusterSettings, () -> 0L, threadPool, - List.of(new RemoteIndexPathUploader(settings, repositoriesServiceSupplier, clusterSettings)) + List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) ); } @@ -185,7 +185,7 @@ public void testFailInitializationWhenRemoteStateDisabled() { clusterSettings, () -> 0L, threadPool, - List.of(new RemoteIndexPathUploader(settings, repositoriesServiceSupplier, clusterSettings)) + List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) ) ); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java index c70523cf542a3..150ea4765b05f 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java @@ -29,6 +29,8 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.junit.Before; import java.io.IOException; @@ -62,6 +64,7 @@ public class RemoteIndexPathUploaderTests extends OpenSearchTestCase { private static final String TRANSLOG_REPO_NAME = "translog-repo"; private static final String SEGMENT_REPO_NAME = "segment-repo"; + private final ThreadPool threadPool = new TestThreadPool(getTestName()); private Settings settings; private ClusterSettings clusterSettings; private RepositoriesService repositoriesService; @@ -112,35 +115,50 @@ public void setup() { indexMetadataList = List.of(indexMetadata); } - public void testInterceptWithNoRemoteDataAttributes() throws IOException { + public void testInterceptWithNoRemoteDataAttributes() { Settings settings = Settings.Builder.EMPTY_SETTINGS; clusterSettings.applySettings(settings); - RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader(settings, () -> repositoriesService, clusterSettings); + RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( + threadPool, + settings, + () -> repositoriesService, + clusterSettings + ); List indexMetadataList = Mockito.mock(List.class); ActionListener actionListener = ActionListener.wrap( res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); verify(indexMetadataList, times(0)).stream(); } - public void testInterceptWithEmptyIndexMetadataList() throws IOException { - RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader(settings, () -> repositoriesService, clusterSettings); + public void testInterceptWithEmptyIndexMetadataList() { + RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( + threadPool, + settings, + () -> repositoriesService, + clusterSettings + ); remoteIndexPathUploader.start(); ActionListener actionListener = ActionListener.wrap( res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.beforeNewIndexUpload(Collections.emptyList(), actionListener); + remoteIndexPathUploader.onNewIndexUpload(Collections.emptyList(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); } - public void testInterceptWithEmptyEligibleIndexMetadataList() throws IOException { - RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader(settings, () -> repositoriesService, clusterSettings); + public void testInterceptWithEmptyEligibleIndexMetadataList() { + RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( + threadPool, + settings, + () -> repositoriesService, + clusterSettings + ); remoteIndexPathUploader.start(); ActionListener actionListener = ActionListener.wrap( res -> successCount.incrementAndGet(), @@ -151,39 +169,44 @@ public void testInterceptWithEmptyEligibleIndexMetadataList() throws IOException List indexMetadataList = new ArrayList<>(); IndexMetadata indexMetadata = mock(IndexMetadata.class); indexMetadataList.add(indexMetadata); - remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); // Case 2 - Empty remoteCustomData when(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)).thenReturn(new HashMap<>()); - remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); assertEquals(2, successCount.get()); assertEquals(0, failureCount.get()); // Case 3 - RemoteStoreEnums.PathType.NAME not in remoteCustomData map Map remoteCustomData = Map.of("test", "test"); when(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)).thenReturn(remoteCustomData); - remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); assertEquals(3, successCount.get()); assertEquals(0, failureCount.get()); // Case 4 - RemoteStoreEnums.PathType.NAME is not HASHED_PREFIX remoteCustomData = Map.of(PathType.NAME, randomFrom(FIXED, HASHED_INFIX).name()); when(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)).thenReturn(remoteCustomData); - remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); assertEquals(4, successCount.get()); assertEquals(0, failureCount.get()); } public void testInterceptWithSameRepo() throws IOException { - RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader(settings, () -> repositoriesService, clusterSettings); + RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( + threadPool, + settings, + () -> repositoriesService, + clusterSettings + ); remoteIndexPathUploader.start(); ActionListener actionListener = ActionListener.wrap( res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); verify(blobContainer, times(1)).writeBlob(anyString(), any(InputStream.class), anyLong(), anyBoolean()); @@ -195,13 +218,18 @@ public void testInterceptWithDifferentRepo() throws IOException { .put(RemoteIndexPathUploader.SEGMENT_REPO_NAME_KEY, SEGMENT_REPO_NAME) .build(); when(repositoriesService.repository(SEGMENT_REPO_NAME)).thenReturn(repository); - RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader(settings, () -> repositoriesService, clusterSettings); + RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( + threadPool, + settings, + () -> repositoriesService, + clusterSettings + ); remoteIndexPathUploader.start(); ActionListener actionListener = ActionListener.wrap( res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); verify(blobContainer, times(2)).writeBlob(anyString(), any(InputStream.class), anyLong(), anyBoolean()); @@ -210,7 +238,12 @@ public void testInterceptWithDifferentRepo() throws IOException { public void testInterceptWithLatchAwaitTimeout() throws IOException { blobContainer = mock(AsyncMultiStreamBlobContainer.class); when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); - RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader(settings, () -> repositoriesService, clusterSettings); + RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( + threadPool, + settings, + () -> repositoriesService, + clusterSettings + ); remoteIndexPathUploader.start(); Settings settings = Settings.builder() @@ -223,7 +256,7 @@ public void testInterceptWithLatchAwaitTimeout() throws IOException { failureCount.incrementAndGet(); exceptionSetOnce.set(ex); }); - remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); assertEquals(0, successCount.get()); assertEquals(1, failureCount.get()); assertTrue(exceptionSetOnce.get() instanceof RemoteStateTransferException); @@ -236,7 +269,12 @@ public void testInterceptWithLatchAwaitTimeout() throws IOException { public void testInterceptWithInterruptedExceptionDuringLatchAwait() throws Exception { AsyncMultiStreamBlobContainer asyncMultiStreamBlobContainer = mock(AsyncMultiStreamBlobContainer.class); when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(asyncMultiStreamBlobContainer); - RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader(settings, () -> repositoriesService, clusterSettings); + RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( + threadPool, + settings, + () -> repositoriesService, + clusterSettings + ); remoteIndexPathUploader.start(); Settings settings = Settings.builder() .put(this.settings) @@ -250,7 +288,7 @@ public void testInterceptWithInterruptedExceptionDuringLatchAwait() throws Excep }); Thread thread = new Thread(() -> { try { - remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); } catch (Exception e) { assertTrue(e instanceof InterruptedException); assertEquals("sleep interrupted", e.getMessage());