Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: append query_id header in http headers if paged result contains it #95

Merged
merged 6 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.databend.client;

import java.util.HashMap;
import java.util.Map;

public class ClientSettings {
Expand All @@ -36,7 +37,7 @@ public class ClientSettings {
// TODO(zhihanz) timezone and locale info

public ClientSettings(String host) {
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), null, null, DEFAULT_RETRY_ATTEMPTS);
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap<String, String>(), null, DEFAULT_RETRY_ATTEMPTS);
}

public ClientSettings(String host, String database) {
Expand All @@ -47,7 +48,7 @@ public ClientSettings(String host, String database) {
this.connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
this.socketTimeout = DEFAULT_SOCKET_TIMEOUT;
this.paginationOptions = PaginationOptions.defaultPaginationOptions();
this.additionalHeaders = null;
this.additionalHeaders = new HashMap<>();
this.stageAttachment = null;
this.retryAttempts = DEFAULT_RETRY_ATTEMPTS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@

@ThreadSafe
public class DatabendClientV1
implements DatabendClient
{
implements DatabendClient {
private static final String USER_AGENT_VALUE = DatabendClientV1.class.getSimpleName() +
"/" +
firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown");
Expand All @@ -62,8 +61,7 @@ public class DatabendClientV1
private final AtomicReference<DatabendSession> databendSession;
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>();

public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings)
{
public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(sql, "sql is null");
requireNonNull(settings, "settings is null");
Expand All @@ -84,8 +82,7 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
}
}

private Request.Builder prepareRequst(HttpUrl url)
{
private Request.Builder prepareRequst(HttpUrl url) {
Request.Builder builder = new Request.Builder()
.url(url)
.header("User-Agent", USER_AGENT_VALUE)
Expand All @@ -97,8 +94,7 @@ private Request.Builder prepareRequst(HttpUrl url)
return builder;
}

private Request buildQueryRequest(String query, ClientSettings settings)
{
private Request buildQueryRequest(String query, ClientSettings settings) {
HttpUrl url = HttpUrl.get(settings.getHost());
if (url == null) {
// TODO(zhihanz) use custom exception
Expand All @@ -116,8 +112,7 @@ private Request buildQueryRequest(String query, ClientSettings settings)
}

@Override
public String getQuery()
{
public String getQuery() {
return query;
}

Expand All @@ -126,8 +121,7 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi
long start = System.nanoTime();
long attempts = 0;
Exception cause = null;
while (true)
{
while (true) {
if (attempts > 0) {
Duration sinceStart = Duration.ofNanos(System.nanoTime() - start);
if (sinceStart.compareTo(Duration.ofSeconds(requestTimeoutSecs)) > 0) {
Expand All @@ -136,12 +130,10 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi

try {
MILLISECONDS.sleep(attempts * 100);
}
catch (InterruptedException e) {
} catch (InterruptedException e) {
try {
close();
}
finally {
} finally {
Thread.currentThread().interrupt();
}
throw new RuntimeException("StatementClient thread was interrupted");
Expand All @@ -151,8 +143,7 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi
JsonResponse<QueryResults> response;
try {
response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request, materializedJsonSizeLimit);
}
catch (RuntimeException e) {
} catch (RuntimeException e) {
cause = e;
continue;
}
Expand Down Expand Up @@ -181,22 +172,24 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi
return false;
}
}

@Override
public boolean execute(Request request)
{
public boolean execute(Request request) {
return executeInternal(request, OptionalLong.empty());
}

private void processResponse(Headers headers, QueryResults results)
{
private void processResponse(Headers headers, QueryResults results) {
if (results.getSession() != null) {
databendSession.set(results.getSession());
}
if (results.getQueryId() != null) {
this.additonalHeaders.put("Query_Id", results.getQueryId());
hantmac marked this conversation as resolved.
Show resolved Hide resolved
}
currentResults.set(results);
}

@Override
public boolean next(){
public boolean next() {
requireNonNull(this.host, "host is null");
requireNonNull(this.currentResults.get(), "currentResults is null");
if (this.currentResults.get().getNextUri() == null) {
Expand All @@ -213,8 +206,7 @@ public boolean next(){
}

@Override
public boolean isRunning()
{
public boolean isRunning() {
QueryResults results = this.currentResults.get();
if (results == null) {
return false;
Expand All @@ -227,31 +219,26 @@ public boolean isRunning()
return results.getNextUri() != null;
}

public Map<String, String> getAdditionalHeaders()
{
public Map<String, String> getAdditionalHeaders() {
return additonalHeaders;
}

@Override
public QueryResults getResults()
{
public QueryResults getResults() {
return currentResults.get();
}

@Override
public DatabendSession getSession()
{
public DatabendSession getSession() {
return databendSession.get();
}

@Override
public void close()
{
public void close() {
killQuery();
}

private void killQuery()
{
private void killQuery() {
QueryResults q = this.currentResults.get();
if (q == null) {
return;
Expand All @@ -263,10 +250,11 @@ private void killQuery()
HttpUrl url = HttpUrl.get(this.host);
url = url.newBuilder().encodedPath(killUriPath).build();
Request r = prepareRequst(url).get().build();
try {
httpClient.newCall(r).execute().close();;
} catch (IOException ignored) {
try {
httpClient.newCall(r).execute().close();
;
} catch (IOException ignored) {

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;

public class QueryResults {
private final String id;
private final String queryId;
private final String sessionId;
private final DatabendSession session;
private final List<QueryRowField> schema;
Expand All @@ -41,7 +41,7 @@ public class QueryResults {

@JsonCreator
public QueryResults(
@JsonProperty("id") String id,
@JsonProperty("id") String queryId,
@JsonProperty("session_id") String sessionId,
@JsonProperty("session") DatabendSession session,
@JsonProperty("schema") List<QueryRowField> schema,
Expand All @@ -54,7 +54,7 @@ public QueryResults(
@JsonProperty("final_uri") URI finalUri,
@JsonProperty("next_uri") URI nextUri,
@JsonProperty("kill_uri") URI killUri) {
this.id = id;
this.queryId = queryId;
this.sessionId = sessionId;
this.session = session;
this.schema = schema;
Expand All @@ -72,8 +72,8 @@ public QueryResults(
// add builder

@JsonProperty
public String getId() {
return id;
public String getQueryId() {
return queryId;
}

@JsonProperty
Expand Down Expand Up @@ -139,7 +139,7 @@ public URI getKillUri() {
@Override
public String toString() {
return toStringHelper(this)
.add("id", id)
.add("id", queryId)
.add("sessionId", sessionId)
.add("session", session)
.add("schema", schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TestQueryResults {
public void testBasic() {
String goldenValue = "{\"id\":\"5c4e776a-8171-462a-b2d3-6a34823d0552\",\"session_id\":\"3563624b-8767-44ff-a235-3f5bb4e54d03\",\"session\":{},\"schema\":[{\"name\":\"(number / 3)\",\"type\":\"Float64\"},{\"name\":\"(number + 1)\",\"type\":\"UInt64\"}],\"data\":[[\"0.0\",\"1\"],[\"0.3333333333333333\",\"2\"],[\"0.6666666666666666\",\"3\"],[\"1.0\",\"4\"],[\"1.3333333333333333\",\"5\"],[\"1.6666666666666667\",\"6\"],[\"2.0\",\"7\"],[\"2.3333333333333335\",\"8\"],[\"2.6666666666666665\",\"9\"],[\"3.0\",\"10\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":10,\"bytes\":80},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":10,\"bytes\":160},\"running_time_ms\":1.494205},\"affect\":null,\"stats_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552\",\"final_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552/final\",\"next_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552/final\",\"kill_uri\":\"/v1/query/5c4e776a-8171-462a-b2d3-6a34823d0552/kill\"}";
QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenValue);
Assert.assertEquals(queryResults.getId(), "5c4e776a-8171-462a-b2d3-6a34823d0552");
Assert.assertEquals(queryResults.getQueryId(), "5c4e776a-8171-462a-b2d3-6a34823d0552");
Assert.assertEquals(queryResults.getSessionId(), "3563624b-8767-44ff-a235-3f5bb4e54d03");
Assert.assertEquals(queryResults.getSchema().size(), 2);
Assert.assertEquals(queryResults.getSchema().get(0).getName(), "(number / 3)");
Expand All @@ -46,7 +46,7 @@ public void testBasic() {
public void TestError() {
String goldenValue = "{\"id\":\"\",\"session_id\":null,\"session\":null,\"schema\":[],\"data\":[],\"state\":\"Failed\",\"error\":{\"code\":1065,\"message\":\"error: \\n --> SQL:1:8\\n |\\n1 | select error\\n | ^^^^^ column doesn't exist\\n\\n\"},\"stats\":{\"scan_progress\":{\"rows\":0,\"bytes\":0},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":0,\"bytes\":0},\"running_time_ms\":0.0},\"affect\":null,\"stats_uri\":null,\"final_uri\":null,\"next_uri\":null,\"kill_uri\":null}";
QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenValue);
Assert.assertEquals(queryResults.getId(), "");
Assert.assertEquals(queryResults.getQueryId(), "");
Assert.assertEquals(queryResults.getSessionId(), null);
Assert.assertEquals(queryResults.getSession(), null);
Assert.assertEquals(queryResults.getState(), "Failed");
Expand All @@ -58,7 +58,7 @@ public void TestError() {
public void TestDateTime() {
String goldenString = "{\"id\":\"1fbbaf5b-8807-47d3-bb9c-122a3b7c527c\",\"session_id\":\"ef4a4a66-7a81-4a90-b6ab-d484313111b8\",\"session\":{},\"schema\":[{\"name\":\"date\",\"type\":\"Date\"},{\"name\":\"ts\",\"type\":\"Timestamp\"}],\"data\":[[\"2022-04-07\",\"2022-04-07 01:01:01.123456\"],[\"2022-04-08\",\"2022-04-08 01:01:01.000000\"],[\"2022-04-07\",\"2022-04-07 01:01:01.123456\"],[\"2022-04-08\",\"2022-04-08 01:01:01.000000\"],[\"2022-04-07\",\"2022-04-07 01:01:01.123456\"],[\"2022-04-08\",\"2022-04-08 01:01:01.000000\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":6,\"bytes\":72},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":6,\"bytes\":72},\"running_time_ms\":7.681399},\"affect\":null,\"stats_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c\",\"final_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c/final\",\"next_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c/final\",\"kill_uri\":\"/v1/query/1fbbaf5b-8807-47d3-bb9c-122a3b7c527c/kill\"}";
QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString);
Assert.assertEquals(queryResults.getId(), "1fbbaf5b-8807-47d3-bb9c-122a3b7c527c");
Assert.assertEquals(queryResults.getQueryId(), "1fbbaf5b-8807-47d3-bb9c-122a3b7c527c");
Assert.assertEquals(queryResults.getSessionId(), "ef4a4a66-7a81-4a90-b6ab-d484313111b8");
Assert.assertEquals(queryResults.getSession().getDatabase(), null);
Assert.assertEquals(queryResults.getSession().getKeepServerSessionSecs(), 0);
Expand All @@ -80,7 +80,7 @@ public void TestDateTime() {
public void TestUseDB() {
String goldenString = "{\"id\":\"d0aa3285-0bf5-42da-b06b-0d3db55f10bd\",\"session_id\":\"ded852b7-0da2-46ba-8708-e6fcb1c33081\",\"session\":{\"database\":\"db2\"},\"schema\":[],\"data\":[],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":0,\"bytes\":0},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":0,\"bytes\":0},\"running_time_ms\":0.891883},\"affect\":{\"type\":\"UseDB\",\"name\":\"db2\"},\"stats_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd\",\"final_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd/final\",\"next_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd/final\",\"kill_uri\":\"/v1/query/d0aa3285-0bf5-42da-b06b-0d3db55f10bd/kill\"}";
QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString);
Assert.assertEquals(queryResults.getId(), "d0aa3285-0bf5-42da-b06b-0d3db55f10bd");
Assert.assertEquals(queryResults.getQueryId(), "d0aa3285-0bf5-42da-b06b-0d3db55f10bd");
QueryAffect affect = queryResults.getAffect();
Assert.assertEquals(affect.getClass(), QueryAffect.UseDB.class);
Assert.assertEquals(((QueryAffect.UseDB) affect).getName(), "db2");
Expand All @@ -105,7 +105,7 @@ public void TestChangeSettings() {
public void TestArray() {
String goldenString = "{\"id\":\"eecb2440-0180-45cb-8b21-23f4a9975df3\",\"session_id\":\"ef692df6-657d-42b8-a10d-6e6cac657abe\",\"session\":{},\"schema\":[{\"name\":\"id\",\"type\":\"Int8\"},{\"name\":\"obj\",\"type\":\"Variant\"},{\"name\":\"d\",\"type\":\"Timestamp\"},{\"name\":\"s\",\"type\":\"String\"},{\"name\":\"arr\",\"type\":\"Array(Int64)\"}],\"data\":[[\"1\",\"{\\\"a\\\": 1,\\\"b\\\": 2}\",\"1983-07-12 21:30:55.888000\",\"hello world, 你好\",\"[1,2,3,4,5]\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":1,\"bytes\":131},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":1,\"bytes\":131},\"running_time_ms\":9.827047},\"affect\":null,\"stats_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3\",\"final_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3/final\",\"next_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3/final\",\"kill_uri\":\"/v1/query/eecb2440-0180-45cb-8b21-23f4a9975df3/kill\"}";
QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString);
Assert.assertEquals(queryResults.getId(), "eecb2440-0180-45cb-8b21-23f4a9975df3");
Assert.assertEquals(queryResults.getQueryId(), "eecb2440-0180-45cb-8b21-23f4a9975df3");
Assert.assertEquals(queryResults.getSchema().size(), 5);
Assert.assertEquals(queryResults.getSchema().get(0).getName(), "id");
Assert.assertEquals(queryResults.getSchema().get(0).getDataType().getType(), "Int8");
Expand All @@ -115,7 +115,7 @@ public void TestArray() {
public void TestVariant() {
String goldenString = "{\"id\":\"d74b2471-3a15-45e2-9ef4-ca8a39505661\",\"session_id\":\"f818e198-20d9-4c06-8de6-bc68ab6e9dc1\",\"session\":{},\"schema\":[{\"name\":\"var\",\"type\":\"Nullable(Variant)\"}],\"data\":[[\"1\"],[\"1.34\"],[\"true\"],[\"[1,2,3,[\\\"a\\\",\\\"b\\\",\\\"c\\\"]]\"],[\"{\\\"a\\\":1,\\\"b\\\":{\\\"c\\\":2}}\"]],\"state\":\"Succeeded\",\"error\":null,\"stats\":{\"scan_progress\":{\"rows\":5,\"bytes\":168},\"write_progress\":{\"rows\":0,\"bytes\":0},\"result_progress\":{\"rows\":5,\"bytes\":168},\"running_time_ms\":7.827281},\"affect\":null,\"stats_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661\",\"final_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661/final\",\"next_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661/final\",\"kill_uri\":\"/v1/query/d74b2471-3a15-45e2-9ef4-ca8a39505661/kill\"}\n";
QueryResults queryResults = QUERY_RESULTS_CODEC.fromJson(goldenString);
Assert.assertEquals(queryResults.getId(), "d74b2471-3a15-45e2-9ef4-ca8a39505661");
Assert.assertEquals(queryResults.getQueryId(), "d74b2471-3a15-45e2-9ef4-ca8a39505661");
Assert.assertEquals(queryResults.getSchema().size(), 1);
Assert.assertEquals(queryResults.getSchema().get(0).getName(), "var");
Assert.assertEquals(queryResults.getSchema().get(0).getDataType().getType(), "Variant");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import static com.databend.jdbc.DatabendColumnInfo.setTypeInfo;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.math.RoundingMode.HALF_UP;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.joda.time.DateTimeConstants.SECONDS_PER_DAY;
Expand Down Expand Up @@ -155,7 +154,7 @@ private static BigDecimal parseBigDecimal(String value)

static SQLException resultsException(QueryResults results) {
QueryErrors error = requireNonNull(results.getError());
String message = format("Query failed (#%s): %s", results.getId(), error.getMessage());
String message = format("Query failed (#%s): %s", results.getQueryId(), error.getMessage());
return new SQLException(message, String.valueOf(error.getCode()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -516,13 +517,23 @@ DatabendClient startQuery(String sql) throws SQLException {
setSocketTimeout(this.driverUri.getSocketTimeout()).
setSession(this.session.get()).
setHost(this.getURI().toString()).
setAdditionalHeaders(new HashMap<>()).
setPaginationOptions(options).build();
return new DatabendClientV1(httpClient, sql, s);
}

DatabendClient startQuery(String sql, StageAttachment attach) throws SQLException {
PaginationOptions options = getPaginationOptions();
ClientSettings s = new ClientSettings.Builder().setSession(this.session.get()).setHost(this.getURI().toString()).setQueryTimeoutSecs(this.driverUri.getQueryTimeout()).setConnectionTimeout(this.driverUri.getConnectionTimeout()).setSocketTimeout(this.driverUri.getSocketTimeout()).setPaginationOptions(options).setStageAttachment(attach).build();
ClientSettings s = new ClientSettings.Builder().
setSession(this.session.get()).
setHost(this.getURI().toString()).
setQueryTimeoutSecs(this.driverUri.getQueryTimeout()).
setConnectionTimeout(this.driverUri.getConnectionTimeout()).
setSocketTimeout(this.driverUri.getSocketTimeout()).
setPaginationOptions(options).
setAdditionalHeaders(new HashMap<>()).
setStageAttachment(attach).
build();
return new DatabendClientV1(httpClient, sql, s);
}

Expand Down
Loading
Loading