Skip to content

Commit

Permalink
Add unit tests for uploadBlobAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Jun 5, 2024
1 parent 148eca5 commit 23f49e5
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void writeAsync(final U entity, final ActionListener<Void> listener) {
try (InputStream inputStream = entity.serialize()) {
BlobPath blobPath = getBlobPathForUpload(entity);
entity.setFullBlobName(blobPath);
transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT, listener);
transferService.uploadBlobAsync(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT, listener);
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.index.store.exception.ChecksumCombinationException;
import org.opensearch.index.translog.ChannelFactory;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand Down Expand Up @@ -111,7 +109,7 @@ public void uploadBlobs(
}

@Override
public void uploadBlob(InputStream inputStream, Iterable<String> remotePath, String blobName, WritePriority writePriority, ActionListener<Void> listener) throws IOException {
public void uploadBlobAsync(InputStream inputStream, Iterable<String> remotePath, String blobName, WritePriority writePriority, ActionListener<Void> listener) throws IOException {
assert remotePath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remotePath;
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
Expand All @@ -135,20 +133,15 @@ public void uploadBlob(InputStream inputStream, Iterable<String> remotePath, Str
);
}

try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length,
true,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
asyncBlobUpload(blobName,
blobName,
bytes.length,
blobPath,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
listener
);
}
}

Expand All @@ -165,36 +158,21 @@ private void uploadBlob(
try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) {
contentLength = channel.size();
}
boolean remoteIntegrityEnabled = false;
BlobContainer blobContainer = blobStore.blobContainer(blobPath);
if (blobContainer instanceof AsyncMultiStreamBlobContainer) {
remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported();
}
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileSnapshot.getName(),
fileSnapshot.getName(),
contentLength,
true,
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
Objects.requireNonNull(fileSnapshot.getChecksum()),
remoteIntegrityEnabled
);
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
listener.onFailure(new FileTransferException(fileSnapshot, ex));
});

completionListener = ActionListener.runBefore(completionListener, () -> {
try {
remoteTransferContainer.close();
} catch (Exception e) {
logger.warn("Error occurred while closing streams", e);
}
});

WriteContext writeContext = remoteTransferContainer.createWriteContext();
((AsyncMultiStreamBlobContainer) blobStore.blobContainer(blobPath)).asyncBlobUpload(writeContext, completionListener);
Objects.requireNonNull(fileSnapshot.getChecksum());
asyncBlobUpload(fileSnapshot.getName(),
fileSnapshot.getName(),
contentLength,
blobPath,
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
fileSnapshot.getChecksum(),
completionListener
);

} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e);
Expand All @@ -209,6 +187,24 @@ private void uploadBlob(

}

private void asyncBlobUpload(String fileName, String remoteFileName, long contentLength, BlobPath blobPath, WritePriority writePriority, RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier, long expectedChecksum, ActionListener<Void> completionListener) throws IOException {
BlobContainer blobContainer = blobStore.blobContainer(blobPath);
assert blobContainer instanceof AsyncMultiStreamBlobContainer;
boolean remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported();
try (RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
remoteFileName,
contentLength,
true,
writePriority,
inputStreamSupplier,
expectedChecksum,
remoteIntegrityEnabled
)) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), completionListener);
}
}

@Override
public InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException {
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

import java.io.IOException;
Expand Down Expand Up @@ -66,7 +65,7 @@ void uploadBlobs(
* @throws IOException the exception while transferring the data
*/
void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath, WritePriority writePriority) throws IOException;
void uploadBlob(InputStream inputStream, Iterable<String> remotePath, String blobName, WritePriority writePriority, ActionListener<Void> listener) throws IOException;
void uploadBlobAsync(InputStream inputStream, Iterable<String> remotePath, String blobName, WritePriority writePriority, ActionListener<Void> listener) throws IOException;

void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,52 @@

package org.opensearch.index.translog.transfer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.NoneCompressor;
import org.opensearch.core.index.Index;
import org.opensearch.env.Environment;
import org.opensearch.env.TestEnvironment;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.BlobStoreTestUtil;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class BlobStoreTransferServiceTests extends OpenSearchTestCase {

private ThreadPool threadPool;
Expand Down Expand Up @@ -103,6 +123,64 @@ public void onFailure(Exception e) {
assertTrue(succeeded.get());
}

public void testUploadBlobFromInputStreamSyncFSRepo() throws IOException, InterruptedException{
TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool);
uploadBlobFromInputStream(transferService);
}

public void testUploadBlobFromInputStreamAsyncFSRepo() throws IOException, InterruptedException{
BlobStore blobStore = createTestBlobStore();
MockAsyncFsContainer mockAsyncFsContainer = new MockAsyncFsContainer(
(FsBlobStore) blobStore,
BlobPath.cleanPath(),
null
);
FsBlobStore fsBlobStore = mock(FsBlobStore.class);
when(fsBlobStore.blobContainer(any())).thenReturn(mockAsyncFsContainer);

TransferService transferService = new BlobStoreTransferService(fsBlobStore, threadPool);
uploadBlobFromInputStream(transferService);
}

private IndexMetadata getIndexMetadata() {
final Index index = new Index("test-index", "index-uuid");
final Settings idxSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
.build();
return new IndexMetadata.Builder(index.getName()).settings(idxSettings)
.version(5L)
.numberOfShards(1)
.numberOfReplicas(0)
.build();
}

private void uploadBlobFromInputStream(TransferService transferService) throws IOException, InterruptedException {
TestClass testObject = new TestClass("field1", "value1");
AtomicBoolean succeeded = new AtomicBoolean(false);
ChecksumBlobStoreFormat<IndexMetadata> blobStoreFormat = new ChecksumBlobStoreFormat<>("coordination", "%s", IndexMetadata::fromXContent);
IndexMetadata indexMetadata = getIndexMetadata();
try (InputStream inputStream = blobStoreFormat.serialize(indexMetadata, "index-metadata", new NoneCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput()) {
CountDownLatch latch = new CountDownLatch(1);
ActionListener<TestClass> listener = new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(TestClass testObject) {
assert succeeded.compareAndSet(false, true);
assert testObject.name.equals("field1");
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("Failed to perform uploadBlobAsync", e);
}
}, latch);
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(testObject), ex -> listener.onFailure(ex));
transferService.uploadBlobAsync(inputStream, repository.basePath(), "test-object", WritePriority.URGENT, completionListener);
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(succeeded.get());
}
}

@Override
public void tearDown() throws Exception {
super.tearDown();
Expand Down Expand Up @@ -134,6 +212,10 @@ protected void assertSnapshotOrGenericThread() {
return repository;
}

private BlobStore createTestBlobStore() throws IOException {
return new FsBlobStore(randomIntBetween(1, 8) * 1024, createTempDir(), false);
}

/** Create a {@link Environment} with random path.home and path.repo **/
private Environment createEnvironment() {
Path home = createTempDir();
Expand All @@ -144,4 +226,50 @@ private Environment createEnvironment() {
.build()
);
}

private static class TestClass implements Serializable {
private TestClass(String name, String value) {
this.name = name;
this.value = value;
}

private final String name;
private final String value;

@Override
public String toString() {
return "TestClass{ name: " + name + ", value: " + value + " }";
}
}

private static class MockAsyncFsContainer extends FsBlobContainer implements AsyncMultiStreamBlobContainer {

private BlobContainer delegate;

public MockAsyncFsContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) {
super(blobStore, blobPath, path);
delegate = blobStore.blobContainer(BlobPath.cleanPath());
}

@Override
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {
InputStream inputStream = writeContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream();
delegate.writeBlob(writeContext.getFileName(), inputStream, writeContext.getFileSize(), true);
completionListener.onResponse(null);
}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
throw new RuntimeException("read not supported");
}

@Override
public boolean remoteIntegrityCheckSupported() {
return false;
}

public BlobContainer getDelegate() {
return delegate;
}
}
}

0 comments on commit 23f49e5

Please sign in to comment.