Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1805840: Augment telemetry with method_call_count #2804

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
- Updated integration testing for `session.lineage.trace` to exclude deleted objects
- Added documentation for `DataFrame.map`.
- Improve performance of `DataFrame.apply` by mapping numpy functions to snowpark functions if possible.
- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn
- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn.
- Infer return type of functions in `Series.map`, `Series.apply` and `DataFrame.map` if type-hint is not provided.
- Added `call_count` to telemetry that counts method calls including interchange protocol calls.

## 1.26.0 (2024-12-05)

Expand Down
17 changes: 17 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_internal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class SnowparkPandasTelemetryField(Enum):
ARGS = "argument"
# fallback flag
IS_FALLBACK = "is_fallback"
# number of times a method has been called on the same query compiler
CALL_COUNT = "call_count"
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved


# Argument truncating size after converted to str. Size amount can be later specified after analysis and needs.
Expand All @@ -58,6 +60,7 @@ def _send_snowpark_pandas_telemetry_helper(
func_name: str,
query_history: Optional[QueryHistory],
api_calls: Union[str, list[dict[str, Any]]],
method_call_count: str,
) -> None:
"""
A helper function that sends Snowpark pandas API telemetry data.
Expand All @@ -71,6 +74,7 @@ def _send_snowpark_pandas_telemetry_helper(
query_history: The query history context manager to record queries that are pushed down to the Snowflake
database in the session.
api_calls: Optional list of Snowpark pandas API calls made during the function execution.
method_call_count: Number of times a method has been called.

Returns:
None
Expand All @@ -79,6 +83,11 @@ def _send_snowpark_pandas_telemetry_helper(
TelemetryField.KEY_FUNC_NAME.value: func_name,
TelemetryField.KEY_CATEGORY.value: SnowparkPandasTelemetryField.FUNC_CATEGORY_SNOWPARK_PANDAS.value,
TelemetryField.KEY_ERROR_MSG.value: error_msg,
**(
{SnowparkPandasTelemetryField.CALL_COUNT.value: method_call_count}
if method_call_count is not None
else {}
),
}
if len(api_calls) > 0:
data[TelemetryField.KEY_API_CALLS.value] = api_calls
Expand Down Expand Up @@ -274,6 +283,7 @@ def _telemetry_helper(
# Moving existing api call out first can avoid to generate duplicates.
existing_api_calls = []
need_to_restore_args0_api_calls = False
method_call_count = None

# If the decorated func is a class method or a standalone function, we need to get an active session:
if is_standalone_function or (len(args) > 0 and isinstance(args[0], type)):
Expand All @@ -295,6 +305,11 @@ def _telemetry_helper(
need_to_restore_args0_api_calls = True
session = args[0]._query_compiler._modin_frame.ordered_dataframe.session
class_prefix = args[0].__class__.__name__
func_name = _gen_func_name(
class_prefix, func, property_name, property_method_type
)
args[0]._query_compiler._method_call_counts[func_name] += 1
method_call_count = args[0]._query_compiler._method_call_counts[func_name]
except (TypeError, IndexError, AttributeError):
# TypeError: args might not support indexing; IndexError: args is empty; AttributeError: args[0] might not
# have _query_compiler attribute.
Expand Down Expand Up @@ -337,6 +352,7 @@ def _telemetry_helper(
func_name=func_name,
query_history=query_history,
api_calls=existing_api_calls + [curr_api_call],
method_call_count=method_call_count,
)
raise e

Expand Down Expand Up @@ -371,6 +387,7 @@ def _telemetry_helper(
func_name=func_name,
query_history=query_history,
api_calls=existing_api_calls + [curr_api_call],
method_call_count=method_call_count,
)
if need_to_restore_args0_api_calls:
args[0]._query_compiler.snowpark_pandas_api_calls = existing_api_calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import json
import logging
import re
from collections import Counter
import typing
import uuid
from collections.abc import Hashable, Iterable, Mapping, Sequence
Expand Down Expand Up @@ -530,9 +531,11 @@ def __init__(self, frame: InternalFrame) -> None:
), "frame is None or not a InternalFrame"
self._modin_frame = frame
# self.snowpark_pandas_api_calls a list of lazy Snowpark pandas telemetry api calls
# Copying and modifying self.snowpark_pandas_api_calls is taken care of in telemetry decorators
# Copying and modifying self.snowpark_pandas_api_calls and self._method_call_counts
# is taken care of in telemetry decorators
self.snowpark_pandas_api_calls: list = []
self._attrs: dict[Any, Any] = {}
self._method_call_counts: Counter[str] = Counter[str]()

def _raise_not_implemented_error_for_timedelta(
self, frame: InternalFrame = None
Expand Down
115 changes: 115 additions & 0 deletions tests/integ/modin/test_telemetry.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test where 2 different methods are called? Also, how does this interact with query compiler methods that call other query compiler methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test that calls two different methods on the same QC now.

Trying out align, it returns the telemetry with its own func name for ex
[{'func_name': 'DataFrame.align', 'category': 'snowpark_pandas', 'error_msg': None, 'call_count': 1, 'api_calls': [{'name': 'DataFrame.align'}]}]

Did you have another specific method in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an example, SnowflakeQueryCompiler.any with axis=0 calls SnowflakeQueryCompiler._bool_reduce_helper, which then in turn calls SnowflakeQueryCompiler.agg. I was wondering how this would be reflected in telemetry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case _query_compiler._method_call_counts would include the func_name count 'DataFrame.BasePandasDataset.any' = 1 as well as the other attributes called tracked with telemetry such as 'DataFrame.__repr__' and 'DataFrame.property.iloc_get'.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Would it be accurate to say that _query_compiler._method_call_counts only tracks methods called on this particular instance of query compiler (the example call chain I gave returns a new query compiler instance every time), and these counts are only used in telemetry for certain frontend methods like dataframe and repr that we specify explicitly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly the goal is to track call counts on the same query compiler instance, more info is in the design doc here: https://docs.google.com/document/d/1EfqQwejVbF5_36hnOP-ap0t3NaCWmDz62iAcR0PtX20/edit?tab=t.0#heading=h.4uu48icmuq7z

Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def test_snowpark_pandas_telemetry_method_decorator(test_table_name):
"sfqids",
"func_name",
"error_msg",
"call_count",
}
assert data["category"] == "snowpark_pandas"
assert data["api_calls"] == df1_expected_api_calls + [
Expand Down Expand Up @@ -178,6 +179,7 @@ def test_send_snowpark_pandas_telemetry_helper(send_mock):
func_name="test_send_func",
query_history=None,
api_calls=[],
method_call_count=None,
)
send_mock.assert_called_with(
{
Expand Down Expand Up @@ -559,6 +561,119 @@ def test_telemetry_repr():
]


@sql_count_checker(query_count=6, join_count=4)
def test_telemetry_interchange_call_count():
s = pd.DataFrame([1, 2, 3, 4])
t = pd.DataFrame([5])
s.__dataframe__()
s.__dataframe__()
t.__dataframe__()

s.iloc[0, 0] = 7
s.__dataframe__()
s.__dataframe__()
t.__dataframe__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__dataframe__"
]
assert len(telemetry_data) == 6
# s calls __dataframe__() for the first time.
assert telemetry_data[0]["call_count"] == 1
# s calls __dataframe__() for the second time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this second call served from some cache or recomputed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the telemetry call data is recomputed every time, @sfc-gh-azhan might confirm

assert telemetry_data[1]["call_count"] == 2
# t calls __dataframe__() for the first time.
assert telemetry_data[2]["call_count"] == 1
# the new version of s calls __dataframe__() for the first time.
assert telemetry_data[3]["call_count"] == 1
# the new version of s calls __dataframe__() for the second time.
assert telemetry_data[4]["call_count"] == 2
# t calls __dataframe__() for the second time.
assert telemetry_data[5]["call_count"] == 2


@sql_count_checker(query_count=4)
def test_telemetry_func_call_count():
s = pd.DataFrame([1, 2, np.nan, 4])
t = pd.DataFrame([5])

s.__repr__()
s.__repr__()
s.__repr__()

t.__repr__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__repr__"
]

# second to last call from telemetry data
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
# s called __repr__() 3 times.
assert telemetry_data[-2]["call_count"] == 3

# last call from telemetry data
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
# t called __repr__() 1 time.
assert telemetry_data[-1]["call_count"] == 1


@sql_count_checker(query_count=3)
def test_telemetry_multiple_func_call_count():
s = pd.DataFrame([1, 2, np.nan, 4])

s.__repr__()
s.__repr__()
s.__dataframe__()

def _get_data(call):
try:
return call.to_dict()["message"][TelemetryField.KEY_DATA.value]
except Exception:
return None

repr_telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__repr__"
]
dataframe_telemetry_data = [
_get_data(call)
for call in pd.session._conn._telemetry_client.telemetry._log_batch
if _get_data(call) is not None
and "func_name" in _get_data(call)
and _get_data(call)["func_name"] == "DataFrame.__dataframe__"
]

# last call from telemetry data
# s called __repr__() 2 times.
assert repr_telemetry_data[-1]["call_count"] == 2

# last call from telemetry data
# s called __dataframe__() 2 times.
assert dataframe_telemetry_data[-1]["call_count"] == 1


@sql_count_checker(query_count=0)
def test_telemetry_copy():
# copy() is defined in upstream Modin's BasePandasDataset class, and not overridden by any
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/modin/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def snowpark_pandas_error_test_helper(
query_history=ANY,
telemetry_type=telemetry_type,
error_msg=error_msg,
method_call_count=ANY,
)


Expand Down Expand Up @@ -115,6 +116,7 @@ def raise_real_type_error(_):
query_history=ANY,
telemetry_type="snowpark_pandas_type_error",
error_msg=None,
method_call_count=ANY,
)
assert len(mock_arg2._query_compiler.snowpark_pandas_api_calls) == 0

Expand All @@ -133,6 +135,7 @@ def raise_real_type_error(_):
query_history=ANY,
telemetry_type="snowpark_pandas_type_error",
error_msg=None,
method_call_count=ANY,
)


Expand Down
Loading