Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support query forwards. #284

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/setup_databend_cluster/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ runs:
shell: bash
run: |
docker run -d --network host --name nginx-lb \
-v ${{ github.workspace }}/scripts/ci/nginx.conf:/etc/nginx/nginx.conf:ro \
-v ${{ github.workspace }}/scripts/ci/nginx_rr.conf:/etc/nginx/nginx.conf:ro \
nginx

- name: Download binary and extract into target directory
Expand Down
16 changes: 15 additions & 1 deletion .github/workflows/test_cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,28 @@ jobs:
- uses: ./.github/actions/setup_databend_cluster
timeout-minutes: 15
with:
version: '1.2.629-nightly'
version: '1.2.647-nightly'
target: 'x86_64-unknown-linux-gnu'

- name: Test with conn to node 1
run: mvn test -DexcludedGroups=FLAKY
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

- name: View Nginx logs
run: docker logs nginx-lb

- name: check nginx
run: |
curl -u 'databend:databend' -X POST "http://localhost:8010/v1/query" \
-H 'Content-Type: application/json' \
-d '{"sql": "select 1", "pagination": { "wait_time_secs": 5 }}' || true
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

- name: View Nginx logs
run: docker logs nginx-lb

- name: Test with conn to nginx
run: mvn test -DexcludedGroups=FLAKY
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class ClientSettings {
public static final String X_DATABEND_ROUTE_HINT = "X-DATABEND-ROUTE-HINT";
public static final String X_DATABEND_STAGE_NAME = "X-DATABEND-STAGE-NAME";
public static final String X_DATABEND_RELATIVE_PATH = "X-DATABEND-RELATIVE-PATH";
public static final String X_DATABEND_STICKY_NODE = "X-DATABEND-STICKY-NODE";
public static final String DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE";
public static final String DatabendTenantHeader = "X-DATABEND-TENANT";
private final String host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ public class DatabendClientV1
private final Map<String, String> additonalHeaders;
// client session
private final AtomicReference<DatabendSession> databendSession;
private String nodeID;
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>(null);
private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName());

private Consumer<DatabendSession> on_session_state_update;

public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer<DatabendSession> on_session_state_update) {
public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer<DatabendSession> on_session_state_update, AtomicReference<String> last_node_id) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(sql, "sql is null");
requireNonNull(settings, "settings is null");
Expand All @@ -87,11 +88,13 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
this.maxRetryAttempts = settings.getRetryAttempts();
// need atomic reference since it may get updated when query returned.
this.databendSession = new AtomicReference<>(settings.getSession());
this.nodeID = last_node_id.get();
Request request = buildQueryRequest(query, settings);
boolean completed = this.execute(request);
if (!completed) {
throw new RuntimeException("Query failed to complete");
}
last_node_id.set(this.nodeID);
}

public static List<DiscoveryNode> discoverNodes(OkHttpClient httpClient, ClientSettings settings) {
Expand Down Expand Up @@ -127,8 +130,13 @@ private Request buildQueryRequest(String query, ClientSettings settings) {
if (reqString == null || reqString.isEmpty()) {
throw new IllegalArgumentException("Invalid request: " + req);
}

url = url.newBuilder().encodedPath(QUERY_PATH).build();
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
DatabendSession session = databendSession.get();
if (session != null && session.getNeedSticky()) {
builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, nodeID);
}
return builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, reqString)).build();
}

Expand Down Expand Up @@ -299,6 +307,7 @@ public boolean execute(Request request) {
}

private void processResponse(Headers headers, QueryResults results) {
nodeID = results.getNodeId();
DatabendSession session = results.getSession();
if (session != null) {
databendSession.set(session);
Expand Down Expand Up @@ -332,6 +341,7 @@ public boolean advance() {
HttpUrl url = HttpUrl.get(this.host);
url = url.newBuilder().encodedPath(nextUriPath).build();
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, this.nodeID);
Request request = builder.get().build();
return executeInternal(request, OptionalLong.of(MAX_MATERIALIZED_JSON_RESPONSE_SIZE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,25 @@ public class DatabendSession {

// txn
private String txnState;
private Boolean needSticky;

private Map<String, Object> additionalProperties = new HashMap<>();

@JsonCreator
public DatabendSession(
@JsonProperty("database") String database,
@JsonProperty("settings") Map<String, String> settings,
@JsonProperty("txn_state") String txnState) {
@JsonProperty("txn_state") String txnState,
@JsonProperty("need_sticky") Boolean needSticky) {
this.database = database;
this.settings = settings;
this.txnState = txnState;
this.needSticky = needSticky != null ? needSticky : false;
}

// default
public static DatabendSession createDefault() {
return new DatabendSession(DEFAULT_DATABASE, null, null);
return new DatabendSession(DEFAULT_DATABASE, null, null, false);
}

public static Builder builder() {
Expand All @@ -79,6 +82,11 @@ public String getTxnState() {
return txnState;
}

@JsonProperty("need_sticky")
public Boolean getNeedSticky() {
return needSticky;
}

@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return additionalProperties;
Expand Down Expand Up @@ -145,7 +153,7 @@ public void setAutoCommit(boolean autoCommit) {
}

public DatabendSession build() {
return new DatabendSession(database, settings, txnState);
return new DatabendSession(database, settings, txnState, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

public class QueryResults {
private final String queryId;
private final String nodeId;
private final String sessionId;
private final DatabendSession session;
private final List<QueryRowField> schema;
Expand All @@ -42,6 +43,7 @@ public class QueryResults {
@JsonCreator
public QueryResults(
@JsonProperty("id") String queryId,
@JsonProperty("node_id") String nodeId,
@JsonProperty("session_id") String sessionId,
@JsonProperty("session") DatabendSession session,
@JsonProperty("schema") List<QueryRowField> schema,
Expand All @@ -55,6 +57,7 @@ public QueryResults(
@JsonProperty("next_uri") URI nextUri,
@JsonProperty("kill_uri") URI killUri) {
this.queryId = queryId;
this.nodeId = nodeId;
this.sessionId = sessionId;
this.session = session;
this.schema = schema;
Expand All @@ -76,6 +79,11 @@ public String getQueryId() {
return queryId;
}

@JsonProperty
public String getNodeId() {
return nodeId;
}

@JsonProperty
public String getSessionId() {
return sessionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static com.databend.client.ClientSettings.*;

Expand All @@ -39,7 +40,8 @@ public void testBasicQueryPagination() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();

ClientSettings settings = new ClientSettings(DATABEND_HOST);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null);
AtomicReference<String> lastNodeID = new AtomicReference<>();
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
System.out.println(cli.getResults().getData());
Assert.assertEquals(cli.getQuery(), "select 1");
Assert.assertEquals(cli.getSession().getDatabase(), DATABASE);
Expand All @@ -57,8 +59,10 @@ public void testConnectionRefused() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();
ClientSettings settings = new ClientSettings("http://localhost:13191");

AtomicReference<String> lastNodeID = new AtomicReference<>();

try {
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
cli.getResults(); // This should trigger the connection attempt
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
Expand All @@ -73,11 +77,12 @@ public void testConnectionRefused() {
public void testBasicQueryIDHeader() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();
String expectedUUID = UUID.randomUUID().toString();
AtomicReference<String> lastNodeID = new AtomicReference<>();

Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null);
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID);
Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID);

String expectedUUID1 = UUID.randomUUID().toString();
Expand All @@ -86,7 +91,7 @@ public void testBasicQueryIDHeader() {
ClientSettings settings1 = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders1, null, DEFAULT_RETRY_ATTEMPTS);
Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID);
// check X_Databend_Query_ID won't change after calling next()
DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null);
DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID);
for (int i = 1; i < 1000; i++) {
cli.advance();
Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class DatabendConnection implements Connection, FileTransferAPI, Consumer
private AtomicReference<DatabendSession> session = new AtomicReference<>();

private String routeHint = "";
private AtomicReference<String> lastNodeID = new AtomicReference<>();

private void initializeFileHandler() {
if (this.debug()) {
Expand Down Expand Up @@ -696,12 +697,12 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws
throw new SQLException("Error start query: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e);
}
try {
// route hint is used when transaction occured or when multi-cluster warehouse adopted(CLOUD ONLY)
// route hint is used when transaction occurred or when multi-cluster warehouse adopted(CLOUD ONLY)
// on cloud case, we have gateway to handle with route hint, and will not parse URI from route hint.
// transaction procedure:
// 1. server return session body where txn state is active
// 2. when there is a active transaction, it will route all query to target route hint uri if exists
// 3. if there is not active transaction, it will use load balancing policy to choose a host to execute query
// 2. when there is an active transaction, it will route all query to target route hint uri if exists
// 3. if there is not an active transaction, it will use load balancing policy to choose a host to execute query
String query_id = UUID.randomUUID().toString();
String candidateHost = this.driverUri.getUri(query_id).toString();
if (!inActiveTransaction()) {
Expand All @@ -726,7 +727,7 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws
if (this.autoDiscovery) {
tryAutoDiscovery(httpClient, s);
}
return new DatabendClientV1(httpClient, sql, s, this);
return new DatabendClientV1(httpClient, sql, s, this, lastNodeID);
} catch (RuntimeException e1) {
e = e1;
} catch (Exception e1) {
Expand Down
File renamed without changes.
22 changes: 22 additions & 0 deletions scripts/ci/nginx_rr.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
events {
worker_connections 1024;
}

http {
upstream backend {
server 127.0.0.1:8000;
server 127.0.0.1:8002;
server 127.0.0.1:8003;
}

server {
listen 8010;

location / {
proxy_pass http://backend;
proxy_set_header X-Databend-Relative-Path $http_x_databend_relative_path;
proxy_set_header X-Databend-Stage-Name $http_x_databend_stage_name;
proxy_set_header X-Databend-Sticky-Node $http_x_databend_sticky_node;
}
}
}