Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Transfer Manager completes without error, skipping uploads, when provided upload file transformer throws an exception #5710

Open
1 task done
plovell opened this issue Nov 13, 2024 · 0 comments
Labels
bug This issue is a bug. needs-triage This issue or PR still needs to be triaged.

Comments

@plovell
Copy link

plovell commented Nov 13, 2024

Describe the bug

We have observed an issue using transferManager.uploadDirectory with and uploadFileRequestTransformer supplied. In our code, after we have updated beyond version 2.25.18 the directory upload would complete while only actually uploading a small percentage of the files in the directory.

The reason for the failure is that the transformer we were supplying relied on a thread local variable. This variable was not present after the initial buffer of 100 in flight requests was exhausted in AsyncBufferingSubscriber. After the 100 threads were exhausted the subsequent requests are built inside the sdk-async-response which did not have access to the thread local variable and threw an exception. This exception appears to be swallowed without report in code or logging. I have discovered using logging that the currently in flight requests are cancelled in the background with exceptions only logged at the debug level.

We have fixed the transformer so it no longer fails, but would like it such that the same scenario happens again, the SDK actually indicates that the directory upload has failed. As a result of the failure not being thrown our own software reported that the upload was successful, which is undesirable.

Regression Issue

  • Select this option if this issue appears to be a regression.

Expected Behavior

I expected the exception to be recorded on the completable future that is used as the response to the upload directory request as a failed transfer.

Current Behavior

The application instead swallows the original exception generated by the upload file transformer and the completable future returned by the directory upload request indicates no errors have occurred.

Reproduction Steps

The following code reproduces the issue:

`
package com.smartcommunications.tools.aws;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.*;
import software.amazon.awssdk.transfer.s3.progress.TransferListener;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

public class S3TestTool {

private static final String S3_PREFIX = "s3_upload_test_file";
private static final String S3_BUCKET = "<valid-s3-bucket-name>";

private static final int FILE_SIZE = 1024 * 100;
private static final int NUMBER_OF_FILES = 105;
private static final long EXPECTED_BYTES_TRANSFERRED = NUMBER_OF_FILES * FILE_SIZE;

private static S3TransferManager transferManager;
private static final AtomicLong numberOfBytesTransferred = new AtomicLong(0);

public static void main(String[] args) throws IOException {
    S3AsyncClient s3Client = S3AsyncClient.crtBuilder()
            .region(Region.EU_WEST_1)
            .forcePathStyle(true)
            .targetThroughputInGbps(1.0)
            .build();

    transferManager = S3TransferManager.builder()
            .s3Client(s3Client)
            .build();

    File uploadFolder = new File("s3_upload_test");

    if (!uploadFolder.exists()) {
        uploadFolder.mkdirs();
        for (int i = 0; i < NUMBER_OF_FILES; i++) {
            try (FileOutputStream fos = new FileOutputStream(new File(uploadFolder, i + ".bin"))) {
                byte[] b = new byte[FILE_SIZE];
                new Random().nextBytes(b);
                fos.write(b);
            }
        }
    }

    uploadFiles(uploadFolder.toPath());
}

private static void uploadFiles(Path localPath) {
    UploadDirectoryRequest.Builder uploadDirectoryRequest;

    uploadDirectoryRequest = buildRequest(localPath);

    DirectoryUpload directoryUpload = transferManager.uploadDirectory(uploadDirectoryRequest.build());
    CompletedDirectoryUpload result = directoryUpload.completionFuture().join();

    System.out.println();
    System.out.println("=== Transfer completed. ===");
    System.out.println();

    if (result.failedTransfers().isEmpty()) {
        System.out.println("Transfer completed with no recorded failures.");
    }
    else {
        System.out.println("Transfer completed with failures: " + result.failedTransfers().size());
        for (FailedFileUpload completedUpload : result.failedTransfers()) {
            System.out.println("\t" + completedUpload);
        }
    }

    if (numberOfBytesTransferred.get() != EXPECTED_BYTES_TRANSFERRED) {
        System.out.println("Test failed - only transferred " + numberOfBytesTransferred.get() + ", of " + EXPECTED_BYTES_TRANSFERRED + " bytes.");
    }
    else {
        System.out.println("Test passed - transferred all " + EXPECTED_BYTES_TRANSFERRED + " bytes of data");
    }
}

private static UploadDirectoryRequest.Builder buildRequest(Path thePath) {
    return UploadDirectoryRequest.builder()
            .bucket(S3_BUCKET)
            .source(thePath)
            .s3Prefix(S3_PREFIX)
            .uploadFileRequestTransformer(S3TestTool::requestTransformer);
}

private static void requestTransformer(UploadFileRequest.Builder uploadFileRequest) {
    System.out.println("Request Transformer Thread: " + Thread.currentThread().getName());
    if (Thread.currentThread().getName().contains("sdk-async-response")) {
        throw new IllegalStateException("Missing thread local variable.");
    }
    uploadFileRequest
            .putObjectRequest(req -> req.bucket(S3_BUCKET)
                    .key(uploadFileRequest.build().putObjectRequest().key()))
            .addTransferListener(new DirectoryUploadProgressListener());
}

public static class DirectoryUploadProgressListener implements TransferListener {
    @Override
    public void bytesTransferred(Context.BytesTransferred context) {
        System.out.println("[" + Thread.currentThread().getName() + "]: Uploaded bytes: " + context.progressSnapshot().transferredBytes());
        numberOfBytesTransferred.addAndGet(context.progressSnapshot().transferredBytes());
    }

    @Override
    public void transferComplete(Context.TransferComplete context) {
        System.out.println("[" + Thread.currentThread().getName() + "]: Completed bytes: " + context.progressSnapshot().transferredBytes());
    }
}

}
`

This test will return the following:

Transfer completed with no recorded failures.
Test failed - only transferred X, of 10752000 bytes.

Where X < 10752000

Possible Solution

No response

Additional Information/Context

No response

AWS Java SDK version used

SDK 2.29.10 CRT 0.33.0

JDK version used

openjdk version "17.0.8" 2023-07-18 OpenJDK Runtime Environment Temurin-17.0.8+7 (build 17.0.8+7) OpenJDK 64-Bit Server VM Temurin-17.0.8+7 (build 17.0.8+7, mixed mode, sharing)

Operating System and version

Windows 11

@plovell plovell added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Nov 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. needs-triage This issue or PR still needs to be triaged.
Projects
None yet
Development

No branches or pull requests

1 participant