diff --git a/.github/actions/setup_databend_cluster/action.yml b/.github/actions/setup_databend_cluster/action.yml index 941d1f2..fef70d3 100644 --- a/.github/actions/setup_databend_cluster/action.yml +++ b/.github/actions/setup_databend_cluster/action.yml @@ -34,7 +34,7 @@ runs: shell: bash run: | docker run -d --network host --name nginx-lb \ - -v ${{ github.workspace }}/scripts/ci/nginx.conf:/etc/nginx/nginx.conf:ro \ + -v ${{ github.workspace }}/scripts/ci/nginx_rr.conf:/etc/nginx/nginx.conf:ro \ nginx - name: Download binary and extract into target directory diff --git a/.github/workflows/test_cluster.yml b/.github/workflows/test_cluster.yml index e79fb51..62abf59 100644 --- a/.github/workflows/test_cluster.yml +++ b/.github/workflows/test_cluster.yml @@ -31,7 +31,7 @@ jobs: - uses: ./.github/actions/setup_databend_cluster timeout-minutes: 15 with: - version: '1.2.629-nightly' + version: '1.2.647-nightly' target: 'x86_64-unknown-linux-gnu' - name: Test with conn to node 1 @@ -39,6 +39,20 @@ jobs: env: MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }} + - name: View Nginx logs + run: docker logs nginx-lb + + - name: check nginx + run: | + curl -u 'databend:databend' -X POST "http://localhost:8010/v1/query" \ + -H 'Content-Type: application/json' \ + -d '{"sql": "select 1", "pagination": { "wait_time_secs": 5 }}' || true + env: + MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }} + + - name: View Nginx logs + run: docker logs nginx-lb + - name: Test with conn to nginx run: mvn test -DexcludedGroups=FLAKY env: diff --git a/databend-client/src/main/java/com/databend/client/ClientSettings.java b/databend-client/src/main/java/com/databend/client/ClientSettings.java index c8f60b3..eb9bbea 100644 --- a/databend-client/src/main/java/com/databend/client/ClientSettings.java +++ b/databend-client/src/main/java/com/databend/client/ClientSettings.java @@ -26,6 +26,7 @@ public class ClientSettings { public static final String X_DATABEND_ROUTE_HINT = "X-DATABEND-ROUTE-HINT"; public static final String X_DATABEND_STAGE_NAME = "X-DATABEND-STAGE-NAME"; public static final String X_DATABEND_RELATIVE_PATH = "X-DATABEND-RELATIVE-PATH"; + public static final String X_DATABEND_STICKY_NODE = "X-DATABEND-STICKY-NODE"; public static final String DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE"; public static final String DatabendTenantHeader = "X-DATABEND-TENANT"; private final String host; diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java index ee15176..d620a20 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java +++ b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java @@ -67,12 +67,13 @@ public class DatabendClientV1 private final Map additonalHeaders; // client session private final AtomicReference databendSession; + private String nodeID; private final AtomicReference currentResults = new AtomicReference<>(null); private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName()); private Consumer on_session_state_update; - public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer on_session_state_update) { + public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings, Consumer on_session_state_update, AtomicReference last_node_id) { requireNonNull(httpClient, "httpClient is null"); requireNonNull(sql, "sql is null"); requireNonNull(settings, "settings is null"); @@ -87,11 +88,13 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett this.maxRetryAttempts = settings.getRetryAttempts(); // need atomic reference since it may get updated when query returned. this.databendSession = new AtomicReference<>(settings.getSession()); + this.nodeID = last_node_id.get(); Request request = buildQueryRequest(query, settings); boolean completed = this.execute(request); if (!completed) { throw new RuntimeException("Query failed to complete"); } + last_node_id.set(this.nodeID); } public static List discoverNodes(OkHttpClient httpClient, ClientSettings settings) { @@ -127,8 +130,13 @@ private Request buildQueryRequest(String query, ClientSettings settings) { if (reqString == null || reqString.isEmpty()) { throw new IllegalArgumentException("Invalid request: " + req); } + url = url.newBuilder().encodedPath(QUERY_PATH).build(); Request.Builder builder = prepareRequest(url, this.additonalHeaders); + DatabendSession session = databendSession.get(); + if (session != null && session.getNeedSticky()) { + builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, nodeID); + } return builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, reqString)).build(); } @@ -299,6 +307,7 @@ public boolean execute(Request request) { } private void processResponse(Headers headers, QueryResults results) { + nodeID = results.getNodeId(); DatabendSession session = results.getSession(); if (session != null) { databendSession.set(session); @@ -332,6 +341,7 @@ public boolean advance() { HttpUrl url = HttpUrl.get(this.host); url = url.newBuilder().encodedPath(nextUriPath).build(); Request.Builder builder = prepareRequest(url, this.additonalHeaders); + builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, this.nodeID); Request request = builder.get().build(); return executeInternal(request, OptionalLong.of(MAX_MATERIALIZED_JSON_RESPONSE_SIZE)); } diff --git a/databend-client/src/main/java/com/databend/client/DatabendSession.java b/databend-client/src/main/java/com/databend/client/DatabendSession.java index 24178f4..7b4af90 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendSession.java +++ b/databend-client/src/main/java/com/databend/client/DatabendSession.java @@ -41,6 +41,7 @@ public class DatabendSession { // txn private String txnState; + private Boolean needSticky; private Map additionalProperties = new HashMap<>(); @@ -48,15 +49,17 @@ public class DatabendSession { public DatabendSession( @JsonProperty("database") String database, @JsonProperty("settings") Map settings, - @JsonProperty("txn_state") String txnState) { + @JsonProperty("txn_state") String txnState, + @JsonProperty("need_sticky") Boolean needSticky) { this.database = database; this.settings = settings; this.txnState = txnState; + this.needSticky = needSticky != null ? needSticky : false; } // default public static DatabendSession createDefault() { - return new DatabendSession(DEFAULT_DATABASE, null, null); + return new DatabendSession(DEFAULT_DATABASE, null, null, false); } public static Builder builder() { @@ -79,6 +82,11 @@ public String getTxnState() { return txnState; } + @JsonProperty("need_sticky") + public Boolean getNeedSticky() { + return needSticky; + } + @JsonAnyGetter public Map getAdditionalProperties() { return additionalProperties; @@ -145,7 +153,7 @@ public void setAutoCommit(boolean autoCommit) { } public DatabendSession build() { - return new DatabendSession(database, settings, txnState); + return new DatabendSession(database, settings, txnState, false); } } } diff --git a/databend-client/src/main/java/com/databend/client/QueryResults.java b/databend-client/src/main/java/com/databend/client/QueryResults.java index 30b0a07..35fe223 100644 --- a/databend-client/src/main/java/com/databend/client/QueryResults.java +++ b/databend-client/src/main/java/com/databend/client/QueryResults.java @@ -25,6 +25,7 @@ public class QueryResults { private final String queryId; + private final String nodeId; private final String sessionId; private final DatabendSession session; private final List schema; @@ -42,6 +43,7 @@ public class QueryResults { @JsonCreator public QueryResults( @JsonProperty("id") String queryId, + @JsonProperty("node_id") String nodeId, @JsonProperty("session_id") String sessionId, @JsonProperty("session") DatabendSession session, @JsonProperty("schema") List schema, @@ -55,6 +57,7 @@ public QueryResults( @JsonProperty("next_uri") URI nextUri, @JsonProperty("kill_uri") URI killUri) { this.queryId = queryId; + this.nodeId = nodeId; this.sessionId = sessionId; this.session = session; this.schema = schema; @@ -76,6 +79,11 @@ public String getQueryId() { return queryId; } + @JsonProperty + public String getNodeId() { + return nodeId; + } + @JsonProperty public String getSessionId() { return sessionId; diff --git a/databend-client/src/test/java/com/databend/client/TestClientIT.java b/databend-client/src/test/java/com/databend/client/TestClientIT.java index 0f9afc4..1e367c0 100644 --- a/databend-client/src/test/java/com/databend/client/TestClientIT.java +++ b/databend-client/src/test/java/com/databend/client/TestClientIT.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import static com.databend.client.ClientSettings.*; @@ -39,7 +40,8 @@ public void testBasicQueryPagination() { OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build(); ClientSettings settings = new ClientSettings(DATABEND_HOST); - DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null); + AtomicReference lastNodeID = new AtomicReference<>(); + DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID); System.out.println(cli.getResults().getData()); Assert.assertEquals(cli.getQuery(), "select 1"); Assert.assertEquals(cli.getSession().getDatabase(), DATABASE); @@ -57,8 +59,10 @@ public void testConnectionRefused() { OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build(); ClientSettings settings = new ClientSettings("http://localhost:13191"); + AtomicReference lastNodeID = new AtomicReference<>(); + try { - DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null); + DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID); cli.getResults(); // This should trigger the connection attempt Assert.fail("Expected exception was not thrown"); } catch (Exception e) { @@ -73,11 +77,12 @@ public void testConnectionRefused() { public void testBasicQueryIDHeader() { OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build(); String expectedUUID = UUID.randomUUID().toString(); + AtomicReference lastNodeID = new AtomicReference<>(); Map additionalHeaders = new HashMap<>(); additionalHeaders.put(X_Databend_Query_ID, expectedUUID); ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS); - DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null); + DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null, lastNodeID); Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID); String expectedUUID1 = UUID.randomUUID().toString(); @@ -86,7 +91,7 @@ public void testBasicQueryIDHeader() { ClientSettings settings1 = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders1, null, DEFAULT_RETRY_ATTEMPTS); Assert.assertEquals(cli.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID); // check X_Databend_Query_ID won't change after calling next() - DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null); + DatabendClient cli1 = new DatabendClientV1(client, "SELECT number from numbers(200000) order by number", settings1, null, lastNodeID); for (int i = 1; i < 1000; i++) { cli.advance(); Assert.assertEquals(cli1.getAdditionalHeaders().get(X_Databend_Query_ID), expectedUUID1); diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java index 922d4db..4a43abe 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java @@ -78,6 +78,7 @@ public class DatabendConnection implements Connection, FileTransferAPI, Consumer private AtomicReference session = new AtomicReference<>(); private String routeHint = ""; + private AtomicReference lastNodeID = new AtomicReference<>(); private void initializeFileHandler() { if (this.debug()) { @@ -696,12 +697,12 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws throw new SQLException("Error start query: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e); } try { - // route hint is used when transaction occured or when multi-cluster warehouse adopted(CLOUD ONLY) + // route hint is used when transaction occurred or when multi-cluster warehouse adopted(CLOUD ONLY) // on cloud case, we have gateway to handle with route hint, and will not parse URI from route hint. // transaction procedure: // 1. server return session body where txn state is active - // 2. when there is a active transaction, it will route all query to target route hint uri if exists - // 3. if there is not active transaction, it will use load balancing policy to choose a host to execute query + // 2. when there is an active transaction, it will route all query to target route hint uri if exists + // 3. if there is not an active transaction, it will use load balancing policy to choose a host to execute query String query_id = UUID.randomUUID().toString(); String candidateHost = this.driverUri.getUri(query_id).toString(); if (!inActiveTransaction()) { @@ -726,7 +727,7 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws if (this.autoDiscovery) { tryAutoDiscovery(httpClient, s); } - return new DatabendClientV1(httpClient, sql, s, this); + return new DatabendClientV1(httpClient, sql, s, this, lastNodeID); } catch (RuntimeException e1) { e = e1; } catch (Exception e1) { diff --git a/scripts/ci/nginx.conf b/scripts/ci/nginx_hash.conf similarity index 100% rename from scripts/ci/nginx.conf rename to scripts/ci/nginx_hash.conf diff --git a/scripts/ci/nginx_rr.conf b/scripts/ci/nginx_rr.conf new file mode 100644 index 0000000..93c0202 --- /dev/null +++ b/scripts/ci/nginx_rr.conf @@ -0,0 +1,22 @@ +events { + worker_connections 1024; +} + +http { + upstream backend { + server 127.0.0.1:8000; + server 127.0.0.1:8002; + server 127.0.0.1:8003; + } + + server { + listen 8010; + + location / { + proxy_pass http://backend; + proxy_set_header X-Databend-Relative-Path $http_x_databend_relative_path; + proxy_set_header X-Databend-Stage-Name $http_x_databend_stage_name; + proxy_set_header X-Databend-Sticky-Node $http_x_databend_sticky_node; + } + } +}