Skip to content

Commit

Permalink
feat: add discovery api on databend client (#264)
Browse files Browse the repository at this point in the history
* feat: add discovery api on databend client

* chore: upgrade to v0.3.0

* fill out missing files

* chore: update to minimum supported jdbc version
  • Loading branch information
ZhiHanZ authored Sep 2, 2024
1 parent f2c3dd3 commit 6cf07e5
Show file tree
Hide file tree
Showing 32 changed files with 418 additions and 157 deletions.
2 changes: 1 addition & 1 deletion .github/actions/setup_databend_cluster/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ inputs:
version:
description: "query and meta service version"
required: true
default: "1.2.616-nightly"
default: "1.2.629-nightly"
target:
description: ""
required: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
services:
databend:
image: datafuselabs/databend
image: datafuselabs/databend:nightly
env:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- uses: ./.github/actions/setup_databend_cluster
timeout-minutes: 15
with:
version: '1.2.616-nightly'
version: '1.2.629-nightly'
target: 'x86_64-unknown-linux-gnu'
- name: Run Maven clean deploy with release profile
run: mvn test -DexcludedGroups=FLAKY
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.idea/
databend-jdbc/databend-jdbc-debug.log
target/
/databend/
.databend/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Add following code block as a dependency
<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.2.9</version>
<version>0.3.0</version>
</dependency>
```

Expand Down
4 changes: 2 additions & 2 deletions databend-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<groupId>com.databend</groupId>
<artifactId>databend-base</artifactId>
<version>0.2.9</version>
<version>0.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.databend</groupId>
<artifactId>databend-client</artifactId>
<version>0.2.9</version>
<version>0.3.0</version>

<properties>
<!--suppress UnresolvedMavenProperty -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Random;

public class ClientSettings {
public static final Integer DEFAULT_QUERY_TIMEOUT = 300;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@

public interface DatabendClient extends Closeable {
String getQuery();

@Override
void close();

DatabendSession getSession();

String getHost();

Map<String, String> getAdditionalHeaders();

QueryResults getResults();

// execute Restful query request for the first time.
// @param request the request to be executed
// @return true if request finished with result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.databend.client;

public final class DatabendClientFactory {
private DatabendClientFactory() {}
private DatabendClientFactory() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,20 @@
package com.databend.client;

import com.databend.client.errors.CloudErrors;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.*;
import okio.Buffer;

import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.function.Consumer;
import java.util.logging.Logger;

import static com.databend.client.JsonCodec.jsonCodec;
import static com.google.common.base.MoreObjects.firstNonNull;
Expand All @@ -51,12 +46,14 @@ public class DatabendClientV1
firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown");
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
public static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
public static final JsonCodec<DiscoveryResponseCodec.DiscoveryResponse> DISCOVERY_RESULT_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class);
public static final String succeededState = "succeeded";
public static final String failedState = "failed";
public static final String runningState = "running";


public static final String QUERY_PATH = "/v1/query";
public static final String DISCOVERY_PATH = "/v1/discovery_nodes";
private static final long MAX_MATERIALIZED_JSON_RESPONSE_SIZE = 128 * 1024;
private final OkHttpClient httpClient;
private final String query;
Expand Down Expand Up @@ -96,14 +93,23 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
}
}

public Request.Builder prepareRequest(HttpUrl url) {
public static List<DiscoveryNode> dicoverNodes(OkHttpClient httpClient, ClientSettings settings) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(settings, "settings is null");
requireNonNull(settings.getHost(), "settings.host is null");
Request request = buildDiscoveryRequest(settings);
DiscoveryResponseCodec.DiscoveryResponse response = getDiscoveryResponse(httpClient, request, OptionalLong.empty(), settings.getQueryTimeoutSecs());
return response.getNodes();
}

public static Request.Builder prepareRequest(HttpUrl url, Map<String, String> additionalHeaders) {
Request.Builder builder = new Request.Builder()
.url(url)
.header("User-Agent", USER_AGENT_VALUE)
.header("Accept", "application/json")
.header("Content-Type", "application/json");
if (this.getAdditionalHeaders() != null) {
this.getAdditionalHeaders().forEach(builder::addHeader);
if (additionalHeaders != null) {
additionalHeaders.forEach(builder::addHeader);
}
return builder;
}
Expand All @@ -121,15 +127,90 @@ private Request buildQueryRequest(String query, ClientSettings settings) {
throw new IllegalArgumentException("Invalid request: " + req);
}
url = url.newBuilder().encodedPath(QUERY_PATH).build();
Request.Builder builder = prepareRequest(url);
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
return builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, reqString)).build();
}

private static Request buildDiscoveryRequest(ClientSettings settings) {
HttpUrl url = HttpUrl.get(settings.getHost());
if (url == null) {
// TODO(zhihanz) use custom exception
throw new IllegalArgumentException("Invalid host: " + settings.getHost());
}
url = url.newBuilder().encodedPath(DISCOVERY_PATH).build();
Request.Builder builder = prepareRequest(url, settings.getAdditionalHeaders());
return builder.get().build();
}

@Override
public String getQuery() {
return query;
}

private static DiscoveryResponseCodec.DiscoveryResponse getDiscoveryResponse(OkHttpClient httpClient, Request request, OptionalLong materializedJsonSizeLimit, int requestTimeoutSecs) {
requireNonNull(request, "request is null");

long start = System.nanoTime();
int attempts = 0;
Exception lastException = null;

while (true) {
if (attempts > 0) {
Duration sinceStart = Duration.ofNanos(System.nanoTime() - start);
if (sinceStart.compareTo(Duration.ofSeconds(requestTimeoutSecs)) > 0) {
throw new RuntimeException(format("Error fetching discovery nodes (attempts: %s, duration: %s)", attempts, sinceStart.getSeconds()), lastException);
}

try {
MILLISECONDS.sleep(attempts * 100); // Exponential backoff
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while fetching discovery nodes", e);
}
}
attempts++;

JsonResponse<DiscoveryResponseCodec.DiscoveryResponse> response;
try {
response = JsonResponse.execute(
DISCOVERY_RESULT_CODEC,
httpClient,
request,
materializedJsonSizeLimit);
} catch (RuntimeException e) {
lastException = e;
if (e.getCause() instanceof ConnectException) {
// Retry on connection refused errors
continue;
}
throw new RuntimeException("Failed to fetch discovery nodes: " + e.getMessage(), e);
}

if (response.getStatusCode() == HTTP_OK && response.hasValue()) {
DiscoveryResponseCodec.DiscoveryResponse discoveryResponse = response.getValue();
if (discoveryResponse.getError() == null) {
return discoveryResponse; // Successful response
}
if (discoveryResponse.getError().notFound()) {
throw new UnsupportedOperationException("Discovery request feature not supported: " + discoveryResponse.getError());
}
throw new RuntimeException("Discovery request failed: " + discoveryResponse.getError());
}

// Handle other HTTP error codes and response body parsing for errors
if (response.getResponseBody().isPresent()) {
CloudErrors errors = CloudErrors.tryParse(response.getResponseBody().get());
if (errors != null && errors.tryGetErrorKind().canRetry()) {
continue;
}
}

if (response.getStatusCode() != 520) {
throw new RuntimeException("Discovery request failed with status code: " + response.getStatusCode());
}
}
}

private boolean executeInternal(Request request, OptionalLong materializedJsonSizeLimit) {
requireNonNull(request, "request is null");
long start = System.nanoTime();
Expand Down Expand Up @@ -219,7 +300,7 @@ private void processResponse(Headers headers, QueryResults results) {
if (results.getQueryId() != null && this.additonalHeaders.get(ClientSettings.X_Databend_Query_ID) == null) {
this.additonalHeaders.put(ClientSettings.X_Databend_Query_ID, results.getQueryId());
}
if (headers != null && headers.get(ClientSettings.X_DATABEND_ROUTE_HINT) != null){
if (headers != null && headers.get(ClientSettings.X_DATABEND_ROUTE_HINT) != null) {
this.additonalHeaders.put(ClientSettings.X_DATABEND_ROUTE_HINT, headers.get(ClientSettings.X_DATABEND_ROUTE_HINT));
}
currentResults.set(results);
Expand All @@ -241,7 +322,7 @@ public boolean advance() {
String nextUriPath = this.currentResults.get().getNextUri().toString();
HttpUrl url = HttpUrl.get(this.host);
url = url.newBuilder().encodedPath(nextUriPath).build();
Request.Builder builder = prepareRequest(url);
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
Request request = builder.get().build();
return executeInternal(request, OptionalLong.of(MAX_MATERIALIZED_JSON_RESPONSE_SIZE));
}
Expand Down Expand Up @@ -291,7 +372,7 @@ private void closeQuery() {
String path = uri.toString();
HttpUrl url = HttpUrl.get(this.host);
url = url.newBuilder().encodedPath(path).build();
Request r = prepareRequest(url).get().build();
Request r = prepareRequest(url, this.additonalHeaders).get().build();
try {
httpClient.newCall(r).execute().close();
} catch (IOException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databend.client;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import static com.google.common.base.MoreObjects.toStringHelper;

public class DiscoveryNode {
private final String address;

@JsonCreator
public DiscoveryNode(
@JsonProperty("address") String address) {
this.address = address;
}

// add builder

@JsonProperty
public String getAddress() {
return address;
}

@Override
public String toString() {
return toStringHelper(this)
.add("address", address)
.toString();
}

public static Builder builder() {
return new Builder();
}

public static final class Builder {
private String address;

public Builder setAddress(String address) {
this.address = address;
return this;
}

public DiscoveryNode build() {
return new DiscoveryNode(address);
}
}
}
Loading

0 comments on commit 6cf07e5

Please sign in to comment.