Skip to content

Commit

Permalink
feat(workflow): add prolog and epilog to WorkflowDefinition
Browse files Browse the repository at this point in the history
Add the `prolog` and the `epilog` properties to the
`sghi.etl.core.WorkflowDefinition` interface. These properties return
functions or callable objects that are executed at the beginning and the
end of an SGHI ETL Workflow execution respectively.
  • Loading branch information
kennedykori committed Oct 28, 2024
1 parent 857b43b commit 472805e
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 9 deletions.
39 changes: 39 additions & 0 deletions src/sghi/etl/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@
"""Raw Data Type."""


# =============================================================================
# HELPERS
# =============================================================================


def _noop() -> None:
"""Do nothing."""
...


# =============================================================================
# BASE INTERFACES
# =============================================================================
Expand Down Expand Up @@ -272,3 +282,32 @@ def sink_factory(self) -> Callable[[], Sink[_PDT]]:
``Sink`` instance associated with this workflow.
"""
...

@property
def prolog(self) -> Callable[[], None]:
"""A callable to be executed at the beginning of the workflow.
If the execution of this callable fails, i.e. raises an exception, then
the main workflow is never executed, only the callable returned by the
:attr:`epilog` property is.
This can be used to validate the loaded configuration, setting up
certain resources before the workflow execution starts, etc.
The default implementation of this property returns a callable that
does nothing.
.. versionadded:: 1.2.0
"""
return _noop

@property
def epilog(self) -> Callable[[], None]:
"""A callable to be executed at the end of the workflow.
This is always executed regardless of whether the :meth:`prolog` or
workflow completed successfully or not.
The default implementation of this property returns a callable that
does nothing.
.. versionadded:: 1.2.0
"""
return _noop
111 changes: 102 additions & 9 deletions test/sghi/etl/core_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

from __future__ import annotations

from collections.abc import Iterable
from collections.abc import Callable, Iterable
from dataclasses import dataclass, field
from unittest import TestCase

import pytest
from typing_extensions import override

from sghi.disposable import not_disposed
from sghi.etl.core import Processor, Sink, Source
from sghi.etl.core import Processor, Sink, Source, WorkflowDefinition
from sghi.utils import type_fqn

# =============================================================================
# TESTS HELPERS
Expand All @@ -19,7 +20,7 @@

@dataclass(slots=True)
class IntsSupplier(Source[Iterable[int]]):
"""A simple :class:`Source` that supplies integers."""
"""A :class:`Source` that supplies integers."""

max_ints: int = field(default=10)
_is_disposed: bool = field(default=False, init=False)
Expand Down Expand Up @@ -64,14 +65,13 @@ def dispose(self) -> None:
class CollectToList(Sink[Iterable[str]]):
"""A :class:`Sink` that collects all the values it receives in a list."""

collection_target: list[str] = field()
collection_target: list[str] = field(default_factory=list)
_is_disposed: bool = field(default=False, init=False)

@not_disposed
@override
def drain(self, processed_data: Iterable[str]) -> None:
for value in processed_data:
self.collection_target.append(value)
self.collection_target.extend(processed_data)

@property
@override
Expand All @@ -83,6 +83,43 @@ def dispose(self) -> None:
self._is_disposed = True


@dataclass(frozen=True, slots=True)
class TestWorkflowDefinition(WorkflowDefinition[Iterable[int], Iterable[str]]):
"""A simple :class:`WorkflowDefinition` implementation."""

@property
@override
def description(self) -> str | None:
return None

@property
@override
def id(self) -> str:
return "test"

@property
@override
def name(self) -> str:
return "Test Workflow"

@property
@override
def processor_factory(
self,
) -> Callable[[], Processor[Iterable[int], Iterable[str]]]:
return IntsToStrings

@property
@override
def sink_factory(self) -> Callable[[], Sink[Iterable[str]]]:
return CollectToList

@property
@override
def source_factory(self) -> Callable[[], Source[Iterable[int]]]:
return IntsSupplier


# =============================================================================
# TESTS
# =============================================================================
Expand All @@ -91,7 +128,7 @@ def dispose(self) -> None:
class TestSource(TestCase):
"""Tests for the :class:`sghi.etl.core.Source` interface.
Tests for the default method implementations on the `Source` interface.
Tests for the default method implementations of the ``Source`` interface.
"""

def test_invoking_source_as_a_callable_returns_expected_value(
Expand All @@ -111,13 +148,15 @@ def test_invoking_source_as_a_callable_returns_expected_value(
IntsSupplier(max_ints=max_ints) as instance1,
IntsSupplier(max_ints=max_ints) as instance2,
):
# noinspection PyArgumentList
assert list(instance1.draw()) == list(instance2()) == [0, 1, 2, 3]


class TestProcessor(TestCase):
"""Tests for the :class:`sghi.etl.core.Processor` interface.
Tests for the default method implementations on the `Processor` interface.
Tests for the default method implementations of the ``Processor``
interface.
"""

@override
Expand Down Expand Up @@ -191,7 +230,7 @@ def test_invoking_the_process_method_raises_a_deprecation_waring(
class TestSink(TestCase):
"""Tests for the :class:`sghi.etl.core.Processor` interface.
Tests for the default method implementations on the `Sink` interface.
Tests for the default method implementations of the ``Sink`` interface.
"""

@override
Expand Down Expand Up @@ -231,3 +270,57 @@ def test_invoking_sink_as_a_callable_returns_expected_value(self) -> None:
instance2(processed_data)

assert collect1 == collect2 == ["0", "1", "2", "3", "4"]


class TestWorkflow(TestCase):
"""Tests for the :class:`sghi.etl.core.WorkflowDefinition` interface.
Tests for the default method implementations of the ``WorkflowDefinition``
interface.
"""

@override
def setUp(self) -> None:
super().setUp()
self._instance: WorkflowDefinition[Iterable[int], Iterable[str]]
self._instance = TestWorkflowDefinition()

def test_epilog_return_value(self) -> None:
"""The default implementation of
:meth:`~sghi.etl.core.WorkflowDefinition.epilog` should return a
callable that does nothing.
""" # noqa: D205
epilog: Callable[[], None] = self._instance.epilog
assert callable(epilog)

try:
epilog()
except Exception as exp: # noqa: BLE001
_fail_reason: str = (
f"The following unexpected error: '{exp!r}', was raised when "
"invoking the callable returned by the default implementation "
f"of the '{type_fqn(WorkflowDefinition)}.epilog' property. No "
"errors should be raised by the callable returned by the "
"default implementation of the said property."
)
pytest.fail(reason=_fail_reason)

def test_prolog_return_value(self) -> None:
"""The default implementation of
:meth:`~sghi.etl.core.WorkflowDefinition.prolog` should return a
callable that does nothing.
""" # noqa: D205
prolog: Callable[[], None] = self._instance.prolog
assert callable(prolog)

try:
prolog()
except Exception as exp: # noqa: BLE001
_fail_reason: str = (
f"The following unexpected error: '{exp!r}', was raised when "
"invoking the callable returned by the default implementation "
f"of the '{type_fqn(WorkflowDefinition)}.prolog' property. No "
"errors should be raised by the callable returned by the "
"default implementation of the said property."
)
pytest.fail(reason=_fail_reason)

0 comments on commit 472805e

Please sign in to comment.