Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to interrupt/cancel operations that use Databricks Connect and Ibis #3494

Open
kyrre opened this issue Jan 18, 2025 · 3 comments
Open
Labels
bug Something isn't working

Comments

@kyrre
Copy link

kyrre commented Jan 18, 2025

Describe the bug

We are using Ibis and the Databricks Connect (Spark Connect) backend to run queries over large datasets.

However, if I mess up a query and try cancelling it using the "Stop (interrupt) execution" button, then marimo will register the interruption request:

[10:59:06 AM]
STDERR
(cell-12)
�[34m[D 250118 10:59:06 handlers:25]�(B�[m Interrupt request received

but then just continue waiting forever (even after the query is finished in Databricks).

If I hit push the "Stop (interrupt) execution" button twice the exception will be raised and I get the following trace:


Traceback (most recent call last):
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 648, in __iter__
    for response in self._call:
                    ^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 543, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 960, in _next
    _common.wait(self._state.condition.wait, _response_ready)
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/grpc/_common.py", line 156, in wait
    _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/grpc/_common.py", line 116, in _wait_once
    wait_fn(timeout=timeout)
  File "/Users/kyrre.wahl.kongsgaard/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/threading.py", line 359, in wait
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/marimo/_runtime/handlers.py", line 31, in interrupt_handler
    raise MarimoInterrupt
marimo._runtime.control_flow.MarimoInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/marimo/_runtime/executor.py", line 142, in execute_cell
    return eval(cell.last_expr, glbls)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  Cell marimo:///Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/src/data/azure-applications/[service_principal_name].parquet.py#cell=cell-12, line 1, in <module>
    azure_application_events.execute()
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/ibis/expr/types/core.py", line 396, in execute
    return self._find_backend(use_default=True).execute(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/ibis/backends/pyspark/__init__.py", line 451, in execute
    df = query.toPandas()  # blocks until finished
         ^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/dataframe.py", line 2000, in toPandas
    pdf, ei = self._session.client.to_pandas(query, self._plan.observations)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 1244, in to_pandas
    table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
                                                  ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 1919, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 1881, in _execute_and_fetch_as_iterator
    for b in generator:
             ^^^^^^^^^
  File "<frozen _collections_abc>", line 356, in __next__
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py", line 139, in send
    if not self._has_next():
           ^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py", line 172, in _has_next
    self._current = self._call_iter(
                    ^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py", line 277, in _call_iter
    return iter_fun()
           ^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py", line 173, in <lambda>
    lambda: next(self._iterator)  # type: ignore[arg-type]
            ^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 654, in __iter__
    trailers = self._call.trailing_metadata()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 819, in trailing_metadata
    _common.wait(self._state.condition.wait, _done)
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/grpc/_common.py", line 156, in wait
    _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/grpc/_common.py", line 116, in _wait_once
    wait_fn(timeout=timeout)
  File "/Users/kyrre.wahl.kongsgaard/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/threading.py", line 359, in wait
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyrre.wahl.kongsgaard/projects/cdc-dashboards/.venv/lib/python3.12/site-packages/marimo/_runtime/handlers.py", line 31, in interrupt_handler
    raise MarimoInterrupt
marimo._runtime.control_flow.MarimoInterrupt

The Spark job will, however, just continue on the Databricks side (and potentially happily scan through hundreds of terabytes without notifying the user 🙀).

Environment

{
  "marimo": "0.10.14",
  "OS": "Darwin",
  "OS Version": "24.2.0",
  "Processor": "arm",
  "Python Version": "3.12.7",
  "Binaries": {
    "Browser": "132.0.6834.83",
    "Node": "v18.16.0"
  },
  "Dependencies": {
    "click": "8.1.8",
    "docutils": "0.21.2",
    "itsdangerous": "2.2.0",
    "jedi": "0.19.2",
    "markdown": "3.7",
    "narwhals": "1.22.0",
    "packaging": "24.2",
    "psutil": "6.1.1",
    "pygments": "2.19.1",
    "pymdown-extensions": "10.14",
    "pyyaml": "6.0.2",
    "ruff": "0.9.2",
    "starlette": "0.45.2",
    "tomlkit": "0.13.2",
    "typing-extensions": "4.12.2",
    "uvicorn": "0.34.0",
    "websockets": "14.1"
  },
  "Optional Dependencies": {
    "duckdb": "1.1.3",
    "ibis-framework": "9.5.0",
    "pandas": "2.2.3",
    "pyarrow": "17.0.0"
  }
}

Code to reproduce

import ibis
import marimo as mo

from databricks.sdk.core import Config
from databricks.connect import DatabricksSession

config = Config(profile="security")
spark = DatabricksSession.builder.sdkConfig(config).getOrCreate()

con = ibis.pyspark.connect(spark)

cloud_app_events = (
    con.table(
        name="cloud_app_events",
        database=("old_security_logs", "mde"),
    )
    .select(_.properties)
    .unpack("properties")
)

# and then try to cancel after running this 
cloud_app_events.execute() 
@kyrre kyrre added the bug Something isn't working label Jan 18, 2025
@mscolnick
Copy link
Contributor

hi @kyrre, we will investigate this and create a databricks account. In the meantime, can you try this in jupyter? it will help determine if this is a bug in marimo or a bug in the library.

@kyrre
Copy link
Author

kyrre commented Jan 19, 2025

@mscolnick I exported the notebook to jupyter format and experimented a bit.

In jupyterlab this is what happens:

If you try to cancel the execution of the cell using the "interrupt kernel" button it will register it in the terminal output:

[I 2025-01-19 17:40:38.592 ServerApp] Kernel interrupted: 55055774-e314-42ab-887f-2b87caa82b87

but the execution will continue and the cell itself will never finish. This is the same the same as for marimo.

However, if you push the "interrupt kernel" button twice it will throw a KeyboardInterrupt exception, but now the Spark job will fail with the reason "SPARK_JOB_CANCELLED" (as expected).

@mscolnick
Copy link
Contributor

Awesome thanks for investigating. It looks like we can implement the KeyboardInterrupt as well for this to work in marimo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants