From bc48178f954ff1fa62cafcfb9870df88f166ec9c Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Mon, 22 Jan 2024 14:58:37 -0500 Subject: [PATCH 1/6] Implement an .is_available property for AsyncExecution status Signed-off-by: Jesse Whitehouse --- src/databricks/sql/ae.py | 14 +++++++++++++- src/databricks/sql/results.py | 12 ++++++++++++ tests/e2e/test_execute_async.py | 7 +++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/databricks/sql/ae.py b/src/databricks/sql/ae.py index 0751e1bb..aff417ba 100644 --- a/src/databricks/sql/ae.py +++ b/src/databricks/sql/ae.py @@ -1,7 +1,7 @@ from enum import Enum from typing import Optional, Union, TYPE_CHECKING from databricks.sql.exc import RequestError -from databricks.sql.results import ResultSet +from databricks.sql.results import ResultSet, execute_response_contains_direct_results from datetime import datetime @@ -81,6 +81,7 @@ class AsyncExecution: ] _last_sync_timestamp: Optional[datetime] = None _result_set: Optional["ResultSet"] = None + _is_available: bool = True def __init__( self, @@ -101,6 +102,8 @@ def __init__( if execute_statement_response: self._execute_statement_response = execute_statement_response + if execute_response_contains_direct_results(execute_statement_response): + self._is_available = False else: self._execute_statement_response = FakeExecuteStatementResponse( directResults=False, operationHandle=self.t_operation_handle @@ -225,6 +228,15 @@ def last_sync_timestamp(self) -> Optional[datetime]: """The timestamp of the last time self.status was synced with the server""" return self._last_sync_timestamp + @property + def is_available(self) -> bool: + """Indicates whether the result of this query can be fetched from a separate thread. + + Only returns False if the query returned its results directly when `execute_async` was called. + """ + + return self._is_available + @classmethod def from_thrift_response( cls, diff --git a/src/databricks/sql/results.py b/src/databricks/sql/results.py index aed07982..8e17bf74 100644 --- a/src/databricks/sql/results.py +++ b/src/databricks/sql/results.py @@ -9,6 +9,7 @@ from databricks.sql.exc import ( CursorAlreadyClosedError, ) +from databricks.sql.thrift_api.TCLIService import ttypes from databricks.sql.types import Row from databricks.sql.utils import ExecuteResponse @@ -223,3 +224,14 @@ def map_col_type(type_): (column.name, map_col_type(column.datatype), None, None, None, None, None) for column in table_schema_message.columns ] + + +def execute_response_contains_direct_results( + execute_response: ttypes.TExecuteStatementResp, +) -> bool: + """ + Returns True if the thrift TExecuteStatementResp contains metadata + This indicates the statement has finished executing at the server. + """ + + return bool(execute_response.directResults.resultSetMetadata) diff --git a/tests/e2e/test_execute_async.py b/tests/e2e/test_execute_async.py index 6fde60ea..39ba0875 100644 --- a/tests/e2e/test_execute_async.py +++ b/tests/e2e/test_execute_async.py @@ -166,6 +166,9 @@ def test_get_async_execution_no_results_when_direct_results_were_sent(self): with self.connection() as conn: ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1}) + assert ( + not ae.is_available + ), "Queries that return direct results should not be available" query_id, query_secret = ae.serialize().split(":") ae.get_result() @@ -193,9 +196,13 @@ def test_get_async_execution_twice(self): """ with self.connection() as conn_1, self.connection() as conn_2: ae_1 = conn_1.execute_async(LONG_ISH_QUERY) + assert ( + ae_1.is_available + ), "A long query does not return direct results so is_available should be True" query_id, query_secret = ae_1.serialize().split(":") ae_2 = conn_2.get_async_execution(query_id, query_secret) + assert ae_2.is_available while ae_1.is_running: time.sleep(1) From c0d35a793594377afb913de82af5cb908633da3c Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Mon, 22 Jan 2024 16:42:59 -0500 Subject: [PATCH 2/6] Revise logic for checking for direct results after conversation with thrift server team. Signed-off-by: Jesse Whitehouse --- src/databricks/sql/results.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/databricks/sql/results.py b/src/databricks/sql/results.py index 8e17bf74..8ee53e55 100644 --- a/src/databricks/sql/results.py +++ b/src/databricks/sql/results.py @@ -230,8 +230,22 @@ def execute_response_contains_direct_results( execute_response: ttypes.TExecuteStatementResp, ) -> bool: """ - Returns True if the thrift TExecuteStatementResp contains metadata - This indicates the statement has finished executing at the server. + Returns True if the thrift TExecuteStatementResp returned a direct result. + + When directResults is used the server just batches these rpcs together, + if the entire result can be returned in a single round-trip: + + struct TSparkDirectResults { + 1: optional TGetOperationStatusResp operationStatus + 2: optional TGetResultSetMetadataResp resultSetMetadata + 3: optional TFetchResultsResp resultSet + 4: optional TCloseOperationResp closeOperation + } """ - return bool(execute_response.directResults.resultSetMetadata) + has_op_status = execute_response.directResults.operationStatus + has_result_set = execute_response.directResults.resultSet + has_metadata = execute_response.directResults.resultSetMetadata + has_close_op = execute_response.directResults.closeOperation + + return has_op_status and has_result_set and has_metadata and has_close_op From 6337a899b1d5f37a69644ac99433805413ea9d75 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Mon, 22 Jan 2024 16:43:18 -0500 Subject: [PATCH 3/6] Fix: when moving ResultSet to results.py I didn't instantiate a logger Signed-off-by: Jesse Whitehouse --- src/databricks/sql/results.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/databricks/sql/results.py b/src/databricks/sql/results.py index 8ee53e55..ec2433c2 100644 --- a/src/databricks/sql/results.py +++ b/src/databricks/sql/results.py @@ -18,6 +18,10 @@ from databricks.sql.client import Connection from databricks.sql.thrift_backend import ThriftBackend +import logging + +logger = logging.getLogger(__name__) + # TODO: this is duplicated from client.py to avoid ImportError. Fix this. DEFAULT_RESULT_BUFFER_SIZE_BYTES = 104857600 From dde90d9a2d36074a819f4d03d67008004f844f90 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Mon, 22 Jan 2024 16:48:23 -0500 Subject: [PATCH 4/6] Rename .is_available to .returned_as_direct_result Signed-off-by: Jesse Whitehouse --- tests/e2e/test_execute_async.py | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/tests/e2e/test_execute_async.py b/tests/e2e/test_execute_async.py index 39ba0875..2733fb95 100644 --- a/tests/e2e/test_execute_async.py +++ b/tests/e2e/test_execute_async.py @@ -68,7 +68,7 @@ def test_direct_results_query_canary(self): with self.connection() as conn: ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1}) - assert not ae.is_running + assert ae.returned_as_direct_result def test_cancel_running_query(self, long_running_ae: AsyncExecution): long_running_ae.cancel() @@ -112,21 +112,14 @@ def cancel_query_in_separate_thread(query_id, query_secret): assert long_running_ae.status == AsyncExecutionStatus.CANCELED def test_long_ish_query_canary(self, long_ish_ae: AsyncExecution): - """This test verifies that on the current endpoint, the LONG_ISH_QUERY requires - at least one sync_status call before it is finished. If this test fails, it means - the SQL warehouse got faster at executing this query and we should increment the value - of GT_FIVE_SECONDS_VALUE + """This test verifies that on the current endpoint, the LONG_ISH_QUERY does not return direct results. It would be easier to do this if Databricks SQL had a SLEEP() function :/ - """ - poll_count = 0 - while long_ish_ae.is_running: - time.sleep(1) - long_ish_ae.sync_status() - poll_count += 1 + We could acheive something similar by overriding the directResults setting in our ExecuteStatementReq + """ - assert poll_count > 0 + assert not long_ish_ae.returned_as_direct_result def test_get_async_execution_and_get_results_without_direct_results( self, long_ish_ae: AsyncExecution @@ -162,12 +155,12 @@ def test_serialize(self, long_running_ae: AsyncExecution): assert ae.is_running def test_get_async_execution_no_results_when_direct_results_were_sent(self): - """It remains to be seen whether results can be fetched repeatedly from a "picked up" execution.""" + """Queries that return direct results cannot be picked up with `get_async_execution()`.""" with self.connection() as conn: ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1}) assert ( - not ae.is_available + ae.returned_as_direct_result ), "Queries that return direct results should not be available" query_id, query_secret = ae.serialize().split(":") ae.get_result() @@ -197,12 +190,12 @@ def test_get_async_execution_twice(self): with self.connection() as conn_1, self.connection() as conn_2: ae_1 = conn_1.execute_async(LONG_ISH_QUERY) assert ( - ae_1.is_available + not ae_1.returned_as_direct_result ), "A long query does not return direct results so is_available should be True" query_id, query_secret = ae_1.serialize().split(":") ae_2 = conn_2.get_async_execution(query_id, query_secret) - assert ae_2.is_available + assert not ae_2.returned_as_direct_result while ae_1.is_running: time.sleep(1) From 8318df73b3ebc23ee722f7c352e2e2cab6edc646 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Mon, 22 Jan 2024 16:52:12 -0500 Subject: [PATCH 5/6] Missed these in the last commit...whoops Signed-off-by: Jesse Whitehouse --- src/databricks/sql/ae.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/databricks/sql/ae.py b/src/databricks/sql/ae.py index aff417ba..efa8e4e0 100644 --- a/src/databricks/sql/ae.py +++ b/src/databricks/sql/ae.py @@ -81,7 +81,7 @@ class AsyncExecution: ] _last_sync_timestamp: Optional[datetime] = None _result_set: Optional["ResultSet"] = None - _is_available: bool = True + _returned_as_direct_result: bool = False def __init__( self, @@ -103,7 +103,7 @@ def __init__( if execute_statement_response: self._execute_statement_response = execute_statement_response if execute_response_contains_direct_results(execute_statement_response): - self._is_available = False + self._returned_as_direct_result = True else: self._execute_statement_response = FakeExecuteStatementResponse( directResults=False, operationHandle=self.t_operation_handle @@ -229,13 +229,15 @@ def last_sync_timestamp(self) -> Optional[datetime]: return self._last_sync_timestamp @property - def is_available(self) -> bool: - """Indicates whether the result of this query can be fetched from a separate thread. + def returned_as_direct_result(self) -> bool: + """When direct results were returned, this query_id cannot be picked up + with `Connection.get_async_execution()` - Only returns False if the query returned its results directly when `execute_async` was called. + Only returns True if the query returned its results directly when `execute_async` + was called. """ - return self._is_available + return self._returned_as_direct_result @classmethod def from_thrift_response( From e4d6c1ab8c92161f4eb09992a697bacc5daf1718 Mon Sep 17 00:00:00 2001 From: Jesse Whitehouse Date: Mon, 22 Jan 2024 16:53:30 -0500 Subject: [PATCH 6/6] Fix outdated assertion Signed-off-by: Jesse Whitehouse --- tests/e2e/test_execute_async.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/e2e/test_execute_async.py b/tests/e2e/test_execute_async.py index 2733fb95..a14e59cc 100644 --- a/tests/e2e/test_execute_async.py +++ b/tests/e2e/test_execute_async.py @@ -189,9 +189,8 @@ def test_get_async_execution_twice(self): """ with self.connection() as conn_1, self.connection() as conn_2: ae_1 = conn_1.execute_async(LONG_ISH_QUERY) - assert ( - not ae_1.returned_as_direct_result - ), "A long query does not return direct results so is_available should be True" + assert not ae_1.returned_as_direct_result + query_id, query_secret = ae_1.serialize().split(":") ae_2 = conn_2.get_async_execution(query_id, query_secret)