Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: a new PartitionedTableService API in Python #353

Open
deephaven-internal opened this issue Nov 1, 2024 · 1 comment
Open

feat: a new PartitionedTableService API in Python #353

deephaven-internal opened this issue Nov 1, 2024 · 1 comment

Comments

@deephaven-internal
Copy link

This issue was auto-generated

PR: deephaven/deephaven-core#6175
Author: jmao-denver

Original PR Body

Fixes #6171

Most recent nightlies run: https://github.com/nbauernfeind/deephaven-core/actions/runs/11502810543/

@nbauernfeind
Copy link
Member

nbauernfeind commented Nov 1, 2024

This is a working example that I believe appropriate to use as the basis of deephaven.io documentation:

### Below is a sample implementation showing:
### 1) how to implement a sample TableDataServiceBackend
### 2) manually manipulate the TableDataServiceBackend to demonstrate behavior of static and refreshing scenarios
### 3) how to fetch a table from that backend in both static and refreshing contexts

from typing import Callable, Optional, Dict
import pyarrow as pa
from deephaven.experimental.table_data_service import TableDataServiceBackend, TableKey, TableLocationKey, TableDataService

class TableKeyImpl(TableKey):
    def __init__(self, key: str):
        self.key = key

    def __hash__(self):
        return hash(self.key)

    def __eq__(self, other):
        if not isinstance(other, TableKeyImpl):
            return NotImplemented
        return self.key == other.key

    def __str__(self):
        return f"TableKeyImpl{{{self.key}}}"

class TableLocationKeyImpl(TableLocationKey):
    def __init__(self, key: str):
        self.key = key

    def __hash__(self):
        return hash(self.key)

    def __eq__(self, other):
        if not isinstance(other, TableLocationKeyImpl):
            return NotImplemented
        return self.key == other.key

    def __str__(self):
        return f"TableLocationKeyImpl{{{self.key}}}"

class TestTable():
    class TestTableLocation():
        def __init__(self, data_schema: pa.Schema, partitioning_values: Optional[pa.Table]):
            self.partitioning_values = partitioning_values
            self.size_cb: Callable[[int], None] = lambda *x:x
            self.size_failure_cb: Callable[[], None] = lambda *x:x
            self.data: pa.Table = data_schema.empty_table()

        def append_data(self, new_data: pa.Table):
            rbs = self.data.to_batches()
            for batch in new_data.to_batches():
                rbs.append(batch)
            self.data = pa.Table.from_batches(rbs)
            self.size_cb(self.data.num_rows)

    def __init__(self, data_schema: pa.Schema, partitioning_column_schema: Optional[pa.Schema]):
        self.data_schema = data_schema
        self.partitioning_column_schema = partitioning_column_schema
        self.locations: Dict[TableLocationKey, self.TestTableLocation] = {}
        self.location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None] = lambda *x:x
        self.location_failure_cb: Callable[[str], None] = lambda *x:x

    def add_table_location(self, table_location_key: TableLocationKeyImpl,
                           partitioning_column_values: Optional[pa.Table],
                           data_values: pa.Table):
        if table_location_key in self.locations:
            raise ValueError(f"Cannot add table location {table_location_key} already exists")
        new_location = self.TestTableLocation(self.data_schema, partitioning_column_values)
        new_location.append_data(data_values)
        self.locations[table_location_key] = new_location

    def append_table_location(self, table_location_key: TableLocationKeyImpl, data_values: pa.Table):
        if table_location_key not in self.locations:
            raise ValueError(f"Cannot append to non-existent table location {table_location_key}")
        self.locations[table_location_key].append_data(data_values)

class TestBackend(TableDataServiceBackend):
    def __init__(self):
        self.tables: Dict[TableKey, TestTable] = {}

    def add_table(self, table_key: TableKeyImpl, table: TestTable):
        if table_key in self.tables:
            raise ValueError(f"{table_key} already exists")
        self.tables[table_key] = table

    def table_schema(self, table_key: TableKeyImpl,
                     schema_cb: Callable[[pa.Schema, Optional[pa.Schema]], None],
                     failure_cb: Callable[[str], None]) -> None:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return

        table = self.tables[table_key]
        schema_cb(table.data_schema, table.partitioning_column_schema)

    def table_locations(self, table_key: TableKeyImpl,
                        location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
                        success_cb: Callable[[], None],
                        failure_cb: Callable[[str], None]) -> None:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return

        for key, location in self.tables[table_key].locations:
            location_cb([key, location.partitioning_values])
        success_cb()

    def table_location_size(self, table_key: TableKeyImpl, table_location_key: TableLocationKeyImpl,
                            size_cb: Callable[[int], None],
                            failure_cb: Callable[[str], None]) -> None:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return

        table = self.tables[table_key]
        if table_location_key not in table.locations:
            failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
            return

        size_cb(table.locations[table_location_key].data.num_rows)

    def column_values(self, table_key: TableKeyImpl, table_location_key: TableLocationKeyImpl,
                      col: str, offset: int, min_rows: int, max_rows: int,
                      values_cb: Callable[[pa.Table], None],
                      failure_cb: Callable[[str], None]) -> None:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return

        table = self.tables[table_key]
        if table_location_key not in table.locations:
            failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
            return

        location = table.locations[table_location_key]
        values_cb(location.data.select([col]).slice(offset, min_rows))

    def subscribe_to_table_locations(self, table_key: TableKeyImpl,
                                    location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
                                    success_cb: Callable[[], None],
                                    failure_cb: Callable[[str], None]) -> Callable[[], None]:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return lambda *x:x

        table = self.tables[table_key]
        table.location_cb = location_cb
        table.location_failure_cb = failure_cb

        # send all existing locations straight away
        for key, location in table.locations.items():
            location_cb(key, location.partitioning_values)
        success_cb()

        def unsubscribe():
            table.location_cb = lambda *x:x
            table.location_failure_cb = lambda *x:x

        return unsubscribe

    def subscribe_to_table_location_size(self, table_key: TableKeyImpl,
                                         table_location_key: TableLocationKeyImpl,
                                         size_cb: Callable[[int], None],
                                         success_cb: Callable[[], None],
                                         failure_cb: Callable[[str], None]) -> Callable[[], None]:
        if table_key not in self.tables:
            failure_cb(f"{table_key} does not exist")
            return lambda *x:x

        table = self.tables[table_key]
        if table_location_key not in table.locations:
            failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
            return lambda *x:x

        location = table.locations[table_location_key]
        location.size_cb = size_cb
        location.failure_cb = failure_cb

        # send existing size
        size_cb(location.data.num_rows)
        success_cb()

        def unsubscribe():
            location.size_cb = lambda *x:x
            location.failure_cb = lambda *x:x

        return unsubscribe

The backend implementation would then be used as follows:

import numpy as np
import deephaven.arrow as dharrow
from deephaven.column import *
from deephaven import new_table

from deephaven.time import to_j_instant
# generate the same data for each location; noting that we do not need to include partitioning columns
location_cols = [
    bool_col(name="Boolean", data=[True, False]),
    byte_col(name="Byte", data=(1, -1)),
    char_col(name="Char", data='-1'),
    short_col(name="Short", data=[1, -1]),
    int_col(name="Int", data=[1, -1]),
    long_col(name="Long", data=[1, -1]),
    long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)),
    float_col(name="Float", data=[1.01, -1.01]),
    double_col(name="Double", data=[1.01, -1.01]),
    string_col(name="String", data=["foo", "bar"]),
    datetime_col(name="Datetime", data=[to_j_instant('2024-10-01T12:30:00 ET'), to_j_instant('2024-10-01T12:45:00 ET')]),
]
location_data = dharrow.to_arrow(new_table(cols=location_cols))

def generate_partitioning_values(ticker: str, exchange: str) -> pa.Table:
    partitioning_cols = [
        string_col(name="Ticker", data=[ticker]),
        string_col(name="Exchange", data=[exchange]),
    ]
    return dharrow.to_arrow(new_table(cols=partitioning_cols))

backend = TestBackend()
data_service = TableDataService(backend)

# generate a simple table
backend.add_table(
    TableKeyImpl("sample"), 
    TestTable(location_data.schema, generate_partitioning_values("DUMMY_VAL", "DUMMY_VAL").schema))

def add_ticker_data(ticker: str, exchange: str):
    table_key = TableKeyImpl("sample")
    table_location_key = TableLocationKeyImpl(ticker + ":" + exchange)
    if table_key not in backend.tables:
        raise ValueError(f'{table_key} does not exist')
    if table_location_key not in backend.tables[table_key].locations:
        backend.tables[table_key].add_table_location(
            table_location_key, generate_partitioning_values(ticker, exchange), location_data)
    else:
        backend.tables[table_key].append_table_location(table_location_key, location_data)


# add just a tiny amount of data
add_ticker_data("GOOG", "NYSE")
add_ticker_data("MSFT", "BZX")
add_ticker_data("MSFT", "BZY")

from deephaven.liveness_scope import LivenessScope
scope = LivenessScope()

with scope.open():
    t = data_service.make_table(TableKeyImpl("sample"), refreshing=True)

Once the table is open and visible in the REPL, could then be appended to with more calls to add_ticker_data like this:

# this adds a new table location to the already opened table
add_ticker_data("GOOG", "BZX") 

# these append to existing table locations of the already opened table
add_ticker_data("MSFT", "BZX") 
add_ticker_data("MSFT", "BZY")

There is some value in asserting that the unsubscribe callbacks returned from subscribe_* methods validate that they are only removing the callbacks that were assigned from the originating call. The system is allowed to race an unsubscribe with new subscribe_* calls. I am not certain if comparing the stored callback to the captured callback was enough, or if python needs to mark critical sections clearly; so I'm leaving this up to the documentation/python experts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants