Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-1263] Implement a .returned_as_direct_result property for AsyncExecution status #325

Open
wants to merge 6 commits into
base: peco-1263-staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/databricks/sql/ae.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum
from typing import Optional, Union, TYPE_CHECKING
from databricks.sql.exc import RequestError
from databricks.sql.results import ResultSet
from databricks.sql.results import ResultSet, execute_response_contains_direct_results

from datetime import datetime

Expand Down Expand Up @@ -81,6 +81,7 @@ class AsyncExecution:
]
_last_sync_timestamp: Optional[datetime] = None
_result_set: Optional["ResultSet"] = None
_is_available: bool = True

def __init__(
self,
Expand All @@ -101,6 +102,8 @@ def __init__(

if execute_statement_response:
self._execute_statement_response = execute_statement_response
if execute_response_contains_direct_results(execute_statement_response):
self._is_available = False
else:
self._execute_statement_response = FakeExecuteStatementResponse(
directResults=False, operationHandle=self.t_operation_handle
Expand Down Expand Up @@ -225,6 +228,15 @@ def last_sync_timestamp(self) -> Optional[datetime]:
"""The timestamp of the last time self.status was synced with the server"""
return self._last_sync_timestamp

@property
def is_available(self) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long is it available for? How does this limitation work in the CUJ of recovering from a user-space crash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to check with the thrift server folks to get that answer. I believe it's available for a few hours. With cloud fetch enabled it's 24 hours.

What do you think of the name is_available? I'd like a name that's reflective but also easy enough to type.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its really a tough one; I think this might be a little misleading, because isn't the answer 'Unknown' for crash recovery CUJ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Because under this situation, if the client crashes it will do so before the query_id is returned. Which means there would be no way to pick up the execution anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so if it gets a query_id, then it must be available, unless its been a long time? Although, again, available doesn't sound quite right because the query could still be executing. Maybe just returned_as_direct_result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I've done the rename and pushed.

Note: we could get around this by allowing users to disable directResults when the TExecuteStatementReq is emitted.

"""Indicates whether the result of this query can be fetched from a separate thread.

Only returns False if the query returned its results directly when `execute_async` was called.
"""

return self._is_available

@classmethod
def from_thrift_response(
cls,
Expand Down
12 changes: 12 additions & 0 deletions src/databricks/sql/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from databricks.sql.exc import (
CursorAlreadyClosedError,
)
from databricks.sql.thrift_api.TCLIService import ttypes
from databricks.sql.types import Row
from databricks.sql.utils import ExecuteResponse

Expand Down Expand Up @@ -223,3 +224,14 @@ def map_col_type(type_):
(column.name, map_col_type(column.datatype), None, None, None, None, None)
for column in table_schema_message.columns
]


def execute_response_contains_direct_results(
execute_response: ttypes.TExecuteStatementResp,
) -> bool:
"""
Returns True if the thrift TExecuteStatementResp contains metadata
This indicates the statement has finished executing at the server.
"""

return bool(execute_response.directResults.resultSetMetadata)
7 changes: 7 additions & 0 deletions tests/e2e/test_execute_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ def test_get_async_execution_no_results_when_direct_results_were_sent(self):

with self.connection() as conn:
ae = conn.execute_async(DIRECT_RESULTS_QUERY, {"param": 1})
assert (
not ae.is_available
), "Queries that return direct results should not be available"
query_id, query_secret = ae.serialize().split(":")
ae.get_result()

Expand Down Expand Up @@ -193,9 +196,13 @@ def test_get_async_execution_twice(self):
"""
with self.connection() as conn_1, self.connection() as conn_2:
ae_1 = conn_1.execute_async(LONG_ISH_QUERY)
assert (
ae_1.is_available
), "A long query does not return direct results so is_available should be True"

query_id, query_secret = ae_1.serialize().split(":")
ae_2 = conn_2.get_async_execution(query_id, query_secret)
assert ae_2.is_available

while ae_1.is_running:
time.sleep(1)
Expand Down
Loading