diff --git a/databend-client/src/main/java/com/databend/client/ClientSettings.java b/databend-client/src/main/java/com/databend/client/ClientSettings.java index 6ca2443..41d51f8 100644 --- a/databend-client/src/main/java/com/databend/client/ClientSettings.java +++ b/databend-client/src/main/java/com/databend/client/ClientSettings.java @@ -14,6 +14,7 @@ package com.databend.client; +import java.util.HashMap; import java.util.Map; public class ClientSettings { @@ -36,7 +37,7 @@ public class ClientSettings { // TODO(zhihanz) timezone and locale info public ClientSettings(String host) { - this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), null, null, DEFAULT_RETRY_ATTEMPTS); + this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap(), null, DEFAULT_RETRY_ATTEMPTS); } public ClientSettings(String host, String database) { @@ -47,7 +48,7 @@ public ClientSettings(String host, String database) { this.connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; this.socketTimeout = DEFAULT_SOCKET_TIMEOUT; this.paginationOptions = PaginationOptions.defaultPaginationOptions(); - this.additionalHeaders = null; + this.additionalHeaders = new HashMap<>(); this.stageAttachment = null; this.retryAttempts = DEFAULT_RETRY_ATTEMPTS; } diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java index 4ca8613..0dc2328 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java +++ b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java @@ -39,13 +39,13 @@ @ThreadSafe public class DatabendClientV1 - implements DatabendClient -{ + implements DatabendClient { private static final String USER_AGENT_VALUE = DatabendClientV1.class.getSimpleName() + "/" + firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown"); private static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8"); private static final JsonCodec QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class); + private static final String XDatabendQueryIDHeader = "X-Databend-Query-Id"; private static final String QUERY_PATH = "/v1/query"; private static final long MAX_MATERIALIZED_JSON_RESPONSE_SIZE = 128 * 1024; @@ -62,8 +62,7 @@ public class DatabendClientV1 private final AtomicReference databendSession; private final AtomicReference currentResults = new AtomicReference<>(); - public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings) - { + public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings) { requireNonNull(httpClient, "httpClient is null"); requireNonNull(sql, "sql is null"); requireNonNull(settings, "settings is null"); @@ -84,8 +83,7 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett } } - private Request.Builder prepareRequst(HttpUrl url) - { + private Request.Builder prepareRequst(HttpUrl url) { Request.Builder builder = new Request.Builder() .url(url) .header("User-Agent", USER_AGENT_VALUE) @@ -97,8 +95,7 @@ private Request.Builder prepareRequst(HttpUrl url) return builder; } - private Request buildQueryRequest(String query, ClientSettings settings) - { + private Request buildQueryRequest(String query, ClientSettings settings) { HttpUrl url = HttpUrl.get(settings.getHost()); if (url == null) { // TODO(zhihanz) use custom exception @@ -116,8 +113,7 @@ private Request buildQueryRequest(String query, ClientSettings settings) } @Override - public String getQuery() - { + public String getQuery() { return query; } @@ -126,8 +122,7 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi long start = System.nanoTime(); long attempts = 0; Exception cause = null; - while (true) - { + while (true) { if (attempts > 0) { Duration sinceStart = Duration.ofNanos(System.nanoTime() - start); if (sinceStart.compareTo(Duration.ofSeconds(requestTimeoutSecs)) > 0) { @@ -136,12 +131,10 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi try { MILLISECONDS.sleep(attempts * 100); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { try { close(); - } - finally { + } finally { Thread.currentThread().interrupt(); } throw new RuntimeException("StatementClient thread was interrupted"); @@ -151,8 +144,7 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi JsonResponse response; try { response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request, materializedJsonSizeLimit); - } - catch (RuntimeException e) { + } catch (RuntimeException e) { cause = e; continue; } @@ -181,22 +173,24 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi return false; } } + @Override - public boolean execute(Request request) - { + public boolean execute(Request request) { return executeInternal(request, OptionalLong.empty()); } - private void processResponse(Headers headers, QueryResults results) - { + private void processResponse(Headers headers, QueryResults results) { if (results.getSession() != null) { databendSession.set(results.getSession()); } + if (results.getQueryId() != null) { + this.additonalHeaders.put(XDatabendQueryIDHeader, results.getQueryId()); + } currentResults.set(results); } @Override - public boolean next(){ + public boolean next() { requireNonNull(this.host, "host is null"); requireNonNull(this.currentResults.get(), "currentResults is null"); if (this.currentResults.get().getNextUri() == null) { @@ -213,8 +207,7 @@ public boolean next(){ } @Override - public boolean isRunning() - { + public boolean isRunning() { QueryResults results = this.currentResults.get(); if (results == null) { return false; @@ -227,31 +220,26 @@ public boolean isRunning() return results.getNextUri() != null; } - public Map getAdditionalHeaders() - { + public Map getAdditionalHeaders() { return additonalHeaders; } @Override - public QueryResults getResults() - { + public QueryResults getResults() { return currentResults.get(); } @Override - public DatabendSession getSession() - { + public DatabendSession getSession() { return databendSession.get(); } @Override - public void close() - { + public void close() { killQuery(); } - private void killQuery() - { + private void killQuery() { QueryResults q = this.currentResults.get(); if (q == null) { return; @@ -263,10 +251,10 @@ private void killQuery() HttpUrl url = HttpUrl.get(this.host); url = url.newBuilder().encodedPath(killUriPath).build(); Request r = prepareRequst(url).get().build(); - try { - httpClient.newCall(r).execute().close();; - } catch (IOException ignored) { + try { + httpClient.newCall(r).execute().close(); + } catch (IOException ignored) { - } + } } } diff --git a/databend-client/src/main/java/com/databend/client/QueryResults.java b/databend-client/src/main/java/com/databend/client/QueryResults.java index daf1327..97eb5ab 100644 --- a/databend-client/src/main/java/com/databend/client/QueryResults.java +++ b/databend-client/src/main/java/com/databend/client/QueryResults.java @@ -24,7 +24,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; public class QueryResults { - private final String id; + private final String queryId; private final String sessionId; private final DatabendSession session; private final List schema; @@ -41,7 +41,7 @@ public class QueryResults { @JsonCreator public QueryResults( - @JsonProperty("id") String id, + @JsonProperty("id") String queryId, @JsonProperty("session_id") String sessionId, @JsonProperty("session") DatabendSession session, @JsonProperty("schema") List schema, @@ -54,7 +54,7 @@ public QueryResults( @JsonProperty("final_uri") URI finalUri, @JsonProperty("next_uri") URI nextUri, @JsonProperty("kill_uri") URI killUri) { - this.id = id; + this.queryId = queryId; this.sessionId = sessionId; this.session = session; this.schema = schema; @@ -72,8 +72,8 @@ public QueryResults( // add builder @JsonProperty - public String getId() { - return id; + public String getQueryId() { + return queryId; } @JsonProperty @@ -139,7 +139,7 @@ public URI getKillUri() { @Override public String toString() { return toStringHelper(this) - .add("id", id) + .add("id", queryId) .add("sessionId", sessionId) .add("session", session) .add("schema", schema) diff --git a/databend-client/src/test/java/com/databend/client/TestQueryResults.java b/databend-client/src/test/java/com/databend/client/TestQueryResults.java index e1fa42f..638fa6f 100644 --- a/databend-client/src/test/java/com/databend/client/TestQueryResults.java +++ b/databend-client/src/test/java/com/databend/client/TestQueryResults.java @@ -29,7 +29,7 @@ public class TestQueryResults { public void testBasic() { String goldenValue = "{\"id\":\"5c4e776a-8171-462a-b2d3-6a34823d0552\",\"session_id\":\"3563624b-8767-44ff-a235-3f5bb4e54d03\",\"session\":{},\"schema\":[{\"name\":\"(number / 3)\",\"type\":\"Float64\"},{\"name\":\"(number + 1)\",\"type\":\"UInt64\"}],\"data\":[[\"0.0\",\"1\"],[\"0.3333333333333333\",\"2\"],[\"0.6666666666666666\",\"3\"],[\"1.0\",\"4\"],[\"1.3333333333333333\",\"5\"],[\"1.6666666666666667\",\"6\"],[\"2.0\",\"7\"],[\"2.3333333333333335\",\"8\"],[\"2.6666666666666665\",\"9\"],[\"3.0\",\"10\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":10,\"bytes\":80},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":10,\"bytes\":160},\"running_time_ms\":1.494205},\"affect\":null,\"stats_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552\",\"final_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552/final\",\"next_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552/final\",\"kill_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552/kill\"}"; QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenValue); - Assert.assertEquals(queryResults.getId(), "5c4e776a-8171-462a-b2d3-6a34823d0552"); + Assert.assertEquals(queryResults.getQueryId(), "5c4e776a-8171-462a-b2d3-6a34823d0552"); Assert.assertEquals(queryResults.getSessionId(), "3563624b-8767-44ff-a235-3f5bb4e54d03"); Assert.assertEquals(queryResults.getSchema().size(), 2); Assert.assertEquals(queryResults.getSchema().get(0).getName(), "(number / 3)"); @@ -46,7 +46,7 @@ public void testBasic() { public void TestError() { String goldenValue = "{\"id\":\"\",\"session_id\":null,\"session\":null,\"schema\":[],\"data\":[],\"state\":\"Failed\",\"error\":{\"code\":1065,\"message\":\"error: \\n --> SQL:1:8\\n |\\n1 | select error\\n | ^^^^^ column doesn't exist\\n\\n\"},\"stats\":{\"scan_progress\":{\"rows\":0,\"bytes\":0},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":0,\"bytes\":0},\"running_time_ms\":0.0},\"affect\":null,\"stats_uri\":null,\"final_uri\":null,\"next_uri\":null,\"kill_uri\":null}"; QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenValue); - Assert.assertEquals(queryResults.getId(), ""); + Assert.assertEquals(queryResults.getQueryId(), ""); Assert.assertEquals(queryResults.getSessionId(), null); Assert.assertEquals(queryResults.getSession(), null); Assert.assertEquals(queryResults.getState(), "Failed"); @@ -58,7 +58,7 @@ public void TestError() { public void TestDateTime() { String goldenString = "{\"id\":\"1fbbaf5b-8807-47d3-bb9c-122a3b7c527c\",\"session_id\":\"ef4a4a66-7a81-4a90-b6ab-d484313111b8\",\"session\":{},\"schema\":[{\"name\":\"date\",\"type\":\"Date\"},{\"name\":\"ts\",\"type\":\"Timestamp\"}],\"data\":[[\"2022-04-07\",\"2022-04-07 01:01:01.123456\"],[\"2022-04-08\",\"2022-04-08 01:01:01.000000\"],[\"2022-04-07\",\"2022-04-07 01:01:01.123456\"],[\"2022-04-08\",\"2022-04-08 01:01:01.000000\"],[\"2022-04-07\",\"2022-04-07 01:01:01.123456\"],[\"2022-04-08\",\"2022-04-08 01:01:01.000000\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":6,\"bytes\":72},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":6,\"bytes\":72},\"running_time_ms\":7.681399},\"affect\":null,\"stats_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c\",\"final_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c/final\",\"next_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c/final\",\"kill_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c/kill\"}"; QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); - Assert.assertEquals(queryResults.getId(), "1fbbaf5b-8807-47d3-bb9c-122a3b7c527c"); + Assert.assertEquals(queryResults.getQueryId(), "1fbbaf5b-8807-47d3-bb9c-122a3b7c527c"); Assert.assertEquals(queryResults.getSessionId(), "ef4a4a66-7a81-4a90-b6ab-d484313111b8"); Assert.assertEquals(queryResults.getSession().getDatabase(), null); Assert.assertEquals(queryResults.getSession().getKeepServerSessionSecs(), 0); @@ -80,7 +80,7 @@ public void TestDateTime() { public void TestUseDB() { String goldenString = "{\"id\":\"d0aa3285-0bf5-42da-b06b-0d3db55f10bd\",\"session_id\":\"ded852b7-0da2-46ba-8708-e6fcb1c33081\",\"session\":{\"database\":\"db2\"},\"schema\":[],\"data\":[],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":0,\"bytes\":0},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":0,\"bytes\":0},\"running_time_ms\":0.891883},\"affect\":{\"type\":\"UseDB\",\"name\":\"db2\"},\"stats_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd\",\"final_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd/final\",\"next_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd/final\",\"kill_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd/kill\"}"; QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); - Assert.assertEquals(queryResults.getId(), "d0aa3285-0bf5-42da-b06b-0d3db55f10bd"); + Assert.assertEquals(queryResults.getQueryId(), "d0aa3285-0bf5-42da-b06b-0d3db55f10bd"); QueryAffect affect = queryResults.getAffect(); Assert.assertEquals(affect.getClass(), QueryAffect.UseDB.class); Assert.assertEquals(((QueryAffect.UseDB) affect).getName(), "db2"); @@ -105,7 +105,7 @@ public void TestChangeSettings() { public void TestArray() { String goldenString = "{\"id\":\"eecb2440-0180-45cb-8b21-23f4a9975df3\",\"session_id\":\"ef692df6-657d-42b8-a10d-6e6cac657abe\",\"session\":{},\"schema\":[{\"name\":\"id\",\"type\":\"Int8\"},{\"name\":\"obj\",\"type\":\"Variant\"},{\"name\":\"d\",\"type\":\"Timestamp\"},{\"name\":\"s\",\"type\":\"String\"},{\"name\":\"arr\",\"type\":\"Array(Int64)\"}],\"data\":[[\"1\",\"{\\\"a\\\": 1,\\\"b\\\": 2}\",\"1983-07-12 21:30:55.888000\",\"hello world, 你好\",\"[1,2,3,4,5]\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":1,\"bytes\":131},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":1,\"bytes\":131},\"running_time_ms\":9.827047},\"affect\":null,\"stats_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3\",\"final_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3/final\",\"next_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3/final\",\"kill_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3/kill\"}"; QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); - Assert.assertEquals(queryResults.getId(), "eecb2440-0180-45cb-8b21-23f4a9975df3"); + Assert.assertEquals(queryResults.getQueryId(), "eecb2440-0180-45cb-8b21-23f4a9975df3"); Assert.assertEquals(queryResults.getSchema().size(), 5); Assert.assertEquals(queryResults.getSchema().get(0).getName(), "id"); Assert.assertEquals(queryResults.getSchema().get(0).getDataType().getType(), "Int8"); @@ -115,7 +115,7 @@ public void TestArray() { public void TestVariant() { String goldenString = "{\"id\":\"d74b2471-3a15-45e2-9ef4-ca8a39505661\",\"session_id\":\"f818e198-20d9-4c06-8de6-bc68ab6e9dc1\",\"session\":{},\"schema\":[{\"name\":\"var\",\"type\":\"Nullable(Variant)\"}],\"data\":[[\"1\"],[\"1.34\"],[\"true\"],[\"[1,2,3,[\\\"a\\\",\\\"b\\\",\\\"c\\\"]]\"],[\"{\\\"a\\\":1,\\\"b\\\":{\\\"c\\\":2}}\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":5,\"bytes\":168},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":5,\"bytes\":168},\"running_time_ms\":7.827281},\"affect\":null,\"stats_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661\",\"final_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661/final\",\"next_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661/final\",\"kill_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661/kill\"}\n"; QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString); - Assert.assertEquals(queryResults.getId(), "d74b2471-3a15-45e2-9ef4-ca8a39505661"); + Assert.assertEquals(queryResults.getQueryId(), "d74b2471-3a15-45e2-9ef4-ca8a39505661"); Assert.assertEquals(queryResults.getSchema().size(), 1); Assert.assertEquals(queryResults.getSchema().get(0).getName(), "var"); Assert.assertEquals(queryResults.getSchema().get(0).getDataType().getType(), "Variant"); diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/AbstractDatabendResultSet.java b/databend-jdbc/src/main/java/com/databend/jdbc/AbstractDatabendResultSet.java index 194a670..5a0a9c8 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/AbstractDatabendResultSet.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/AbstractDatabendResultSet.java @@ -59,7 +59,6 @@ import static com.databend.jdbc.DatabendColumnInfo.setTypeInfo; import static java.lang.Math.toIntExact; import static java.lang.String.format; -import static java.math.RoundingMode.HALF_UP; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static org.joda.time.DateTimeConstants.SECONDS_PER_DAY; @@ -155,7 +154,7 @@ private static BigDecimal parseBigDecimal(String value) static SQLException resultsException(QueryResults results) { QueryErrors error = requireNonNull(results.getError()); - String message = format("Query failed (#%s): %s", results.getId(), error.getMessage()); + String message = format("Query failed (#%s): %s", results.getQueryId(), error.getMessage()); return new SQLException(message, String.valueOf(error.getCode())); } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java index bd3dae6..048d2db 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java @@ -34,6 +34,7 @@ import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -516,13 +517,23 @@ DatabendClient startQuery(String sql) throws SQLException { setSocketTimeout(this.driverUri.getSocketTimeout()). setSession(this.session.get()). setHost(this.getURI().toString()). + setAdditionalHeaders(new HashMap<>()). setPaginationOptions(options).build(); return new DatabendClientV1(httpClient, sql, s); } DatabendClient startQuery(String sql, StageAttachment attach) throws SQLException { PaginationOptions options = getPaginationOptions(); - ClientSettings s = new ClientSettings.Builder().setSession(this.session.get()).setHost(this.getURI().toString()).setQueryTimeoutSecs(this.driverUri.getQueryTimeout()).setConnectionTimeout(this.driverUri.getConnectionTimeout()).setSocketTimeout(this.driverUri.getSocketTimeout()).setPaginationOptions(options).setStageAttachment(attach).build(); + ClientSettings s = new ClientSettings.Builder(). + setSession(this.session.get()). + setHost(this.getURI().toString()). + setQueryTimeoutSecs(this.driverUri.getQueryTimeout()). + setConnectionTimeout(this.driverUri.getConnectionTimeout()). + setSocketTimeout(this.driverUri.getSocketTimeout()). + setPaginationOptions(options). + setAdditionalHeaders(new HashMap<>()). + setStageAttachment(attach). + build(); return new DatabendClientV1(httpClient, sql, s); } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendResultSet.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendResultSet.java index ba25de5..efc04d8 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendResultSet.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendResultSet.java @@ -43,7 +43,7 @@ private DatabendResultSet(Statement statement, DatabendClient client, List(flatten(new ResultsPageIterator(client),maxRows), client)); this.statement = statement; this.client = client; - this.queryId = client.getResults().getId(); + this.queryId = client.getResults().getQueryId(); } static DatabendResultSet create(Statement statement, DatabendClient client, long maxRows)