Skip to content

Commit

Permalink
core: Add types for core attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Jun 5, 2023
1 parent bd2be75 commit 46388fd
Show file tree
Hide file tree
Showing 36 changed files with 217 additions and 135 deletions.
6 changes: 6 additions & 0 deletions src/saturn_engine/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from .pipeline import ResourceUsed
from .resource import Resource
from .topic import TopicMessage
from .types import Cursor
from .types import JobId
from .types import MessageId

__all__ = [
"PipelineInfo",
Expand All @@ -16,4 +19,7 @@
"Resource",
"ResourceUsed",
"TopicMessage",
"MessageId",
"Cursor",
"JobId",
]
12 changes: 7 additions & 5 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from pydantic import BaseModel
from pydantic import dataclasses

from saturn_engine.core import PipelineInfo # noqa: F401 # Reexport for public API
from saturn_engine.core import QueuePipeline
from .pipeline import PipelineInfo # noqa: F401 # Reexport for public API
from .pipeline import QueuePipeline
from .types import Cursor
from .types import JobId

T = TypeVar("T")

Expand Down Expand Up @@ -102,16 +104,16 @@ class InventoriesResponse(ListResponse[ComponentDefinition]):

@dataclasses.dataclass
class JobItem:
name: str
name: JobId
started_at: datetime
completed_at: Optional[datetime] = None
cursor: Optional[str] = None
cursor: Optional[Cursor] = None
error: Optional[str] = None


@dataclasses.dataclass
class JobInput:
cursor: Optional[str] = None
cursor: Optional[Cursor] = None
completed_at: Optional[datetime] = None
error: Optional[str] = None

Expand Down
6 changes: 5 additions & 1 deletion src/saturn_engine/core/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
import dataclasses
import uuid

from .types import MessageId


@dataclasses.dataclass
class TopicMessage:
#: Message arguments used to call the pipeline.
args: dict[str, Optional[Any]]

#: Unique message id.
id: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4()))
id: MessageId = dataclasses.field(
default_factory=lambda: MessageId(str(uuid.uuid4()))
)

#: Tags to attach to observability (logging, events, metrics and tracing).
tags: dict[str, str] = dataclasses.field(default_factory=dict)
Expand Down
5 changes: 5 additions & 0 deletions src/saturn_engine/core/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import typing as t

JobId = t.NewType("JobId", str)
MessageId = t.NewType("MessageId", str)
Cursor = t.NewType("Cursor", str)
6 changes: 4 additions & 2 deletions src/saturn_engine/models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from sqlalchemy.orm import relationship
from sqlalchemy.sql.sqltypes import Text

from saturn_engine.core import Cursor
from saturn_engine.core import JobId
from saturn_engine.core.api import JobItem
from saturn_engine.utils import utcnow

Expand Down Expand Up @@ -58,10 +60,10 @@ def __init__(

def as_core_item(self) -> JobItem:
return JobItem(
name=self.name,
name=JobId(self.name),
completed_at=self.completed_at,
started_at=self.started_at,
cursor=self.cursor,
cursor=Cursor(self.cursor) if self.cursor else None,
error=self.error,
)

Expand Down
3 changes: 2 additions & 1 deletion src/saturn_engine/utils/tester/config/inventory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pydantic import dataclasses

from saturn_engine.core import Cursor
from saturn_engine.utils.declarative_config import BaseObject
from saturn_engine.worker.inventories import Item

Expand All @@ -16,7 +17,7 @@ class InventoryTestSpec:
selector: InventorySelector
items: list[Item]
limit: Optional[int] = None
after: Optional[str] = None
after: Optional[Cursor] = None


@dataclasses.dataclass
Expand Down
3 changes: 2 additions & 1 deletion src/saturn_engine/utils/tester/inventory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio

from saturn_engine.config import default_config_with_env
from saturn_engine.core import Cursor
from saturn_engine.utils.options import asdict
from saturn_engine.worker import work_factory
from saturn_engine.worker.services.manager import ServicesManager
Expand All @@ -17,7 +18,7 @@ def run_saturn_inventory(
static_definitions: StaticDefinitions,
inventory_name: str,
limit: Optional[int] = None,
after: Optional[str] = None,
after: Optional[Cursor] = None,
) -> list[dict]:
inventory_item = static_definitions.inventories[inventory_name]
inventory = work_factory.build_inventory(
Expand Down
5 changes: 4 additions & 1 deletion src/saturn_engine/utils/tester/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import click

from saturn_engine.core import Cursor
from saturn_engine.utils.declarative_config import UncompiledObject
from saturn_engine.utils.declarative_config import load_uncompiled_objects_from_path
from saturn_engine.utils.options import fromdict
Expand Down Expand Up @@ -154,7 +155,9 @@ def run(
@click.option("--name", type=str, required=True)
@click.option("--limit", type=int, required=True, default=1)
@click.option("--after", type=str, required=False)
def show_inventory(topology: str, name: str, limit: int, after: Optional[str]) -> None:
def show_inventory(
topology: str, name: str, limit: int, after: Optional[Cursor]
) -> None:
static_definitions = compile_static_definitions(
load_uncompiled_objects_from_path(topology),
)
Expand Down
5 changes: 3 additions & 2 deletions src/saturn_engine/worker/inventories/batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import asyncstdlib as alib

from saturn_engine.core import Cursor
from saturn_engine.core.api import ComponentDefinition
from saturn_engine.worker.services import Services

Expand All @@ -28,13 +29,13 @@ def __init__(

self.inventory = build_inventory(options.inventory, services=services)

async def next_batch(self, after: Optional[str] = None) -> list[Item]:
async def next_batch(self, after: Optional[Cursor] = None) -> list[Item]:
batch: list[Item] = await alib.list(
alib.islice(self.inventory.iterate(after=after), self.batch_size)
)
return batch

async def iterate(self, after: Optional[str] = None) -> AsyncIterator[Item]:
async def iterate(self, after: Optional[Cursor] = None) -> AsyncIterator[Item]:
while True:
batch = await self.next_batch(after)
if not batch:
Expand Down
4 changes: 3 additions & 1 deletion src/saturn_engine/worker/inventories/chained.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from collections.abc import AsyncIterator

from saturn_engine.core import Cursor

from . import Inventory
from .multi import MultiInventory
from .multi import MultiItems


class ChainedInventory(MultiInventory):
async def inventories_iterator(
self, *, inventories: list[tuple[str, Inventory]], after: dict[str, str]
self, *, inventories: list[tuple[str, Inventory]], after: dict[str, Cursor]
) -> AsyncIterator[MultiItems]:
start_inventory = 0
if after:
Expand Down
7 changes: 5 additions & 2 deletions src/saturn_engine/worker/inventories/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import dataclasses

from saturn_engine.core import Cursor
from saturn_engine.core import MessageId

from . import Inventory
from . import Item

Expand All @@ -14,9 +17,9 @@ class Options:
def __init__(self, options: Options, **kwrags: object) -> None:
self.count = options.count or 1000

async def next_batch(self, after: Optional[str] = None) -> list[Item]:
async def next_batch(self, after: Optional[Cursor] = None) -> list[Item]:
n = int(after) + 1 if after is not None else 0
n_end = min(n + 100, self.count)
if n_end == n:
return []
return [Item(id=str(i), args={"n": i}) for i in range(n, n_end)]
return [Item(id=MessageId(str(i)), args={"n": i}) for i in range(n, n_end)]
4 changes: 3 additions & 1 deletion src/saturn_engine/worker/inventories/joined.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from collections.abc import AsyncIterator

from saturn_engine.core import Cursor

from . import Inventory
from .multi import MultiInventory
from .multi import MultiItems


class JoinedInventory(MultiInventory):
async def inventories_iterator(
self, *, inventories: list[tuple[str, Inventory]], after: dict[str, str]
self, *, inventories: list[tuple[str, Inventory]], after: dict[str, Cursor]
) -> AsyncIterator[MultiItems]:
name, inventory = inventories[0]
last_cursor = after.pop(name, None)
Expand Down
44 changes: 25 additions & 19 deletions src/saturn_engine/worker/inventories/joined_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import json
from collections.abc import AsyncIterator

from saturn_engine.core import Cursor
from saturn_engine.core import MessageId
from saturn_engine.core.api import ComponentDefinition
from saturn_engine.worker.services import Services

Expand Down Expand Up @@ -42,7 +44,7 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
options.sub_inventory, services=services
)

async def iterate(self, after: t.Optional[str] = None) -> AsyncIterator[Item]:
async def iterate(self, after: t.Optional[Cursor] = None) -> AsyncIterator[Item]:
cursors = json.loads(after) if after else {}
inventory_cursor = cursors.get(self.inventory_name)
sub_inventory_cursor = cursors.get(self.sub_inventory_name)
Expand All @@ -67,25 +69,29 @@ async def iterate(self, after: t.Optional[str] = None) -> AsyncIterator[Item]:
}

yield Item(
id=json.dumps(
{
self.inventory_name: item.id,
self.sub_inventory_name: sub_item.id,
}
id=MessageId(
json.dumps(
{
self.inventory_name: item.id,
self.sub_inventory_name: sub_item.id,
}
)
),
cursor=json.dumps(
{
**(
{self.inventory_name: inventory_cursor}
if inventory_cursor
else {}
),
**(
{self.sub_inventory_name: sub_item.cursor}
if sub_item.cursor
else {}
),
}
cursor=Cursor(
json.dumps(
{
**(
{self.inventory_name: inventory_cursor}
if inventory_cursor
else {}
),
**(
{self.sub_inventory_name: sub_item.cursor}
if sub_item.cursor
else {}
),
}
)
),
args=args,
)
Expand Down
10 changes: 7 additions & 3 deletions src/saturn_engine/worker/inventories/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import json
from collections.abc import AsyncIterator

from saturn_engine.core import Cursor
from saturn_engine.core import MessageId
from saturn_engine.core.api import ComponentDefinition
from saturn_engine.worker.services import Services

Expand Down Expand Up @@ -48,7 +50,7 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
(inventory.name, build_inventory(inventory, services=services))
)

async def iterate(self, after: Optional[str] = None) -> AsyncIterator[Item]:
async def iterate(self, after: Optional[Cursor] = None) -> AsyncIterator[Item]:
cursors = json.loads(after) if after else {}

async for item in self.inventories_iterator(
Expand All @@ -64,12 +66,14 @@ async def iterate(self, after: Optional[str] = None) -> AsyncIterator[Item]:
self.alias: args,
}
yield Item(
id=json.dumps(item.ids), cursor=json.dumps(item.cursors), args=args
id=MessageId(json.dumps(item.ids)),
cursor=Cursor(json.dumps(item.cursors)),
args=args,
)

@abc.abstractmethod
async def inventories_iterator(
self, *, inventories: list[tuple[str, Inventory]], after: dict[str, str]
self, *, inventories: list[tuple[str, Inventory]], after: dict[str, Cursor]
) -> AsyncIterator[MultiItems]:
raise NotImplementedError()
yield
7 changes: 5 additions & 2 deletions src/saturn_engine/worker/inventories/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import dataclasses

from saturn_engine.core import Cursor
from saturn_engine.core import MessageId

from . import Inventory
from . import Item

Expand All @@ -14,9 +17,9 @@ class Options:
def __init__(self, options: Options, **kwargs: object) -> None:
self.items = options.items

async def next_batch(self, after: t.Optional[str] = None) -> list[Item]:
async def next_batch(self, after: t.Optional[Cursor] = None) -> list[Item]:
begin = int(after) + 1 if after else 0
return [
Item(id=str(i), args=args)
Item(id=MessageId(str(i)), args=args)
for i, args in enumerate(self.items[begin:], start=begin)
]
Loading

0 comments on commit 46388fd

Please sign in to comment.