diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java index 62c706a18c1e8..567e522482a7d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java @@ -53,7 +53,7 @@ public void writeAsync(final U entity, final ActionListener listener) { try (InputStream inputStream = entity.serialize()) { BlobPath blobPath = getBlobPathForUpload(entity); entity.setFullBlobName(blobPath); - transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT, listener); + transferService.uploadBlobAsync(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT, listener); } } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 0babe8800fd16..318e204daba65 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -20,14 +20,12 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.FetchBlobResult; -import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.index.translog.ChannelFactory; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -111,7 +109,7 @@ public void uploadBlobs( } @Override - public void uploadBlob(InputStream inputStream, Iterable remotePath, String blobName, WritePriority writePriority, ActionListener listener) throws IOException { + public void uploadBlobAsync(InputStream inputStream, Iterable remotePath, String blobName, WritePriority writePriority, ActionListener listener) throws IOException { assert remotePath instanceof BlobPath; BlobPath blobPath = (BlobPath) remotePath; final BlobContainer blobContainer = blobStore.blobContainer(blobPath); @@ -135,20 +133,15 @@ public void uploadBlob(InputStream inputStream, Iterable remotePath, Str ); } - try ( - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - blobName, - blobName, - bytes.length, - true, - writePriority, - (size, position) -> new OffsetRangeIndexInputStream(input, size, position), - expectedChecksum, - ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported() - ) - ) { - ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); - } + asyncBlobUpload(blobName, + blobName, + bytes.length, + blobPath, + writePriority, + (size, position) -> new OffsetRangeIndexInputStream(input, size, position), + expectedChecksum, + listener + ); } } @@ -165,36 +158,21 @@ private void uploadBlob( try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) { contentLength = channel.size(); } - boolean remoteIntegrityEnabled = false; - BlobContainer blobContainer = blobStore.blobContainer(blobPath); - if (blobContainer instanceof AsyncMultiStreamBlobContainer) { - remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported(); - } - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - fileSnapshot.getName(), - fileSnapshot.getName(), - contentLength, - true, - writePriority, - (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), - Objects.requireNonNull(fileSnapshot.getChecksum()), - remoteIntegrityEnabled - ); ActionListener completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); listener.onFailure(new FileTransferException(fileSnapshot, ex)); }); - completionListener = ActionListener.runBefore(completionListener, () -> { - try { - remoteTransferContainer.close(); - } catch (Exception e) { - logger.warn("Error occurred while closing streams", e); - } - }); - - WriteContext writeContext = remoteTransferContainer.createWriteContext(); - ((AsyncMultiStreamBlobContainer) blobStore.blobContainer(blobPath)).asyncBlobUpload(writeContext, completionListener); + Objects.requireNonNull(fileSnapshot.getChecksum()); + asyncBlobUpload(fileSnapshot.getName(), + fileSnapshot.getName(), + contentLength, + blobPath, + writePriority, + (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), + fileSnapshot.getChecksum(), + completionListener + ); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); @@ -209,6 +187,24 @@ private void uploadBlob( } + private void asyncBlobUpload(String fileName, String remoteFileName, long contentLength, BlobPath blobPath, WritePriority writePriority, RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier, long expectedChecksum, ActionListener completionListener) throws IOException { + BlobContainer blobContainer = blobStore.blobContainer(blobPath); + assert blobContainer instanceof AsyncMultiStreamBlobContainer; + boolean remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported(); + try (RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + fileName, + remoteFileName, + contentLength, + true, + writePriority, + inputStreamSupplier, + expectedChecksum, + remoteIntegrityEnabled + )) { + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), completionListener); + } + } + @Override public InputStream downloadBlob(Iterable path, String fileName) throws IOException { return blobStore.blobContainer((BlobPath) path).readBlob(fileName); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 9bed1cc822fda..c97690dfa8590 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -14,7 +14,6 @@ import org.opensearch.common.blobstore.FetchBlobResult; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import java.io.IOException; @@ -66,7 +65,7 @@ void uploadBlobs( * @throws IOException the exception while transferring the data */ void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath, WritePriority writePriority) throws IOException; - void uploadBlob(InputStream inputStream, Iterable remotePath, String blobName, WritePriority writePriority, ActionListener listener) throws IOException; + void uploadBlobAsync(InputStream inputStream, Iterable remotePath, String blobName, WritePriority writePriority, ActionListener listener) throws IOException; void deleteBlobs(Iterable path, List fileNames) throws IOException; diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index e4f5a454b15f6..ba10d423779cd 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -8,32 +8,52 @@ package org.opensearch.index.translog.transfer; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.read.ReadContext; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.index.Index; import org.opensearch.env.Environment; import org.opensearch.env.TestEnvironment; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.BlobStoreTestUtil; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - public class BlobStoreTransferServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; @@ -103,6 +123,64 @@ public void onFailure(Exception e) { assertTrue(succeeded.get()); } + public void testUploadBlobFromInputStreamSyncFSRepo() throws IOException, InterruptedException{ + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); + uploadBlobFromInputStream(transferService); + } + + public void testUploadBlobFromInputStreamAsyncFSRepo() throws IOException, InterruptedException{ + BlobStore blobStore = createTestBlobStore(); + MockAsyncFsContainer mockAsyncFsContainer = new MockAsyncFsContainer( + (FsBlobStore) blobStore, + BlobPath.cleanPath(), + null + ); + FsBlobStore fsBlobStore = mock(FsBlobStore.class); + when(fsBlobStore.blobContainer(any())).thenReturn(mockAsyncFsContainer); + + TransferService transferService = new BlobStoreTransferService(fsBlobStore, threadPool); + uploadBlobFromInputStream(transferService); + } + + private IndexMetadata getIndexMetadata() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + return new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .version(5L) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + } + + private void uploadBlobFromInputStream(TransferService transferService) throws IOException, InterruptedException { + TestClass testObject = new TestClass("field1", "value1"); + AtomicBoolean succeeded = new AtomicBoolean(false); + ChecksumBlobStoreFormat blobStoreFormat = new ChecksumBlobStoreFormat<>("coordination", "%s", IndexMetadata::fromXContent); + IndexMetadata indexMetadata = getIndexMetadata(); + try (InputStream inputStream = blobStoreFormat.serialize(indexMetadata, "index-metadata", new NoneCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput()) { + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(TestClass testObject) { + assert succeeded.compareAndSet(false, true); + assert testObject.name.equals("field1"); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Failed to perform uploadBlobAsync", e); + } + }, latch); + ActionListener completionListener = ActionListener.wrap(resp -> listener.onResponse(testObject), ex -> listener.onFailure(ex)); + transferService.uploadBlobAsync(inputStream, repository.basePath(), "test-object", WritePriority.URGENT, completionListener); + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(succeeded.get()); + } + } + @Override public void tearDown() throws Exception { super.tearDown(); @@ -134,6 +212,10 @@ protected void assertSnapshotOrGenericThread() { return repository; } + private BlobStore createTestBlobStore() throws IOException { + return new FsBlobStore(randomIntBetween(1, 8) * 1024, createTempDir(), false); + } + /** Create a {@link Environment} with random path.home and path.repo **/ private Environment createEnvironment() { Path home = createTempDir(); @@ -144,4 +226,50 @@ private Environment createEnvironment() { .build() ); } + + private static class TestClass implements Serializable { + private TestClass(String name, String value) { + this.name = name; + this.value = value; + } + + private final String name; + private final String value; + + @Override + public String toString() { + return "TestClass{ name: " + name + ", value: " + value + " }"; + } + } + + private static class MockAsyncFsContainer extends FsBlobContainer implements AsyncMultiStreamBlobContainer { + + private BlobContainer delegate; + + public MockAsyncFsContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) { + super(blobStore, blobPath, path); + delegate = blobStore.blobContainer(BlobPath.cleanPath()); + } + + @Override + public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { + InputStream inputStream = writeContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream(); + delegate.writeBlob(writeContext.getFileName(), inputStream, writeContext.getFileSize(), true); + completionListener.onResponse(null); + } + + @Override + public void readBlobAsync(String blobName, ActionListener listener) { + throw new RuntimeException("read not supported"); + } + + @Override + public boolean remoteIntegrityCheckSupported() { + return false; + } + + public BlobContainer getDelegate() { + return delegate; + } + } }