diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java index 1124387..6f623b5 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java +++ b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java @@ -33,6 +33,7 @@ import static com.databend.client.JsonCodec.jsonCodec; import static com.google.common.base.MoreObjects.firstNonNull; import static java.lang.String.format; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_OK; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -137,7 +138,13 @@ private static Request buildDiscoveryRequest(ClientSettings settings) { // TODO(zhihanz) use custom exception throw new IllegalArgumentException("Invalid host: " + settings.getHost()); } - url = url.newBuilder().encodedPath(DISCOVERY_PATH).build(); + String discoveryPath = DISCOVERY_PATH; + // intentionally use unsupported discovery path for testing + if (settings.getAdditionalHeaders().get("~mock.unsupported.discovery") != null && settings.getAdditionalHeaders().get("~mock.unsupported.discovery").equals("true")) { + discoveryPath = "/v1/discovery_nodes_unsupported"; + } + + url = url.newBuilder().encodedPath(discoveryPath).build(); Request.Builder builder = prepareRequest(url, settings.getAdditionalHeaders()); return builder.get().build(); } @@ -195,6 +202,8 @@ private static DiscoveryResponseCodec.DiscoveryResponse getDiscoveryResponse(OkH throw new UnsupportedOperationException("Discovery request feature not supported: " + discoveryResponse.getError()); } throw new RuntimeException("Discovery request failed: " + discoveryResponse.getError()); + } else if (response.getStatusCode() == HTTP_NOT_FOUND) { + throw new UnsupportedOperationException("Discovery request feature not supported"); } // Handle other HTTP error codes and response body parsing for errors diff --git a/databend-client/src/main/java/com/databend/client/DiscoveryNode.java b/databend-client/src/main/java/com/databend/client/DiscoveryNode.java index 876a6e2..7a08c52 100644 --- a/databend-client/src/main/java/com/databend/client/DiscoveryNode.java +++ b/databend-client/src/main/java/com/databend/client/DiscoveryNode.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; + import static com.google.common.base.MoreObjects.toStringHelper; public class DiscoveryNode { diff --git a/databend-client/src/test/java/com/databend/client/TestClientIT.java b/databend-client/src/test/java/com/databend/client/TestClientIT.java index 3214ab9..5f35687 100644 --- a/databend-client/src/test/java/com/databend/client/TestClientIT.java +++ b/databend-client/src/test/java/com/databend/client/TestClientIT.java @@ -109,4 +109,21 @@ public void testDiscoverNodes() { } } + @Test(groups = {"it"}) + public void testDiscoverNodesUnSupported() { + OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build(); + String expectedUUID = UUID.randomUUID().toString(); + + Map additionalHeaders = new HashMap<>(); + additionalHeaders.put(X_Databend_Query_ID, expectedUUID); + additionalHeaders.put("~mock.unsupported.discovery", "true"); + ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS); + try { + DatabendClientV1.dicoverNodes(client, settings); + Assert.fail("Expected exception was not thrown"); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.assertTrue(e instanceof UnsupportedOperationException, "Exception should be UnsupportedOperationException"); + } + } } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/ConnectionProperties.java b/databend-jdbc/src/main/java/com/databend/jdbc/ConnectionProperties.java index 2858e3c..a5994a7 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/ConnectionProperties.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/ConnectionProperties.java @@ -25,6 +25,8 @@ public final class ConnectionProperties { static final ConnectionProperty TENANT = new Tenant(); public static final ConnectionProperty MAX_FAILOVER_RETRY = new MaxFailoverRetry(); public static final ConnectionProperty LOAD_BALANCING_POLICY = new LoadBalancingPolicy(); + public static final ConnectionProperty AUTO_DISCOVERY = new AutoDiscovery(); + public static final ConnectionProperty DATABASE = new Database(); public static final ConnectionProperty ACCESS_TOKEN = new AccessToken(); @@ -156,6 +158,12 @@ public LoadBalancingPolicy() { } } + private static class AutoDiscovery extends AbstractConnectionProperty { + public AutoDiscovery() { + super("auto_discovery", Optional.of("false"), NOT_REQUIRED, ALLOWED, BOOLEAN_CONVERTER); + } + } + private static class AccessToken extends AbstractConnectionProperty { public AccessToken() { diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java index d653aed..4846948 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java @@ -12,6 +12,7 @@ import java.io.*; import java.net.ConnectException; import java.net.URI; +import java.net.URISyntaxException; import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; @@ -42,6 +43,7 @@ import static com.databend.client.ClientSettings.*; import static com.google.common.base.Preconditions.checkState; +import static java.net.URI.create; import static java.util.Collections.newSetFromMap; import static java.util.Objects.requireNonNull; @@ -56,6 +58,7 @@ public class DatabendConnection implements Connection, FileTransferAPI, Consumer private final OkHttpClient httpClient; private final Set statements = newSetFromMap(new ConcurrentHashMap<>()); private final DatabendDriverUri driverUri; + private boolean autoDiscovery; private AtomicReference session = new AtomicReference<>(); private String routeHint = ""; @@ -90,6 +93,8 @@ private void initializeFileHandler() { this.driverUri = uri; this.setSchema(uri.getDatabase()); this.routeHint = randRouteHint(); + // it maybe closed due to unsupported server versioning. + this.autoDiscovery = uri.autoDiscovery(); DatabendSession session = new DatabendSession.Builder().setDatabase(this.getSchema()).build(); this.setSession(session); @@ -106,6 +111,41 @@ public static String randRouteHint() { return sb.toString(); } + private static final char SPECIAL_CHAR = '#'; + + public static String uriRouteHint(String URI) { + // Encode the URI using Base64 + String encodedUri = Base64.getEncoder().encodeToString(URI.getBytes()); + + // Append the special character + return encodedUri + SPECIAL_CHAR; + } + + public static URI parseRouteHint(String routeHint) { + if (routeHint == null || routeHint.isEmpty()) { + return null; + } + URI target; + try { + if (routeHint.charAt(routeHint.length() - 1) != SPECIAL_CHAR) { + return null; + } + // Remove the special character + String encodedUri = routeHint.substring(0, routeHint.length() - 1); + + // Decode the Base64 string + byte[] decodedBytes = Base64.getDecoder().decode(encodedUri); + String decodedUri = new String(decodedBytes); + + return create(decodedUri); + } catch (Exception e) { + logger.log(Level.FINE, "Failed to parse route hint: " + routeHint, e); + return null; + } + } + + + private static void checkResultSet(int resultSetType, int resultSetConcurrency) throws SQLFeatureNotSupportedException { if (resultSetType != ResultSet.TYPE_FORWARD_ONLY) { @@ -635,16 +675,36 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws throw new SQLException("Error start query: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e); } try { + // route hint is used when transaction occured or when multi-cluster warehouse adopted(CLOUD ONLY) + // on cloud case, we have gateway to handle with route hint, and will not parse URI from route hint. + // transaction procedure: + // 1. server return session body where txn state is active + // 2. when there is a active transaction, it will route all query to target route hint uri if exists + // 3. if there is not active transaction, it will use load balancing policy to choose a host to execute query + String query_id = UUID.randomUUID().toString(); + String candidateHost = this.driverUri.getUri(query_id).toString(); if (!inActiveTransaction()) { - this.routeHint = randRouteHint(); + this.routeHint = uriRouteHint(candidateHost); + } + // checkout the host to use from route hint + if (this.routeHint != null && !this.routeHint.isEmpty()) { + URI uri = parseRouteHint(this.routeHint); + if (uri != null) { + candidateHost = uri.toString(); + } } + // configure query and choose host based on load balancing policy. - ClientSettings.Builder sb = this.makeClientSettings(); + ClientSettings.Builder sb = this.makeClientSettings(query_id, candidateHost); if (attach != null) { sb.setStageAttachment(attach); } ClientSettings s = sb.build(); logger.log(Level.FINE, "retry " + i + " times to execute query: " + sql + " on " + s.getHost()); + // discover new hosts in need. +// if (this.autoDiscovery) { +// +// } return new DatabendClientV1(httpClient, sql, s, this); } catch (RuntimeException e1) { e = e1; @@ -663,14 +723,13 @@ DatabendClient startQuery(String sql, StageAttachment attach) throws SQLExceptio return startQueryWithFailover(sql, attach); } - private ClientSettings.Builder makeClientSettings() { + private ClientSettings.Builder makeClientSettings(String queryID, String host) { PaginationOptions options = getPaginationOptions(); Map additionalHeaders = setAdditionalHeaders(); - String query_id = UUID.randomUUID().toString(); - additionalHeaders.put(X_Databend_Query_ID, query_id); + additionalHeaders.put(X_Databend_Query_ID, queryID); return new Builder(). setSession(this.session.get()). - setHost(this.driverUri.getUri(query_id).toString()). + setHost(host). setQueryTimeoutSecs(this.driverUri.getQueryTimeout()). setConnectionTimeout(this.driverUri.getConnectionTimeout()). setSocketTimeout(this.driverUri.getSocketTimeout()). diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendDriverUri.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendDriverUri.java index 23cc41a..8dc7901 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendDriverUri.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendDriverUri.java @@ -48,6 +48,7 @@ public final class DatabendDriverUri { private final boolean presignedUrlDisabled; private final Integer connectionTimeout; private final Integer maxFailoverRetry; + private final boolean autoDiscovery; private final Integer queryTimeout; private final Integer socketTimeout; private final Integer waitTimeSecs; @@ -69,11 +70,13 @@ private DatabendDriverUri(String url, Properties driverProperties) this.sslmode = SSL_MODE.getValue(properties).orElse("disable"); this.tenant = TENANT.getValue(properties).orElse(""); this.maxFailoverRetry = MAX_FAILOVER_RETRY.getValue(properties).orElse(0); + this.autoDiscovery = AUTO_DISCOVERY.getValue(properties).orElse(false); List finalUris = canonicalizeUris(uris, this.useSecureConnection, this.sslmode); DatabendClientLoadBalancingPolicy policy = DatabendClientLoadBalancingPolicy.create(LOAD_BALANCING_POLICY.getValue(properties).orElse(DatabendClientLoadBalancingPolicy.DISABLED)); DatabendNodes nodes = uriAndProperties.getKey(); nodes.updateNodes(finalUris); nodes.updatePolicy(policy); + nodes.setSSL(this.useSecureConnection, this.sslmode); this.nodes = nodes; this.database = DATABASE.getValue(properties).orElse("default"); this.presignedUrlDisabled = PRESIGNED_URL_DISABLED.getRequiredValue(properties); @@ -251,10 +254,13 @@ private static Map.Entry> parse(String url) URI lastUri = uris.get(uris.size() - 1); // Initialize database from the last URI initDatabase(lastUri, uriProperties); + String uriPath = lastUri.getPath(); + String uriQuery = lastUri.getQuery(); + String uriFragment = lastUri.getFragment(); // Sync path and query from lastUri for the rest of uri for (int i = 0; i < uris.size() - 1; i++) { URI uri = uris.get(i); - uris.set(i, new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), lastUri.getPath(), lastUri.getQuery(), lastUri.getFragment())); + uris.set(i, new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uriPath, uriQuery, uriFragment)); } // remove duplicate uris Set uriSet = new LinkedHashSet<>(uris); @@ -262,7 +268,7 @@ private static Map.Entry> parse(String url) uris.addAll(uriSet); // Create DatabendNodes object DatabendClientLoadBalancingPolicy policy = DatabendClientLoadBalancingPolicy.create(DatabendClientLoadBalancingPolicy.DISABLED); // You might want to make this configurable - DatabendNodes databendNodes = new DatabendNodes(uris, policy); + DatabendNodes databendNodes = new DatabendNodes(uris, policy, uriPath, uriQuery, uriFragment); return new AbstractMap.SimpleImmutableEntry<>(databendNodes, uriProperties); } catch (URISyntaxException e) { throw new SQLException("Invalid URI: " + raw, e); @@ -304,6 +310,9 @@ public URI getUri(String query_id) { return nodes.pickUri(query_id); } + public Boolean autoDiscovery() { + return autoDiscovery; + } public String getDatabase() { return database; } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodeRouter.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodeRouter.java index 530d86f..386ac52 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodeRouter.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodeRouter.java @@ -1,5 +1,8 @@ package com.databend.jdbc; +import com.databend.client.ClientSettings; +import okhttp3.OkHttpClient; + import java.net.URI; import java.util.List; @@ -13,10 +16,18 @@ public interface DatabendNodeRouter { * @return non-null uris */ List getUris(); - + /** * Get load balancing policy */ DatabendClientLoadBalancingPolicy getPolicy(); + /** + * Discover all possible query uris through databend discovery api and update candidate node router list in need + * @return true if update operation executed, false otherwise + * Ref PR: + * https://github.com/datafuselabs/databend-jdbc/pull/264 + * https://github.com/datafuselabs/databend/pull/16353 + */ + boolean discoverUris(OkHttpClient client, ClientSettings settings); } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java index c1f5840..39a7cf0 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java @@ -1,28 +1,60 @@ package com.databend.jdbc; +import com.databend.client.ClientSettings; +import com.databend.client.DatabendClientV1; +import com.databend.client.DiscoveryNode; +import okhttp3.OkHttpClient; + import java.net.URI; +import java.net.URISyntaxException; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static com.databend.jdbc.ConnectionProperties.SSL; +import static com.databend.jdbc.ConnectionProperties.SSL_MODE; public class DatabendNodes implements DatabendNodeRouter { - private List query_nodes_uris; + private AtomicReference> query_nodes_uris; protected final AtomicInteger index; + // keep track of latest discovery scheduled time + protected final AtomicReference lastDiscoveryTime = new AtomicReference<>(0L); + // minimum time between discovery + protected long discoveryInterval = 1000 * 60 * 5; protected DatabendClientLoadBalancingPolicy policy; - public DatabendNodes(List queryNodesUris, DatabendClientLoadBalancingPolicy policy) { - this.query_nodes_uris = queryNodesUris; + + private final String uriPath; + private final String uriQuery; + private final String uriFragment; + + private boolean useSecureConnection = false; + private String sslmode = "disable"; + + public DatabendNodes(List queryNodesUris, DatabendClientLoadBalancingPolicy policy, String UriPath, String UriQuery, String UriFragment) { + this.query_nodes_uris = new AtomicReference<>(queryNodesUris); this.policy = policy; this.index = new AtomicInteger(0); + this.uriPath = UriPath; + this.uriQuery = UriQuery; + this.uriFragment = UriFragment; } - @Override public List getUris() { - return query_nodes_uris; + return query_nodes_uris.get(); + } + + public void setSSL(boolean useSecureConnection, String sslmode) { + this.useSecureConnection = useSecureConnection; + this.sslmode = sslmode; } public void updateNodes(List query_nodes_uris) { - this.query_nodes_uris = query_nodes_uris; + this.query_nodes_uris.set(query_nodes_uris); } public void updatePolicy(DatabendClientLoadBalancingPolicy policy) { @@ -34,6 +66,61 @@ public DatabendClientLoadBalancingPolicy getPolicy() { return policy; } + @Override + public boolean discoverUris(OkHttpClient client, ClientSettings settings) { + // do nothing if discovery interval is not reached + Long lastDiscoveryTime = this.lastDiscoveryTime.get(); + if (System.currentTimeMillis() - lastDiscoveryTime < discoveryInterval) { + return false; + } + List current_nodes = query_nodes_uris.get(); + if (!this.lastDiscoveryTime.compareAndSet(lastDiscoveryTime, System.currentTimeMillis())) { + return false; + } + + List new_nodes = DatabendClientV1.dicoverNodes(client, settings); + if (!new_nodes.isEmpty()) { + // convert new nodes using lambda + List new_uris = new_nodes.stream().map(node -> URI.create("http://" + node.getAddress())).collect(Collectors.toList()); + updateNodes(new_uris); + return true; + } + return false; + } + + private List parseURI(List nodes) throws SQLException { + String host = null; + List uris = new ArrayList<>(); + try { + for (DiscoveryNode node : nodes) { + String raw_host = node.getAddress(); + String fullUri = (raw_host.startsWith("http://") || raw_host.startsWith("https://")) ? + raw_host : + "http://" + raw_host; + + URI uri = new URI(fullUri); + String authority = uri.getAuthority(); + String[] hostAndPort = authority.split(":"); + if (hostAndPort.length == 2) { + host = hostAndPort[0]; + } else if (hostAndPort.length == 1) { + host = hostAndPort[0]; + } else { + throw new SQLException("Invalid host and port, url: " + uri); + } + if (host == null || host.isEmpty()) { + throw new SQLException("Invalid host " + host); + } + + uris.add(new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uriPath, uriQuery, uriFragment)); + } + + } catch (URISyntaxException e) { + throw new SQLException("Invalid URI", e.getMessage()); + } + + return uris; + } public URI pickUri(String query_id) { return policy.pickUri(query_id, this); } diff --git a/databend-jdbc/src/test/java/com/databend/jdbc/TestMultiHost.java b/databend-jdbc/src/test/java/com/databend/jdbc/TestMultiHost.java index 3a149ea..06f2d5f 100644 --- a/databend-jdbc/src/test/java/com/databend/jdbc/TestMultiHost.java +++ b/databend-jdbc/src/test/java/com/databend/jdbc/TestMultiHost.java @@ -117,6 +117,40 @@ public void testRoundRobinLoadBalancing() Assert.assertEquals(node8000 + node8002 + node8003, 90); } + @Test(groups = {"IT", "cluster"}) + public void testRoundRobinTransaction() + throws SQLException { + // try connect with three nodes 1000 times and count for each node + try (Connection connection = createConnection(ROUND_ROBIN_JDBC_URL)) { + DatabendStatement statement = (DatabendStatement) connection.createStatement(); + statement.execute("drop table if exists test_transaction;"); + statement.execute("create table if not exists test_transaction(id int);"); + + } + for (int i = 0; i < 30; i++) { + try (Connection connection = createConnection(ROUND_ROBIN_JDBC_URL)) { + DatabendStatement statement = (DatabendStatement) connection.createStatement(); + // use transaction select a table, drop a table, insert data into table bring i index + statement.execute("begin;"); + statement.execute("insert into test_transaction values(" + i + ");"); + statement.execute("select * from test_transaction;"); + statement.execute("commit;"); + } + } + + // query on test + try (Connection connection = createConnection(ROUND_ROBIN_JDBC_URL)) { + DatabendStatement statement = (DatabendStatement) connection.createStatement(); + statement.execute("select * from test_transaction;"); + ResultSet r = statement.getResultSet(); + int count = 0; + while (r.next()) { + count++; + } + Assert.assertEquals(count, 30); + } + } + @Test(groups = {"IT", "cluster"}) public void testFailOver() throws SQLException { diff --git a/databend-jdbc/src/test/java/com/databend/jdbc/TestPrepareStatement.java b/databend-jdbc/src/test/java/com/databend/jdbc/TestPrepareStatement.java index db5ede2..b62c94e 100644 --- a/databend-jdbc/src/test/java/com/databend/jdbc/TestPrepareStatement.java +++ b/databend-jdbc/src/test/java/com/databend/jdbc/TestPrepareStatement.java @@ -548,7 +548,7 @@ public void testSelectWithClusterKey() throws SQLException { try (PreparedStatement statement = conn.prepareStatement(selectSQL)) { ResultSet rs = statement.executeQuery(); while (rs.next()) { - Assertions.assertEquals("0.0", rs.getString(5)); + Assertions.assertEquals(Float.valueOf("0.0"), Float.valueOf(rs.getString(5))); } } }