Skip to content

Commit

Permalink
Update unit tests for actor and actor loggers
Browse files Browse the repository at this point in the history
  • Loading branch information
naddeoa committed Oct 18, 2024
1 parent c1a339d commit 369bf40
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 72 deletions.
174 changes: 132 additions & 42 deletions python/tests/api/logger/experimental/logger/actor/test_actor_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest
from dateutil import tz

from whylogs.api.logger.experimental.logger.actor.actor import Actor
from whylogs.api.logger.experimental.logger.actor.data_logger import DataLogger
from whylogs.api.logger.experimental.logger.actor.process_actor import QueueType
from whylogs.api.logger.experimental.logger.actor.process_rolling_logger import (
Expand All @@ -26,7 +27,7 @@
from whylogs.api.logger.experimental.logger.actor.time_util import TimeGranularity
from whylogs.api.logger.segment_processing import SegmentationPartition
from whylogs.api.whylabs.session.session_manager import SessionManager, init
from whylogs.api.writer.writer import Writer, _Writable
from whylogs.api.writer.writer import Writer, _Writable # pyright: ignore[reportPrivateUsage]
from whylogs.core.dataset_profile import DatasetProfile
from whylogs.core.resolvers import STANDARD_RESOLVER
from whylogs.core.segmentation_partition import ColumnMapperFunction
Expand All @@ -42,7 +43,7 @@ class FakeWriter(Writer):
def __init__(self) -> None:
super().__init__()
self._write_calls = mp.Value("i", 0)
self._writable_files: mp.Queue = mp.Queue()
self._writable_files: mp.Queue[Any] = mp.Queue()
self._files: List[str] = []

@property
Expand All @@ -69,7 +70,7 @@ def write(
num = self._write_calls.value # type: ignore

file_path = f"/tmp/profile_{num}.bin"
file._write(path="/tmp", filename=f"profile_{num}.bin")
file._write(path="/tmp", filename=f"profile_{num}.bin") # pyright: ignore[reportPrivateUsage]
self._writable_files.put(file_path)

return True, ""
Expand Down Expand Up @@ -108,7 +109,7 @@ def init_tests() -> Generator[None, None, None]:
SessionManager.reset() # Leave it in a good state for future modules


params = [MPRollingLogger, ThreadRollingLogger]
params: List[Type[Actor[Any]]] = [MPRollingLogger, ThreadRollingLogger]
if os.name != "nt":
params.append(FasterFifoRollingLogger)

Expand All @@ -121,7 +122,7 @@ def actor(
writer = FakeWriter()

class MyWriterFactory(WriterFactory):
def create_writers(self, dataset_id: str) -> List[Writer]:
def create_writers(self, dataset_id: str, org_id: Optional[str] = None) -> List[Writer]:
# This is hard coded to create whylabs writers. We don't want to do any uploading
# during unit tests though.
return [writer]
Expand All @@ -146,7 +147,7 @@ def create_writers(self, dataset_id: str) -> List[Writer]:
pass


def test_actor_happy_path(actor: Tuple[DataLogger, FakeWriter]) -> None:
def test_actor_happy_path(actor: Tuple[DataLogger[Any], FakeWriter]) -> None:
logger, writer = actor

ms = 1689881671000
Expand All @@ -159,9 +160,9 @@ def test_actor_happy_path(actor: Tuple[DataLogger, FakeWriter]) -> None:

dt = datetime.datetime.fromtimestamp(ms / 1000.0, tz=tz.tzutc())
profile = DatasetProfile(dataset_timestamp=dt)
profile.track({"a": 1})
profile.track({"b": 2})
profile.track({"c": 3})
profile.track({"a": 1}) # pyright: ignore[reportUnknownMemberType]
profile.track({"b": 2}) # pyright: ignore[reportUnknownMemberType]
profile.track({"c": 3}) # pyright: ignore[reportUnknownMemberType]

assert_status_single(
status,
Expand Down Expand Up @@ -353,15 +354,15 @@ def converter_udf(text: Union[pd.DataFrame, Dict[str, List[Any]]]) -> Any:
assert_profile(profile, ["a", "b", "c", GENERATED_SEGMENT_COLUMN, ORIGINAL_COLUMN])


def test_actor_null_timestamp(actor: Tuple[DataLogger, FakeWriter]) -> None:
def test_actor_null_timestamp(actor: Tuple[DataLogger[Any], FakeWriter]) -> None:
logger, writer = actor

logger.log(data={"a": 1}, sync=True)
status = logger.status()
logger.close()

profile = DatasetProfile()
profile.track({"a": 1})
profile.track({"a": 1}) # pyright: ignore[reportUnknownMemberType]

assert_status_single(
status,
Expand All @@ -383,7 +384,7 @@ def test_actor_null_timestamp(actor: Tuple[DataLogger, FakeWriter]) -> None:
assert_profile(profile, ["a"])


def test_flush(actor: Tuple[DataLogger, FakeWriter]) -> None:
def test_flush(actor: Tuple[DataLogger[Any], FakeWriter]) -> None:
logger, writer = actor
ms = 1689881671000

Expand Down Expand Up @@ -412,7 +413,7 @@ def test_multiple_writers(
writer2 = FakeWriter()

class DoubleWriterFactory(WriterFactory):
def create_writers(self, dataset_id: str) -> List[Writer]:
def create_writers(self, dataset_id: str, org_id: Optional[str] = None) -> List[Writer]:
return [writer1, writer2]

if issubclass(Act, ProcessRollingLogger):
Expand All @@ -439,9 +440,9 @@ def create_writers(self, dataset_id: str) -> List[Writer]:
actor.close()

profile = DatasetProfile()
profile.track({"a": 1})
profile.track({"b": 2})
profile.track({"c": 3})
profile.track({"a": 1}) # pyright: ignore[reportUnknownMemberType]
profile.track({"b": 2}) # pyright: ignore[reportUnknownMemberType]
profile.track({"c": 3}) # pyright: ignore[reportUnknownMemberType]

assert_status_single(
status,
Expand Down Expand Up @@ -483,7 +484,7 @@ def throw_error() -> int:
writer1 = FakeWriter()

class MyWriterFactory(WriterFactory):
def create_writers(self, dataset_id: str) -> List[Writer]:
def create_writers(self, dataset_id: str, org_id: Optional[str] = None) -> List[Writer]:
return [writer1]

if issubclass(Act, ProcessRollingLogger):
Expand Down Expand Up @@ -528,7 +529,7 @@ def create_writers(self, dataset_id: str) -> List[Writer]:
assert_profile(cast(DatasetProfileView, writer1.last_writables[0]), ["a", "b", "c"])


def test_closing_works(actor: Tuple[DataLogger, FakeWriter]) -> None:
def test_closing_works(actor: Tuple[DataLogger[Any], FakeWriter]) -> None:
logger, writer = actor
ms = 1689881671000
ms1 = add_days(ms, 1)
Expand All @@ -541,13 +542,13 @@ def test_closing_works(actor: Tuple[DataLogger, FakeWriter]) -> None:
logger.close()

profile = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms / 1000.0, tz=tz.tzutc()))
profile.track({"a": 1})
profile.track({"a": 1}) # pyright: ignore[reportUnknownMemberType]

profile1 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms1 / 1000.0, tz=tz.tzutc()))
profile1.track({"b": 2})
profile1.track({"b": 2}) # pyright: ignore[reportUnknownMemberType]

profile2 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms2 / 1000.0, tz=tz.tzutc()))
profile2.track({"c": 3})
profile2.track({"c": 3}) # pyright: ignore[reportUnknownMemberType]

assert_status_single(
status,
Expand Down Expand Up @@ -587,7 +588,101 @@ def test_closing_works(actor: Tuple[DataLogger, FakeWriter]) -> None:
assert len(writer.last_writables) == 3


def test_process_throws_after_killed(actor: Tuple[DataLogger, FakeWriter]) -> None:
def test_multiple_orgs(actor: Tuple[DataLogger[Any], FakeWriter]) -> None:
logger, writer = actor
if isinstance(logger, ProcessRollingLogger):
ms = 1689881671000
ms1 = add_days(ms, 1)

# These messages would have all gone to the same model if they weren't for different orgs
logger.log(data={"a": 1}, sync=True, timestamp_ms=ms, dataset_id="model-1", org_id="org-1")
logger.log(data={"a": 1}, sync=True, timestamp_ms=ms1, dataset_id="model-1", org_id="org-1")
logger.log(data={"b": 2}, sync=True, timestamp_ms=ms, dataset_id="model-1", org_id="org-2")
logger.log(data={"c": 3}, sync=True, timestamp_ms=ms, dataset_id="model-1", org_id="org-3")
status = logger.status()
logger.close()

profile = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms / 1000.0, tz=tz.tzutc()))
profile.track({"a": 1}) # pyright: ignore[reportUnknownMemberType]

profile0 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms1 / 1000.0, tz=tz.tzutc()))
profile0.track({"a": 1}) # pyright: ignore[reportUnknownMemberType]

profile1 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms / 1000.0, tz=tz.tzutc()))
profile1.track({"b": 2}) # pyright: ignore[reportUnknownMemberType]

profile2 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms / 1000.0, tz=tz.tzutc()))
profile2.track({"c": 3}) # pyright: ignore[reportUnknownMemberType]

assert_status_single(
status,
LoggerStatus(
dataset_profiles=2,
dataset_timestamps=2,
pending_writables=0,
segment_caches=0,
writers=1,
views=[
profile.view().serialize(),
profile0.view().serialize(),
],
pending_views=[],
),
"org-1_model-1",
)

assert_status_single(
status,
LoggerStatus(
dataset_profiles=1,
dataset_timestamps=1,
pending_writables=0,
segment_caches=0,
writers=1,
views=[
profile1.view().serialize(),
],
pending_views=[],
),
"org-2_model-1",
)

assert_status_single(
status,
LoggerStatus(
dataset_profiles=1,
dataset_timestamps=1,
pending_writables=0,
segment_caches=0,
writers=1,
views=[
profile2.view().serialize(),
],
pending_views=[],
),
"org-3_model-1",
)

assert writer.write_calls == 4
assert len(writer.last_writables) == 4

assert_profile(cast(DatasetProfileView, writer.last_writables[0]), ["a"])
assert_profile(cast(DatasetProfileView, writer.last_writables[1]), ["a"])
assert_profile(cast(DatasetProfileView, writer.last_writables[2]), ["b"])
assert_profile(cast(DatasetProfileView, writer.last_writables[3]), ["c"])

# Further calls after close should throw
match = "Logger process is no longer alive. It may have been killed."

with pytest.raises(Exception, match=match):
logger.log(data={"a": 1}, sync=True, timestamp_ms=ms)

# These shouldn't change
assert writer.write_calls == 4
assert len(writer.last_writables) == 4


def test_process_throws_after_killed(actor: Tuple[DataLogger[Any], FakeWriter]) -> None:
"""
Test that the logger throws after the process is killed on the caller side. This
version of the test asserts against the sync=True behavior. First the process is force
Expand All @@ -611,9 +706,7 @@ def test_process_throws_after_killed(actor: Tuple[DataLogger, FakeWriter]) -> No
logger.close()


def test_process_throws_after_killed_delay(
actor: Tuple[DataLogger, FakeWriter],
) -> None:
def test_process_throws_after_killed_delay(actor: Tuple[DataLogger[Any], FakeWriter]) -> None:
"""
Very similar to test_process_throws_after_killed but there is a delay after the process is killed
before logging so the log() call will throw before doing any actual work with a clear error message.
Expand All @@ -628,10 +721,7 @@ def test_process_throws_after_killed_delay(
time.sleep(2) # should be enough

# Further sync calls close should throw
if isinstance(logger, ProcessRollingLogger):
match = "Logger process is no longer alive. It may have been killed."
else:
match = "Actor is closed, can't send message."
match = "Logger process is no longer alive. It may have been killed."
with pytest.raises(Exception, match=match):
# Throws even when it isn't sync
logger.log(data={"a": 1}, timestamp_ms=ms)
Expand All @@ -640,7 +730,7 @@ def test_process_throws_after_killed_delay(
logger.close()


def test_actor_multiple_days(actor: Tuple[DataLogger, FakeWriter]) -> None:
def test_actor_multiple_days(actor: Tuple[DataLogger[Any], FakeWriter]) -> None:
logger, writer = actor
ms = 1689881671000
ms1 = add_days(ms, 1)
Expand All @@ -653,13 +743,13 @@ def test_actor_multiple_days(actor: Tuple[DataLogger, FakeWriter]) -> None:
logger.close()

profile = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms / 1000.0, tz=tz.tzutc()))
profile.track({"a": 1})
profile.track({"a": 1}) # pyright: ignore[reportUnknownMemberType]

profile1 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms1 / 1000.0, tz=tz.tzutc()))
profile1.track({"b": 2})
profile1.track({"b": 2}) # pyright: ignore[reportUnknownMemberType]

profile2 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms2 / 1000.0, tz=tz.tzutc()))
profile2.track({"c": 3})
profile2.track({"c": 3}) # pyright: ignore[reportUnknownMemberType]

assert_status_single(
status,
Expand Down Expand Up @@ -687,7 +777,7 @@ def test_actor_multiple_days(actor: Tuple[DataLogger, FakeWriter]) -> None:
assert_profile(cast(DatasetProfileView, writer.last_writables[2]), ["c"])


def test_close_stops_accepting_logs(actor: Tuple[DataLogger, FakeWriter]) -> None:
def test_close_stops_accepting_logs(actor: Tuple[DataLogger[Any], FakeWriter]) -> None:
"""
Caling close on the logger should stop accepting logs and then synchronously wait
for all of the remaining messages to be processed.
Expand Down Expand Up @@ -720,7 +810,7 @@ def test_multiple_datasets() -> None:
writer = FakeWriter()

class MyWriterFactory(WriterFactory):
def create_writers(self, dataset_id: str) -> List[Writer]:
def create_writers(self, dataset_id: str, org_id: Optional[str] = None) -> List[Writer]:
return [writer]

logger = ProcessRollingLogger(
Expand Down Expand Up @@ -751,23 +841,23 @@ def create_writers(self, dataset_id: str) -> List[Writer]:
logger.close()

profile = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms / 1000.0, tz=tz.tzutc()))
profile.track({"a": 1})
profile.track({"z": 7})
profile.track({"a": 1}) # pyright: ignore[reportUnknownMemberType]
profile.track({"z": 7}) # pyright: ignore[reportUnknownMemberType]

profile1 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms1 / 1000.0, tz=tz.tzutc()))
profile1.track({"b": 2})
profile1.track({"b": 2}) # pyright: ignore[reportUnknownMemberType]

profile2 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms2 / 1000.0, tz=tz.tzutc()))
profile2.track({"c": 3})
profile2.track({"c": 3}) # pyright: ignore[reportUnknownMemberType]

profile_ms_2 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms / 1000.0, tz=tz.tzutc()))
profile_ms_2.track({"d": 1})
profile_ms_2.track({"d": 1}) # pyright: ignore[reportUnknownMemberType]

profile3 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms / 1000.0, tz=tz.tzutc()))
profile3.track({"e": 2})
profile3.track({"e": 2}) # pyright: ignore[reportUnknownMemberType]

profile4 = DatasetProfile(dataset_timestamp=datetime.datetime.fromtimestamp(ms3 / 1000.0, tz=tz.tzutc()))
profile4.track({"f": 3})
profile4.track({"f": 3}) # pyright: ignore[reportUnknownMemberType]

assert_status_single(
status,
Expand Down
Loading

0 comments on commit 369bf40

Please sign in to comment.