Skip to content

Commit

Permalink
[SYSTEMDS-3758] Python API Builtin triu, tril, argmin, argmax and cas…
Browse files Browse the repository at this point in the history
…ting Scalar <-> Matrix <-> Frame

Closes #2113
  • Loading branch information
e-strauss committed Sep 26, 2024
1 parent 504e751 commit d80e3a6
Show file tree
Hide file tree
Showing 8 changed files with 406 additions and 3 deletions.
8 changes: 7 additions & 1 deletion src/main/python/systemds/operator/nodes/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@
from systemds.context import SystemDSContext


class Frame(OperationNode):
def to_frame(self):
return Frame(self.sds_context, "as.frame", [self])


OperationNode.to_frame = to_frame


class Frame(OperationNode):
_pd_dataframe: pd.DataFrame

def __init__(
Expand Down
91 changes: 91 additions & 0 deletions src/main/python/systemds/operator/nodes/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
)


def to_matrix(self):
return Matrix(self.sds_context, "as.matrix", [self])


OperationNode.to_matrix = to_matrix


class Matrix(OperationNode):
_np_array: np.array

Expand Down Expand Up @@ -842,5 +849,89 @@ def ifft(self, imag_input: "Matrix" = None) -> "MultiReturn":

return ifft_node

def triu(self, include_diagonal=True, return_values=True) -> "Matrix":
"""Selects the upper triangular part of a matrix, configurable to include the diagonal and return values or ones
:param include_diagonal: boolean, default True
:param return_values: boolean, default True, if set to False returns ones
:return: `Matrix`
"""
named_input_nodes = {
"target": self,
"diag": self.sds_context.scalar(include_diagonal),
"values": self.sds_context.scalar(return_values),
}
return Matrix(
self.sds_context, "upper.tri", named_input_nodes=named_input_nodes
)

def tril(self, include_diagonal=True, return_values=True) -> "Matrix":
"""Selects the lower triangular part of a matrix, configurable to include the diagonal and return values or ones
:param include_diagonal: boolean, default True
:param return_values: boolean, default True, if set to False returns ones
:return: `Matrix`
"""
named_input_nodes = {
"target": self,
"diag": self.sds_context.scalar(include_diagonal),
"values": self.sds_context.scalar(return_values),
}
return Matrix(
self.sds_context, "lower.tri", named_input_nodes=named_input_nodes
)

def argmin(self, axis: int = None) -> "OperationNode":
"""Return the index of the minimum if axis is None or a column vector for row-wise / column-wise minima
computation.
:param axis: can be 0 or 1 to do either row or column sums
:return: `Matrix` representing operation for row / columns or 'Scalar' representing operation for complete
"""
if axis == 0:
return Matrix(self.sds_context, "rowIndexMin", [self.t()])
elif axis == 1:
return Matrix(self.sds_context, "rowIndexMin", [self])
elif axis is None:
return Matrix(
self.sds_context,
"rowIndexMin",
[self.reshape(1, self.nCol() * self.nRow())],
).to_scalar()
else:
raise ValueError(
f"Axis has to be either 0, 1 or None, for column, row or complete {self.operation}"
)

def argmax(self, axis: int = None) -> "OperationNode":
"""Return the index of the maximum if axis is None or a column vector for row-wise / column-wise maxima
computation.
:param axis: can be 0 or 1 to do either row or column sums
:return: `Matrix` representing operation for row / columns or 'Scalar' representing operation for complete
"""
if axis == 0:
return Matrix(self.sds_context, "rowIndexMax", [self.t()])
elif axis == 1:
return Matrix(self.sds_context, "rowIndexMax", [self])
elif axis is None:
return Matrix(
self.sds_context,
"rowIndexMax",
[self.reshape(1, self.nCol() * self.nRow())],
).to_scalar()
else:
raise ValueError(
f"Axis has to be either 0, 1 or None, for column, row or complete {self.operation}"
)

def reshape(self, rows, cols=1):
"""Gives a new shape to a matrix without changing its data.
:param rows: number of rows
:param cols: number of columns, defaults to 1
:return: `Matrix` representing operation"""
return Matrix(self.sds_context, "matrix", [self, rows, cols])

def __str__(self):
return "MatrixNode"
24 changes: 23 additions & 1 deletion src/main/python/systemds/operator/nodes/scalar.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@
VALID_ARITHMETIC_TYPES,
VALID_INPUT_TYPES,
)
from systemds.utils.converters import numpy_to_matrix_block


def to_scalar(self):
return Scalar(self.sds_context, "as.scalar", [self])


OperationNode.to_scalar = to_scalar


class Scalar(OperationNode):
Expand Down Expand Up @@ -67,6 +73,8 @@ def code_line(
named_input_vars: Dict[str, str],
) -> str:
if self.__assign:
if type(self.operation) is bool:
self.operation = "TRUE" if self.operation else "FALSE"
return f"{var_name}={self.operation};"
else:
return super().code_line(var_name, unnamed_input_vars, named_input_vars)
Expand Down Expand Up @@ -289,6 +297,20 @@ def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> "Scalar":
"""
return Scalar(self.sds_context, "toString", [self], named_input_nodes=kwargs)

def to_int(self) -> "Scalar":
return Scalar(
self.sds_context,
"as.integer",
[self],
)

def to_boolean(self) -> "Scalar":
return Scalar(
self.sds_context,
"as.logical",
[self],
)

def isNA(self) -> "Scalar":
"""Computes a boolean indicator matrix of the same shape as the input, indicating where NA (not available)
values are located. Currently, NA is only capturing NaN values.
Expand Down
9 changes: 9 additions & 0 deletions src/main/python/systemds/operator/operation_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,12 @@ def print(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> "OperationNode":
To get the returned string look at the stdout of SystemDSContext.
"""
return OperationNode(self.sds_context, "print", [self], kwargs)

def to_frame(self):
raise NotImplementedError("should have been overwritten in frame.py")

def to_matrix(self):
raise NotImplementedError("should have been overwritten in matrix.py")

def to_scalar(self):
raise NotImplementedError("should have been overwritten in scalar.py")
2 changes: 1 addition & 1 deletion src/main/python/systemds/utils/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
np.dtype(np.int32): jvm.org.apache.sysds.common.Types.ValueType.INT32,
np.dtype(np.float32): jvm.org.apache.sysds.common.Types.ValueType.FP32,
np.dtype(np.uint8): jvm.org.apache.sysds.common.Types.ValueType.UINT8,
np.dtype(np.character): jvm.org.apache.sysds.common.Types.ValueType.CHARACTER,
np.dtype(np.str_): jvm.org.apache.sysds.common.Types.ValueType.CHARACTER,
}
schema = []
col_names = []
Expand Down
95 changes: 95 additions & 0 deletions src/main/python/tests/matrix/test_arg_min_max.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -------------------------------------------------------------

import unittest
import numpy as np
from systemds.context import SystemDSContext

np.random.seed(7)
m = np.array([[1, 2, 3], [6, 5, 4], [8, 7, 9]])
M = np.random.random_integers(9, size=300).reshape(100, 3)
p = np.array([0.25, 0.5, 0.75])
m2 = np.array([1, 2, 3, 4, 5])
w2 = np.array([1, 1, 1, 1, 5])


def weighted_quantiles(values, weights, quantiles=0.5):
i = np.argsort(values)
c = np.cumsum(weights[i])
return values[i[np.searchsorted(c, np.array(quantiles) * c[-1])]]


class TestARGMINMAX(unittest.TestCase):
def setUp(self):
self.sds = SystemDSContext()

def tearDown(self):
self.sds.close()

def test_argmin_basic1(self):
sds_input = self.sds.from_numpy(m)
sds_result = sds_input.argmin(0).compute()
np_result = np.argmin(m, axis=0).reshape(-1, 1)
assert np.allclose(sds_result - 1, np_result, 1e-9)

def test_argmin_basic2(self):
sds_input = self.sds.from_numpy(m)
sds_result = sds_input.argmin(1).compute()
np_result = np.argmin(m, axis=1).reshape(-1, 1)
assert np.allclose(sds_result - 1, np_result, 1e-9)

def test_argmin_basic3(self):
sds_input = self.sds.from_numpy(m)
sds_result = sds_input.argmin().compute(verbose=True)
np_result = np.argmin(m)
assert np.allclose(sds_result - 1, np_result, 1e-9)

def test_argmin_basic4(self):
sds_input = self.sds.from_numpy(m)
with self.assertRaises(ValueError):
sds_input.argmin(3)

def test_argmax_basic1(self):
sds_input = self.sds.from_numpy(m)
sds_result = sds_input.argmax(0).compute()
np_result = np.argmax(m, axis=0).reshape(-1, 1)
assert np.allclose(sds_result - 1, np_result, 1e-9)

def test_argmax_basic2(self):
sds_input = self.sds.from_numpy(m)
sds_result = sds_input.argmax(1).compute()
np_result = np.argmax(m, axis=1).reshape(-1, 1)
assert np.allclose(sds_result - 1, np_result, 1e-9)

def test_argmax_basic3(self):
sds_input = self.sds.from_numpy(m)
sds_result = sds_input.argmax().compute()
np_result = np.argmax(m)
assert np.allclose(sds_result - 1, np_result, 1e-9)

def test_argmax_basic4(self):
sds_input = self.sds.from_numpy(m)
with self.assertRaises(ValueError):
sds_input.argmax(3)


if __name__ == "__main__":
unittest.main()
76 changes: 76 additions & 0 deletions src/main/python/tests/matrix/test_casting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -------------------------------------------------------------

import unittest
import numpy as np
from systemds.context import SystemDSContext
from pandas import DataFrame
from numpy import ndarray


class TestDIAG(unittest.TestCase):
def setUp(self):
self.sds = SystemDSContext()

def tearDown(self):
self.sds.close()

def test_casting_basic1(self):
sds_input = self.sds.from_numpy(np.array([[1]]))
sds_result = sds_input.to_scalar().compute()
self.assertTrue(type(sds_result) == float)

def test_casting_basic2(self):
sds_input = self.sds.from_numpy(np.array([[1]]))
sds_result = sds_input.to_frame().compute()
self.assertTrue(type(sds_result) == DataFrame)

def test_casting_basic3(self):
sds_result = self.sds.scalar(1.0).to_frame().compute()
self.assertTrue(type(sds_result) == DataFrame)

def test_casting_basic4(self):
sds_result = self.sds.scalar(1.0).to_matrix().compute()
self.assertTrue(type(sds_result) == ndarray)

def test_casting_basic5(self):
ar = ndarray((2, 2))
df = DataFrame(ar)
sds_result = self.sds.from_pandas(df).to_matrix().compute()
self.assertTrue(type(sds_result) == ndarray and np.allclose(ar, sds_result))

def test_casting_basic6(self):
ar = ndarray((1, 1))
df = DataFrame(ar)
sds_result = self.sds.from_pandas(df).to_scalar().compute()
self.assertTrue(type(sds_result) == float)

def test_casting_basic7(self):
sds_result = self.sds.scalar(1.0).to_int().compute()
self.assertTrue(type(sds_result) == int and sds_result)

def test_casting_basic8(self):
sds_result = self.sds.scalar(1.0).to_boolean().compute()
self.assertTrue(type(sds_result) == bool)


if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit d80e3a6

Please sign in to comment.