Skip to content

Commit

Permalink
fix: fix transaction route on multi host (#266)
Browse files Browse the repository at this point in the history
* fix: fix transaction route on multi host

* chore: fix flaky cluster key

* chore: ignore useless code

* chore: add more comment
  • Loading branch information
ZhiHanZ authored Sep 5, 2024
1 parent 6cf07e5 commit 8f0b4d8
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static com.databend.client.JsonCodec.jsonCodec;
import static com.google.common.base.MoreObjects.firstNonNull;
import static java.lang.String.format;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -137,7 +138,13 @@ private static Request buildDiscoveryRequest(ClientSettings settings) {
// TODO(zhihanz) use custom exception
throw new IllegalArgumentException("Invalid host: " + settings.getHost());
}
url = url.newBuilder().encodedPath(DISCOVERY_PATH).build();
String discoveryPath = DISCOVERY_PATH;
// intentionally use unsupported discovery path for testing
if (settings.getAdditionalHeaders().get("~mock.unsupported.discovery") != null && settings.getAdditionalHeaders().get("~mock.unsupported.discovery").equals("true")) {
discoveryPath = "/v1/discovery_nodes_unsupported";
}

url = url.newBuilder().encodedPath(discoveryPath).build();
Request.Builder builder = prepareRequest(url, settings.getAdditionalHeaders());
return builder.get().build();
}
Expand Down Expand Up @@ -195,6 +202,8 @@ private static DiscoveryResponseCodec.DiscoveryResponse getDiscoveryResponse(OkH
throw new UnsupportedOperationException("Discovery request feature not supported: " + discoveryResponse.getError());
}
throw new RuntimeException("Discovery request failed: " + discoveryResponse.getError());
} else if (response.getStatusCode() == HTTP_NOT_FOUND) {
throw new UnsupportedOperationException("Discovery request feature not supported");
}

// Handle other HTTP error codes and response body parsing for errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.net.URI;

import static com.google.common.base.MoreObjects.toStringHelper;

public class DiscoveryNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,21 @@ public void testDiscoverNodes() {
}
}

@Test(groups = {"it"})
public void testDiscoverNodesUnSupported() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();
String expectedUUID = UUID.randomUUID().toString();

Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
additionalHeaders.put("~mock.unsupported.discovery", "true");
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
try {
DatabendClientV1.dicoverNodes(client, settings);
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
System.out.println(e.getMessage());
Assert.assertTrue(e instanceof UnsupportedOperationException, "Exception should be UnsupportedOperationException");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public final class ConnectionProperties {
static final ConnectionProperty<String> TENANT = new Tenant();
public static final ConnectionProperty<Integer> MAX_FAILOVER_RETRY = new MaxFailoverRetry();
public static final ConnectionProperty<String> LOAD_BALANCING_POLICY = new LoadBalancingPolicy();
public static final ConnectionProperty<Boolean> AUTO_DISCOVERY = new AutoDiscovery();

public static final ConnectionProperty<String> DATABASE = new Database();
public static final ConnectionProperty<String> ACCESS_TOKEN = new AccessToken();

Expand Down Expand Up @@ -156,6 +158,12 @@ public LoadBalancingPolicy() {
}
}

private static class AutoDiscovery extends AbstractConnectionProperty<Boolean> {
public AutoDiscovery() {
super("auto_discovery", Optional.of("false"), NOT_REQUIRED, ALLOWED, BOOLEAN_CONVERTER);
}
}

private static class AccessToken
extends AbstractConnectionProperty<String> {
public AccessToken() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.*;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
Expand Down Expand Up @@ -42,6 +43,7 @@

import static com.databend.client.ClientSettings.*;
import static com.google.common.base.Preconditions.checkState;
import static java.net.URI.create;
import static java.util.Collections.newSetFromMap;
import static java.util.Objects.requireNonNull;

Expand All @@ -56,6 +58,7 @@ public class DatabendConnection implements Connection, FileTransferAPI, Consumer
private final OkHttpClient httpClient;
private final Set<DatabendStatement> statements = newSetFromMap(new ConcurrentHashMap<>());
private final DatabendDriverUri driverUri;
private boolean autoDiscovery;
private AtomicReference<DatabendSession> session = new AtomicReference<>();

private String routeHint = "";
Expand Down Expand Up @@ -90,6 +93,8 @@ private void initializeFileHandler() {
this.driverUri = uri;
this.setSchema(uri.getDatabase());
this.routeHint = randRouteHint();
// it maybe closed due to unsupported server versioning.
this.autoDiscovery = uri.autoDiscovery();
DatabendSession session = new DatabendSession.Builder().setDatabase(this.getSchema()).build();
this.setSession(session);

Expand All @@ -106,6 +111,41 @@ public static String randRouteHint() {
return sb.toString();
}

private static final char SPECIAL_CHAR = '#';

public static String uriRouteHint(String URI) {
// Encode the URI using Base64
String encodedUri = Base64.getEncoder().encodeToString(URI.getBytes());

// Append the special character
return encodedUri + SPECIAL_CHAR;
}

public static URI parseRouteHint(String routeHint) {
if (routeHint == null || routeHint.isEmpty()) {
return null;
}
URI target;
try {
if (routeHint.charAt(routeHint.length() - 1) != SPECIAL_CHAR) {
return null;
}
// Remove the special character
String encodedUri = routeHint.substring(0, routeHint.length() - 1);

// Decode the Base64 string
byte[] decodedBytes = Base64.getDecoder().decode(encodedUri);
String decodedUri = new String(decodedBytes);

return create(decodedUri);
} catch (Exception e) {
logger.log(Level.FINE, "Failed to parse route hint: " + routeHint, e);
return null;
}
}



private static void checkResultSet(int resultSetType, int resultSetConcurrency)
throws SQLFeatureNotSupportedException {
if (resultSetType != ResultSet.TYPE_FORWARD_ONLY) {
Expand Down Expand Up @@ -635,16 +675,36 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws
throw new SQLException("Error start query: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e);
}
try {
// route hint is used when transaction occured or when multi-cluster warehouse adopted(CLOUD ONLY)
// on cloud case, we have gateway to handle with route hint, and will not parse URI from route hint.
// transaction procedure:
// 1. server return session body where txn state is active
// 2. when there is a active transaction, it will route all query to target route hint uri if exists
// 3. if there is not active transaction, it will use load balancing policy to choose a host to execute query
String query_id = UUID.randomUUID().toString();
String candidateHost = this.driverUri.getUri(query_id).toString();
if (!inActiveTransaction()) {
this.routeHint = randRouteHint();
this.routeHint = uriRouteHint(candidateHost);
}
// checkout the host to use from route hint
if (this.routeHint != null && !this.routeHint.isEmpty()) {
URI uri = parseRouteHint(this.routeHint);
if (uri != null) {
candidateHost = uri.toString();
}
}

// configure query and choose host based on load balancing policy.
ClientSettings.Builder sb = this.makeClientSettings();
ClientSettings.Builder sb = this.makeClientSettings(query_id, candidateHost);
if (attach != null) {
sb.setStageAttachment(attach);
}
ClientSettings s = sb.build();
logger.log(Level.FINE, "retry " + i + " times to execute query: " + sql + " on " + s.getHost());
// discover new hosts in need.
// if (this.autoDiscovery) {
//
// }
return new DatabendClientV1(httpClient, sql, s, this);
} catch (RuntimeException e1) {
e = e1;
Expand All @@ -663,14 +723,13 @@ DatabendClient startQuery(String sql, StageAttachment attach) throws SQLExceptio
return startQueryWithFailover(sql, attach);
}

private ClientSettings.Builder makeClientSettings() {
private ClientSettings.Builder makeClientSettings(String queryID, String host) {
PaginationOptions options = getPaginationOptions();
Map<String, String> additionalHeaders = setAdditionalHeaders();
String query_id = UUID.randomUUID().toString();
additionalHeaders.put(X_Databend_Query_ID, query_id);
additionalHeaders.put(X_Databend_Query_ID, queryID);
return new Builder().
setSession(this.session.get()).
setHost(this.driverUri.getUri(query_id).toString()).
setHost(host).
setQueryTimeoutSecs(this.driverUri.getQueryTimeout()).
setConnectionTimeout(this.driverUri.getConnectionTimeout()).
setSocketTimeout(this.driverUri.getSocketTimeout()).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public final class DatabendDriverUri {
private final boolean presignedUrlDisabled;
private final Integer connectionTimeout;
private final Integer maxFailoverRetry;
private final boolean autoDiscovery;
private final Integer queryTimeout;
private final Integer socketTimeout;
private final Integer waitTimeSecs;
Expand All @@ -69,11 +70,13 @@ private DatabendDriverUri(String url, Properties driverProperties)
this.sslmode = SSL_MODE.getValue(properties).orElse("disable");
this.tenant = TENANT.getValue(properties).orElse("");
this.maxFailoverRetry = MAX_FAILOVER_RETRY.getValue(properties).orElse(0);
this.autoDiscovery = AUTO_DISCOVERY.getValue(properties).orElse(false);
List<URI> finalUris = canonicalizeUris(uris, this.useSecureConnection, this.sslmode);
DatabendClientLoadBalancingPolicy policy = DatabendClientLoadBalancingPolicy.create(LOAD_BALANCING_POLICY.getValue(properties).orElse(DatabendClientLoadBalancingPolicy.DISABLED));
DatabendNodes nodes = uriAndProperties.getKey();
nodes.updateNodes(finalUris);
nodes.updatePolicy(policy);
nodes.setSSL(this.useSecureConnection, this.sslmode);
this.nodes = nodes;
this.database = DATABASE.getValue(properties).orElse("default");
this.presignedUrlDisabled = PRESIGNED_URL_DISABLED.getRequiredValue(properties);
Expand Down Expand Up @@ -251,18 +254,21 @@ private static Map.Entry<DatabendNodes, Map<String, String>> parse(String url)
URI lastUri = uris.get(uris.size() - 1);
// Initialize database from the last URI
initDatabase(lastUri, uriProperties);
String uriPath = lastUri.getPath();
String uriQuery = lastUri.getQuery();
String uriFragment = lastUri.getFragment();
// Sync path and query from lastUri for the rest of uri
for (int i = 0; i < uris.size() - 1; i++) {
URI uri = uris.get(i);
uris.set(i, new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), lastUri.getPath(), lastUri.getQuery(), lastUri.getFragment()));
uris.set(i, new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uriPath, uriQuery, uriFragment));
}
// remove duplicate uris
Set<URI> uriSet = new LinkedHashSet<>(uris);
uris.clear();
uris.addAll(uriSet);
// Create DatabendNodes object
DatabendClientLoadBalancingPolicy policy = DatabendClientLoadBalancingPolicy.create(DatabendClientLoadBalancingPolicy.DISABLED); // You might want to make this configurable
DatabendNodes databendNodes = new DatabendNodes(uris, policy);
DatabendNodes databendNodes = new DatabendNodes(uris, policy, uriPath, uriQuery, uriFragment);
return new AbstractMap.SimpleImmutableEntry<>(databendNodes, uriProperties);
} catch (URISyntaxException e) {
throw new SQLException("Invalid URI: " + raw, e);
Expand Down Expand Up @@ -304,6 +310,9 @@ public URI getUri(String query_id) {
return nodes.pickUri(query_id);
}

public Boolean autoDiscovery() {
return autoDiscovery;
}
public String getDatabase() {
return database;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.databend.jdbc;

import com.databend.client.ClientSettings;
import okhttp3.OkHttpClient;

import java.net.URI;
import java.util.List;

Expand All @@ -13,10 +16,18 @@ public interface DatabendNodeRouter {
* @return non-null uris
*/
List<URI> getUris();

/**
* Get load balancing policy
*/
DatabendClientLoadBalancingPolicy getPolicy();

/**
* Discover all possible query uris through databend discovery api and update candidate node router list in need
* @return true if update operation executed, false otherwise
* Ref PR:
* https://github.com/datafuselabs/databend-jdbc/pull/264
* https://github.com/datafuselabs/databend/pull/16353
*/
boolean discoverUris(OkHttpClient client, ClientSettings settings);
}
Loading

0 comments on commit 8f0b4d8

Please sign in to comment.