Skip to content

Commit

Permalink
conditional conversion of Value to rust (#6737)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergey Kulik <[email protected]>
GitOrigin-RevId: dc503d654eb60e192c2f57590d0d7e729f101939
  • Loading branch information
2 people authored and Manul from Pathway committed Jul 19, 2024
1 parent 13b00c8 commit 48d4be7
Show file tree
Hide file tree
Showing 42 changed files with 1,659 additions and 803 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Changed
- `pw.io.airbyte.read` can now be used with Airbyte connectors implemented in Python without requiring Docker.
- **BREAKING**: UDFs now verify the type of returned values at runtime. If it is possible to cast a returned value to a proper type, the values is cast. If the value does not match the expected type and can't be cast, an error is raised.
- **BREAKING**: `pw.reducers.ndarray` reducer requires input column to either have type `float`, `int` or `Array`.

## [0.13.2] - 2024-07-08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def load_to_pathway(x, y):
points_within_50 = time_series.windowby(
time_series.x,
window=pw.temporal.intervals_over(
at=time_series.x, lower_bound=-50.0, upper_bound=50.0
at=time_series.x, lower_bound=-50.0, upper_bound=50.0, is_outer=False
),
).reduce(
pw.this._pw_window_location,
Expand Down Expand Up @@ -378,7 +378,7 @@ def smooth_table(table):
points_within_50 = table.windowby(
table.x,
window=pw.temporal.intervals_over(
at=table.x, lower_bound=-50.0, upper_bound=50.0
at=table.x, lower_bound=-50.0, upper_bound=50.0, is_outer=False
),
).reduce(
pw.this._pw_window_location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def load_to_pathway(x, y):
upsampled_stream = data_stream_B.windowby(
data_stream_B.x,
window=pw.temporal.intervals_over(
at=data_stream_A.x, lower_bound=-100.0, upper_bound=100.0
at=data_stream_A.x, lower_bound=-100.0, upper_bound=100.0, is_outer=False
),
).reduce(
x=pw.this._pw_window_location,
Expand Down
28 changes: 20 additions & 8 deletions python/pathway/engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Pointer(Generic[_T]):
def ref_scalar(*args, optional=False) -> Pointer: ...
def ref_scalar_with_instance(*args, instance: Value, optional=False) -> Pointer: ...

class PathwayType(Enum):
class PathwayType:
ANY: PathwayType
STRING: PathwayType
INT: PathwayType
Expand All @@ -40,11 +40,17 @@ class PathwayType(Enum):
DATE_TIME_NAIVE: PathwayType
DATE_TIME_UTC: PathwayType
DURATION: PathwayType
ARRAY: PathwayType
@staticmethod
def array(dim: int | None, wrapped: PathwayType) -> PathwayType: ...
JSON: PathwayType
TUPLE: PathwayType
@staticmethod
def tuple(*args: PathwayType) -> PathwayType: ...
@staticmethod
def list(arg: PathwayType) -> PathwayType: ...
BYTES: PathwayType
PY_OBJECT_WRAPPER: PathwayType
@staticmethod
def optional(arg: PathwayType) -> PathwayType: ...

class ConnectorMode(Enum):
STATIC: ConnectorMode
Expand Down Expand Up @@ -133,9 +139,11 @@ class DataRow:
self,
key: Pointer,
values: list[Value],
*,
time: int = 0,
diff: int = 1,
shard: int | None = None,
dtypes: list[PathwayType],
) -> None: ...

class MissingValueError(BaseException):
Expand Down Expand Up @@ -202,12 +210,16 @@ class BinaryOperator:

class Expression:
@staticmethod
def const(value: Value) -> Expression: ...
def const(value: Value, dtype: PathwayType) -> Expression: ...
@staticmethod
def argument(index: int) -> Expression: ...
@staticmethod
def apply(
fun: Callable, /, *args: Expression, propagate_none=False
fun: Callable,
/,
*args: Expression,
dtype: PathwayType,
propagate_none: bool = False,
) -> Expression: ...
@staticmethod
def is_none(expr: Expression) -> Expression: ...
Expand Down Expand Up @@ -480,6 +492,7 @@ class Scope:
propagate_none: bool,
deterministic: bool,
properties: TableProperties,
dtype: PathwayType,
) -> Table: ...
def gradual_broadcast(
self,
Expand Down Expand Up @@ -713,7 +726,7 @@ def run_with_new_graph(
def unsafe_make_pointer(arg) -> Pointer: ...

class DataFormat:
value_fields: Any
value_fields: list[ValueField]

def __init__(self, *args, **kwargs): ...

Expand All @@ -735,7 +748,6 @@ class DataStorage:
object_pattern: str
mock_events: dict[tuple[str, int], list[SnapshotEvent]] | None
table_name: str | None
column_names: list[str] | None
def __init__(self, *args, **kwargs): ...

class CsvParserSettings:
Expand All @@ -746,7 +758,7 @@ class AwsS3Settings:

class ValueField:
name: str
def __init__(self, name: str, type_: PathwayType, *, is_optional: bool = False): ...
def __init__(self, name: str, type_: PathwayType): ...
def set_default(self, *args, **kwargs): ...

class PythonSubject:
Expand Down
19 changes: 5 additions & 14 deletions python/pathway/internals/_io_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import boto3
import boto3.session

from pathway.internals import api, dtype as dt, schema
from pathway.internals import api, schema
from pathway.internals.table import Table
from pathway.internals.trace import trace_user_frame

Expand Down Expand Up @@ -133,8 +133,7 @@ def _format_output_value_fields(table: Table) -> list[api.ValueField]:
value_fields.append(
api.ValueField(
column_name,
column_data.dtype.map_to_engine(),
is_optional=isinstance(column_data.dtype, dt.Optional),
column_data.dtype.to_engine(),
)
)

Expand All @@ -146,19 +145,11 @@ def _form_value_fields(schema: type[schema.Schema]) -> list[api.ValueField]:
default_values = schema.default_values()
result = []

# XXX fix mapping schema types to PathwayType
types = {
name: (dt.unoptionalize(dtype).to_engine(), isinstance(dtype, dt.Optional))
for name, dtype in schema._dtypes().items()
}
types = {name: dtype.to_engine() for name, dtype in schema._dtypes().items()}

for f in schema.column_names():
simple_type, is_optional = types.get(f, (None, False))
if (
simple_type is None
): # types can contain None if there is field of type None in the schema
simple_type = api.PathwayType.ANY
value_field = api.ValueField(f, simple_type, is_optional=is_optional)
dtype = types.get(f, api.PathwayType.ANY)
value_field = api.ValueField(f, dtype)
if f in default_values:
value_field.set_default(default_values[f])
result.append(value_field)
Expand Down
12 changes: 8 additions & 4 deletions python/pathway/internals/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ def static_table_from_pandas(
ordinary_columns = [
column for column in df.columns if column not in PANDAS_PSEUDOCOLUMNS
]
if column_types:
dtypes = [column_types[c].to_engine() for c in ordinary_columns]
else:
dtypes = [PathwayType.ANY] * len(ordinary_columns)

if connector_properties is None:
column_properties = []
Expand All @@ -163,9 +167,7 @@ def static_table_from_pandas(
if v is not None:
dtype = type(v)
break
column_properties.append(
ColumnProperties(dtype=dt.wrap(dtype).map_to_engine())
)
column_properties.append(ColumnProperties(dtype=dt.wrap(dtype).to_engine()))
connector_properties = ConnectorProperties(column_properties=column_properties)

assert len(connector_properties.column_properties) == len(
Expand All @@ -181,7 +183,9 @@ def static_table_from_pandas(
if diff not in [-1, 1]:
raise ValueError(f"Column {DIFF_PSEUDOCOLUMN} can only contain 1 and -1.")
shard = data[SHARD_PSEUDOCOLUMN][i] if SHARD_PSEUDOCOLUMN in data else None
input_row = DataRow(key, values, time=time, diff=diff, shard=shard)
input_row = DataRow(
key, values, time=time, diff=diff, shard=shard, dtypes=dtypes
)
input_data.append(input_row)

return scope.static_table(input_data, connector_properties)
Expand Down
2 changes: 1 addition & 1 deletion python/pathway/internals/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def connector_properties(self) -> api.ConnectorProperties:
for column in self.schema.columns().values():
columns.append(
api.ColumnProperties(
dtype=column.dtype.map_to_engine(),
dtype=column.dtype.to_engine(),
append_only=column.append_only,
)
)
Expand Down
22 changes: 14 additions & 8 deletions python/pathway/internals/dtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@
class DType(ABC):
_cache: dict[typing.Any, DType] = {}

def to_engine(self) -> api.PathwayType | None:
return None

def map_to_engine(self) -> api.PathwayType:
return self.to_engine() or api.PathwayType.ANY
@abstractmethod
def to_engine(self) -> api.PathwayType: ...

@abstractmethod
def is_value_compatible(self, arg) -> bool: ...
Expand Down Expand Up @@ -125,6 +122,9 @@ def __new__(cls) -> _NoneDType:
def is_value_compatible(self, arg):
return arg is None or isinstance(arg, pd._libs.missing.NAType)

def to_engine(self) -> api.PathwayType:
return api.PathwayType.ANY

@property
def typehint(self) -> None:
return None
Expand Down Expand Up @@ -184,6 +184,9 @@ def __new__(
def is_value_compatible(self, arg):
return callable(arg)

def to_engine(self) -> api.PathwayType:
return api.PathwayType.ANY # also passed to the engine as column properties

@cached_property
def typehint(self) -> typing.Any:
if isinstance(self.arg_types, EllipsisType):
Expand All @@ -207,7 +210,7 @@ def _set_args(self, n_dim, wrapped):
self.n_dim = n_dim

def to_engine(self) -> api.PathwayType:
return api.PathwayType.ARRAY
return api.PathwayType.array(self.n_dim, self.wrapped.to_engine())

def __new__(cls, n_dim, wrapped) -> Array:
dtype = wrap(wrapped)
Expand Down Expand Up @@ -298,6 +301,9 @@ def __repr__(self):
def _set_args(self, wrapped):
self.wrapped = wrapped

def to_engine(self) -> api.PathwayType:
return api.PathwayType.optional(self.wrapped.to_engine())

def __new__(cls, arg: DType) -> DType: # type:ignore[misc]
arg = wrap(arg)
if arg == NONE or isinstance(arg, Optional) or arg == ANY:
Expand Down Expand Up @@ -327,7 +333,7 @@ def _set_args(self, args):
self.args = args

def to_engine(self) -> PathwayType:
return api.PathwayType.TUPLE
return api.PathwayType.tuple(*[arg.to_engine() for arg in self.args])

def __new__(cls, *args: DType | EllipsisType) -> Tuple | List: # type: ignore[misc]
if any(isinstance(arg, EllipsisType) for arg in args):
Expand Down Expand Up @@ -391,7 +397,7 @@ def _set_args(self, wrapped):
self.wrapped = wrapped

def to_engine(self) -> PathwayType:
return api.PathwayType.TUPLE
return api.PathwayType.list(self.wrapped.to_engine())

def is_value_compatible(self, arg):
return isinstance(arg, (tuple, list)) and all(
Expand Down
17 changes: 13 additions & 4 deletions python/pathway/internals/expressions/numerical.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,16 @@ def round(self, decimals: expr.ColumnExpression | int = 0) -> expr.ColumnExpress
(
(dt.INT, dt.INT),
dt.INT,
lambda x, y: api.Expression.apply(round, x, y),
lambda x, y: api.Expression.apply(
round, x, y, dtype=dt.INT.to_engine()
),
),
(
(dt.FLOAT, dt.INT),
dt.FLOAT,
lambda x, y: api.Expression.apply(round, x, y),
lambda x, y: api.Expression.apply(
round, x, y, dtype=dt.FLOAT.to_engine()
),
),
),
"num.round",
Expand Down Expand Up @@ -175,14 +179,18 @@ def fill_na(self, default_value: int | float) -> expr.ColumnExpression:
dt.FLOAT,
dt.FLOAT,
lambda x: api.Expression.apply(
lambda y: float(default_value) if math.isnan(y) else y, x
lambda y: float(default_value) if math.isnan(y) else y,
x,
dtype=dt.FLOAT.to_engine(),
),
),
(
dt.Optional(dt.INT),
dt.INT,
lambda x: api.Expression.apply(
lambda y: int(default_value) if y is None else y, x
lambda y: int(default_value) if y is None else y,
x,
dtype=dt.INT.to_engine(),
),
),
(
Expand All @@ -195,6 +203,7 @@ def fill_na(self, default_value: int | float) -> expr.ColumnExpression:
else y
),
x,
dtype=dt.FLOAT.to_engine(),
),
),
),
Expand Down
Loading

0 comments on commit 48d4be7

Please sign in to comment.