From 9364982dafe55ead5a1759121b092b6e82d1b2c8 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 25 Nov 2022 11:21:50 -0500 Subject: [PATCH] [FEATURE] Get rid of the dependency on OpenSearch core Signed-off-by: Andriy Redko --- .../httpclient5/ApacheHttpClient5Options.java | 266 ++++++ .../ApacheHttpClient5Transport.java | 757 ++++++++++++++++++ .../ApacheHttpClient5TransportBuilder.java | 380 +++++++++ .../transport/httpclient5/DeadHostState.java | 125 +++ .../HttpAsyncResponseConsumerFactory.java | 86 ++ .../transport/httpclient5/Response.java | 214 +++++ .../httpclient5/ResponseException.java | 96 +++ .../httpclient5/WarningFailureException.java | 76 ++ .../httpclient5/WarningsHandler.java | 80 ++ .../HeapBufferedAsyncEntityConsumer.java | 139 ++++ .../HeapBufferedAsyncResponseConsumer.java | 123 +++ .../HttpEntityAsyncEntityProducer.java | 182 +++++ .../internal/HttpUriRequestProducer.java | 62 ++ .../transport/httpclient5/internal/Node.java | 289 +++++++ .../httpclient5/internal/NodeSelector.java | 105 +++ .../integTest/httpclient5/CrudIT.java | 518 ++++++++++++ ...SearchApacheHttpClient5ClientTestCase.java | 254 ++++++ 17 files changed, 3752 insertions(+) create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/CrudIT.java create mode 100644 java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/OpenSearchApacheHttpClient5ClientTestCase.java diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java new file mode 100644 index 0000000000..324017a814 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java @@ -0,0 +1,266 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.Version; + +public class ApacheHttpClient5Options implements TransportOptions { + private static final String USER_AGENT = "User-Agent"; + + /** + * Default request options. + */ + public static final ApacheHttpClient5Options DEFAULT = new Builder( + Collections.emptyList(), + HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory.DEFAULT, + null, + null + ).build(); + + private final List
headers; + private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory; + private final WarningsHandler warningsHandler; + private final RequestConfig requestConfig; + + private ApacheHttpClient5Options(Builder builder) { + this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers)); + this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory; + this.warningsHandler = builder.warningsHandler; + this.requestConfig = builder.requestConfig; + } + + public HttpAsyncResponseConsumerFactory getHttpAsyncResponseConsumerFactory() { + return httpAsyncResponseConsumerFactory; + } + + public WarningsHandler getWarningsHandler() { + return warningsHandler; + } + + public RequestConfig getRequestConfig() { + return requestConfig; + } + + @Override + public Collection> headers() { + return headers.stream() + .map(h -> new AbstractMap.SimpleImmutableEntry<>(h.getName(), h.getValue())) + .collect(Collectors.toList()); + } + + @Override + public Map queryParameters() { + return null; + } + + @Override + public Function, Boolean> onWarnings() { + if (warningsHandler == null) { + return null; + } else { + return warnings -> warningsHandler.warningsShouldFailRequest(warnings); + } + } + + @Override + public Builder toBuilder() { + return new Builder(headers, httpAsyncResponseConsumerFactory, warningsHandler, requestConfig); + } + + public static class Builder implements TransportOptions.Builder { + private final List
headers; + private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory; + private WarningsHandler warningsHandler; + private RequestConfig requestConfig; + + private Builder(Builder builder) { + this(builder.headers, builder.httpAsyncResponseConsumerFactory, + builder.warningsHandler, builder.requestConfig); + } + + private Builder( + List
headers, + HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, + WarningsHandler warningsHandler, + RequestConfig requestConfig + ) { + this.headers = new ArrayList<>(headers); + this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory; + this.warningsHandler = warningsHandler; + this.requestConfig = requestConfig; + } + + /** + * Add the provided header to the request. + * + * @param name the header name + * @param value the header value + * @throws NullPointerException if {@code name} or {@code value} is null. + */ + @Override + public Builder addHeader(String name, String value) { + Objects.requireNonNull(name, "header name cannot be null"); + Objects.requireNonNull(value, "header value cannot be null"); + this.headers.add(new ReqHeader(name, value)); + return this; + } + + @Override + public TransportOptions.Builder setParameter(String name, String value) { + return this; + } + + /** + * Called if there are warnings to determine if those warnings should fail the request. + */ + @Override + public TransportOptions.Builder onWarnings(Function, Boolean> listener) { + if (listener == null) { + setWarningsHandler(null); + } else { + setWarningsHandler(w -> { + if (w != null && !w.isEmpty()) { + return listener.apply(w); + } else { + return false; + } + }); + } + + return this; + } + + /** + * Set the {@link HttpAsyncResponseConsumerFactory} used to create one + * {@link AsyncResponseConsumer} callback per retry. Controls how the + * response body gets streamed from a non-blocking HTTP connection on the + * client side. + * + * @param httpAsyncResponseConsumerFactory factory for creating {@link AsyncResponseConsumer}. + * @throws NullPointerException if {@code httpAsyncResponseConsumerFactory} is null. + */ + public void setHttpAsyncResponseConsumerFactory(HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) { + this.httpAsyncResponseConsumerFactory = Objects.requireNonNull( + httpAsyncResponseConsumerFactory, + "httpAsyncResponseConsumerFactory cannot be null" + ); + } + + /** + * How this request should handle warnings. If null (the default) then + * this request will default to the behavior dictacted by + * `setStrictDeprecationMode`. + *

+ * This can be set to {@link WarningsHandler#PERMISSIVE} if the client + * should ignore all warnings which is the same behavior as setting + * strictDeprecationMode to true. It can be set to + * {@link WarningsHandler#STRICT} if the client should fail if there are + * any warnings which is the same behavior as settings + * strictDeprecationMode to false. + *

+ * It can also be set to a custom implementation of + * {@linkplain WarningsHandler} to permit only certain warnings or to + * fail the request if the warnings returned don't + * exactly match some set. + * + * @param warningsHandler the {@link WarningsHandler} to be used + */ + public void setWarningsHandler(WarningsHandler warningsHandler) { + this.warningsHandler = warningsHandler; + } + + /** + * set RequestConfig, which can set socketTimeout, connectTimeout + * and so on by request + * @param requestConfig http client RequestConfig + * @return Builder + */ + public Builder setRequestConfig(RequestConfig requestConfig) { + this.requestConfig = requestConfig; + return this; + } + + @Override + public ApacheHttpClient5Options build() { + return new ApacheHttpClient5Options(this); + } + } + + static ApacheHttpClient5Options initialOptions() { + String ua = String.format( + Locale.ROOT, + "opensearch-java/%s (Java/%s)", + Version.VERSION == null ? "Unknown" : Version.VERSION.toString(), + System.getProperty("java.version") + ); + + return new ApacheHttpClient5Options( + DEFAULT.toBuilder() + .addHeader(USER_AGENT, ua) + .addHeader("Accept", ApacheHttpClient5Transport.JsonContentType.toString()) + ); + } + + static ApacheHttpClient5Options of(TransportOptions options) { + if (options instanceof ApacheHttpClient5Options) { + return (ApacheHttpClient5Options)options; + + } else { + final Builder builder = new Builder(DEFAULT.toBuilder()); + options.headers().forEach(h -> builder.addHeader(h.getKey(), h.getValue())); + options.queryParameters().forEach(builder::setParameter); + builder.onWarnings(options.onWarnings()); + return builder.build(); + } + } + + /** + * Custom implementation of {@link BasicHeader} that overrides equals and + * hashCode so it is easier to test equality of {@link ApacheHttpClient5Options}. + */ + static final class ReqHeader extends BasicHeader { + ReqHeader(String name, String value) { + super(name, value); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other instanceof ReqHeader) { + Header otherHeader = (Header) other; + return Objects.equals(getName(), otherHeader.getName()) && Objects.equals(getValue(), otherHeader.getValue()); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(getName(), getValue()); + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java new file mode 100644 index 0000000000..ddb63723f2 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java @@ -0,0 +1,757 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hc.client5.http.auth.AuthCache; +import org.apache.hc.client5.http.auth.AuthScheme; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.Credentials; +import org.apache.hc.client5.http.auth.CredentialsProvider; +import org.apache.hc.client5.http.classic.methods.HttpHead; +import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.client5.http.entity.GzipDecompressingEntity; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.auth.BasicAuthCache; +import org.apache.hc.client5.http.impl.auth.BasicScheme; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.io.entity.BufferedHttpEntity; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.opensearch.client.json.JsonpDeserializer; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.json.NdJsonpSerializable; +import org.opensearch.client.opensearch._types.ErrorResponse; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.JsonEndpoint; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.TransportException; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.endpoints.BooleanEndpoint; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.transport.httpclient5.internal.HttpUriRequestProducer; +import org.opensearch.client.transport.httpclient5.internal.Node; +import org.opensearch.client.transport.httpclient5.internal.NodeSelector; +import org.opensearch.client.util.MissingRequiredPropertyException; + +import jakarta.json.stream.JsonGenerator; +import jakarta.json.stream.JsonParser; + +public class ApacheHttpClient5Transport implements OpenSearchTransport { + private static final Log logger = LogFactory.getLog(ApacheHttpClient5Transport.class); + static final ContentType JsonContentType = ContentType.APPLICATION_JSON; + + private final JsonpMapper mapper; + private final CloseableHttpAsyncClient client; + private final ApacheHttpClient5Options transportOptions; + private final ConcurrentMap denylist = new ConcurrentHashMap<>(); + private final AtomicInteger lastNodeIndex = new AtomicInteger(0); + private volatile NodeTuple> nodeTuple; + private final NodeSelector nodeSelector; + private final WarningsHandler warningsHandler; + private final FailureListener failureListener; + private final boolean compressionEnabled; + private final boolean chunkedEnabled; + private final String pathPrefix; + private final List

defaultHeaders; + + public ApacheHttpClient5Transport(final CloseableHttpAsyncClient client, final Header[] defaultHeaders, + final List nodes, final JsonpMapper mapper, @Nullable TransportOptions options, final String pathPrefix, + final FailureListener failureListener, final NodeSelector nodeSelector, final boolean strictDeprecationMode, + final boolean compressionEnabled, final boolean chunkedEnabled) { + this.mapper = mapper; + this.client = client; + this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); + this.pathPrefix = pathPrefix; + this.transportOptions = (options == null) ? ApacheHttpClient5Options.initialOptions() : ApacheHttpClient5Options.of(options); + this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE; + this.nodeSelector = (nodeSelector == null) ? NodeSelector.ANY : nodeSelector; + this.failureListener = (failureListener == null) ? new FailureListener() : failureListener; + this.chunkedEnabled = chunkedEnabled; + this.compressionEnabled = compressionEnabled; + setNodes(nodes); + } + + @Override + public ResponseT performRequest(RequestT request, + Endpoint endpoint, TransportOptions options) throws IOException { + return performRequestAsync(request, endpoint, options).join(); + } + + @Override + public CompletableFuture performRequestAsync(RequestT request, + Endpoint endpoint, TransportOptions options) { + + final ApacheHttpClient5Options requestOptions = (options == null) ? transportOptions : ApacheHttpClient5Options.of(options); + final CompletableFuture future = new CompletableFuture<>(); + final HttpUriRequestBase clientReq = prepareLowLevelRequest(request, endpoint, options); + final WarningsHandler warningsHandler = (requestOptions.getWarningsHandler() == null) ? + this.warningsHandler : requestOptions.getWarningsHandler(); + + try { + performRequestAsync(nextNodes(), requestOptions, clientReq, warningsHandler, future); + } catch(final IOException ex) { + future.completeExceptionally(ex); + } + + return future.thenApply(r -> { + try { + return (ResponseT)prepareResponse(r, endpoint); + } catch (final IOException ex) { + throw new CompletionException(ex); + } + }); + } + + @Override + public JsonpMapper jsonpMapper() { + return mapper; + } + + @Override + public TransportOptions options() { + return transportOptions; + } + + @Override + public void close() throws IOException { + client.close(); + } + + private void performRequestAsync(final NodeTuple> nodeTuple, final ApacheHttpClient5Options options, + final HttpUriRequestBase request, final WarningsHandler warningsHandler, final CompletableFuture listener) { + final RequestContext context = createContextForNextAttempt(options, request, nodeTuple.nodes.next(), nodeTuple.authCache); + Future future = client.execute( + context.requestProducer, + context.asyncResponseConsumer, + context.context, + new FutureCallback() { + @Override + public void completed(ClassicHttpResponse httpResponse) { + try { + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, + httpResponse, warningsHandler); + if (responseOrResponseException.responseException == null) { + listener.complete(responseOrResponseException.response); + } else { + if (nodeTuple.nodes.hasNext()) { + performRequestAsync(nodeTuple, options, request, warningsHandler, listener); + } else { + listener.completeExceptionally(responseOrResponseException.responseException); + } + } + } catch (Exception e) { + listener.completeExceptionally(e); + } + } + + @Override + public void failed(Exception failure) { + try { + onFailure(context.node); + if (nodeTuple.nodes.hasNext()) { + performRequestAsync(nodeTuple, options, request, warningsHandler, listener); + } else { + listener.completeExceptionally(failure); + } + } catch (Exception e) { + listener.completeExceptionally(e); + } + } + + @Override + public void cancelled() { + listener.completeExceptionally(new CancellationException("request was cancelled")); + } + } + ); + + if (future instanceof org.apache.hc.core5.concurrent.Cancellable) { + request.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); + } + } + + /** + * Replaces the nodes with which the client communicates. + * + * @param nodes the new nodes to communicate with. + */ + private void setNodes(Collection nodes) { + if (nodes == null || nodes.isEmpty()) { + throw new IllegalArgumentException("nodes must not be null or empty"); + } + AuthCache authCache = new BasicAuthCache(); + + Map nodesByHost = new LinkedHashMap<>(); + for (Node node : nodes) { + Objects.requireNonNull(node, "node cannot be null"); + // TODO should we throw an IAE if we have two nodes with the same host? + nodesByHost.put(node.getHost(), node); + authCache.put(node.getHost(), new BasicScheme()); + } + this.nodeTuple = new NodeTuple<>(Collections.unmodifiableList(new ArrayList<>(nodesByHost.values())), authCache); + this.denylist.clear(); + } + + private ResponseOrResponseException convertResponse(final HttpUriRequestBase request, final Node node, + final ClassicHttpResponse httpResponse, final WarningsHandler warningsHandler) throws IOException { + int statusCode = httpResponse.getCode(); + + Optional.ofNullable(httpResponse.getEntity()) + .map(HttpEntity::getContentEncoding) + .filter("gzip"::equalsIgnoreCase) + .map(gzipHeaderValue -> new GzipDecompressingEntity(httpResponse.getEntity())) + .ifPresent(httpResponse::setEntity); + + Response response = new Response(new RequestLine(request), node.getHost(), httpResponse); + Set ignoreErrorCodes = getIgnoreErrorCodes("400,401,403,404,405", request.getMethod()); + if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { + onResponse(node); + if (warningsHandler.warningsShouldFailRequest(response.getWarnings())) { + throw new WarningFailureException(response); + } + return new ResponseOrResponseException(response); + } + ResponseException responseException = new ResponseException(response); + if (isRetryStatus(statusCode)) { + // mark host dead and retry against next one + onFailure(node); + return new ResponseOrResponseException(responseException); + } + // mark host alive and don't retry, as the error should be a request problem + onResponse(node); + throw responseException; + } + + private static Set getIgnoreErrorCodes(String ignoreString, String requestMethod) { + Set ignoreErrorCodes; + if (ignoreString == null) { + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + // 404 never causes error if returned for a HEAD request + ignoreErrorCodes = Collections.singleton(404); + } else { + ignoreErrorCodes = Collections.emptySet(); + } + } else { + String[] ignoresArray = ignoreString.split(","); + ignoreErrorCodes = new HashSet<>(); + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + // 404 never causes error if returned for a HEAD request + ignoreErrorCodes.add(404); + } + for (String ignoreCode : ignoresArray) { + try { + ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); + } + } + } + return ignoreErrorCodes; + } + + private static boolean isSuccessfulResponse(int statusCode) { + return statusCode < 300; + } + + private static boolean isRetryStatus(int statusCode) { + switch (statusCode) { + case 502: + case 503: + case 504: + return true; + } + return false; + } + + /** + * Returns a non-empty {@link Iterator} of nodes to be used for a request + * that match the {@link NodeSelector}. + *

+ * If there are no living nodes that match the {@link NodeSelector} + * this will return the dead node that matches the {@link NodeSelector} + * that is closest to being revived. + * @throws IOException if no nodes are available + */ + private NodeTuple> nextNodes() throws IOException { + NodeTuple> nodeTuple = this.nodeTuple; + Iterable hosts = selectNodes(nodeTuple, denylist, lastNodeIndex, nodeSelector); + return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache); + } + + /** + * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones + * if the previous attempt failed and so on. Package private for testing. + */ + static Iterable selectNodes( + NodeTuple> nodeTuple, + Map denylist, + AtomicInteger lastNodeIndex, + NodeSelector nodeSelector + ) throws IOException { + /* + * Sort the nodes into living and dead lists. + */ + List livingNodes = new ArrayList<>(Math.max(0, nodeTuple.nodes.size() - denylist.size())); + List deadNodes = new ArrayList<>(denylist.size()); + for (Node node : nodeTuple.nodes) { + DeadHostState deadness = denylist.get(node.getHost()); + if (deadness == null || deadness.shallBeRetried()) { + livingNodes.add(node); + } else { + deadNodes.add(new DeadNode(node, deadness)); + } + } + + if (false == livingNodes.isEmpty()) { + /* + * Normal state: there is at least one living node. If the + * selector is ok with any over the living nodes then use them + * for the request. + */ + List selectedLivingNodes = new ArrayList<>(livingNodes); + nodeSelector.select(selectedLivingNodes); + if (false == selectedLivingNodes.isEmpty()) { + /* + * Rotate the list using a global counter as the distance so subsequent + * requests will try the nodes in a different order. + */ + Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement()); + return selectedLivingNodes; + } + } + + /* + * Last resort: there are no good nodes to use, either because + * the selector rejected all the living nodes or because there aren't + * any living ones. Either way, we want to revive a single dead node + * that the NodeSelectors are OK with. We do this by passing the dead + * nodes through the NodeSelector so it can have its say in which nodes + * are ok. If the selector is ok with any of the nodes then we will take + * the one in the list that has the lowest revival time and try it. + */ + if (false == deadNodes.isEmpty()) { + final List selectedDeadNodes = new ArrayList<>(deadNodes); + /* + * We'd like NodeSelectors to remove items directly from deadNodes + * so we can find the minimum after it is filtered without having + * to compare many things. This saves us a sort on the unfiltered + * list. + */ + nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator())); + if (false == selectedDeadNodes.isEmpty()) { + return Collections.singletonList(Collections.min(selectedDeadNodes).node); + } + } + throw new IOException( + "NodeSelector [" + nodeSelector + "] rejected all nodes, " + "living " + livingNodes + " and dead " + deadNodes + ); + } + + /** + * Called after each failed attempt. + * Receives as an argument the host that was used for the failed attempt. + */ + private void onFailure(Node node) { + while (true) { + DeadHostState previousDeadHostState = denylist.putIfAbsent( + node.getHost(), + new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER) + ); + if (previousDeadHostState == null) { + if (logger.isDebugEnabled()) { + logger.debug("added [" + node + "] to denylist"); + } + break; + } + if (denylist.replace(node.getHost(), previousDeadHostState, new DeadHostState(previousDeadHostState))) { + if (logger.isDebugEnabled()) { + logger.debug("updated [" + node + "] already in denylist"); + } + break; + } + } + } + + private RequestContext createContextForNextAttempt(final ApacheHttpClient5Options options, + final HttpUriRequestBase request, final Node node, final AuthCache authCache) { + request.reset(); + + if (options.getRequestConfig() != null) { + request.setConfig(options.getRequestConfig()); + } + + return new RequestContext(options, request, node, authCache); + } + + private ResponseT prepareResponse(Response clientResp, + Endpoint endpoint + ) throws IOException { + + try { + int statusCode = clientResp.getStatusLine().getStatusCode(); + + if (endpoint.isError(statusCode)) { + JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); + if (errorDeserializer == null) { + throw new TransportException( + "Request failed with status code '" + statusCode + "'", + new ResponseException(clientResp) + ); + } + + HttpEntity entity = clientResp.getEntity(); + if (entity == null) { + throw new TransportException( + "Expecting a response body, but none was sent", + new ResponseException(clientResp) + ); + } + + // We may have to replay it. + entity = new BufferedHttpEntity(entity); + + try { + InputStream content = entity.getContent(); + try (JsonParser parser = mapper.jsonProvider().createParser(content)) { + ErrorT error = errorDeserializer.deserialize(parser, mapper); + // TODO: have the endpoint provide the exception constructor + throw new OpenSearchException((ErrorResponse) error); + } + } catch(MissingRequiredPropertyException errorEx) { + // Could not decode exception, try the response type + try { + ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint); + return response; + } catch(Exception respEx) { + // No better luck: throw the original error decoding exception + throw new TransportException("Failed to decode error response", new ResponseException(clientResp)); + } + } + } else { + return decodeResponse(statusCode, clientResp.getEntity(), clientResp, endpoint); + } + } finally { + EntityUtils.consume(clientResp.getEntity()); + } + } + + private HttpUriRequestBase prepareLowLevelRequest( + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options + ) { + String method = endpoint.method(request); + String path = endpoint.requestUrl(request); + Map params = endpoint.queryParameters(request); + + final URI uri = URI.create(path + (params.isEmpty() ? "" : params + .entrySet() + .stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining("&")))); + final HttpUriRequestBase clientReq = new HttpUriRequestBase(method, uri); + if (endpoint.hasRequestBody()) { + // Request has a body and must implement JsonpSerializable or NdJsonpSerializable + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + if (request instanceof NdJsonpSerializable) { + writeNdJson((NdJsonpSerializable) request, baos); + } else { + JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); + mapper.serialize(request, generator); + generator.close(); + } + + clientReq.setEntity(new ByteArrayEntity(baos.toByteArray(), JsonContentType)); + } + + return clientReq; + } + + /** + * Called after each successful request call. + * Receives as an argument the host that was used for the successful request. + */ + private void onResponse(Node node) { + DeadHostState removedHost = this.denylist.remove(node.getHost()); + if (logger.isDebugEnabled() && removedHost != null) { + logger.debug("removed [" + node + "] from denylist"); + } + } + + private ResponseT decodeResponse( + int statusCode, @Nullable HttpEntity entity, Response clientResp, Endpoint endpoint + ) throws IOException { + + if (endpoint instanceof BooleanEndpoint) { + BooleanEndpoint bep = (BooleanEndpoint) endpoint; + + @SuppressWarnings("unchecked") + ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode)); + return response; + + } else if (endpoint instanceof JsonEndpoint){ + @SuppressWarnings("unchecked") + JsonEndpoint jsonEndpoint = (JsonEndpoint)endpoint; + // Successful response + ResponseT response = null; + JsonpDeserializer responseParser = jsonEndpoint.responseDeserializer(); + if (responseParser != null) { + // Expecting a body + if (entity == null) { + throw new TransportException( + "Expecting a response body, but none was sent", + new ResponseException(clientResp) + ); + } + InputStream content = entity.getContent(); + try (JsonParser parser = mapper.jsonProvider().createParser(content)) { + response = responseParser.deserialize(parser, mapper); + }; + } + return response; + } else { + throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'"); + } + } + + /** + * {@link NodeTuple} enables the {@linkplain Node}s and {@linkplain AuthCache} + * to be set together in a thread safe, volatile way. + */ + static class NodeTuple { + final T nodes; + final AuthCache authCache; + + NodeTuple(final T nodes, final AuthCache authCache) { + this.nodes = nodes; + this.authCache = authCache; + } + } + + /** + * Contains a reference to a denylisted node and the time until it is + * revived. We use this so we can do a single pass over the denylist. + */ + private static class DeadNode implements Comparable { + final Node node; + final DeadHostState deadness; + + DeadNode(Node node, DeadHostState deadness) { + this.node = node; + this.deadness = deadness; + } + + @Override + public String toString() { + return node.toString(); + } + + @Override + public int compareTo(DeadNode rhs) { + return deadness.compareTo(rhs.deadness); + } + } + + /** + * Adapts an Iterator<DeadNodeAndRevival> into an + * Iterator<Node>. + */ + private static class DeadNodeIteratorAdapter implements Iterator { + private final Iterator itr; + + private DeadNodeIteratorAdapter(Iterator itr) { + this.itr = itr; + } + + @Override + public boolean hasNext() { + return itr.hasNext(); + } + + @Override + public Node next() { + return itr.next().node; + } + + @Override + public void remove() { + itr.remove(); + } + } + + /** + * Write an nd-json value by serializing each of its items on a separate line, recursing if its items themselves implement + * {@link NdJsonpSerializable} to flattening nested structures. + */ + private void writeNdJson(NdJsonpSerializable value, ByteArrayOutputStream baos) { + Iterator values = value._serializables(); + while(values.hasNext()) { + Object item = values.next(); + if (item instanceof NdJsonpSerializable && item != value) { // do not recurse on the item itself + writeNdJson((NdJsonpSerializable) item, baos); + } else { + JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); + mapper.serialize(item, generator); + generator.close(); + baos.write('\n'); + } + } + } + + private static class RequestContext { + private final Node node; + private final AsyncRequestProducer requestProducer; + private final AsyncResponseConsumer asyncResponseConsumer; + private final HttpClientContext context; + + RequestContext(final ApacheHttpClient5Options options, final HttpUriRequestBase request, + final Node node, final AuthCache authCache) { + this.node = node; + this.requestProducer = HttpUriRequestProducer.create(request, node.getHost()); + this.asyncResponseConsumer = options + .getHttpAsyncResponseConsumerFactory() + .createHttpAsyncResponseConsumer(); + this.context = HttpClientContext.create(); + context.setAuthCache(new WrappingAuthCache(context, authCache)); + } + } + + /** + * The Apache HttpClient 5 adds "Authorization" header even if the credentials for basic authentication are not provided + * (effectively, username and password are 'null'). To workaround that, wrapping the AuthCache around current HttpClientContext + * and ensuring that the credentials are indeed provided for particular HttpHost, otherwise returning no authentication scheme + * even if it is present in the cache. + */ + private static class WrappingAuthCache implements AuthCache { + private final HttpClientContext context; + private final AuthCache delegate; + private final boolean usePersistentCredentials = true; + + WrappingAuthCache(HttpClientContext context, AuthCache delegate) { + this.context = context; + this.delegate = delegate; + } + + @Override + public void put(HttpHost host, AuthScheme authScheme) { + delegate.put(host, authScheme); + } + + @Override + public AuthScheme get(HttpHost host) { + AuthScheme authScheme = delegate.get(host); + + if (authScheme != null) { + final CredentialsProvider credsProvider = context.getCredentialsProvider(); + if (credsProvider != null) { + final String schemeName = authScheme.getName(); + final AuthScope authScope = new AuthScope(host, null, schemeName); + final Credentials creds = credsProvider.getCredentials(authScope, context); + + // See please https://issues.apache.org/jira/browse/HTTPCLIENT-2203 + if (authScheme instanceof BasicScheme) { + ((BasicScheme) authScheme).initPreemptive(creds); + } + + if (creds == null) { + return null; + } + } + } + + return authScheme; + } + + @Override + public void remove(HttpHost host) { + if (!usePersistentCredentials) { + delegate.remove(host); + } + } + + @Override + public void clear() { + delegate.clear(); + } + + } + + private static class ResponseOrResponseException { + private final Response response; + private final ResponseException responseException; + + ResponseOrResponseException(Response response) { + this.response = Objects.requireNonNull(response); + this.responseException = null; + } + + ResponseOrResponseException(ResponseException responseException) { + this.responseException = Objects.requireNonNull(responseException); + this.response = null; + } + } + + /** + * Listener that allows to be notified whenever a failure happens. Useful when sniffing is enabled, so that we can sniff on failure. + * The default implementation is a no-op. + */ + public static class FailureListener { + /** + * Create a {@link FailureListener} instance. + */ + public FailureListener() {} + + /** + * Notifies that the node provided as argument has just failed. + * + * @param node The node which has failed. + */ + public void onFailure(Node node) {} + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java new file mode 100644 index 0000000000..f3da47658b --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java @@ -0,0 +1,380 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.security.AccessController; +import java.security.NoSuchAlgorithmException; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.apache.hc.client5.http.auth.CredentialsProvider; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.core5.function.Factory; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.reactor.ssl.TlsDetails; +import org.apache.hc.core5.util.Timeout; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.transport.httpclient5.internal.Node; +import org.opensearch.client.transport.httpclient5.internal.NodeSelector; + +public class ApacheHttpClient5TransportBuilder { + /** + * The default connection timeout in milliseconds. + */ + public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000; + + /** + * The default response timeout in milliseconds. + */ + public static final int DEFAULT_RESPONSE_TIMEOUT_MILLIS = 30000; + + /** + * The default maximum of connections per route. + */ + public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10; + + /** + * The default maximum total connections. + */ + public static final int DEFAULT_MAX_CONN_TOTAL = 30; + + private static final Header[] EMPTY_HEADERS = new Header[0]; + + private final List nodes; + private Header[] defaultHeaders = EMPTY_HEADERS; + private ApacheHttpClient5Transport.FailureListener failureListener; + private HttpClientConfigCallback httpClientConfigCallback; + private RequestConfigCallback requestConfigCallback; + private String pathPrefix; + private NodeSelector nodeSelector = NodeSelector.ANY; + private boolean strictDeprecationMode = false; + private boolean compressionEnabled = false; + private Optional chunkedEnabled; + + /** + * Creates a new builder instance and sets the hosts that the client will send requests to. + * + * @throws IllegalArgumentException if {@code nodes} is {@code null} or empty. + */ + ApacheHttpClient5TransportBuilder(List nodes) { + if (nodes == null || nodes.isEmpty()) { + throw new IllegalArgumentException("nodes must not be null or empty"); + } + for (Node node : nodes) { + if (node == null) { + throw new IllegalArgumentException("node cannot be null"); + } + } + this.nodes = nodes; + this.chunkedEnabled = Optional.empty(); + } + + /** + * Sets the default request headers, which will be sent along with each request. + *

+ * Request-time headers will always overwrite any default headers. + * + * @param defaultHeaders array of default header + * @throws NullPointerException if {@code defaultHeaders} or any header is {@code null}. + */ + public ApacheHttpClient5TransportBuilder setDefaultHeaders(Header[] defaultHeaders) { + Objects.requireNonNull(defaultHeaders, "defaultHeaders must not be null"); + for (Header defaultHeader : defaultHeaders) { + Objects.requireNonNull(defaultHeader, "default header must not be null"); + } + this.defaultHeaders = defaultHeaders; + return this; + } + + /** + * Sets the {@link RestClient.FailureListener} to be notified for each request failure + * + * @param failureListener the {@link RestClient.FailureListener} for each failure + * @throws NullPointerException if {@code failureListener} is {@code null}. + */ + public ApacheHttpClient5TransportBuilder setFailureListener(ApacheHttpClient5Transport.FailureListener failureListener) { + Objects.requireNonNull(failureListener, "failureListener must not be null"); + this.failureListener = failureListener; + return this; + } + + /** + * Sets the {@link HttpClientConfigCallback} to be used to customize http client configuration + * + * @param httpClientConfigCallback the {@link HttpClientConfigCallback} to be used + * @throws NullPointerException if {@code httpClientConfigCallback} is {@code null}. + */ + public ApacheHttpClient5TransportBuilder setHttpClientConfigCallback(HttpClientConfigCallback httpClientConfigCallback) { + Objects.requireNonNull(httpClientConfigCallback, "httpClientConfigCallback must not be null"); + this.httpClientConfigCallback = httpClientConfigCallback; + return this; + } + + /** + * Sets the {@link RequestConfigCallback} to be used to customize http client configuration + * + * @param requestConfigCallback the {@link RequestConfigCallback} to be used + * @throws NullPointerException if {@code requestConfigCallback} is {@code null}. + */ + public ApacheHttpClient5TransportBuilder setRequestConfigCallback(RequestConfigCallback requestConfigCallback) { + Objects.requireNonNull(requestConfigCallback, "requestConfigCallback must not be null"); + this.requestConfigCallback = requestConfigCallback; + return this; + } + + /** + * Sets the path's prefix for every request used by the http client. + *

+ * For example, if this is set to "/my/path", then any client request will become "/my/path/" + endpoint. + *

+ * In essence, every request's {@code endpoint} is prefixed by this {@code pathPrefix}. The path prefix is useful for when + * OpenSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; + * it is not intended for other purposes and it should not be supplied in other scenarios. + * + * @param pathPrefix the path prefix for every request. + * @throws NullPointerException if {@code pathPrefix} is {@code null}. + * @throws IllegalArgumentException if {@code pathPrefix} is empty, or ends with more than one '/'. + */ + public ApacheHttpClient5TransportBuilder setPathPrefix(String pathPrefix) { + this.pathPrefix = cleanPathPrefix(pathPrefix); + return this; + } + + /** + * Cleans up the given path prefix to ensure that looks like "/base/path". + * + * @param pathPrefix the path prefix to be cleaned up. + * @return the cleaned up path prefix. + * @throws NullPointerException if {@code pathPrefix} is {@code null}. + * @throws IllegalArgumentException if {@code pathPrefix} is empty, or ends with more than one '/'. + */ + public static String cleanPathPrefix(String pathPrefix) { + Objects.requireNonNull(pathPrefix, "pathPrefix must not be null"); + + if (pathPrefix.isEmpty()) { + throw new IllegalArgumentException("pathPrefix must not be empty"); + } + + String cleanPathPrefix = pathPrefix; + if (cleanPathPrefix.startsWith("/") == false) { + cleanPathPrefix = "/" + cleanPathPrefix; + } + + // best effort to ensure that it looks like "/base/path" rather than "/base/path/" + if (cleanPathPrefix.endsWith("/") && cleanPathPrefix.length() > 1) { + cleanPathPrefix = cleanPathPrefix.substring(0, cleanPathPrefix.length() - 1); + + if (cleanPathPrefix.endsWith("/")) { + throw new IllegalArgumentException("pathPrefix is malformed. too many trailing slashes: [" + pathPrefix + "]"); + } + } + return cleanPathPrefix; + } + + /** + * Sets the {@link NodeSelector} to be used for all requests. + * + * @param nodeSelector the {@link NodeSelector} to be used + * @throws NullPointerException if the provided nodeSelector is null + */ + public ApacheHttpClient5TransportBuilder setNodeSelector(NodeSelector nodeSelector) { + Objects.requireNonNull(nodeSelector, "nodeSelector must not be null"); + this.nodeSelector = nodeSelector; + return this; + } + + /** + * Whether the REST client should return any response containing at least + * one warning header as a failure. + * + * @param strictDeprecationMode flag for enabling strict deprecation mode + */ + public ApacheHttpClient5TransportBuilder setStrictDeprecationMode(boolean strictDeprecationMode) { + this.strictDeprecationMode = strictDeprecationMode; + return this; + } + + /** + * Whether the REST client should compress requests using gzip content encoding and add the "Accept-Encoding: gzip" + * header to receive compressed responses. + * + * @param compressionEnabled flag for enabling compression + */ + public ApacheHttpClient5TransportBuilder setCompressionEnabled(boolean compressionEnabled) { + this.compressionEnabled = compressionEnabled; + return this; + } + + /** + * Whether the REST client should use Transfer-Encoding: chunked for requests or not" + * + * @param chunkedEnabled force enable/disable chunked transfer-encoding. + */ + public ApacheHttpClient5TransportBuilder setChunkedEnabled(boolean chunkedEnabled) { + this.chunkedEnabled = Optional.of(chunkedEnabled); + return this; + } + + /** + * Creates a new {@link RestClient} based on the provided configuration. + */ + public ApacheHttpClient5Transport build() { + if (failureListener == null) { + failureListener = new ApacheHttpClient5Transport.FailureListener(); + } + CloseableHttpAsyncClient httpClient = AccessController.doPrivileged( + (PrivilegedAction) this::createHttpClient + ); + + final ApacheHttpClient5Transport transport = new ApacheHttpClient5Transport( + httpClient, + defaultHeaders, + nodes, + new JacksonJsonpMapper(), + null, /* options */ + pathPrefix, + failureListener, + nodeSelector, + strictDeprecationMode, + compressionEnabled, + chunkedEnabled.orElse(false) + ); + + httpClient.start(); + return transport; + } + + /** + * Returns a new {@link ApacheHttpClient5TransportBuilder} to help with {@link ApacheHttpClient5Transport} creation. + * Creates a new builder instance and sets the hosts that the client will send requests to. + *

+ * Prefer this to {@link #builder(HttpHost...)} if you have metadata up front about the nodes. + * If you don't either one is fine. + * + * @param nodes The nodes that the client will send requests to. + */ + public static ApacheHttpClient5TransportBuilder builder(Node... nodes) { + return new ApacheHttpClient5TransportBuilder(nodes == null ? null : Arrays.asList(nodes)); + } + + /** + * Returns a new {@link ApacheHttpClient5TransportBuilder} to help with {@link ApacheHttpClient5Transport} creation. + * Creates a new builder instance and sets the nodes that the client will send requests to. + *

+ * You can use this if you do not have metadata up front about the nodes. If you do, prefer + * {@link #builder(Node...)}. + * @see Node#Node(HttpHost) + * + * @param hosts The hosts that the client will send requests to. + */ + public static ApacheHttpClient5TransportBuilder builder(HttpHost... hosts) { + if (hosts == null || hosts.length == 0) { + throw new IllegalArgumentException("hosts must not be null nor empty"); + } + List nodes = Arrays.stream(hosts).map(Node::new).collect(Collectors.toList()); + return new ApacheHttpClient5TransportBuilder(nodes); + } + + private CloseableHttpAsyncClient createHttpClient() { + // default timeouts are all infinite + RequestConfig.Builder requestConfigBuilder = RequestConfig.custom() + .setConnectTimeout(Timeout.ofMilliseconds(DEFAULT_CONNECT_TIMEOUT_MILLIS)) + .setResponseTimeout(Timeout.ofMilliseconds(DEFAULT_RESPONSE_TIMEOUT_MILLIS)); + + if (requestConfigCallback != null) { + requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder); + } + + try { + final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() + .setSslContext(SSLContext.getDefault()) + // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219 + .setTlsDetailsFactory(new Factory() { + @Override + public TlsDetails create(final SSLEngine sslEngine) { + return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()); + } + }) + .build(); + + final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE) + .setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL) + .setTlsStrategy(tlsStrategy) + .build(); + + HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create() + .setDefaultRequestConfig(requestConfigBuilder.build()) + .setConnectionManager(connectionManager) + .setTargetAuthenticationStrategy(DefaultAuthenticationStrategy.INSTANCE) + .disableAutomaticRetries(); + if (httpClientConfigCallback != null) { + httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder); + } + + final HttpAsyncClientBuilder finalBuilder = httpClientBuilder; + return AccessController.doPrivileged((PrivilegedAction) finalBuilder::build); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("could not create the default ssl context", e); + } + } + + /** + * Callback used the default {@link RequestConfig} being set to the {@link CloseableHttpClient} + * @see HttpClientBuilder#setDefaultRequestConfig + */ + public interface RequestConfigCallback { + /** + * Allows to customize the {@link RequestConfig} that will be used with each request. + * It is common to customize the different timeout values through this method without losing any other useful default + * value that the {@link RestClientBuilder} internally sets. + * + * @param requestConfigBuilder the {@link RestClientBuilder} for customizing the request configuration. + */ + RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder); + } + + /** + * Callback used to customize the {@link CloseableHttpClient} instance used by a {@link RestClient} instance. + * Allows to customize default {@link RequestConfig} being set to the client and any parameter that + * can be set through {@link HttpClientBuilder} + */ + public interface HttpClientConfigCallback { + /** + * Allows to customize the {@link CloseableHttpAsyncClient} being created and used by the {@link RestClient}. + * Commonly used to customize the default {@link CredentialsProvider} for authentication for communication + * through TLS/SSL without losing any other useful default value that the {@link RestClientBuilder} internally + * sets, like connection pooling. + * + * @param httpClientBuilder the {@link HttpClientBuilder} for customizing the client instance. + */ + HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder); + } + + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java new file mode 100644 index 0000000000..f186d2033d --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and + * when the host should be retried (based on number of previous failed attempts). + * Class is immutable, a new copy of it should be created each time the state has to be changed. + */ +final class DeadHostState implements Comparable { + + private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1); + static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30); + static final Supplier DEFAULT_TIME_SUPPLIER = System::nanoTime; + + private final int failedAttempts; + private final long deadUntilNanos; + private final Supplier timeSupplier; + + /** + * Build the initial dead state of a host. Useful when a working host stops functioning + * and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so. + * + * @param timeSupplier a way to supply the current time and allow for unit testing + */ + DeadHostState(Supplier timeSupplier) { + this.failedAttempts = 1; + this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS; + this.timeSupplier = timeSupplier; + } + + /** + * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence + * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait + * to retry that same host again. Minimum is 1 minute (for a node the only failed once created + * through {@link #DeadHostState(Supplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times) + * + * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt + */ + DeadHostState(DeadHostState previousDeadHostState) { + long timeoutNanos = (long) Math.min( + MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1), + MAX_CONNECTION_TIMEOUT_NANOS + ); + this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos; + this.failedAttempts = previousDeadHostState.failedAttempts + 1; + this.timeSupplier = previousDeadHostState.timeSupplier; + } + + /** + * Indicates whether it's time to retry to failed host or not. + * + * @return true if the host should be retried, false otherwise + */ + boolean shallBeRetried() { + return timeSupplier.get() - deadUntilNanos > 0; + } + + /** + * Returns the timestamp (nanos) till the host is supposed to stay dead without being retried. + * After that the host should be retried. + */ + long getDeadUntilNanos() { + return deadUntilNanos; + } + + int getFailedAttempts() { + return failedAttempts; + } + + @Override + public int compareTo(DeadHostState other) { + if (timeSupplier != other.timeSupplier) { + throw new IllegalArgumentException( + "can't compare DeadHostStates holding different time suppliers as they may " + "be based on different clocks" + ); + } + return Long.compare(deadUntilNanos, other.deadUntilNanos); + } + + @Override + public String toString() { + return "DeadHostState{" + + "failedAttempts=" + + failedAttempts + + ", deadUntilNanos=" + + deadUntilNanos + + ", timeSupplier=" + + timeSupplier + + '}'; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java new file mode 100644 index 0000000000..348a466da9 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.opensearch.client.nio.HeapBufferedAsyncResponseConsumer; + +/** + * Factory used to create instances of {@link AsyncResponseConsumer}. Each request retry needs its own instance of the + * consumer object. Users can implement this interface and pass their own instance to the specialized + * performRequest methods that accept an {@link HttpAsyncResponseConsumerFactory} instance as argument. + */ +public interface HttpAsyncResponseConsumerFactory { + + /** + * Creates the default type of {@link AsyncResponseConsumer}, based on heap buffering with a buffer limit of 100MB. + */ + HttpAsyncResponseConsumerFactory DEFAULT = new HeapBufferedResponseConsumerFactory( + HeapBufferedResponseConsumerFactory.DEFAULT_BUFFER_LIMIT); + + /** + * Creates the {@link AsyncResponseConsumer}, called once per request attempt. + */ + AsyncResponseConsumer createHttpAsyncResponseConsumer(); + + /** + * Default factory used to create instances of {@link AsyncResponseConsumer}. + * Creates one instance of {@link HeapBufferedAsyncResponseConsumer} for each request attempt, with a configurable + * buffer limit which defaults to 100MB. + */ + class HeapBufferedResponseConsumerFactory implements HttpAsyncResponseConsumerFactory { + + // default buffer limit is 100MB + static final int DEFAULT_BUFFER_LIMIT = 100 * 1024 * 1024; + + private final int bufferLimit; + + /** + * Creates a {@link HeapBufferedResponseConsumerFactory} instance with the given buffer limit. + * + * @param bufferLimitBytes the buffer limit to be applied to this instance + */ + public HeapBufferedResponseConsumerFactory(int bufferLimitBytes) { + this.bufferLimit = bufferLimitBytes; + } + + /** + * Creates the {@link AsyncResponseConsumer}, called once per request attempt. + */ + @Override + public AsyncResponseConsumer createHttpAsyncResponseConsumer() { + return new HeapBufferedAsyncResponseConsumer(bufferLimit); + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java new file mode 100644 index 0000000000..2f36d517de --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java @@ -0,0 +1,214 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.message.StatusLine; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Holds an opensearch response. It wraps the {@link HttpResponse} returned and associates it with + * its corresponding {@link RequestLine} and {@link HttpHost}. + */ +final class Response { + + private final RequestLine requestLine; + private final HttpHost host; + private final ClassicHttpResponse response; + + Response(RequestLine requestLine, HttpHost host, ClassicHttpResponse response) { + Objects.requireNonNull(requestLine, "requestLine cannot be null"); + Objects.requireNonNull(host, "host cannot be null"); + Objects.requireNonNull(response, "response cannot be null"); + this.requestLine = requestLine; + this.host = host; + this.response = response; + } + + /** + * Returns the request line that generated this response + */ + public RequestLine getRequestLine() { + return requestLine; + } + + /** + * Returns the node that returned this response + */ + public HttpHost getHost() { + return host; + } + + /** + * Returns the status line of the current response + */ + public StatusLine getStatusLine() { + return new StatusLine(response); + } + + /** + * Returns all the response headers + */ + public Header[] getHeaders() { + return response.getHeaders(); + } + + /** + * Returns the value of the first header with a specified name of this message. + * If there is more than one matching header in the message the first element is returned. + * If there is no matching header in the message null is returned. + * + * @param name header name + */ + public String getHeader(String name) { + Header header = response.getFirstHeader(name); + if (header == null) { + return null; + } + return header.getValue(); + } + + /** + * Returns the response body available, null otherwise + * @see HttpEntity + */ + public HttpEntity getEntity() { + return response.getEntity(); + } + + /** + * Optimized regular expression to test if a string matches the RFC 1123 date + * format (with quotes and leading space). Start/end of line characters and + * atomic groups are used to prevent backtracking. + */ + private static final Pattern WARNING_HEADER_DATE_PATTERN = Pattern.compile("^ " + // start of line, leading space + // quoted RFC 1123 date format + "\"" + // opening quote + "(?>Mon|Tue|Wed|Thu|Fri|Sat|Sun), " + // day of week, atomic group to prevent backtracking + "\\d{2} " + // 2-digit day + "(?>Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) " + // month, atomic group to prevent backtracking + "\\d{4} " + // 4-digit year + "\\d{2}:\\d{2}:\\d{2} " + // (two-digit hour):(two-digit minute):(two-digit second) + "GMT" + // GMT + "\"$"); // closing quote (optional, since an older version can still send a warn-date), end of line + + /** + * Length of RFC 1123 format (with quotes and leading space), used in + * matchWarningHeaderPatternByPrefix(String). + */ + private static final int WARNING_HEADER_DATE_LENGTH = 0 + 1 + 1 + 3 + 1 + 1 + 2 + 1 + 3 + 1 + 4 + 1 + 2 + 1 + 2 + 1 + 2 + 1 + 3 + 1; + + /** + * Tests if a string matches the RFC 7234 specification for warning headers. + * This assumes that the warn code is always 299 and the warn agent is always + * OpenSearch. + * + * @param s the value of a warning header formatted according to RFC 7234 + * @return {@code true} if the input string matches the specification + */ + private static boolean matchWarningHeaderPatternByPrefix(final String s) { + return s.startsWith("299 OpenSearch-"); + } + + /** + * Refer to org.opensearch.common.logging.DeprecationLogger + */ + private static String extractWarningValueFromWarningHeader(final String s) { + String warningHeader = s; + + /* + * The following block tests for the existence of a RFC 1123 date in the warning header. If the date exists, it is removed for + * extractWarningValueFromWarningHeader(String) to work properly (as it does not handle dates). + */ + if (s.length() > WARNING_HEADER_DATE_LENGTH) { + final String possibleDateString = s.substring(s.length() - WARNING_HEADER_DATE_LENGTH); + final Matcher matcher = WARNING_HEADER_DATE_PATTERN.matcher(possibleDateString); + + if (matcher.matches()) { + warningHeader = warningHeader.substring(0, s.length() - WARNING_HEADER_DATE_LENGTH); + } + } + + final int firstQuote = warningHeader.indexOf('\"'); + final int lastQuote = warningHeader.length() - 1; + final String warningValue = warningHeader.substring(firstQuote + 1, lastQuote); + return warningValue; + } + + /** + * Returns a list of all warning headers returned in the response. + */ + public List getWarnings() { + List warnings = new ArrayList<>(); + for (Header header : response.getHeaders("Warning")) { + String warning = header.getValue(); + if (matchWarningHeaderPatternByPrefix(warning)) { + warnings.add(extractWarningValueFromWarningHeader(warning)); + } else { + warnings.add(warning); + } + } + return warnings; + } + + /** + * Returns true if there is at least one warning header returned in the + * response. + */ + public boolean hasWarnings() { + Header[] warnings = response.getHeaders("Warning"); + return warnings != null && warnings.length > 0; + } + + ClassicHttpResponse getHttpResponse() { + return response; + } + + /** + * Convert response to string representation + */ + @Override + public String toString() { + return "Response{" + "requestLine=" + requestLine + ", host=" + host + ", response=" + getStatusLine() + '}'; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java new file mode 100644 index 0000000000..930f0dd849 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.BufferedHttpEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; + +import java.io.IOException; +import java.util.Locale; + +/** + * Exception thrown when an opensearch node responds to a request with a status code that indicates an error. + * Holds the response that was returned. + */ +final class ResponseException extends IOException { + + private final Response response; + + /** + * Creates a ResponseException containing the given {@code Response}. + * + * @param response The error response. + */ + ResponseException(Response response) throws IOException { + super(buildMessage(response)); + this.response = response; + } + + static String buildMessage(Response response) throws IOException { + String message = String.format( + Locale.ROOT, + "method [%s], host [%s], URI [%s], status line [%s]", + response.getRequestLine().getMethod(), + response.getHost(), + response.getRequestLine().getUri(), + response.getStatusLine().toString() + ); + + if (response.hasWarnings()) { + message += "\n" + "Warnings: " + response.getWarnings(); + } + + HttpEntity entity = response.getEntity(); + if (entity != null) { + if (entity.isRepeatable() == false) { + entity = new BufferedHttpEntity(entity); + response.getHttpResponse().setEntity(entity); + } + try { + message += "\n" + EntityUtils.toString(entity); + } catch (final ParseException ex) { + throw new IOException(ex); + } + } + return message; + } + + /** + * Returns the {@link Response} that caused this exception to be thrown. + */ + public Response getResponse() { + return response; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java new file mode 100644 index 0000000000..8627fd9d0c --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.io.IOException; + +/** + * This exception is used to indicate that one or more {@link Response#getWarnings()} exist + * and is typically used when the {@link ApacheHttpClient5Transport} is set to fail by passing + * `true` to `strictDeprecationMode`. + */ +// This class extends RuntimeException in order to deal with wrapping that is done in FutureUtils on exception. +// if the exception is not of type OpenSearchException or RuntimeException it will be wrapped in a UncategorizedExecutionException +public final class WarningFailureException extends RuntimeException { + private final Response response; + + /** + * Creates a {@link WarningFailureException} instance. + * + * @param response the response that contains warnings. + * @throws IOException if there is a problem building the exception message. + */ + public WarningFailureException(Response response) throws IOException { + super(ResponseException.buildMessage(response)); + this.response = response; + } + + /** + * Wrap a {@linkplain WarningFailureException} with another one with the current + * stack trace. This is used during synchronous calls so that the caller + * ends up in the stack trace of the exception thrown. + * + * @param e the exception to be wrapped. + */ + WarningFailureException(WarningFailureException e) { + super(e.getMessage(), e); + this.response = e.getResponse(); + } + + /** + * Returns the {@link Response} that caused this exception to be thrown. + */ + public Response getResponse() { + return response; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java new file mode 100644 index 0000000000..1a0b73d07d --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.List; + +/** + * Called if there are warnings to determine if those warnings should fail the + * request. + */ +public interface WarningsHandler { + + /** + * Determines whether the given list of warnings should fail the request. + * + * @param warnings a list of warnings. + * @return boolean indicating if the request should fail. + */ + boolean warningsShouldFailRequest(List warnings); + + /** + * The permissive warnings handler. Warnings will not fail the request. + */ + WarningsHandler PERMISSIVE = new WarningsHandler() { + @Override + public boolean warningsShouldFailRequest(List warnings) { + return false; + } + + @Override + public String toString() { + return "permissive"; + } + }; + + /** + * The strict warnings handler. Warnings will fail the request. + */ + WarningsHandler STRICT = new WarningsHandler() { + @Override + public boolean warningsShouldFailRequest(List warnings) { + return false == warnings.isEmpty(); + } + + @Override + public String toString() { + return "strict"; + } + }; +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java new file mode 100644 index 0000000000..aa4d73a237 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.ContentTooLongException; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.nio.AsyncEntityConsumer; +import org.apache.hc.core5.http.nio.entity.AbstractBinAsyncEntityConsumer; +import org.apache.hc.core5.util.ByteArrayBuffer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Default implementation of {@link AsyncEntityConsumer}. Buffers the whole + * response content in heap memory, meaning that the size of the buffer is equal to the content-length of the response. + * Limits the size of responses that can be read based on a configurable argument. Throws an exception in case the entity is longer + * than the configured buffer limit. + */ +public class HeapBufferedAsyncEntityConsumer extends AbstractBinAsyncEntityConsumer { + + private final int bufferLimitBytes; + private AtomicReference bufferRef = new AtomicReference<>(); + + /** + * Creates a new instance of this consumer with the provided buffer limit. + * + * @param bufferLimit the buffer limit. Must be greater than 0. + * @throws IllegalArgumentException if {@code bufferLimit} is less than or equal to 0. + */ + public HeapBufferedAsyncEntityConsumer(int bufferLimit) { + if (bufferLimit <= 0) { + throw new IllegalArgumentException("bufferLimit must be greater than 0"); + } + this.bufferLimitBytes = bufferLimit; + } + + /** + * Get the limit of the buffer. + */ + public int getBufferLimit() { + return bufferLimitBytes; + } + + /** + * Triggered to signal beginning of entity content stream. + * + * @param contentType the entity content type + */ + @Override + protected void streamStart(final ContentType contentType) throws HttpException, IOException {} + + /** + * Triggered to obtain the capacity increment. + * + * @return the number of bytes this consumer is prepared to process. + */ + @Override + protected int capacityIncrement() { + return Integer.MAX_VALUE; + } + + /** + * Triggered to pass incoming data packet to the data consumer. + * + * @param src the data packet. + * @param endOfStream flag indicating whether this data packet is the last in the data stream. + * + */ + @Override + protected void data(final ByteBuffer src, final boolean endOfStream) throws IOException { + if (src == null) { + return; + } + + ByteArrayBuffer buffer = bufferRef.get(); + if (buffer == null) { + buffer = new ByteArrayBuffer(bufferLimitBytes); + if (bufferRef.compareAndSet(null, buffer) == false) { + buffer = bufferRef.get(); + } + } + + int len = src.limit(); + if (buffer.length() + len > bufferLimitBytes) { + throw new ContentTooLongException( + "entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]" + ); + } + + if (len < 0) { + len = 4096; + } + + if (src.hasArray()) { + buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining()); + } else { + while (src.hasRemaining()) { + buffer.append(src.get()); + } + } + } + + /** + * Triggered to generate entity representation. + * + * @return the entity content + */ + @Override + protected byte[] generateContent() throws IOException { + final ByteArrayBuffer buffer = bufferRef.get(); + return buffer == null ? new byte[0] : buffer.toByteArray(); + } + + /** + * Release resources being held + */ + @Override + public void releaseResources() { + ByteArrayBuffer buffer = bufferRef.getAndSet(null); + if (buffer != null) { + buffer.clear(); + buffer = null; + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java new file mode 100644 index 0000000000..4d0ceb66e7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.message.BasicClassicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.support.AbstractAsyncResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; + +import java.io.IOException; + +/** + * Default implementation of {@link AsyncResponseConsumer}. Buffers the whole + * response content in heap memory, meaning that the size of the buffer is equal to the content-length of the response. + * Limits the size of responses that can be read based on a configurable argument. Throws an exception in case the entity is longer + * than the configured buffer limit. + */ +public class HeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseConsumer { + private static final Log LOGGER = LogFactory.getLog(HeapBufferedAsyncResponseConsumer.class); + private final int bufferLimit; + + /** + * Creates a new instance of this consumer with the provided buffer limit. + * + * @param bufferLimit the buffer limit. Must be greater than 0. + * @throws IllegalArgumentException if {@code bufferLimit} is less than or equal to 0. + */ + public HeapBufferedAsyncResponseConsumer(int bufferLimit) { + super(new HeapBufferedAsyncEntityConsumer(bufferLimit)); + this.bufferLimit = bufferLimit; + } + + /** + * Get the limit of the buffer. + */ + public int getBufferLimit() { + return bufferLimit; + } + + /** + * Triggered to signal receipt of an intermediate (1xx) HTTP response. + * + * @param response the intermediate (1xx) HTTP response. + * @param context the actual execution context. + */ + @Override + public void informationResponse(final HttpResponse response, final HttpContext context) throws HttpException, IOException {} + + /** + * Triggered to generate object that represents a result of response message processing. + * @param response the response message. + * @param entity the response entity. + * @param contentType the response content type. + * @return the result of response processing. + */ + @Override + protected ClassicHttpResponse buildResult(final HttpResponse response, final byte[] entity, final ContentType contentType) { + final ClassicHttpResponse classicResponse = new BasicClassicHttpResponse(response.getCode()); + classicResponse.setVersion(response.getVersion()); + classicResponse.setHeaders(response.getHeaders()); + classicResponse.setReasonPhrase(response.getReasonPhrase()); + if (response.getLocale() != null) { + classicResponse.setLocale(response.getLocale()); + } + + if (entity != null) { + String encoding = null; + + try { + final Header contentEncoding = response.getHeader(HttpHeaders.CONTENT_ENCODING); + if (contentEncoding != null) { + encoding = contentEncoding.getValue(); + } + } catch (final HttpException ex) { + LOGGER.debug("Unable to detect content encoding", ex); + } + + final ByteArrayEntity httpEntity = new ByteArrayEntity(entity, contentType, encoding); + classicResponse.setEntity(httpEntity); + } + + return classicResponse; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java new file mode 100644 index 0000000000..1c669a55c7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.ResourceHolder; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The {@link AsyncEntityProducer} implementation for {@link HttpEntity} + */ +public class HttpEntityAsyncEntityProducer implements AsyncEntityProducer { + + private final HttpEntity entity; + private final ByteBuffer byteBuffer; + private final boolean chunked; + private final AtomicReference exception; + private final AtomicReference channelRef; + private boolean eof; + + /** + * Create new async HTTP entity producer + * @param entity HTTP entity + * @param bufferSize buffer size + */ + public HttpEntityAsyncEntityProducer(final HttpEntity entity, final int bufferSize) { + this.entity = Args.notNull(entity, "Http Entity"); + this.byteBuffer = ByteBuffer.allocate(bufferSize); + this.chunked = entity.isChunked(); + this.exception = new AtomicReference<>(); + this.channelRef = new AtomicReference<>(); + } + + /** + * Create new async HTTP entity producer with default buffer size (8192 bytes) + * @param entity HTTP entity + */ + public HttpEntityAsyncEntityProducer(final HttpEntity entity) { + this(entity, 8192); + } + + /** + * Determines whether the producer can consistently produce the same content + * after invocation of {@link ResourceHolder#releaseResources()}. + */ + @Override + public boolean isRepeatable() { + return entity.isRepeatable(); + } + + /** + * Returns content type of the entity, if known. + */ + @Override + public String getContentType() { + return entity.getContentType(); + } + + /** + * Returns length of the entity, if known. + */ + @Override + public long getContentLength() { + return entity.getContentLength(); + } + + /** + * Returns the number of bytes immediately available for output. + * This method can be used as a hint to control output events + * of the underlying I/O session. + * + * @return the number of bytes immediately available for output + */ + @Override + public int available() { + return Integer.MAX_VALUE; + } + + /** + * Returns content encoding of the entity, if known. + */ + @Override + public String getContentEncoding() { + return entity.getContentEncoding(); + } + + /** + * Returns chunked transfer hint for this entity. + *

+ * The behavior of wrapping entities is implementation dependent, + * but should respect the primary purpose. + *

+ */ + @Override + public boolean isChunked() { + return chunked; + } + + /** + * Preliminary declaration of trailing headers. + */ + @Override + public Set getTrailerNames() { + return entity.getTrailerNames(); + } + + /** + * Triggered to signal the ability of the underlying data channel + * to accept more data. The data producer can choose to write data + * immediately inside the call or asynchronously at some later point. + * + * @param channel the data channel capable to accepting more data. + */ + @Override + public void produce(final DataStreamChannel channel) throws IOException { + ReadableByteChannel stream = channelRef.get(); + if (stream == null) { + stream = Channels.newChannel(entity.getContent()); + Asserts.check(channelRef.getAndSet(stream) == null, "Illegal producer state"); + } + if (!eof) { + final int bytesRead = stream.read(byteBuffer); + if (bytesRead < 0) { + eof = true; + } + } + if (byteBuffer.position() > 0) { + byteBuffer.flip(); + channel.write(byteBuffer); + byteBuffer.compact(); + } + if (eof && byteBuffer.position() == 0) { + channel.endStream(); + releaseResources(); + } + } + + /** + * Triggered to signal a failure in data generation. + * + * @param cause the cause of the failure. + */ + @Override + public void failed(final Exception cause) { + if (exception.compareAndSet(null, cause)) { + releaseResources(); + } + } + + /** + * Release resources being held + */ + @Override + public void releaseResources() { + eof = false; + final ReadableByteChannel stream = channelRef.getAndSet(null); + if (stream != null) { + try { + stream.close(); + } catch (final IOException ex) { + /* Close quietly */ + } + } + } + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java new file mode 100644 index 0000000000..b9940009c7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.net.URIAuthority; +import org.apache.hc.core5.util.Args; + +/** + * The producer of the {@link HttpUriRequestBase} instances associated with a particular {@link HttpHost} + */ +public class HttpUriRequestProducer extends BasicRequestProducer { + private final HttpUriRequestBase request; + + HttpUriRequestProducer(final HttpUriRequestBase request, final AsyncEntityProducer entityProducer) { + super(request, entityProducer); + this.request = request; + } + + /** + * Get the produced {@link HttpUriRequestBase} instance + * @return produced {@link HttpUriRequestBase} instance + */ + public HttpUriRequestBase getRequest() { + return request; + } + + /** + * Create new request producer for {@link HttpUriRequestBase} instance and {@link HttpHost} + * @param request {@link HttpUriRequestBase} instance + * @param host {@link HttpHost} instance + * @return new request producer + */ + public static HttpUriRequestProducer create(final HttpUriRequestBase request, final HttpHost host) { + Args.notNull(request, "Request"); + Args.notNull(host, "HttpHost"); + + // TODO: Should we copy request here instead of modifying in place? + request.setAuthority(new URIAuthority(host)); + request.setScheme(host.getSchemeName()); + + final HttpEntity entity = request.getEntity(); + AsyncEntityProducer entityProducer = null; + + if (entity != null) { + entityProducer = new HttpEntityAsyncEntityProducer(entity); + } + + return new HttpUriRequestProducer(request, entityProducer); + } + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java new file mode 100644 index 0000000000..8f55f67cde --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java @@ -0,0 +1,289 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.HttpHost; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +/** + * Metadata about an {@link HttpHost} running OpenSearch. + */ +public class Node { + /** + * Address that this host claims is its primary contact point. + */ + private final HttpHost host; + /** + * Addresses on which the host is listening. These are useful to have + * around because they allow you to find a host based on any address it + * is listening on. + */ + private final Set boundHosts; + /** + * Name of the node as configured by the {@code node.name} attribute. + */ + private final String name; + /** + * Version of OpenSearch that the node is running or {@code null} + * if we don't know the version. + */ + private final String version; + /** + * Roles that the OpenSearch process on the host has or {@code null} + * if we don't know what roles the node has. + */ + private final Roles roles; + /** + * Attributes declared on the node. + */ + private final Map> attributes; + + /** + * Create a {@linkplain Node} with metadata. All parameters except + * {@code host} are nullable and implementations of {@link NodeSelector} + * need to decide what to do in their absence. + * + * @param host primary host address + * @param boundHosts addresses on which the host is listening + * @param name name of the node + * @param version version of OpenSearch + * @param roles roles that the OpenSearch process has on the host + * @param attributes attributes declared on the node + */ + public Node(HttpHost host, Set boundHosts, String name, String version, Roles roles, Map> attributes) { + if (host == null) { + throw new IllegalArgumentException("host cannot be null"); + } + this.host = host; + this.boundHosts = boundHosts; + this.name = name; + this.version = version; + this.roles = roles; + this.attributes = attributes; + } + + /** + * Create a {@linkplain Node} without any metadata. + * + * @param host primary host address + */ + public Node(HttpHost host) { + this(host, null, null, null, null, null); + } + + /** + * Contact information for the host. + */ + public HttpHost getHost() { + return host; + } + + /** + * Addresses on which the host is listening. These are useful to have + * around because they allow you to find a host based on any address it + * is listening on. + */ + public Set getBoundHosts() { + return boundHosts; + } + + /** + * The {@code node.name} of the node. + */ + public String getName() { + return name; + } + + /** + * Version of OpenSearch that the node is running or {@code null} + * if we don't know the version. + */ + public String getVersion() { + return version; + } + + /** + * Roles that the OpenSearch process on the host has or {@code null} + * if we don't know what roles the node has. + */ + public Roles getRoles() { + return roles; + } + + /** + * Attributes declared on the node. + */ + public Map> getAttributes() { + return attributes; + } + + /** + * Convert node to string representation + */ + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append("[host=").append(host); + if (boundHosts != null) { + b.append(", bound=").append(boundHosts); + } + if (name != null) { + b.append(", name=").append(name); + } + if (version != null) { + b.append(", version=").append(version); + } + if (roles != null) { + b.append(", roles=").append(roles); + } + if (attributes != null) { + b.append(", attributes=").append(attributes); + } + return b.append(']').toString(); + } + + /** + * Compare two nodes for equality + * @param obj node instance to compare with + */ + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Node other = (Node) obj; + return host.equals(other.host) + && Objects.equals(boundHosts, other.boundHosts) + && Objects.equals(name, other.name) + && Objects.equals(version, other.version) + && Objects.equals(roles, other.roles) + && Objects.equals(attributes, other.attributes); + } + + /** + * Calculate the hash code of the node + */ + @Override + public int hashCode() { + return Objects.hash(host, boundHosts, name, version, roles, attributes); + } + + /** + * Role information about an OpenSearch process. + */ + public static final class Roles { + + private final Set roles; + + /** + * Create a {@link Roles} instance of the given string set. + * + * @param roles set of role names. + */ + public Roles(final Set roles) { + this.roles = new TreeSet<>(roles); + } + + /** + * Returns whether or not the node could be elected cluster-manager. + */ + public boolean isClusterManagerEligible() { + return roles.contains("master") || roles.contains("cluster_manager"); + } + + /** + * Returns whether or not the node could be elected cluster-manager. + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #isClusterManagerEligible()} + */ + @Deprecated + public boolean isMasterEligible() { + return isClusterManagerEligible(); + } + + /** + * Returns whether or not the node stores data. + */ + public boolean isData() { + return roles.contains("data"); + } + + /** + * Returns whether or not the node runs ingest pipelines. + */ + public boolean isIngest() { + return roles.contains("ingest"); + } + + /** + * Returns whether the node is dedicated to provide search capability. + */ + public boolean isSearch() { + return roles.contains("search"); + } + + /** + * Convert roles to string representation + */ + @Override + public String toString() { + return String.join(",", roles); + } + + /** + * Compare two roles for equality + * @param obj roles instance to compare with + */ + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Roles other = (Roles) obj; + return roles.equals(other.roles); + } + + /** + * Calculate the hash code of the roles + */ + @Override + public int hashCode() { + return roles.hashCode(); + } + + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java new file mode 100644 index 0000000000..eb11d8bb0b --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import java.util.Iterator; + +/** + * Selects nodes that can receive requests. Used to keep requests away + * from cluster-manager nodes or to send them to nodes with a particular attribute. + */ +public interface NodeSelector { + /** + * Select the {@link Node}s to which to send requests. This is called with + * a mutable {@link Iterable} of {@linkplain Node}s in the order that the + * rest client would prefer to use them and implementers should remove + * nodes from the that should not receive the request. Implementers may + * iterate the nodes as many times as they need. + *

+ * This may be called twice per request: first for "living" nodes that + * have not been denylisted by previous errors. If the selector removes + * all nodes from the list or if there aren't any living nodes then the + * {@link org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport} + * will call this method with a list of "dead" nodes. + *

+ * Implementers should not rely on the ordering of the nodes. + * + * @param nodes the {@link Node}s targeted for the sending requests + */ + void select(Iterable nodes); + /* + * We were fairly careful with our choice of Iterable here. The caller has + * a List but reordering the list is likely to break round robin. Luckily + * Iterable doesn't allow any reordering. + */ + + /** + * Selector that matches any node. + */ + NodeSelector ANY = new NodeSelector() { + @Override + public void select(Iterable nodes) { + // Intentionally does nothing + } + + @Override + public String toString() { + return "ANY"; + } + }; + + /** + * Selector that matches any node that has metadata and doesn't + * have the {@code cluster_manager} role OR it has the data {@code data} + * role. + */ + NodeSelector SKIP_DEDICATED_CLUSTER_MANAGERS = new NodeSelector() { + @Override + public void select(Iterable nodes) { + for (Iterator itr = nodes.iterator(); itr.hasNext();) { + Node node = itr.next(); + if (node.getRoles() == null) continue; + if (node.getRoles().isClusterManagerEligible() + && false == node.getRoles().isData() + && false == node.getRoles().isIngest()) { + itr.remove(); + } + } + } + + @Override + public String toString() { + return "SKIP_DEDICATED_CLUSTER_MANAGERS"; + } + }; +} diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/CrudIT.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/CrudIT.java new file mode 100644 index 0000000000..050a188593 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/CrudIT.java @@ -0,0 +1,518 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch._types.Refresh; +import org.opensearch.client.opensearch._types.Result; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.BulkResponse; +import org.opensearch.client.opensearch.core.DeleteResponse; +import org.opensearch.client.opensearch.core.ExistsSourceRequest; +import org.opensearch.client.opensearch.core.GetRequest; +import org.opensearch.client.opensearch.core.GetResponse; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.IndexResponse; +import org.opensearch.client.opensearch.core.MgetResponse; +import org.opensearch.client.opensearch.core.UpdateRequest; +import org.opensearch.client.opensearch.core.UpdateResponse; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; +import org.opensearch.client.opensearch.indices.CreateIndexResponse; +import org.opensearch.client.opensearch.indices.DeleteIndexResponse; +import org.opensearch.client.transport.endpoints.BooleanResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class CrudIT extends OpenSearchApacheHttpClient5ClientTestCase { + + public void testDelete() throws IOException { + { + // Testing index deletion + String index1 = "my-index"; + CreateIndexResponse createIndexResponse = client().indices().create(b -> b.index(index1)); + assertEquals(index1, createIndexResponse.index()); + assertTrue(createIndexResponse.acknowledged()); + assertTrue(createIndexResponse.shardsAcknowledged()); + DeleteIndexResponse deleteIndexResponse = client().indices().delete(b -> b.index(index1)); + assertTrue(deleteIndexResponse.acknowledged()); + } + + { + // Testing doc deletion after data ingestion + String docId = "id"; + String index = "my-index1"; + client().indices().create(b -> b.index(index)); + + String id = client().index(b -> b + .index(index) + .id(docId) + .document(Collections.singletonMap("foo", "bar"))) + .id(); + assertEquals(id, docId); + + DeleteResponse deleteResponse = client().delete(d -> d.id(docId).index(index)); + assertEquals(deleteResponse.index(), index); + assertEquals(deleteResponse.id(), docId); + assertEquals(deleteResponse.result(), Result.Deleted); + + String docIdTemp = "does_not_exist"; + DeleteResponse deleteResponseDocNotExist = client().delete(d -> d.id(docIdTemp).index(index)); + assertEquals(deleteResponseDocNotExist.index(), index); + assertEquals(deleteResponseDocNotExist.id(), docIdTemp); + assertEquals(deleteResponseDocNotExist.result(), Result.NotFound); + + } + } + + public void testExists() throws IOException { + assertFalse(client().indices().exists(b -> b.index("index")).value()); + String index = "ingest-test"; + // Create an index + CreateIndexResponse createIndexResponse = client().indices().create(b -> b + .index(index) + ); + assertEquals(index, createIndexResponse.index()); + + // Check that it actually exists. Example of a boolean response + assertTrue(client().indices().exists(b -> b.index(index)).value()); + + client().index(b -> b + .index(index) + .id("id") + .document(Collections.singletonMap("foo", "bar")) + .refresh(Refresh.True)); + + assertTrue(client().exists(b -> b.index(index).id("id")).value()); + assertFalse(client().exists(b -> b.index(index).id("random_id")).value()); + assertFalse(client().exists(b -> b.index(index).id("random_id").version(1L)).value()); + } + + public void testSourceExists() throws IOException { + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + + { + ExistsSourceRequest request = new ExistsSourceRequest.Builder() + .index("index") + .id("id") + .build(); + BooleanResponse response = client().existsSource(request); + assertFalse(response.value()); + } + client().index(b -> b + .index("index") + .id("id") + .refresh(Refresh.True) + .document(appData)); + { + ExistsSourceRequest request = new ExistsSourceRequest.Builder() + .index("index") + .id("id") + .build(); + BooleanResponse response = client().existsSource(request); + assertTrue(response.value()); + } + { + ExistsSourceRequest request = new ExistsSourceRequest.Builder() + .index("index") + .id("does_not_exist") + .build(); + BooleanResponse response = client().existsSource(request); + assertFalse(response.value()); + } + } + + public void testGet() throws IOException { + + { + OpenSearchException exception = expectThrows( + OpenSearchException.class, + () -> client().get(new GetRequest.Builder().index("index").id("id").build(), String.class) + ); + assertEquals(404, exception.status()); + assertEquals("Request failed: [index_not_found_exception] no such index [index]", + exception.getMessage()); + } + + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + + client().index(b -> b.index("index").id("id").document(appData).refresh(Refresh.True)); + + { + GetResponse getResponse = client().get(b -> b + .index("index") + .id("id") + .version(1L) + , AppData.class + ); + assertEquals("index", getResponse.index()); + assertEquals("id", getResponse.id()); + assertTrue(getResponse.found()); + assertEquals(java.util.Optional.of(1L), java.util.Optional.of(getResponse.version())); + assertEquals(appData.getIntValue(), getResponse.source().getIntValue()); + assertEquals(appData.getMsg(), getResponse.source().getMsg()); + } + { + GetResponse getResponse = client().get(b -> b + .index("index") + .id("does_not_exist") + , AppData.class + ); + assertEquals("index", getResponse.index()); + assertEquals("does_not_exist", getResponse.id()); + assertFalse(getResponse.found()); + assertNull(getResponse.version()); + assertNull(getResponse.source()); + } + } + + public void testGetWithTypes() throws IOException { + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + + client().index(b -> b + .index("index") + .id("id") + .document(appData) + .refresh(Refresh.True)); + + GetResponse getResponse = client().get(b -> b + .index("index") + .id("id") + , AppData.class + ); + + assertEquals("index", getResponse.index()); + assertEquals("id", getResponse.id()); + + assertTrue(getResponse.found()); + assertEquals(java.util.Optional.of(1L), java.util.Optional.of(getResponse.version())); + assertEquals(appData.getMsg(), getResponse.source().getMsg()); + } + + public void testMultiGet() throws IOException { + { + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + + List ids = new ArrayList<>(); + ids.add("id1"); + ids.add("id2"); + MgetResponse response = client().mget(b -> b + .index("index") + .ids(ids) + , AppData.class + ); + assertEquals(2, response.docs().size()); + + assertTrue(response.docs().get(0).isFailure()); + assertEquals("id1", response.docs().get(0).failure().id()); + assertEquals("index", response.docs().get(0).failure().index()); + assertEquals( + "no such index [index]", + response.docs().get(0).failure().error().reason() + ); + assertTrue(response.docs().get(1).isFailure()); + assertEquals("id2", response.docs().get(1).failure().id()); + assertEquals("index", response.docs().get(1).failure().index()); + assertEquals( + "no such index [index]", + response.docs().get(1).failure().error().reason() + ); + } + } + + public void testUpdate() throws Exception { + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + + AppData updatedAppData = new AppData(); + appData.setIntValue(3333); + appData.setMsg("bar"); + + { + UpdateRequest updateRequest = new UpdateRequest.Builder() + .index("index") + .id("does_not_exist") + .doc(appData) + .build(); + try { + client().update(updateRequest, AppData.class); + } catch (OpenSearchException e) { + // 1.x: [document_missing_exception] [_doc][does_not_exist]: document missing + // 2.x: [document_missing_exception] [does_not_exist]: document missing + assertTrue(e.getMessage().contains("[document_missing_exception]")); + assertTrue(e.getMessage().contains("[does_not_exist]: document missing")); + assertEquals(404, e.status()); + } + } + { + IndexRequest indexRequest = new IndexRequest.Builder() + .index("index") + .id("id") + .document(appData) + .build(); + IndexResponse indexResponse = client().index(indexRequest); + assertEquals(Result.Created, indexResponse.result()); + + long lastUpdateSeqNo; + long lastUpdatePrimaryTerm; + { + UpdateRequest updateRequest = new UpdateRequest.Builder() + .index("index") + .id("id") + .doc(updatedAppData) + .build(); + UpdateResponse updateResponse = client().update(updateRequest, AppData.class); + assertEquals(Result.Updated, updateResponse.result()); + assertEquals(indexResponse.version() + 1, updateResponse.version()); + lastUpdateSeqNo = updateResponse.seqNo(); + lastUpdatePrimaryTerm = updateResponse.primaryTerm(); + assertTrue(lastUpdateSeqNo >= 0L); + assertTrue(lastUpdatePrimaryTerm >= 1L); + } + + } + } + + public void testUpdateWithTypes() throws IOException { + + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + + AppData updatedAppData = new AppData(); + appData.setIntValue(3333); + appData.setMsg("bar"); + + IndexRequest indexRequest = new IndexRequest.Builder() + .index("index") + .id("id") + .document(appData) + .build(); + IndexResponse indexResponse = client().index(indexRequest); + + UpdateRequest updateRequest = new UpdateRequest.Builder() + .index("index") + .id("id") + .doc(updatedAppData) + .build(); + UpdateResponse updateResponse = client().update(updateRequest, AppData.class); + + assertEquals(Result.Updated, updateResponse.result()); + assertEquals(indexResponse.version() + 1, updateResponse.version()); + } + + public void testBulk() throws IOException { + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + int nbItems = randomIntBetween(10, 100); + boolean[] errors = new boolean[nbItems]; + + List opsList = new ArrayList<>(); + + for (int i = 0; i < nbItems; i++) { + String id = String.valueOf(i); + boolean erroneous = randomBoolean(); + errors[i] = erroneous; + BulkOperation.Kind opType = randomFrom(BulkOperation.Kind.Delete, BulkOperation.Kind.Index, + BulkOperation.Kind.Create/*, BulkOperation.Kind.Update*/); + if (opType.equals(BulkOperation.Kind.Delete)) { + if (!erroneous) { + assertEquals( + Result.Created, + client().index(b -> b.index("index").id(id).document(appData)).result() + ); + } + BulkOperation op = new BulkOperation.Builder().delete(d -> d.index("index").id(id)).build(); + opsList.add(op); + } else { + appData.setIntValue(i); + appData.setMsg("id"); + if (opType.equals(BulkOperation.Kind.Index)) { + BulkOperation op = new BulkOperation.Builder().index(b -> b + .index("index") + .id(id) + .document(appData) + .ifSeqNo(erroneous ? 12L : null) + .ifPrimaryTerm(erroneous ? 12L : null)).build(); + opsList.add(op); + + } else if (opType.equals(BulkOperation.Kind.Create)) { + BulkOperation op = new BulkOperation.Builder().create(o -> o + .index("index") + .id(id) + .document(appData) + ).build(); + opsList.add(op); + + if (erroneous) { + assertEquals(Result.Created, client().index(b -> b. + index("index").id(id).document(appData)).result()); + } + + } else if (opType.equals(BulkOperation.Kind.Update)) { + BulkOperation op = new BulkOperation.Builder().update(o -> o + .index("index") + .id(id) + .document(Collections.singletonMap("key", "value")) + ).build(); + opsList.add(op); + if (!erroneous) { + assertEquals( + Result.Created, + client().index(b -> b.index("index").id(id).document(appData)).result() + ); + } + } + } + } + BulkRequest bulkRequest = new BulkRequest.Builder().operations(opsList).build(); + + BulkResponse bulkResponse = client().bulk(bulkRequest); + assertTrue(bulkResponse.took() > 0); + assertEquals(nbItems, bulkResponse.items().size()); + + validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); + } + + private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse bulkResponse, BulkRequest bulkRequest) { + for (int i = 0; i < nbItems; i++) { + BulkResponseItem bulkResponseItem = bulkResponse.items().get(i); + + assertEquals("index", bulkResponseItem.index()); + assertEquals(String.valueOf(i), bulkResponseItem.id()); + + BulkOperation bulkOperation = bulkRequest.operations().get(i); + if (bulkOperation.isIndex() || bulkOperation.isCreate()) { + assertEquals(errors[i] ? 409 : 201, bulkResponseItem.status()); + } else if (bulkOperation.isUpdate()) { + assertEquals(errors[i] ? Result.NotFound.jsonValue() : Result.Updated.jsonValue(), bulkResponseItem.result()); + assertEquals(errors[i] ? 404 : 200, bulkResponseItem.status()); + } else if (bulkOperation.isDelete()) { + assertEquals(errors[i] ? Result.NotFound.jsonValue() : Result.Deleted.jsonValue(), bulkResponseItem.result()); + assertEquals(errors[i] ? 404 : 200, bulkResponseItem.status()); + } + } + } + + public void testUrlEncode() throws IOException { + String indexPattern = ""; + String expectedIndex = "logstash-" + + DateTimeFormat.forPattern("YYYY.MM.dd").print(new DateTime(DateTimeZone.UTC).monthOfYear().roundFloorCopy()); + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + { + IndexResponse response = client().index(b -> b.index(indexPattern).id("id#1").document(appData)); + assertEquals(expectedIndex, response.index()); + assertEquals("id#1", response.id()); + } + { + GetResponse getResponse = client().get(b -> b + .index(indexPattern).id("id#1"), AppData.class); + assertTrue(getResponse.found()); + assertEquals(expectedIndex, getResponse.index()); + assertEquals("id#1", getResponse.id()); + } + + String docId = "this/is/the/id"; + { + IndexResponse indexResponse = client().index(b -> b.index("index").id(docId).document(appData)); + assertEquals("index", indexResponse.index()); + assertEquals(docId, indexResponse.id()); + } + { + GetResponse getResponse = client().get(b -> b + .index("index").id(docId), AppData.class); + assertTrue(getResponse.found()); + assertEquals("index", getResponse.index()); + assertEquals(docId, getResponse.id()); + } + + assertTrue(client().indices().exists(b -> b.index(indexPattern, "index")).value()); + } + + public void testParamsEncode() throws IOException { + String routing = "test"; + { + String id = "id"; + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + IndexResponse response = client().index(b -> b.index("index").id(id).document(appData).routing(routing)); + assertEquals("index", response.index()); + assertEquals(id, response.id()); + } + { + GetResponse getResponse = client().get(b -> b + .index("index").id("id").routing(routing), AppData.class); + assertTrue(getResponse.found()); + assertEquals("index", getResponse.index()); + assertEquals("id", getResponse.id()); + assertEquals(routing, getResponse.routing()); + } + } + + public void testGetIdWithPlusSign() throws Exception { + String id = "id+id"; + AppData appData = new AppData(); + appData.setIntValue(1337); + appData.setMsg("foo"); + + { + IndexResponse indexResponse = client().index(b -> b + .index("index").id(id).document(appData)); + assertEquals("index", indexResponse.index()); + assertEquals(id, indexResponse.id()); + } + { + GetResponse getResponse = client().get(b -> b + .index("index").id(id), AppData.class); + assertTrue(getResponse.found()); + assertEquals("index", getResponse.index()); + assertEquals(id, getResponse.id()); + } + } + + public static class AppData { + private int intValue; + private String msg; + + public int getIntValue() { + return intValue; + } + + public void setIntValue(int intValue) { + this.intValue = intValue; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + } +} \ No newline at end of file diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/OpenSearchApacheHttpClient5ClientTestCase.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/OpenSearchApacheHttpClient5ClientTestCase.java new file mode 100644 index 0000000000..cf64c4635c --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/httpclient5/OpenSearchApacheHttpClient5ClientTestCase.java @@ -0,0 +1,254 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + + +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.core5.function.Factory; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.reactor.ssl.TlsDetails; +import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.hc.core5.util.Timeout; +import org.junit.After; +import org.opensearch.Version; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.ExpandWildcard; +import org.opensearch.client.opensearch.cat.IndicesResponse; +import org.opensearch.client.opensearch.cat.indices.IndicesRecord; +import org.opensearch.client.opensearch.indices.DeleteIndexRequest; +import org.opensearch.client.opensearch.nodes.NodesInfoResponse; +import org.opensearch.client.opensearch.nodes.info.NodeInfo; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; +import org.junit.AfterClass; +import org.junit.Before; +import org.opensearch.common.io.PathUtils; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeSet; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import static java.util.Collections.unmodifiableList; + +public abstract class OpenSearchApacheHttpClient5ClientTestCase extends OpenSearchTestCase { + public static final String TRUSTSTORE_PATH = "truststore.path"; + public static final String TRUSTSTORE_PASSWORD = "truststore.password"; + public static final String CLIENT_SOCKET_TIMEOUT = "client.socket.timeout"; + public static final String CLIENT_PATH_PREFIX = "client.path.prefix"; + + private static OpenSearchClient client; + private static OpenSearchClient adminClient; + + private static TreeSet nodeVersions; + private static List clusterHosts; + + @Before + public void setUp() throws IOException { + if (client == null) { + String cluster = getTestRestCluster(); + String[] stringUrls = cluster.split(","); + List hosts = new ArrayList<>(stringUrls.length); + for (String stringUrl : stringUrls) { + int portSeparator = stringUrl.lastIndexOf(':'); + if (portSeparator < 0) { + throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]"); + } + String host = stringUrl.substring(0, portSeparator); + int port = Integer.valueOf(stringUrl.substring(portSeparator + 1)); + hosts.add(new HttpHost(getProtocol(), host, port)); + } + clusterHosts = unmodifiableList(hosts); + logger.info("initializing REST clients against {}", clusterHosts); + + client = buildClient(clientSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + adminClient = buildClient(adminSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + + nodeVersions = new TreeSet<>(); + final NodesInfoResponse response = adminClient.nodes().info(); + for (final NodeInfo node : response.nodes().values()) { + nodeVersions.add(Version.fromString(node.version())); + } + } + } + + /** + * Used to obtain settings for the REST client that is used to send REST requests. + */ + protected Settings clientSettings() { + Settings.Builder builder = Settings.builder(); + if (System.getProperty("tests.rest.client_path_prefix") != null) { + builder.put(CLIENT_PATH_PREFIX, System.getProperty("tests.rest.client_path_prefix")); + } + return builder.build(); + } + + /** + * Returns the REST client settings used for admin actions like cleaning up after the test has completed. + */ + protected Settings adminSettings() { + return clientSettings(); // default to the same client settings + } + + protected OpenSearchClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + final ApacheHttpClient5TransportBuilder builder = ApacheHttpClient5TransportBuilder.builder(hosts); + configureClient(builder, settings); + return new OpenSearchClient(builder.setStrictDeprecationMode(true).build()); + } + + protected static void configureClient(ApacheHttpClient5TransportBuilder builder, Settings settings) throws IOException { + String keystorePath = settings.get(TRUSTSTORE_PATH); + if (keystorePath != null) { + final String keystorePass = settings.get(TRUSTSTORE_PASSWORD); + if (keystorePass == null) { + throw new IllegalStateException(TRUSTSTORE_PATH + " is provided but not " + TRUSTSTORE_PASSWORD); + } + Path path = PathUtils.get(keystorePath); + if (!Files.exists(path)) { + throw new IllegalStateException(TRUSTSTORE_PATH + " is set but points to a non-existing file"); + } + try { + final String keyStoreType = keystorePath.endsWith(".p12") ? "PKCS12" : "jks"; + KeyStore keyStore = KeyStore.getInstance(keyStoreType); + try (InputStream is = Files.newInputStream(path)) { + keyStore.load(is, keystorePass.toCharArray()); + } + final SSLContext sslcontext = SSLContexts.custom().loadTrustMaterial(keyStore, null).build(); + builder.setHttpClientConfigCallback(httpClientBuilder -> { + final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() + .setSslContext(sslcontext) + // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219 + .setTlsDetailsFactory(new Factory() { + @Override + public TlsDetails create(final SSLEngine sslEngine) { + return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()); + } + }) + .build(); + + final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setTlsStrategy(tlsStrategy) + .build(); + + return httpClientBuilder.setConnectionManager(connectionManager); + }); + } catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { + throw new RuntimeException("Error setting up ssl", e); + } + } + Map headers = ThreadContext.buildDefaultHeaders(settings); + Header[] defaultHeaders = new Header[headers.size()]; + int i = 0; + for (Map.Entry entry : headers.entrySet()) { + defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue()); + } + builder.setDefaultHeaders(defaultHeaders); + final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); + final TimeValue socketTimeout = TimeValue.parseTimeValue( + socketTimeoutString == null ? "60s" : socketTimeoutString, + CLIENT_SOCKET_TIMEOUT + ); + builder.setRequestConfigCallback( + conf -> conf.setResponseTimeout(Timeout.ofMilliseconds(Math.toIntExact(socketTimeout.getMillis()))) + ); + if (settings.hasValue(CLIENT_PATH_PREFIX)) { + builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX)); + } + } + + private boolean isHttps() { + return Optional.ofNullable(System.getProperty("https")) + .map("true"::equalsIgnoreCase) + .orElse(false); + } + + protected String getProtocol() { + return isHttps() ? "https" : "http"; + } + + protected static OpenSearchClient client() { + return client; + } + + protected static OpenSearchClient adminClient() { + return client; + } + + + @AfterClass + public static void closeClients() throws IOException { + try { + if (client != null) { + IOUtils.close(client._transport()); + } + + if (adminClient != null) { + IOUtils.close(adminClient._transport()); + } + } finally { + clusterHosts = null; + client = null; + adminClient = null; + nodeVersions = null; + } + } + + @After + protected void wipeAllOSIndices() throws IOException { + final IndicesResponse response = adminClient + .cat() + .indices(r -> r.expandWildcards(ExpandWildcard.All)); + + for (IndicesRecord index : response.valueBody()) { + if (index.index() != null && !".opendistro_security".equals(index.index())) { + adminClient().indices().delete(new DeleteIndexRequest.Builder().index(index.index()).build()); + } + } + } + + protected String getTestRestCluster() { + String cluster = System.getProperty("tests.rest.cluster"); + if (cluster == null) { + throw new RuntimeException( + "Must specify [tests.rest.cluster] system property with a comma delimited list of [host:port] " + + "to which to send REST requests" + ); + } + return cluster; + } + + /** + * wipeAllIndices won't work since it cannot delete security index. Use wipeAllOSIndices instead. + */ + protected boolean preserveIndicesUponCompletion() { + return true; + } +} \ No newline at end of file