diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ebf01e80d1..2f62bbb2810 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,8 +59,9 @@ - Updated integration testing for `session.lineage.trace` to exclude deleted objects - Added documentation for `DataFrame.map`. - Improve performance of `DataFrame.apply` by mapping numpy functions to snowpark functions if possible. -- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn +- Added documentation on the extent of Snowpark pandas interoperability with scikit-learn. - Infer return type of functions in `Series.map`, `Series.apply` and `DataFrame.map` if type-hint is not provided. +- Added `call_count` to telemetry that counts method calls including interchange protocol calls. ## 1.26.0 (2024-12-05) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/telemetry.py b/src/snowflake/snowpark/modin/plugin/_internal/telemetry.py index 3ea77ca0025..42a60d90bb6 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/telemetry.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/telemetry.py @@ -36,6 +36,8 @@ class SnowparkPandasTelemetryField(Enum): ARGS = "argument" # fallback flag IS_FALLBACK = "is_fallback" + # number of times a method has been called on the same query compiler + CALL_COUNT = "call_count" # Argument truncating size after converted to str. Size amount can be later specified after analysis and needs. @@ -58,6 +60,7 @@ def _send_snowpark_pandas_telemetry_helper( func_name: str, query_history: Optional[QueryHistory], api_calls: Union[str, list[dict[str, Any]]], + method_call_count: str, ) -> None: """ A helper function that sends Snowpark pandas API telemetry data. @@ -71,6 +74,7 @@ def _send_snowpark_pandas_telemetry_helper( query_history: The query history context manager to record queries that are pushed down to the Snowflake database in the session. api_calls: Optional list of Snowpark pandas API calls made during the function execution. + method_call_count: Number of times a method has been called. Returns: None @@ -79,6 +83,11 @@ def _send_snowpark_pandas_telemetry_helper( TelemetryField.KEY_FUNC_NAME.value: func_name, TelemetryField.KEY_CATEGORY.value: SnowparkPandasTelemetryField.FUNC_CATEGORY_SNOWPARK_PANDAS.value, TelemetryField.KEY_ERROR_MSG.value: error_msg, + **( + {SnowparkPandasTelemetryField.CALL_COUNT.value: method_call_count} + if method_call_count is not None + else {} + ), } if len(api_calls) > 0: data[TelemetryField.KEY_API_CALLS.value] = api_calls @@ -274,6 +283,7 @@ def _telemetry_helper( # Moving existing api call out first can avoid to generate duplicates. existing_api_calls = [] need_to_restore_args0_api_calls = False + method_call_count = None # If the decorated func is a class method or a standalone function, we need to get an active session: if is_standalone_function or (len(args) > 0 and isinstance(args[0], type)): @@ -295,6 +305,11 @@ def _telemetry_helper( need_to_restore_args0_api_calls = True session = args[0]._query_compiler._modin_frame.ordered_dataframe.session class_prefix = args[0].__class__.__name__ + func_name = _gen_func_name( + class_prefix, func, property_name, property_method_type + ) + args[0]._query_compiler._method_call_counts[func_name] += 1 + method_call_count = args[0]._query_compiler._method_call_counts[func_name] except (TypeError, IndexError, AttributeError): # TypeError: args might not support indexing; IndexError: args is empty; AttributeError: args[0] might not # have _query_compiler attribute. @@ -337,6 +352,7 @@ def _telemetry_helper( func_name=func_name, query_history=query_history, api_calls=existing_api_calls + [curr_api_call], + method_call_count=method_call_count, ) raise e @@ -371,6 +387,7 @@ def _telemetry_helper( func_name=func_name, query_history=query_history, api_calls=existing_api_calls + [curr_api_call], + method_call_count=method_call_count, ) if need_to_restore_args0_api_calls: args[0]._query_compiler.snowpark_pandas_api_calls = existing_api_calls diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 5f7f38d99f7..fc7dc036879 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -10,6 +10,7 @@ import json import logging import re +from collections import Counter import typing import uuid from collections.abc import Hashable, Iterable, Mapping, Sequence @@ -530,9 +531,11 @@ def __init__(self, frame: InternalFrame) -> None: ), "frame is None or not a InternalFrame" self._modin_frame = frame # self.snowpark_pandas_api_calls a list of lazy Snowpark pandas telemetry api calls - # Copying and modifying self.snowpark_pandas_api_calls is taken care of in telemetry decorators + # Copying and modifying self.snowpark_pandas_api_calls and self._method_call_counts + # is taken care of in telemetry decorators self.snowpark_pandas_api_calls: list = [] self._attrs: dict[Any, Any] = {} + self._method_call_counts: Counter[str] = Counter[str]() def _raise_not_implemented_error_for_timedelta( self, frame: InternalFrame = None diff --git a/tests/integ/modin/test_telemetry.py b/tests/integ/modin/test_telemetry.py index 3caa81741b3..a5a904c7431 100644 --- a/tests/integ/modin/test_telemetry.py +++ b/tests/integ/modin/test_telemetry.py @@ -143,6 +143,7 @@ def test_snowpark_pandas_telemetry_method_decorator(test_table_name): "sfqids", "func_name", "error_msg", + "call_count", } assert data["category"] == "snowpark_pandas" assert data["api_calls"] == df1_expected_api_calls + [ @@ -178,6 +179,7 @@ def test_send_snowpark_pandas_telemetry_helper(send_mock): func_name="test_send_func", query_history=None, api_calls=[], + method_call_count=None, ) send_mock.assert_called_with( { @@ -559,6 +561,119 @@ def test_telemetry_repr(): ] +@sql_count_checker(query_count=6, join_count=4) +def test_telemetry_interchange_call_count(): + s = pd.DataFrame([1, 2, 3, 4]) + t = pd.DataFrame([5]) + s.__dataframe__() + s.__dataframe__() + t.__dataframe__() + + s.iloc[0, 0] = 7 + s.__dataframe__() + s.__dataframe__() + t.__dataframe__() + + def _get_data(call): + try: + return call.to_dict()["message"][TelemetryField.KEY_DATA.value] + except Exception: + return None + + telemetry_data = [ + _get_data(call) + for call in pd.session._conn._telemetry_client.telemetry._log_batch + if _get_data(call) is not None + and "func_name" in _get_data(call) + and _get_data(call)["func_name"] == "DataFrame.__dataframe__" + ] + assert len(telemetry_data) == 6 + # s calls __dataframe__() for the first time. + assert telemetry_data[0]["call_count"] == 1 + # s calls __dataframe__() for the second time. + assert telemetry_data[1]["call_count"] == 2 + # t calls __dataframe__() for the first time. + assert telemetry_data[2]["call_count"] == 1 + # the new version of s calls __dataframe__() for the first time. + assert telemetry_data[3]["call_count"] == 1 + # the new version of s calls __dataframe__() for the second time. + assert telemetry_data[4]["call_count"] == 2 + # t calls __dataframe__() for the second time. + assert telemetry_data[5]["call_count"] == 2 + + +@sql_count_checker(query_count=4) +def test_telemetry_func_call_count(): + s = pd.DataFrame([1, 2, np.nan, 4]) + t = pd.DataFrame([5]) + + s.__repr__() + s.__repr__() + s.__repr__() + + t.__repr__() + + def _get_data(call): + try: + return call.to_dict()["message"][TelemetryField.KEY_DATA.value] + except Exception: + return None + + telemetry_data = [ + _get_data(call) + for call in pd.session._conn._telemetry_client.telemetry._log_batch + if _get_data(call) is not None + and "func_name" in _get_data(call) + and _get_data(call)["func_name"] == "DataFrame.__repr__" + ] + + # second to last call from telemetry data + # s called __repr__() 3 times. + assert telemetry_data[-2]["call_count"] == 3 + + # last call from telemetry data + # t called __repr__() 1 time. + assert telemetry_data[-1]["call_count"] == 1 + + +@sql_count_checker(query_count=3) +def test_telemetry_multiple_func_call_count(): + s = pd.DataFrame([1, 2, np.nan, 4]) + + s.__repr__() + s.__repr__() + s.__dataframe__() + + def _get_data(call): + try: + return call.to_dict()["message"][TelemetryField.KEY_DATA.value] + except Exception: + return None + + repr_telemetry_data = [ + _get_data(call) + for call in pd.session._conn._telemetry_client.telemetry._log_batch + if _get_data(call) is not None + and "func_name" in _get_data(call) + and _get_data(call)["func_name"] == "DataFrame.__repr__" + ] + dataframe_telemetry_data = [ + _get_data(call) + for call in pd.session._conn._telemetry_client.telemetry._log_batch + if _get_data(call) is not None + and "func_name" in _get_data(call) + and _get_data(call)["func_name"] == "DataFrame.__dataframe__" + ] + + # last call from telemetry data + # s called __repr__() 2 times. + assert repr_telemetry_data[-1]["call_count"] == 2 + + # last call from telemetry data + # s called __dataframe__() 2 times. + assert dataframe_telemetry_data[-1]["call_count"] == 1 + + @sql_count_checker(query_count=0) def test_telemetry_copy(): # copy() is defined in upstream Modin's BasePandasDataset class, and not overridden by any diff --git a/tests/unit/modin/test_telemetry.py b/tests/unit/modin/test_telemetry.py index 92617906970..d8ffbb97dcd 100644 --- a/tests/unit/modin/test_telemetry.py +++ b/tests/unit/modin/test_telemetry.py @@ -54,6 +54,7 @@ def snowpark_pandas_error_test_helper( query_history=ANY, telemetry_type=telemetry_type, error_msg=error_msg, + method_call_count=ANY, ) @@ -115,6 +116,7 @@ def raise_real_type_error(_): query_history=ANY, telemetry_type="snowpark_pandas_type_error", error_msg=None, + method_call_count=ANY, ) assert len(mock_arg2._query_compiler.snowpark_pandas_api_calls) == 0 @@ -133,6 +135,7 @@ def raise_real_type_error(_): query_history=ANY, telemetry_type="snowpark_pandas_type_error", error_msg=None, + method_call_count=ANY, )