From 55ec87833dc03abc44efe0f8e1c2e7860d9b8134 Mon Sep 17 00:00:00 2001 From: Wesley Workman Date: Wed, 10 Jul 2024 19:06:12 -0400 Subject: [PATCH] Fix for AwsSdk2Transport error handling (#1068) Signed-off-by: Wesley Workman --- CHANGELOG.md | 1 + .../transport/aws/AwsSdk2Transport.java | 134 +++++++++++------- .../integTest/aws/AwsSdk2GetRequestIT.java | 57 ++++++++ .../integTest/aws/AwsSdk2SearchIT.java | 17 --- .../aws/AwsSdk2TransportTestCase.java | 18 +++ 5 files changed, 159 insertions(+), 68 deletions(-) create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2GetRequestIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 02c9eca0c1..8ee194a9bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ This section is for maintaining a changelog for all breaking changes for the cli ### Fixed - Deserialize aggregation containing a parent aggregation ([#706](https://github.com/opensearch-project/opensearch-java/pull/706)) +- Error deserialization w/ AwsSdk2Transport ([#1068](https://github.com/opensearch-project/opensearch-java/pull/1068)) ### Security diff --git a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java index ad6718ec20..c334995a21 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java @@ -10,10 +10,7 @@ import jakarta.json.JsonObject; import jakarta.json.stream.JsonParser; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.UnsupportedEncodingException; +import java.io.*; import java.net.ConnectException; import java.net.SocketTimeoutException; import java.net.URI; @@ -48,6 +45,7 @@ import org.opensearch.client.transport.TransportOptions; import org.opensearch.client.transport.endpoints.BooleanEndpoint; import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.util.MissingRequiredPropertyException; import org.opensearch.client.util.OpenSearchRequestBodyBuffer; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; @@ -534,10 +532,17 @@ private ResponseT parseResponse( if (errorDeserializer == null || bodyStream == null) { throw new TransportException("Request failed with status code '" + statusCode + "'"); } + + // We may have to reset if there is a parse deserialization exception + bodyStream = toByteArrayInputStream(bodyStream); + try { try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) { ErrorT error = errorDeserializer.deserialize(parser, mapper); throw new OpenSearchException((ErrorResponse) error); + } catch (MissingRequiredPropertyException errorEx) { + bodyStream.reset(); + return decodeResponse(uri, method, protocol, httpResponse, bodyStream, endpoint, mapper); } } catch (OpenSearchException e) { throw e; @@ -551,57 +556,68 @@ private ResponseT parseResponse( } } } else { - if (endpoint instanceof BooleanEndpoint) { - BooleanEndpoint bep = (BooleanEndpoint) endpoint; - @SuppressWarnings("unchecked") - ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode)); - return response; - } else if (endpoint instanceof JsonEndpoint) { - JsonEndpoint jsonEndpoint = (JsonEndpoint) endpoint; - // Successful response - ResponseT response = null; - JsonpDeserializer responseParser = jsonEndpoint.responseDeserializer(); - if (responseParser != null) { - // Expecting a body - if (bodyStream == null) { - throw new TransportException("Expecting a response body, but none was sent"); - } - try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) { - try { - response = responseParser.deserialize(parser, mapper); - } catch (NullPointerException e) { - response = responseParser.deserialize(parser, mapper); - } - } - ; - } - return response; - } else if (endpoint instanceof GenericEndpoint) { - @SuppressWarnings("unchecked") - final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; + return decodeResponse(uri, method, protocol, httpResponse, bodyStream, endpoint, mapper); + } + } - String contentType = null; - if (bodyStream != null) { - contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null); + private ResponseT decodeResponse( + URI uri, + @Nonnull SdkHttpMethod method, + String protocol, + @Nonnull SdkHttpResponse httpResponse, + @CheckForNull InputStream bodyStream, + @Nonnull Endpoint endpoint, + JsonpMapper mapper + ) throws IOException { + if (endpoint instanceof BooleanEndpoint) { + BooleanEndpoint bep = (BooleanEndpoint) endpoint; + @SuppressWarnings("unchecked") + ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(httpResponse.statusCode())); + return response; + } else if (endpoint instanceof JsonEndpoint) { + JsonEndpoint jsonEndpoint = (JsonEndpoint) endpoint; + // Successful response + ResponseT response = null; + JsonpDeserializer responseParser = jsonEndpoint.responseDeserializer(); + if (responseParser != null) { + // Expecting a body + if (bodyStream == null) { + throw new TransportException("Expecting a response body, but none was sent"); + } + try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) { + try { + response = responseParser.deserialize(parser, mapper); + } catch (NullPointerException e) { + response = responseParser.deserialize(parser, mapper); + } } + } + return response; + } else if (endpoint instanceof GenericEndpoint) { + @SuppressWarnings("unchecked") + final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; - return rawEndpoint.responseDeserializer( - uri.toString(), - method.name(), - protocol, - httpResponse.statusCode(), - httpResponse.statusText().orElse(null), - httpResponse.headers() - .entrySet() - .stream() - .map(h -> new AbstractMap.SimpleEntry(h.getKey(), Objects.toString(h.getValue()))) - .collect(Collectors.toList()), - contentType, - bodyStream - ); - } else { - throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'"); + String contentType = null; + if (bodyStream != null) { + contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null); } + + return rawEndpoint.responseDeserializer( + uri.toString(), + method.name(), + protocol, + httpResponse.statusCode(), + httpResponse.statusText().orElse(null), + httpResponse.headers() + .entrySet() + .stream() + .map(h -> new AbstractMap.SimpleEntry(h.getKey(), Objects.toString(h.getValue()))) + .collect(Collectors.toList()), + contentType, + bodyStream + ); + } else { + throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'"); } } @@ -617,6 +633,22 @@ private static Optional or(Optional opt, Supplier -1) { + baos.write(buffer, 0, len); + } + baos.flush(); + return new ByteArrayInputStream(baos.toByteArray()); + } + /** * Wrap the exception so the caller's signature shows up in the stack trace, taking care to copy the original type and message * where possible so async and sync code don't have to check different exceptions. diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2GetRequestIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2GetRequestIT.java new file mode 100644 index 0000000000..03dc5afb16 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2GetRequestIT.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.aws; + +import java.util.concurrent.CompletableFuture; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.core.*; + +public class AwsSdk2GetRequestIT extends AwsSdk2TransportTestCase { + @Test + public void testSyncGetRequest() throws Exception { + resetTestIndex(false); + final OpenSearchClient client = getClient(false, null, null); + + SimplePojo doc1 = new SimplePojo("Document 1", "The text of document 1"); + addDoc(client, "id1", doc1); + + Thread.sleep(1000); + + GetRequest doc1Request = new GetRequest.Builder().index(TEST_INDEX).id("id1").build(); + GetResponse doc1Response = client.get(doc1Request, SimplePojo.class); + Assert.assertTrue(doc1Response.found()); + + GetRequest doc2Request = new GetRequest.Builder().index(TEST_INDEX).id("does-not-exist").build(); + GetResponse doc2Response = client.get(doc2Request, SimplePojo.class); + Assert.assertFalse(doc2Response.found()); + } + + @Test + public void testAsyncGetRequest() throws Exception { + resetTestIndex(false); + final OpenSearchAsyncClient client = getAsyncClient(false, null, null); + + SimplePojo doc1 = new SimplePojo("Document 1", "The text of document 1"); + addDoc(client, "id1", doc1).join(); + + Thread.sleep(1000); + + GetRequest doc1Request = new GetRequest.Builder().index(TEST_INDEX).id("id1").build(); + CompletableFuture> doc1ResponseFuture = client.get(doc1Request, SimplePojo.class); + + GetRequest doc2Request = new GetRequest.Builder().index(TEST_INDEX).id("does-not-exist").build(); + CompletableFuture> doc2ResponseFuture = client.get(doc2Request, SimplePojo.class); + + Assert.assertTrue(doc1ResponseFuture.join().found()); + Assert.assertFalse(doc2ResponseFuture.join().found()); + } +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2SearchIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2SearchIT.java index 7ec5b1f654..e563600fed 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2SearchIT.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2SearchIT.java @@ -16,7 +16,6 @@ import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.OpenSearchException; -import org.opensearch.client.opensearch.core.IndexRequest; import org.opensearch.client.opensearch.core.IndexResponse; import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.opensearch.indices.CreateIndexRequest; @@ -105,22 +104,6 @@ void testClientAsync(boolean async) throws Exception { Assert.assertEquals(doc1, response.hits().hits().get(0).source()); } - private void addDoc(OpenSearchClient client, String id, SimplePojo doc) throws Exception { - IndexRequest.Builder req = new IndexRequest.Builder().index(TEST_INDEX).document(doc).id(id); - client.index(req.build()); - } - - private CompletableFuture addDoc(OpenSearchAsyncClient client, String id, SimplePojo doc) { - IndexRequest.Builder req = new IndexRequest.Builder().index(TEST_INDEX).document(doc).id(id); - try { - return client.index(req.build()); - } catch (Exception e) { - final CompletableFuture failed = new CompletableFuture<>(); - failed.completeExceptionally(e); - return failed; - } - } - @Test public void testDoubleWrappedException() throws Exception { // ensure the test index exists diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2TransportTestCase.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2TransportTestCase.java index 79c4985b81..a7add62867 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2TransportTestCase.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2TransportTestCase.java @@ -26,6 +26,8 @@ import org.opensearch.client.opensearch._types.SortOptions; import org.opensearch.client.opensearch._types.SortOrder; import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.IndexResponse; import org.opensearch.client.opensearch.core.SearchRequest; import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.opensearch.indices.CreateIndexRequest; @@ -215,6 +217,22 @@ public void resetTestIndex(boolean async) throws Exception { client.create(req.build()); } + protected void addDoc(OpenSearchClient client, String id, SimplePojo doc) throws Exception { + IndexRequest.Builder req = new IndexRequest.Builder().index(TEST_INDEX).document(doc).id(id); + client.index(req.build()); + } + + protected CompletableFuture addDoc(OpenSearchAsyncClient client, String id, SimplePojo doc) { + IndexRequest.Builder req = new IndexRequest.Builder().index(TEST_INDEX).document(doc).id(id); + try { + return client.index(req.build()); + } catch (Exception e) { + final CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(e); + return failed; + } + } + protected SearchResponse query(OpenSearchClient client, String title, String text) throws Exception { final Query query = Query.of(qb -> { if (title != null) {