diff --git a/CHANGELOG.md b/CHANGELOG.md index 461080e40e..27447cdf13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Dependencies ### Changed +- Changed AwsSdk2Transport to pre-emptively throw an exception when using AWS SDK's ApacheHttpClient to make an unsupported DELETE/GET request with a body ([#1256](https://github.com/opensearch-project/opensearch-java/pull/1256)) ### Deprecated diff --git a/guides/auth.md b/guides/auth.md index 658464642d..060c3d6708 100644 --- a/guides/auth.md +++ b/guides/auth.md @@ -7,14 +7,19 @@ Requests to [OpenSearch Service and OpenSearch Serverless](https://docs.aws.amazon.com/opensearch-service/index.html) must be signed using the AWS signing protocol. Use `AwsSdk2Transport` to send signed requests. +> ⚠️ **Warning** ⚠️ +> Using `software.amazon.awssdk.http.apache.ApacheHttpClient` is discouraged as it does not support request bodies on GET or DELETE requests. +> This leads to incorrect handling of requests such as `OpenSearchClient.clearScroll()` and `OpenSearchClient.deletePit()`. +> As such `AwsSdk2Transport` will throw a `TransportException` if an unsupported request is encountered while using `ApacheHttpClient`. + ```java -SdkHttpClient httpClient = ApacheHttpClient.builder().build(); +SdkHttpClient httpClient = AwsCrtHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "search-...us-west-2.es.amazonaws.com", // OpenSearch endpoint, without https:// - "es" // signing service name, use "aoss" for OpenSearch Serverless + "es", // signing service name, use "aoss" for OpenSearch Serverless Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index 3992078b6e..64fc42e763 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -102,7 +102,13 @@ tasks.withType { tasks.withType().configureEach{ options { + this as StandardJavadocDocletOptions encoding = "UTF-8" + addMultilineStringsOption("tag").setValue(listOf( + "apiNote:a:API Note:", + "implSpec:a:Implementation Requirements:", + "implNote:a:Implementation Note:", + )) } } @@ -170,7 +176,6 @@ val integrationTest = task("integrationTest") { val opensearchVersion = "2.12.0" dependencies { - val jacksonVersion = "2.17.0" val jacksonDatabindVersion = "2.17.0" @@ -200,7 +205,6 @@ dependencies { implementation("jakarta.annotation", "jakarta.annotation-api", "1.3.5") // Apache 2.0 - implementation("com.fasterxml.jackson.core", "jackson-core", jacksonVersion) implementation("com.fasterxml.jackson.core", "jackson-databind", jacksonDatabindVersion) testImplementation("com.fasterxml.jackson.datatype", "jackson-datatype-jakarta-jsonp", jacksonVersion) @@ -213,16 +217,21 @@ dependencies { implementation("org.apache.httpcomponents.core5", "httpcore5-h2", "5.3.1") // For AwsSdk2Transport - "awsSdk2SupportCompileOnly"("software.amazon.awssdk","sdk-core","[2.15,3.0)") - "awsSdk2SupportCompileOnly"("software.amazon.awssdk","auth","[2.15,3.0)") - testImplementation("software.amazon.awssdk","sdk-core","[2.15,3.0)") - testImplementation("software.amazon.awssdk","auth","[2.15,3.0)") - testImplementation("software.amazon.awssdk","aws-crt-client","[2.15,3.0)") - testImplementation("software.amazon.awssdk","apache-client","[2.15,3.0)") - testImplementation("software.amazon.awssdk","sts","[2.15,3.0)") + "awsSdk2SupportCompileOnly"("software.amazon.awssdk", "sdk-core", "[2.21,3.0)") + "awsSdk2SupportCompileOnly"("software.amazon.awssdk", "auth", "[2.21,3.0)") + "awsSdk2SupportCompileOnly"("software.amazon.awssdk", "http-auth-aws", "[2.21,3.0)") + testImplementation("software.amazon.awssdk", "sdk-core", "[2.21,3.0)") + testImplementation("software.amazon.awssdk", "auth", "[2.21,3.0)") + testImplementation("software.amazon.awssdk", "http-auth-aws", "[2.21,3.0)") + testImplementation("software.amazon.awssdk", "aws-crt-client", "[2.21,3.0)") + testImplementation("software.amazon.awssdk", "apache-client", "[2.21,3.0)") + testImplementation("software.amazon.awssdk", "netty-nio-client", "[2.21,3.0)") + testImplementation("software.amazon.awssdk", "url-connection-client", "[2.21,3.0)") + testImplementation("software.amazon.awssdk", "sts", "[2.21,3.0)") + testImplementation("org.apache.logging.log4j", "log4j-api","[2.17.1,3.0)") testImplementation("org.apache.logging.log4j", "log4j-core","[2.17.1,3.0)") - + // EPL-2.0 OR BSD-3-Clause // https://eclipse-ee4j.github.io/yasson/ implementation("org.eclipse", "yasson", "2.0.2") @@ -234,6 +243,10 @@ dependencies { testImplementation("junit", "junit" , "4.13.2") { exclude(group = "org.hamcrest") } + + // The Bouncy Castle License (MIT): https://www.bouncycastle.org/licence.html + testImplementation("org.bouncycastle", "bcprov-lts8on", "2.73.6") + testImplementation("org.bouncycastle", "bcpkix-lts8on", "2.73.6") } licenseReport { diff --git a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java index 310d936e4a..6ac694332f 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java @@ -19,6 +19,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; +import java.time.Clock; import java.util.AbstractMap; import java.util.Collection; import java.util.Map; @@ -26,7 +27,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import javax.annotation.CheckForNull; @@ -36,6 +37,7 @@ import org.opensearch.client.json.JsonpDeserializer; import org.opensearch.client.json.JsonpMapper; import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.ErrorCause; import org.opensearch.client.opensearch._types.ErrorResponse; import org.opensearch.client.opensearch._types.OpenSearchException; @@ -52,18 +54,20 @@ import org.opensearch.client.util.OpenSearchRequestBodyBuffer; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.auth.signer.Aws4Signer; -import software.amazon.awssdk.auth.signer.params.Aws4SignerParams; import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.HttpExecuteResponse; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.SdkHttpFullRequest; import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner; +import software.amazon.awssdk.http.auth.spi.signer.SignedRequest; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.utils.IoUtils; import software.amazon.awssdk.utils.SdkAutoCloseable; @@ -81,6 +85,7 @@ public class AwsSdk2Transport implements OpenSearchTransport { private static final byte[] NO_BYTES = new byte[0]; private final SdkAutoCloseable httpClient; + private final boolean isApacheHttpClient; private final String host; private final String signingServiceName; private final Region signingRegion; @@ -118,6 +123,10 @@ public AwsSdk2Transport( * @param options Options that apply to all requests. Can be null. Create with * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, * compression options, etc. + * + * @implNote Using {@code software.amazon.awssdk.http.apache.ApacheHttpClient} is discouraged as it does not support request bodies on GET or DELETE requests. + * This leads to incorrect handling of requests such as {@link OpenSearchClient#clearScroll(org.opensearch.client.opensearch.core.ClearScrollRequest)} and {@link OpenSearchClient#deletePit(org.opensearch.client.opensearch.core.pit.DeletePitRequest)}. + * As such {@link #performRequest(Object, Endpoint, TransportOptions)} & {@link #performRequestAsync(Object, Endpoint, TransportOptions)} will throw a {@link TransportException} if an unsupported request is encountered while using {@code ApacheHttpClient}. */ public AwsSdk2Transport( @CheckForNull SdkHttpClient syncHttpClient, @@ -162,6 +171,10 @@ public AwsSdk2Transport( * @param options Options that apply to all requests. Can be null. Create with * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, * compression options, etc. + * + * @implNote Using {@code software.amazon.awssdk.http.apache.ApacheHttpClient} is discouraged as it does not support request bodies on GET or DELETE requests. + * This leads to incorrect handling of requests such as {@link OpenSearchClient#clearScroll(org.opensearch.client.opensearch.core.ClearScrollRequest)} and {@link OpenSearchClient#deletePit(org.opensearch.client.opensearch.core.pit.DeletePitRequest)}. + * As such {@link #performRequest(Object, Endpoint, TransportOptions)} & {@link #performRequestAsync(Object, Endpoint, TransportOptions)} will throw a {@link TransportException} if an unsupported request is encountered while using {@code ApacheHttpClient}. */ public AwsSdk2Transport( @CheckForNull SdkHttpClient syncHttpClient, @@ -182,6 +195,8 @@ private AwsSdk2Transport( ) { Objects.requireNonNull(host, "Target OpenSearch service host must not be null"); this.httpClient = httpClient; + this.isApacheHttpClient = httpClient instanceof SdkHttpClient + && httpClient.getClass().getName().equals("software.amazon.awssdk.http.apache.ApacheHttpClient"); this.host = host; this.signingServiceName = signingServiceName; this.signingRegion = signingRegion; @@ -195,9 +210,8 @@ public ResponseT performRequest( Endpoint endpoint, @Nullable TransportOptions options ) throws IOException { - OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options); - SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody); + SignedRequest clientReq = prepareRequest(request, endpoint, options, requestBody); if (httpClient instanceof SdkHttpClient) { return executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options); @@ -229,7 +243,7 @@ public CompletableFuture performRequest ) { try { OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options); - SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody); + SignedRequest clientReq = prepareRequest(request, endpoint, options, requestBody); if (httpClient instanceof SdkAsyncHttpClient) { return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options); } else if (httpClient instanceof SdkHttpClient) { @@ -265,16 +279,12 @@ private OpenSearchRequestBodyBuffer prepareRequestBody( TransportOptions options ) throws IOException { if (endpoint.hasRequestBody()) { - final JsonpMapper mapper = Optional.ofNullable(options) - .map(o -> o instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) o) : null) - .map(AwsSdk2TransportOptions::mapper) - .orElse(defaultMapper); - final int maxUncompressedSize = or( - Optional.ofNullable(options) - .map(o -> o instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) o) : null) - .map(AwsSdk2TransportOptions::requestCompressionSize), - () -> Optional.ofNullable(transportOptions.requestCompressionSize()) - ).orElse(DEFAULT_REQUEST_COMPRESSION_SIZE); + final JsonpMapper mapper = Optional.ofNullable( + options instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) options) : null + ).map(AwsSdk2TransportOptions::mapper).orElse(defaultMapper); + final int maxUncompressedSize = getOption(options, AwsSdk2TransportOptions::requestCompressionSize).orElse( + DEFAULT_REQUEST_COMPRESSION_SIZE + ); OpenSearchRequestBodyBuffer buffer = new OpenSearchRequestBodyBuffer(mapper, maxUncompressedSize); buffer.addContent(request); @@ -284,13 +294,31 @@ private OpenSearchRequestBodyBuffer prepareRequestBody( return null; } - private SdkHttpFullRequest prepareRequest( + private SignedRequest prepareRequest( RequestT request, Endpoint endpoint, @CheckForNull TransportOptions options, @CheckForNull OpenSearchRequestBodyBuffer body - ) throws UnsupportedEncodingException { - SdkHttpFullRequest.Builder req = SdkHttpFullRequest.builder().method(SdkHttpMethod.fromValue(endpoint.method(request))); + ) throws UnsupportedEncodingException, TransportException { + SdkHttpMethod method = SdkHttpMethod.fromValue(endpoint.method(request)); + + // AWS Apache Http Client only supports request bodies on PATCH, POST & PUT requests. + // See: + // https://github.com/aws/aws-sdk-java-v2/blob/master/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/internal/impl/ApacheHttpRequestFactory.java#L118-L137 + if (isApacheHttpClient + && method != SdkHttpMethod.PATCH + && method != SdkHttpMethod.POST + && method != SdkHttpMethod.PUT + && body != null + && body.getContentLength() > 0) { + throw new TransportException( + "AWS SDK's ApacheHttpClient does not support request bodies for HTTP method `" + + method + + "`. Please use a different SdkHttpClient implementation." + ); + } + + SdkHttpFullRequest.Builder req = SdkHttpFullRequest.builder().method(method); StringBuilder url = new StringBuilder(); url.append("https://").append(host); @@ -315,46 +343,57 @@ private SdkHttpFullRequest prepareRequest( } catch (URISyntaxException e) { throw new IllegalArgumentException("Invalid request URI: " + url.toString()); } + + ContentStreamProvider bodyProvider = body != null ? ContentStreamProvider.fromByteArrayUnsafe(body.getByteArray()) : null; + + applyHeadersPreSigning(req, options, body); + + final AwsCredentialsProvider credentials = getOption(options, AwsSdk2TransportOptions::credentials).orElseGet( + DefaultCredentialsProvider::create + ); + + final Clock signingClock = getOption(options, AwsSdk2TransportOptions::signingClock).orElse(null); + + SignedRequest signedReq = AwsV4HttpSigner.create() + .sign( + b -> b.identity(credentials.resolveCredentials()) + .request(req.build()) + .payload(bodyProvider) + .putProperty(AwsV4HttpSigner.SERVICE_SIGNING_NAME, this.signingServiceName) + .putProperty(AwsV4HttpSigner.REGION_NAME, this.signingRegion.id()) + .putProperty(AwsV4HttpSigner.SIGNING_CLOCK, signingClock) + ); + + SdkHttpRequest.Builder httpRequest = signedReq.request().toBuilder(); + + applyHeadersPostSigning(httpRequest, body); + + return signedReq.toBuilder().request(httpRequest.build()).build(); + } + + private void applyHeadersPreSigning(SdkHttpRequest.Builder req, TransportOptions options, OpenSearchRequestBodyBuffer body) { applyOptionsHeaders(req, transportOptions); applyOptionsHeaders(req, options); - if (endpoint.hasRequestBody() && body != null) { + + if (body != null) { req.putHeader("Content-Type", body.getContentType()); String encoding = body.getContentEncoding(); if (encoding != null) { req.putHeader("Content-Encoding", encoding); } - req.putHeader("Content-Length", String.valueOf(body.getContentLength())); - req.contentStreamProvider(body::getInputStream); - // To add the "X-Amz-Content-Sha256" header, it needs to set as required. - // It is a required header for Amazon OpenSearch Serverless. - req.putHeader("x-amz-content-sha256", "required"); } - boolean responseCompression = or( - Optional.ofNullable(options) - .map(o -> o instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) o) : null) - .map(AwsSdk2TransportOptions::responseCompression), - () -> Optional.ofNullable(transportOptions.responseCompression()) - ).orElse(Boolean.TRUE); - if (responseCompression) { + if (getOption(options, AwsSdk2TransportOptions::responseCompression).orElse(Boolean.TRUE)) { req.putHeader("Accept-Encoding", "gzip"); } else { req.removeHeader("Accept-Encoding"); } + } - final AwsCredentialsProvider credentials = or( - Optional.ofNullable(options) - .map(o -> o instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) o) : null) - .map(AwsSdk2TransportOptions::credentials), - () -> Optional.ofNullable(transportOptions.credentials()) - ).orElse(DefaultCredentialsProvider.create()); - - Aws4SignerParams signerParams = Aws4SignerParams.builder() - .awsCredentials(credentials.resolveCredentials()) - .signingName(this.signingServiceName) - .signingRegion(signingRegion) - .build(); - return Aws4Signer.create().sign(req.build(), signerParams); + private void applyHeadersPostSigning(SdkHttpRequest.Builder req, OpenSearchRequestBodyBuffer body) { + if (body != null) { + req.putHeader("Content-Length", String.valueOf(body.getContentLength())); + } } private void applyOptionsParams(StringBuilder url, TransportOptions options) throws UnsupportedEncodingException { @@ -372,7 +411,7 @@ private void applyOptionsParams(StringBuilder url, TransportOptions options) thr } } - private void applyOptionsHeaders(SdkHttpFullRequest.Builder builder, TransportOptions options) { + private void applyOptionsHeaders(SdkHttpRequest.Builder builder, TransportOptions options) { if (options == null) { return; } @@ -386,15 +425,13 @@ private void applyOptionsHeaders(SdkHttpFullRequest.Builder builder, TransportOp private ResponseT executeSync( SdkHttpClient syncHttpClient, - SdkHttpFullRequest httpRequest, + SignedRequest signedRequest, Endpoint endpoint, TransportOptions options ) throws IOException { - + SdkHttpRequest httpRequest = signedRequest.request(); HttpExecuteRequest.Builder executeRequest = HttpExecuteRequest.builder().request(httpRequest); - if (httpRequest.contentStreamProvider().isPresent()) { - executeRequest.contentStreamProvider(httpRequest.contentStreamProvider().get()); - } + signedRequest.payload().ifPresent(executeRequest::contentStreamProvider); HttpExecuteResponse executeResponse = syncHttpClient.prepareRequest(executeRequest.build()).call(); AbortableInputStream bodyStream = null; try { @@ -418,13 +455,13 @@ private ResponseT executeSync( private CompletableFuture executeAsync( SdkAsyncHttpClient asyncHttpClient, - SdkHttpFullRequest httpRequest, + SignedRequest signedRequest, @CheckForNull OpenSearchRequestBodyBuffer requestBody, Endpoint endpoint, TransportOptions options ) { + SdkHttpRequest httpRequest = signedRequest.request(); byte[] requestBodyArray = requestBody == null ? NO_BYTES : requestBody.getByteArray(); - final AsyncCapturingResponseHandler responseHandler = new AsyncCapturingResponseHandler(); AsyncExecuteRequest.Builder executeRequest = AsyncExecuteRequest.builder() .request(httpRequest) @@ -463,10 +500,9 @@ private ResponseT parseResponse( @Nonnull Endpoint endpoint, @CheckForNull TransportOptions options ) throws IOException { - final JsonpMapper mapper = Optional.ofNullable(options) - .map(o -> o instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) o) : null) - .map(AwsSdk2TransportOptions::mapper) - .orElse(defaultMapper); + final JsonpMapper mapper = Optional.ofNullable( + options instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) options) : null + ).map(AwsSdk2TransportOptions::mapper).orElse(defaultMapper); int statusCode = httpResponse.statusCode(); boolean isZipped = httpResponse.firstMatchingHeader("Content-Encoding").map(enc -> enc.contains("gzip")).orElse(Boolean.FALSE); @@ -625,16 +661,15 @@ private ResponseT decodeResponse( } } - private static Optional or(Optional opt, Supplier> supplier) { - Objects.requireNonNull(opt); - Objects.requireNonNull(supplier); - if (opt.isPresent()) { - return opt; - } else { - @SuppressWarnings("unchecked") - Optional r = (Optional) supplier.get(); - return Objects.requireNonNull(r); - } + private Optional getOption(@Nullable TransportOptions options, @Nonnull Function getter) { + Objects.requireNonNull(getter, "getter must not be null"); + + Function> optGetter = o -> Optional.ofNullable(getter.apply(o)); + + Optional opt = Optional.ofNullable(options instanceof AwsSdk2TransportOptions ? (AwsSdk2TransportOptions) options : null) + .flatMap(optGetter); + + return opt.isPresent() ? opt : optGetter.apply(transportOptions); } private static ByteArrayInputStream toByteArrayInputStream(InputStream is) throws IOException { diff --git a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2TransportOptions.java b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2TransportOptions.java index 1d10f9c424..29f3b687b1 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2TransportOptions.java +++ b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2TransportOptions.java @@ -8,6 +8,7 @@ package org.opensearch.client.transport.aws; +import java.time.Clock; import java.util.List; import java.util.function.Function; import org.opensearch.client.json.JsonpMapper; @@ -71,6 +72,18 @@ public interface AwsSdk2TransportOptions extends TransportOptions { */ JsonpMapper mapper(); + /** + * Get the clock used for signing requests. + *

+ * If this is null, then a default will be used -- either a value specified + * in a more general {@link AwsSdk2TransportOptions} that applies to the request, or + * {@link Clock#systemUTC()} if there is none. + *

+ * + * @return A clock or null + */ + Clock signingClock(); + AwsSdk2TransportOptions.Builder toBuilder(); static AwsSdk2TransportOptions.Builder builder() { @@ -92,6 +105,8 @@ interface Builder extends TransportOptions.Builder { Builder setMapper(JsonpMapper mapper); + Builder setSigningClock(Clock clock); + AwsSdk2TransportOptions build(); } @@ -101,6 +116,7 @@ class BuilderImpl extends TransportOptions.BuilderImpl implements Builder { protected Integer requestCompressionSize; protected Boolean responseCompression; protected JsonpMapper mapper; + protected Clock signingClock; public BuilderImpl() {} @@ -110,6 +126,7 @@ public BuilderImpl(AwsSdk2TransportOptions src) { requestCompressionSize = src.requestCompressionSize(); responseCompression = src.responseCompression(); mapper = src.mapper(); + signingClock = src.signingClock(); } @Override @@ -154,6 +171,12 @@ public Builder setResponseCompression(Boolean enabled) { return this; } + @Override + public Builder setSigningClock(Clock clock) { + this.signingClock = clock; + return this; + } + @Override public AwsSdk2TransportOptions build() { return new DefaultImpl(this); @@ -162,10 +185,11 @@ public AwsSdk2TransportOptions build() { class DefaultImpl extends TransportOptions.DefaultImpl implements AwsSdk2TransportOptions { - private AwsCredentialsProvider credentials; - private Integer requestCompressionSize; - private Boolean responseCompression; - private JsonpMapper mapper; + private final AwsCredentialsProvider credentials; + private final Integer requestCompressionSize; + private final Boolean responseCompression; + private final JsonpMapper mapper; + private final Clock signingClock; DefaultImpl(AwsSdk2TransportOptions.BuilderImpl builder) { super(builder); @@ -173,6 +197,7 @@ class DefaultImpl extends TransportOptions.DefaultImpl implements AwsSdk2Transpo requestCompressionSize = builder.requestCompressionSize; responseCompression = builder.responseCompression; mapper = builder.mapper; + signingClock = builder.signingClock; } @Override @@ -195,6 +220,11 @@ public JsonpMapper mapper() { return mapper; } + @Override + public Clock signingClock() { + return signingClock; + } + @Override public AwsSdk2TransportOptions.Builder toBuilder() { return new AwsSdk2TransportOptions.BuilderImpl(this); diff --git a/java-client/src/test/java/org/opensearch/client/transport/aws/AwsSdk2TransportTests.java b/java-client/src/test/java/org/opensearch/client/transport/aws/AwsSdk2TransportTests.java new file mode 100644 index 0000000000..c5a6aca793 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/transport/aws/AwsSdk2TransportTests.java @@ -0,0 +1,503 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.aws; + +import static org.apache.hc.core5.http.ContentType.APPLICATION_JSON; +import static org.apache.hc.core5.http.HttpHeaders.CONTENT_LENGTH; +import static org.apache.hc.core5.http.HttpHeaders.CONTENT_TYPE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.security.KeyStore; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.ProtocolException; +import org.apache.hc.core5.http.impl.bootstrap.HttpServer; +import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.io.HttpRequestHandler; +import org.apache.hc.core5.http.io.entity.BasicHttpEntity; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x509.GeneralName; +import org.bouncycastle.crypto.SavableDigest; +import org.bouncycastle.crypto.digests.SHA256Digest; +import org.bouncycastle.util.encoders.Hex; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.generic.Requests; +import org.opensearch.client.transport.TransportException; +import org.opensearch.client.transport.util.FunnellingHttpsProxy; +import org.opensearch.client.transport.util.SelfSignedCertificateAuthority; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.utils.AttributeMap; + +@RunWith(Parameterized.class) +public class AwsSdk2TransportTests { + public enum SdkHttpClientType { + APACHE, + AWS_CRT, + AWS_CRT_ASYNC, + NETTY_NIO_ASYNC, + URL_CONNECTION + } + + private static final String[] TEST_SERVICE_NAMES = { "aoss", "es", "arbitrary" }; + private static final Region TEST_REGION = Region.AP_SOUTHEAST_2; + private static final String TEST_INDEX = "sample-index1"; + private static final String EMPTY_SHA256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + private static SSLContext SSL_CONTEXT; + private static TrustManager[] CLIENT_TRUST_MANAGERS; + + private final ConcurrentLinkedQueue receivedRequests = new ConcurrentLinkedQueue<>(); + private final SdkHttpClientType sdkHttpClientType; + private final String serviceName; + private final String serviceHostName; + private HttpServer server; + private FunnellingHttpsProxy proxy; + + public AwsSdk2TransportTests(SdkHttpClientType sdkHttpClientType, String serviceName) { + this.sdkHttpClientType = sdkHttpClientType; + this.serviceName = serviceName; + this.serviceHostName = getTestServiceHostName(serviceName); + } + + @Parameterized.Parameters(name = "sdkHttpClientType: {0}, serviceName: {1}") + public static Collection getParameters() { + return Arrays.stream(SdkHttpClientType.values()) + .flatMap( + sdkHttpClientType -> Arrays.stream(TEST_SERVICE_NAMES).map(serviceName -> new Object[] { sdkHttpClientType, serviceName }) + ) + .collect(Collectors.toList()); + } + + private static String getTestServiceHostName(String serviceName) { + return "aaabbbcccddd111222333." + TEST_REGION.toString() + "." + serviceName + ".amazonaws.com"; + } + + @BeforeClass + public static void setupClass() throws Exception { + final SelfSignedCertificateAuthority ca = new SelfSignedCertificateAuthority(); + + GeneralName[] subjectAlternateNames = Arrays.stream(TEST_SERVICE_NAMES) + .map(AwsSdk2TransportTests::getTestServiceHostName) + .map(hostname -> new GeneralName(GeneralName.dNSName, hostname)) + .toArray(GeneralName[]::new); + + SelfSignedCertificateAuthority.GeneratedCertificate hostCert = ca.generateCertificate( + new X500Name("DC=localhost, O=localhost, OU=localhost, CN=localhost"), + subjectAlternateNames + ); + + final char[] keystorePassword = "password".toCharArray(); + KeyStore keyMaterial = KeyStore.getInstance("JKS"); + keyMaterial.load(null, keystorePassword); + keyMaterial.setKeyEntry("localhost", hostCert.getPrivateKey(), keystorePassword, hostCert.getCertificateChain()); + + SSL_CONTEXT = SSLContextBuilder.create() + .loadKeyMaterial(keyMaterial, keystorePassword, (aliases, sslParameters) -> "localhost") + .build(); + + KeyStore trustStore = KeyStore.getInstance("JKS"); + trustStore.load(null, keystorePassword); + trustStore.setCertificateEntry("localhost", ca.getCertificate()); + + TrustManagerFactory clientTrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + clientTrustManagerFactory.init(trustStore); + CLIENT_TRUST_MANAGERS = clientTrustManagerFactory.getTrustManagers(); + } + + @Before + public void setup() throws Exception { + server = ServerBootstrap.bootstrap() + .setRequestRouter( + RequestRouter.builder() + .addRoute(RequestRouter.LOCAL_AUTHORITY, "/", hardcodedJsonHandler("{}")) + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "/" + TEST_INDEX, + hardcodedJsonHandler("{\"acknowledged\": true,\"shards_acknowledged\": true,\"index\": \"" + TEST_INDEX + "\"}") + ) + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "/" + TEST_INDEX + "/_refresh", + hardcodedJsonHandler("{\"_shards\":{\"failed\":0,\"successful\":1,\"total\":1}}") + ) + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "/_search/scroll", + hardcodedJsonHandler("{\"succeeded\": true,\"num_freed\": 1}") + ) + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "/_search/point_in_time", + hardcodedJsonHandler("{\"pits\": [{\"pit_id\": \"pit1\", \"successful\": true}]}") + ) + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .build() + ) + .setSslContext(SSL_CONTEXT) + .setListenerPort(0) + .create(); + server.start(); + proxy = new FunnellingHttpsProxy(server.getLocalPort()); + } + + private static class ReceivedRequest { + private final ClassicHttpRequest request; + private final byte[] body; + + public ReceivedRequest(ClassicHttpRequest request) throws IOException { + this.request = request; + HttpEntity entity = request.getEntity(); + if (entity != null) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + entity.writeTo(baos); + body = baos.toByteArray(); + } else { + body = new byte[0]; + } + } + + public String getMethod() { + return request.getMethod(); + } + + public String getRequestUri() { + return request.getRequestUri(); + } + + public String getHeader(String name) throws ProtocolException { + return Optional.ofNullable(request.getHeader(name)).map(Header::getValue).orElse(null); + } + } + + private HttpRequestHandler hardcodedJsonHandler(String json) { + byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8); + return (request, response, context) -> { + receivedRequests.add(new ReceivedRequest(request)); + response.setCode(200); + response.setEntity(new BasicHttpEntity(new ByteArrayInputStream(jsonBytes), jsonBytes.length, APPLICATION_JSON)); + }; + } + + @After + public void teardown() { + server.close(CloseMode.IMMEDIATE); + server = null; + proxy.close(); + proxy = null; + receivedRequests.clear(); + } + + private OpenSearchClient getTestClient() throws URISyntaxException { + AwsSdk2TransportOptions options = AwsSdk2TransportOptions.builder() + .setCredentials(() -> AwsBasicCredentials.builder().accessKeyId("test-access-key").secretAccessKey("test-secret-key").build()) + .setSigningClock(Clock.fixed(Instant.ofEpochSecond(1673626117), ZoneId.of("UTC"))) // 2023-01-13 16:08:37 +0000 + .setResponseCompression(false) + .build(); + + AttributeMap sdkHttpClientConfig; + + if (sdkHttpClientType == SdkHttpClientType.AWS_CRT || sdkHttpClientType == SdkHttpClientType.AWS_CRT_ASYNC) { + // AWS CRT does not support custom trust managers to verify the cert + sdkHttpClientConfig = AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true).build(); + } else { + sdkHttpClientConfig = AttributeMap.builder() + .put(SdkHttpConfigurationOption.TLS_TRUST_MANAGERS_PROVIDER, () -> CLIENT_TRUST_MANAGERS) + .build(); + } + + URI proxyEndpoint = new URI("http://localhost:" + proxy.getPort()); + + SdkHttpClient sdkHttpClient = null; + SdkAsyncHttpClient sdkAsyncHttpClient = null; + switch (sdkHttpClientType) { + case APACHE: + software.amazon.awssdk.http.apache.ProxyConfiguration proxyConfig = software.amazon.awssdk.http.apache.ProxyConfiguration + .builder() + .endpoint(proxyEndpoint) + .build(); + sdkHttpClient = ApacheHttpClient.builder().proxyConfiguration(proxyConfig).buildWithDefaults(sdkHttpClientConfig); + break; + case AWS_CRT: + sdkHttpClient = AwsCrtHttpClient.builder() + .proxyConfiguration(p -> p.scheme("http").host("localhost").port(proxy.getPort())) + .buildWithDefaults(sdkHttpClientConfig); + break; + case AWS_CRT_ASYNC: + sdkAsyncHttpClient = AwsCrtAsyncHttpClient.builder() + .proxyConfiguration(p -> p.scheme("http").host("localhost").port(proxy.getPort())) + .buildWithDefaults(sdkHttpClientConfig); + break; + case NETTY_NIO_ASYNC: + ProxyConfiguration nettyProxyConfig = software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder() + .scheme("http") + .host("localhost") + .port(proxy.getPort()) + .build(); + sdkAsyncHttpClient = NettyNioAsyncHttpClient.builder() + .proxyConfiguration(nettyProxyConfig) + .buildWithDefaults(sdkHttpClientConfig); + break; + case URL_CONNECTION: + sdkHttpClient = UrlConnectionHttpClient.builder() + .proxyConfiguration(p -> p.endpoint(proxyEndpoint)) + .buildWithDefaults(sdkHttpClientConfig); + break; + default: + throw new IllegalArgumentException("Unknown SdkHttpClientType: " + sdkHttpClientType); + } + + AwsSdk2Transport transport; + if (sdkAsyncHttpClient != null) { + transport = new AwsSdk2Transport(sdkAsyncHttpClient, serviceHostName, serviceName, TEST_REGION, options); + } else { + transport = new AwsSdk2Transport(sdkHttpClient, serviceHostName, serviceName, TEST_REGION, options); + } + return new OpenSearchClient(transport); + } + + @Test + public void testSigV4PutIndex() throws Exception { + assertSigV4Request( + c -> c.indices() + .create( + b -> b.index("sample-index1") + .aliases("sample-alias1", a -> a) + .mappings(m -> m.properties("age", p -> p.integer(i -> i))) + .settings(s -> s.index(i -> i.numberOfReplicas("1").numberOfShards("2"))) + ), + "PUT", + "/" + TEST_INDEX, + 156, + "381bb92a04d397cab611362eb3ac3e075db11ac08272d64763de2279e2b5604d", + selectExpectedSignature( + "29123ccbcbd9af71fce384a1ed6d64b8c70f660e55a16de05405cac5fbebf18b", + "ff12e7b3e5e0f96fa25f13b3e95606dd18e3f1314dea6b7d6a9159f0aa51c21c", + "dbddbed28a34c0c380cd31567491a240294ef58755f9370e237d66f10d20d2df" + ) + ); + } + + @Test + public void testSigV4ClearScroll() throws Exception { + assertSigV4Request( + OpenSearchClient::clearScroll, + "DELETE", + "/_search/scroll", + 2, + "44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", + selectExpectedSignature( + "8c5d3d990f038e1d980a7d1b1611fa55f9b9b29a018a89ec84a6b9286e0e782d", + "f423dc8dce53a90d9f8e0701a8a721e54119b97201366438796d74ca0265f08d", + "63dd431cb3d4e2ba9e0aaf183975b1d19528de23bd68ee0c4269000008545922" + ) + ); + } + + @Test + public void testSigV4DeletePit() throws Exception { + // noinspection ArraysAsListWithZeroOrOneArgument + assertSigV4Request( + c -> c.deletePit(d -> d.pitId(Arrays.asList("pit1"))), + "DELETE", + "/_search/point_in_time", + 19, + "daaa6af55a9cfe622f46de69ebc3b4df84703f320b839346b7fb4cf94bdbd766", + selectExpectedSignature( + "82cb4f441ca313047542597cd54bdb3139ce111e269fe3bade5d59a1b2cd00a0", + "6abef10fb828cfc62683f38fbaa93894885308b0516bbe7b5485ae99e16b51bb", + "59697fbb5f10b197a1abea0264e7380d34db3c99b428bfa3781c0b665242f420" + ) + ); + } + + @Test + public void testSigV4Refresh() throws Exception { + assertSigV4Request( + c -> c.indices().refresh(s -> s.index(TEST_INDEX).ignoreUnavailable(true)), + "POST", + "/" + TEST_INDEX + "/_refresh?ignore_unavailable=true", + 0, + EMPTY_SHA256, + selectExpectedSignature( + "6955ebe7d39f5e885c544dc9945a20ba2bc293200abe7ffce43d8288a0e0a606", + "aab646c6a8be1fe42b25469c057bf07c99445fff5a9cf889b5768054b4fe8f00", + "5b28f6020340454f6c9a0ef7ed056095f54f4083d066e80b42af2a2ff77aea80" + ) + ); + } + + @Test + public void testHeadWithBody() throws Exception { + assertSigV4Request( + c -> c.generic().execute(Requests.builder().method("HEAD").endpoint("/").json("{}").build()), + "HEAD", + "/", + 2, + "44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", + selectExpectedSignature( + "547492a6aab72cdb687697ea291c35ae350e9fc0b7f96d1906efaeafa3e2b3c7", + "4e94a0a1048e252d3f46bda799886d726e4972286fa79ee80c2d7e5529c86948", + "6a4c0801c89b6cbc8f786a68bf51f18589ef77bdd5c01eb49a227fa19391a333" + ) + ); + } + + @Test + public void testOptionsWithBody() throws Exception { + assertSigV4Request( + c -> c.generic().execute(Requests.builder().method("OPTIONS").endpoint("/").json("{}").build()), + "OPTIONS", + "/", + 2, + "44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", + selectExpectedSignature( + "087c8fd96bc338d0dd680610967dd3c2d3f265a40158c1db4bfed83afaaf5246", + "e01b953a36d725d3e54565277a3aea6014961ce14c1c47b5930f6d75bc47f43b", + "2c3b08c49f0e45906f99cef144b3ba780c5c0d38cb9e2bcc75d34087172a254f" + ) + ); + } + + private void assertSigV4Request( + OpenSearchClientAction request, + String method, + String requestUri, + int contentLength, + String contentSha256, + String expectedSignature + ) throws Exception { + OpenSearchClient client = getTestClient(); + + if (sdkHttpClientType != SdkHttpClientType.APACHE + || contentLength == 0 + || "PATCH".equals(method) + || "POST".equals(method) + || "PUT".equals(method)) { + request.invoke(client); + } else { + // AWS Apache Http Client only supports content on PATCH, POST & PUT requests. + // See: + // https://github.com/aws/aws-sdk-java-v2/blob/master/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/internal/impl/ApacheHttpRequestFactory.java#L118-L137 + assertThrows( + "AWS SDK's ApacheHttpClient does not support request bodies for HTTP method `" + + method + + "`. Please use a different SdkHttpClient implementation.", + TransportException.class, + () -> request.invoke(client) + ); + return; + } + + assertEquals(1, receivedRequests.size()); + ReceivedRequest req = receivedRequests.poll(); + assertNotNull(req); + + assertEquals(method, req.getMethod()); + assertEquals(requestUri, req.getRequestUri()); + + assertEquals(contentLength, req.body.length); + assertEquals(contentSha256, sha256Hex(req.body)); + + String signedHeaders = "host;x-amz-content-sha256;x-amz-date"; + + if (contentLength > 0) { + assertEquals(APPLICATION_JSON.getMimeType(), req.getHeader(CONTENT_TYPE)); + signedHeaders = "content-type;" + signedHeaders; + } + + String expectedContentLength = String.valueOf(contentLength); + if (contentLength == 0 + && (sdkHttpClientType == SdkHttpClientType.AWS_CRT + || sdkHttpClientType == SdkHttpClientType.NETTY_NIO_ASYNC + || sdkHttpClientType == SdkHttpClientType.URL_CONNECTION)) { + // AWS CRT, Netty NIO and URLConnection clients do not set content-length for empty bodies + expectedContentLength = null; + } + + String contentLengthHdr = req.getHeader(CONTENT_LENGTH); + assertEquals(expectedContentLength, contentLengthHdr); + + assertEquals(serviceHostName, req.getHeader("Host")); + assertEquals("20230113T160837Z", req.getHeader("x-amz-date")); + assertEquals(contentSha256, req.getHeader("x-amz-content-sha256")); + assertEquals( + "AWS4-HMAC-SHA256 Credential=test-access-key/20230113/ap-southeast-2/" + + serviceName + + "/aws4_request, SignedHeaders=" + + signedHeaders + + ", Signature=" + + expectedSignature, + req.getHeader("Authorization") + ); + } + + private String selectExpectedSignature(String aossSig, String esSig, String arbitrarySig) { + switch (serviceName) { + case "aoss": + return aossSig; + case "es": + return esSig; + case "arbitrary": + return arbitrarySig; + default: + throw new IllegalArgumentException("Unknown service name: " + serviceName); + } + } + + private static String sha256Hex(byte[] data) { + SavableDigest digest = SHA256Digest.newInstance(); + digest.update(data, 0, data.length); + byte[] hash = new byte[digest.getDigestSize()]; + digest.doFinal(hash, 0); + return Hex.toHexString(hash); + } + + @FunctionalInterface + private interface OpenSearchClientAction { + void invoke(OpenSearchClient client) throws Exception; + } +} diff --git a/java-client/src/test/java/org/opensearch/client/transport/util/FunnellingHttpsProxy.java b/java-client/src/test/java/org/opensearch/client/transport/util/FunnellingHttpsProxy.java new file mode 100644 index 0000000000..19b62a7eb6 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/transport/util/FunnellingHttpsProxy.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.util; + +import static org.apache.hc.core5.http.HttpStatus.SC_METHOD_NOT_ALLOWED; +import static org.apache.hc.core5.http.HttpStatus.SC_OK; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; + +public class FunnellingHttpsProxy implements Closeable { + private static final int SO_TIMEOUT = 5000; + + @Nonnull + private final ServerSocket serverSocket; + @Nonnull + private final InetSocketAddress boundAddress; + private final int redirectToPort; + @Nonnull + private final List connectionHandlers; + @Nonnull + private final List sockets; + private final Thread acceptThread; + private volatile boolean running; + + public FunnellingHttpsProxy(int redirectToPort) throws Exception { + serverSocket = new ServerSocket(0); + boundAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress(); + this.redirectToPort = redirectToPort; + connectionHandlers = new ArrayList<>(); + sockets = new ArrayList<>(); + running = true; + acceptThread = new Thread(this::acceptConnections); + acceptThread.start(); + } + + public int getPort() { + return boundAddress.getPort(); + } + + @Override + public void close() { + if (!running) { + return; + } + running = false; + closeQuietly(serverSocket); + try { + acceptThread.join(); + } catch (InterruptedException ignored) {} + for (Socket socket : sockets) { + closeQuietly(socket); + } + for (Thread handler : connectionHandlers) { + try { + handler.join(); + } catch (InterruptedException ignored) {} + } + } + + private void acceptConnections() { + while (running) { + try { + Socket socket = serverSocket.accept(); + sockets.add(socket); + socket.setSoTimeout(SO_TIMEOUT); + Thread handler = new Thread(handleConnection(socket)); + connectionHandlers.add(handler); + handler.start(); + } catch (Exception ignored) {} + } + } + + private Runnable handleConnection(Socket clientSocket) { + return () -> { + InputStream clientInput = null; + OutputStream clientOutput = null; + Socket serverSocket = null; + InputStream serverInput = null; + OutputStream serverOutput = null; + + try { + clientInput = clientSocket.getInputStream(); + clientOutput = clientSocket.getOutputStream(); + + String httpRequest = readHttpMessage(clientInput); + + if (!httpRequest.startsWith("CONNECT ")) { + writeHttpStatus(clientOutput, SC_METHOD_NOT_ALLOWED); + return; + } + + serverSocket = new Socket("localhost", redirectToPort); + serverSocket.setSoTimeout(SO_TIMEOUT); + serverInput = serverSocket.getInputStream(); + serverOutput = serverSocket.getOutputStream(); + + writeHttpStatus(clientOutput, SC_OK); + + Thread serverToClient = new Thread(pipeline(serverInput, clientOutput)); + serverToClient.start(); + + pipeline(clientInput, serverOutput).run(); + + serverToClient.join(); + } catch (IOException | InterruptedException ignored) {} finally { + closeQuietly(clientInput, clientOutput, clientSocket, serverInput, serverOutput, serverSocket); + } + }; + } + + private Runnable pipeline(InputStream input, OutputStream output) { + return () -> { + byte[] buffer = new byte[4096]; + try { + int n; + while (running && -1 != (n = input.read(buffer))) { + output.write(buffer, 0, n); + if (input.available() < 1) { + output.flush(); + } + } + } catch (IOException ignored) {} + }; + } + + private static String readHttpMessage(InputStream input) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader(input)); + StringBuilder message = new StringBuilder(); + while (true) { + String line = reader.readLine(); + if (line == null || line.isEmpty()) { + break; + } + message.append(line).append("\r\n"); + } + return message.toString(); + } + + private static void writeHttpStatus(OutputStream output, int status) throws IOException { + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(output)); + writer.write("HTTP/1.1 " + status + " " + EnglishReasonPhraseCatalog.INSTANCE.getReason(status, null) + "\r\n"); + writer.write("\r\n"); + writer.flush(); + } + + private static void closeQuietly(Closeable... closeables) { + if (closeables == null) return; + for (Closeable closeable : closeables) { + if (closeable != null) { + try { + closeable.close(); + } catch (IOException ignored) {} + } + } + } +} diff --git a/java-client/src/test/java/org/opensearch/client/transport/util/SelfSignedCertificateAuthority.java b/java-client/src/test/java/org/opensearch/client/transport/util/SelfSignedCertificateAuthority.java new file mode 100644 index 0000000000..f2806980a8 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/transport/util/SelfSignedCertificateAuthority.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.util; + +import java.math.BigInteger; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.Provider; +import java.security.PublicKey; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.time.ZonedDateTime; +import java.util.Date; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier; +import org.bouncycastle.asn1.x509.BasicConstraints; +import org.bouncycastle.asn1.x509.ExtendedKeyUsage; +import org.bouncycastle.asn1.x509.Extension; +import org.bouncycastle.asn1.x509.GeneralName; +import org.bouncycastle.asn1.x509.GeneralNames; +import org.bouncycastle.asn1.x509.KeyPurposeId; +import org.bouncycastle.asn1.x509.KeyUsage; +import org.bouncycastle.cert.CertIOException; +import org.bouncycastle.cert.X509CertificateHolder; +import org.bouncycastle.cert.X509v3CertificateBuilder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.cert.jcajce.JcaX509ExtensionUtils; +import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder; +import org.bouncycastle.crypto.CryptoServicesRegistrar; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.OperatorCreationException; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; + +public class SelfSignedCertificateAuthority { + private static final Provider BC_PROVIDER = new BouncyCastleProvider(); + private static final String KEY_ALGORITHM = "RSA"; + private static final String SIGNATURE_ALGORITHM = "SHA256with" + KEY_ALGORITHM; + + private final PublicKey publicKey; + private final ContentSigner signer; + private final JcaX509CertificateConverter converter; + private final JcaX509ExtensionUtils extUtils; + private final X500Name issuingSubject; + private final AuthorityKeyIdentifier authorityKeyIdentifier; + private final X509Certificate certificate; + + public SelfSignedCertificateAuthority() throws NoSuchAlgorithmException, OperatorCreationException, CertIOException, + CertificateException { + KeyPair keyPair = generateKeyPair(); + publicKey = keyPair.getPublic(); + signer = new JcaContentSignerBuilder(SIGNATURE_ALGORITHM).build(keyPair.getPrivate()); + converter = new JcaX509CertificateConverter().setProvider(BC_PROVIDER); + extUtils = new JcaX509ExtensionUtils(); + issuingSubject = new X500Name("DC=localhost, O=localhost, OU=localhost Root CA, CN=localhost Root CA"); + + X509CertificateHolder certificate = newCertificate(issuingSubject, publicKey).addExtension( + Extension.authorityKeyIdentifier, + false, + extUtils.createAuthorityKeyIdentifier(publicKey) + ) + .addExtension(Extension.subjectKeyIdentifier, false, extUtils.createSubjectKeyIdentifier(publicKey)) + .addExtension(Extension.basicConstraints, true, new BasicConstraints(true)) + .addExtension(Extension.keyUsage, true, new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyCertSign | KeyUsage.cRLSign)) + .build(signer); + authorityKeyIdentifier = extUtils.createAuthorityKeyIdentifier(certificate); + this.certificate = converter.getCertificate(certificate); + } + + public X509Certificate getCertificate() { + return certificate; + } + + public GeneratedCertificate generateCertificate(X500Name subject, GeneralName[] subjectAlternateNames) throws NoSuchAlgorithmException, + CertIOException, CertificateException { + KeyPair keyPair = generateKeyPair(); + X509CertificateHolder certificate = newCertificate(subject, keyPair.getPublic()).addExtension( + Extension.authorityKeyIdentifier, + false, + authorityKeyIdentifier + ) + .addExtension(Extension.subjectKeyIdentifier, false, extUtils.createSubjectKeyIdentifier(publicKey)) + .addExtension(Extension.basicConstraints, true, new BasicConstraints(false)) + .addExtension( + Extension.keyUsage, + true, + new KeyUsage(KeyUsage.digitalSignature | KeyUsage.nonRepudiation | KeyUsage.keyEncipherment) + ) + .addExtension( + Extension.extendedKeyUsage, + true, + new ExtendedKeyUsage(new KeyPurposeId[] { KeyPurposeId.id_kp_serverAuth, KeyPurposeId.id_kp_clientAuth }) + ) + .addExtension(Extension.subjectAlternativeName, false, new GeneralNames(subjectAlternateNames)) + .build(signer); + return new GeneratedCertificate(this, keyPair.getPrivate(), converter.getCertificate(certificate)); + } + + private X509v3CertificateBuilder newCertificate(X500Name subject, PublicKey publicKey) { + ZonedDateTime start = ZonedDateTime.now().minusDays(1); + + return new JcaX509v3CertificateBuilder( + issuingSubject, + new BigInteger(Long.SIZE, CryptoServicesRegistrar.getSecureRandom()), + Date.from(start.toInstant()), + Date.from(start.plusDays(7).toInstant()), + subject, + publicKey + ); + } + + private static KeyPair generateKeyPair() throws NoSuchAlgorithmException { + KeyPairGenerator keyGen = KeyPairGenerator.getInstance(KEY_ALGORITHM, BC_PROVIDER); + keyGen.initialize(2048, CryptoServicesRegistrar.getSecureRandom()); + return keyGen.generateKeyPair(); + } + + public static class GeneratedCertificate { + private final SelfSignedCertificateAuthority ca; + private final PrivateKey privateKey; + private final X509Certificate certificate; + + private GeneratedCertificate(SelfSignedCertificateAuthority ca, PrivateKey privateKey, X509Certificate certificate) { + this.ca = ca; + this.privateKey = privateKey; + this.certificate = certificate; + } + + public PrivateKey getPrivateKey() { + return privateKey; + } + + public X509Certificate getCertificate() { + return certificate; + } + + public X509Certificate[] getCertificateChain() { + return new X509Certificate[] { certificate, ca.getCertificate() }; + } + } +}