Skip to content

Commit

Permalink
SNOW-1524152: Implement setQueryTimeout for async queries (#1958)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-jf authored Jan 9, 2025
1 parent 8b5e51e commit 5473bb2
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 2 deletions.
20 changes: 20 additions & 0 deletions src/main/java/net/snowflake/client/core/SFBaseSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public abstract class SFBaseSession {

private boolean isJdbcArrowTreatDecimalAsInt = true;

private boolean supportImplicitAsyncQueryTimeout = false;

protected SFBaseSession(SFConnectionHandler sfConnectionHandler) {
this.sfConnectionHandler = sfConnectionHandler;
}
Expand Down Expand Up @@ -1314,4 +1316,22 @@ public SFConnectionHandler getSfConnectionHandler() {
public boolean getEnableReturnTimestampWithTimeZone() {
return enableReturnTimestampWithTimeZone;
}

/**
* @return True if query timeout should be set on the server side for async queries. False by
* default.
*/
@SnowflakeJdbcInternalApi
public boolean getSupportImplicitAsyncQueryTimeout() {
return supportImplicitAsyncQueryTimeout;
}

/**
* @param supportImplicitAsyncQueryTimeout Setting supportImplicitAsyncQueryTimeout to true allows
* for query timeout to be set on the server side.
*/
@SnowflakeJdbcInternalApi
public void setSupportImplicitAsyncQueryTimeout(boolean supportImplicitAsyncQueryTimeout) {
this.supportImplicitAsyncQueryTimeout = supportImplicitAsyncQueryTimeout;
}
}
6 changes: 5 additions & 1 deletion src/main/java/net/snowflake/client/core/SFBaseStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ public abstract class SFBaseStatement {
public void addProperty(String propertyName, Object propertyValue) throws SFException {
statementParametersMap.put(propertyName, propertyValue);

// for query timeout, we implement it on client side for now
if ("query_timeout".equalsIgnoreCase(propertyName)) {
// Client side implementation
queryTimeout = (Integer) propertyValue;
if (this.getSFBaseSession().getSupportImplicitAsyncQueryTimeout()) {
// Set server parameter for supporting query timeout on async queries
statementParametersMap.put("STATEMENT_TIMEOUT_IN_SECONDS", (Integer) propertyValue);
}
}

// check if the number of session properties exceed limit
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/net/snowflake/client/core/SFSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,12 @@ public void addSFSessionProperty(String propertyName, Object propertyValue) thro
}
break;

case SUPPORT_IMPLICIT_ASYNC_QUERY_TIMEOUT:
if (propertyValue != null) {
setSupportImplicitAsyncQueryTimeout(getBooleanValue(propertyValue));
}
break;

default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ public enum SFSessionProperty {
HTTP_CLIENT_SOCKET_TIMEOUT("HTTP_CLIENT_SOCKET_TIMEOUT", false, Integer.class),

JAVA_LOGGING_CONSOLE_STD_OUT("JAVA_LOGGING_CONSOLE_STD_OUT", false, Boolean.class),

JAVA_LOGGING_CONSOLE_STD_OUT_THRESHOLD(
"JAVA_LOGGING_CONSOLE_STD_OUT_THRESHOLD", false, String.class);
"JAVA_LOGGING_CONSOLE_STD_OUT_THRESHOLD", false, String.class),

SUPPORT_IMPLICIT_ASYNC_QUERY_TIMEOUT(
"SUPPORT_IMPLICIT_ASYNC_QUERY_TIMEOUT", false, Boolean.class);

// property key in string
private String propertyKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,12 @@ public interface SnowflakeStatement {
* @throws SQLException if an error is encountered
*/
void resultSetMetadataHandler(SFBaseResultSet resultSet) throws SQLException;

/**
* Sets the query timeout when running an async query.
*
* @param seconds The number of seconds until timeout.
* @throws SQLException if an error is encountered
*/
void setAsyncQueryTimeout(int seconds) throws SQLException;
}
18 changes: 18 additions & 0 deletions src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,21 @@ public void setQueryTimeout(int seconds) throws SQLException {
}
}

@Override
public void setAsyncQueryTimeout(int seconds) throws SQLException {
logger.trace("setAsyncQueryTimeout(int seconds)", false);
raiseSQLExceptionIfStatementIsClosed();

try {
if (this.sfBaseStatement != null) {
this.sfBaseStatement.addProperty("STATEMENT_TIMEOUT_IN_SECONDS", seconds);
}
} catch (SFException ex) {
throw new SnowflakeSQLException(
ex.getCause(), ex.getSqlState(), ex.getVendorCode(), ex.getParams());
}
}

@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
logger.trace("isWrapperFor(Class<?> iface)", false);
Expand Down Expand Up @@ -1260,6 +1275,9 @@ public void setParameter(String name, Object value) throws SQLException {}
@Override
public void setQueryTimeout(int seconds) throws SQLException {}

@Override
public void setAsyncQueryTimeout(int seconds) throws SQLException {}

@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
logger.trace("isWrapperFor(Class<?> iface)", false);
Expand Down
90 changes: 90 additions & 0 deletions src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package net.snowflake.client.jdbc;

import static net.snowflake.client.jdbc.ErrorCode.ROW_DOES_NOT_EXIST;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -19,15 +20,19 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import net.snowflake.client.TestUtil;
import net.snowflake.client.annotations.DontRunOnGithubActions;
import net.snowflake.client.category.TestTags;
import net.snowflake.client.core.ParameterBindingDTO;
import net.snowflake.client.core.QueryStatus;
import net.snowflake.client.core.SFSession;
import net.snowflake.client.core.bind.BindUploader;
import net.snowflake.common.core.SqlState;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -296,4 +301,89 @@ public void testQueryIdIsSetOnFailedExecuteQuery() throws SQLException {
}
}
}

/**
* Test for setting query timeout on async queries. Applicable to versions after 3.21.0.
*
* @throws SQLException if there is an error when executing
*/
@Test
public void testSetQueryTimeoutForAsyncQueryUsingConnectionProperty() throws SQLException {
Properties p = new Properties();
p.put("SUPPORT_IMPLICIT_ASYNC_QUERY_TIMEOUT", true);
try (Connection con = getConnection(p);
Statement statement = con.createStatement()) {
statement.setQueryTimeout(3);

String sql = "select seq4() from table(generator(rowcount => 1000000000))";

try (ResultSet resultSet =
statement.unwrap(SnowflakeStatement.class).executeAsyncQuery(sql)) {
SnowflakeResultSet sfrs = resultSet.unwrap(SnowflakeResultSet.class);
await()
.atMost(Duration.ofSeconds(10))
.until(() -> sfrs.getStatusV2().getStatus() == QueryStatus.FAILED_WITH_ERROR);

assertTrue(
sfrs.getStatusV2()
.getErrorMessage()
.contains(
"Statement reached its statement or warehouse timeout of 3 second(s) and was canceled"));
}
}
}

/**
* Test for setting query timeout on regular queries with the SUPPORT_IMPLICIT_ASYNC_QUERY_TIMEOUT
* property set to true. Applicable to versions after 3.21.0.
*
* @throws SQLException if there is an error when executing
*/
@Test
public void testSetQueryTimeoutWhenAsyncConnectionPropertySet() throws SQLException {
Properties p = new Properties();
p.put("SUPPORT_IMPLICIT_ASYNC_QUERY_TIMEOUT", true);
try (Connection con = getConnection(p);
Statement statement = con.createStatement()) {
statement.setQueryTimeout(3);

String sql = "select seq4() from table(generator(rowcount => 1000000000))";

try {
statement.executeQuery(sql);
fail("This query should fail.");
} catch (SQLException e) {
assertEquals(SqlState.QUERY_CANCELED, e.getSQLState());
}
}
}

/**
* Test for setting query timeout on async queries. Applicable to versions after 3.21.0.
*
* @throws SQLException if there is an error when executing
*/
@Test
public void testSetQueryTimeoutForAsyncQuery() throws SQLException {
try (Connection con = getConnection();
Statement statement = con.createStatement()) {
SnowflakeStatement sfStmt = statement.unwrap(SnowflakeStatement.class);
sfStmt.setAsyncQueryTimeout(3);

String sql = "select seq4() from table(generator(rowcount => 1000000000))";

try (ResultSet resultSet = sfStmt.executeAsyncQuery(sql)) {
SnowflakeResultSet sfrs = resultSet.unwrap(SnowflakeResultSet.class);
await()
.atMost(Duration.ofSeconds(10))
.until(() -> sfrs.getStatusV2().getStatus() == QueryStatus.FAILED_WITH_ERROR);

assertTrue(
sfrs.getStatusV2()
.getErrorMessage()
.contains(
"Statement reached its statement or warehouse timeout of 3 second(s) and was canceled"));
}
}
}
}

0 comments on commit 5473bb2

Please sign in to comment.