Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer refactor #132

Merged
merged 20 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests-unit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
os:
- ubuntu-latest
- macos-latest
Expand Down
4 changes: 2 additions & 2 deletions examples/managers/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
PayAllPaymentManager,
PaymentPlatformNegotiator,
PoolActivityManager,
ProposalScoringBuffer,
RefreshingDemandManager,
ScoringBuffer,
WorkContext,
WorkResult,
retry,
Expand Down Expand Up @@ -58,7 +58,7 @@ async def run_on_golem(
demand_manager.get_initial_proposal,
plugins=[
NegotiatingPlugin(proposal_negotiators=negotiators),
ScoringBuffer(
ProposalScoringBuffer(
min_size=3, max_size=5, fill_concurrency_size=3, proposal_scorers=scorers
),
],
Expand Down
6 changes: 3 additions & 3 deletions examples/managers/flexible_negotiation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

from golem.managers import (
BlacklistProviderIdPlugin,
Buffer,
DefaultAgreementManager,
DefaultProposalManager,
NegotiatingPlugin,
PayAllPaymentManager,
PaymentPlatformNegotiator,
PoolActivityManager,
ProposalBuffer,
RefreshingDemandManager,
SequentialWorkManager,
WorkContext,
Expand Down Expand Up @@ -48,14 +48,14 @@ async def main():
golem,
demand_manager.get_initial_proposal,
plugins=[
Buffer(
ProposalBuffer(
min_size=10,
max_size=1000,
fill_concurrency_size=5,
),
BlacklistProviderIdPlugin(BLACKLISTED_PROVIDERS),
NegotiatingPlugin(proposal_negotiators=[PaymentPlatformNegotiator()]),
Buffer(
ProposalBuffer(
min_size=3,
max_size=5,
fill_concurrency_size=3,
Expand Down
4 changes: 2 additions & 2 deletions examples/managers/mid_agreement_payments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
from datetime import datetime, timedelta

from golem.managers import (
Buffer,
DefaultAgreementManager,
DefaultProposalManager,
MidAgreementPaymentsNegotiator,
NegotiatingPlugin,
PayAllPaymentManager,
PaymentPlatformNegotiator,
ProposalBuffer,
RefreshingDemandManager,
SequentialWorkManager,
SingleUseActivityManager,
Expand Down Expand Up @@ -91,7 +91,7 @@ async def main():
),
]
),
Buffer(
ProposalBuffer(
min_size=1,
max_size=4,
fill_concurrency_size=2,
Expand Down
8 changes: 4 additions & 4 deletions examples/managers/proposal_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from golem.managers import (
BlacklistProviderIdPlugin,
Buffer,
DefaultAgreementManager,
DefaultProposalManager,
LinearAverageCostPricing,
Expand All @@ -15,11 +14,12 @@
PaymentPlatformNegotiator,
PoolActivityManager,
PropertyValueLerpScore,
ProposalBuffer,
ProposalScoringBuffer,
RandomScore,
RefreshingDemandManager,
RejectIfCostsExceeds,
RejectProposal,
ScoringBuffer,
SequentialWorkManager,
WorkContext,
WorkResult,
Expand Down Expand Up @@ -78,7 +78,7 @@ async def main():
golem,
demand_manager.get_initial_proposal,
plugins=[
Buffer(
ProposalBuffer(
min_size=10,
max_size=1000,
),
Expand All @@ -101,7 +101,7 @@ async def main():
else None,
)
),
ScoringBuffer(
ProposalScoringBuffer(
min_size=3,
max_size=5,
fill_concurrency_size=3,
Expand Down
5 changes: 1 addition & 4 deletions examples/task_api_draft/examples/yacat.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
from collections import defaultdict
from typing import (
Any,
AsyncIterator,
Callable,
DefaultDict,
Expand Down Expand Up @@ -221,9 +220,7 @@ async def main() -> None:

#############################################
# NOT REALLY INTERESTING PARTS OF THE LOGIC
async def close_agreement_repeat_task(
func: Callable, args: Tuple[Activity, Any], e: Exception
) -> None:
async def close_agreement_repeat_task(func: Callable, args: Tuple, e: Exception) -> None:
activity, task = args
tasks_queue.put_nowait(task)
print("Task failed on", activity)
Expand Down
3 changes: 2 additions & 1 deletion examples/task_api_draft/task_api/activity_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from golem.pipeline import InputStreamExhausted
from golem.resources import Activity
from golem.utils.asyncio import ensure_cancelled


class ActivityPool:
Expand Down Expand Up @@ -94,7 +95,7 @@ async def _activity_destroyed_cleanup(
return

await activity.wait_destroyed()
manager_task.cancel()
await ensure_cancelled(manager_task)

async def _get_next_idle_activity(
self, activity_stream: AsyncIterator[Union[Activity, Awaitable[Activity]]]
Expand Down
6 changes: 3 additions & 3 deletions examples/task_api_draft/task_api/execute_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ async def random_score(proposal: Proposal) -> float:

def close_agreement_repeat_task(
task_stream: TaskDataStream[TaskData],
) -> Callable[[Callable, Tuple[Activity, TaskData], Exception], Awaitable[None]]:
) -> Callable[[Callable, Tuple, Exception], Awaitable[None]]:
async def on_exception(
func: Callable[[Activity, TaskData], Awaitable[TaskResult]],
args: Tuple[Activity, TaskData],
func: Callable,
args: Tuple,
e: Exception,
) -> None:
activity, in_data = args
Expand Down
4 changes: 2 additions & 2 deletions golem/event_bus/in_memory/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Awaitable, Callable, DefaultDict, List, Optional, Tuple, Type

from golem.event_bus.base import Event, EventBus, EventBusError, TEvent
from golem.utils.asyncio import create_task_with_logging
from golem.utils.asyncio import create_task_with_logging, ensure_cancelled
from golem.utils.logging import get_trace_id_name, trace_span

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,7 +49,7 @@ async def stop(self):
await self._event_queue.join()

if self._process_event_queue_loop_task is not None:
self._process_event_queue_loop_task.cancel()
await ensure_cancelled(self._process_event_queue_loop_task)
self._process_event_queue_loop_task = None

@trace_span(show_results=True)
Expand Down
8 changes: 4 additions & 4 deletions golem/managers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from golem.managers.payment import PayAllPaymentManager
from golem.managers.proposal import (
BlacklistProviderIdPlugin,
Buffer,
DefaultProposalManager,
LinearAverageCostPricing,
LinearCoeffsCost,
Expand All @@ -34,10 +33,11 @@
NegotiatingPlugin,
PaymentPlatformNegotiator,
PropertyValueLerpScore,
ProposalBuffer,
ProposalScoringBuffer,
ProposalScoringMixin,
RandomScore,
RejectIfCostsExceeds,
ScoringBuffer,
)
from golem.managers.work import (
ConcurrentWorkManager,
Expand Down Expand Up @@ -73,7 +73,7 @@
"PayAllPaymentManager",
"DefaultProposalManager",
"BlacklistProviderIdPlugin",
"Buffer",
"ProposalBuffer",
"PaymentPlatformNegotiator",
"MidAgreementPaymentsNegotiator",
"NegotiatingPlugin",
Expand All @@ -86,7 +86,7 @@
"LinearCoeffsCost",
"PropertyValueLerpScore",
"RandomScore",
"ScoringBuffer",
"ProposalScoringBuffer",
"SequentialWorkManager",
"ConcurrentWorkManager",
"WorkManagerPluginsMixin",
Expand Down
33 changes: 24 additions & 9 deletions golem/managers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,20 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence, Tuple, TypeVar, Union
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Protocol,
Sequence,
Tuple,
TypeVar,
Union,
runtime_checkable,
)

from golem.exceptions import GolemException
from golem.resources import (
Expand All @@ -15,6 +28,7 @@
Script,
)
from golem.resources.activity import commands
from golem.utils.typing import MaybeAwaitable
approxit marked this conversation as resolved.
Show resolved Hide resolved

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -180,11 +194,11 @@ class RejectProposal(ManagerPluginException):
pass


class ProposalNegotiator(ABC):
@abstractmethod
@runtime_checkable
class ProposalNegotiator(Protocol):
def __call__(
self, demand_data: DemandData, proposal_data: ProposalData
) -> Union[Awaitable[Optional[RejectProposal]], Optional[RejectProposal]]:
) -> MaybeAwaitable[Optional[RejectProposal]]:
...


Expand All @@ -208,21 +222,22 @@ async def stop(self) -> None:
ProposalScoringResult = Sequence[Optional[float]]


class ProposalScorer(ABC):
@abstractmethod
@runtime_checkable
class ProposalScorer(Protocol):
def __call__(
self, proposals_data: Sequence[ProposalData]
) -> Union[Awaitable[ProposalScoringResult], ProposalScoringResult]:
) -> MaybeAwaitable[ProposalScoringResult]:
...


ScorerWithOptionalWeight = Union[ProposalScorer, Tuple[float, ProposalScorer]]


class WorkManagerPlugin(ABC):
@abstractmethod
@runtime_checkable
class WorkManagerPlugin(Protocol):
def __call__(self, do_work: DoWorkCallable) -> DoWorkCallable:
...


# TODO: Make consistent naming on functions in arguments in whole project - callable or func
PricingCallable = Callable[[ProposalData], Optional[float]]
16 changes: 6 additions & 10 deletions golem/managers/demand/refreshing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Awaitable, Callable, List, Tuple

from golem.managers.base import DemandManager
Expand All @@ -10,9 +10,8 @@
from golem.payload import defaults as payload_defaults
from golem.resources import Allocation, Demand, Proposal
from golem.resources.demand.demand_builder import DemandBuilder
from golem.utils.asyncio import create_task_with_logging
from golem.utils.asyncio import ErrorReportingQueue, create_task_with_logging
from golem.utils.logging import get_trace_id_name, trace_span
from golem.utils.queue import ErrorReportingQueue

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,13 +67,10 @@ async def _wait_for_demand_to_expire(self):
if not self._demands:
return

remaining: timedelta = (
datetime.utcfromtimestamp(
self._demands[-1][0].data.properties["golem.srv.comp.expiration"] / 1000
)
- datetime.utcnow()
)
await asyncio.sleep(remaining.seconds)
await self._demands[-1][0].get_data()
expiration_date = self._demands[-1][0].get_expiration_date()
remaining = expiration_date - datetime.now(timezone.utc)
await asyncio.sleep(remaining.total_seconds())

@trace_span()
async def _create_and_subscribe_demand(self):
Expand Down
4 changes: 2 additions & 2 deletions golem/managers/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Generic, List, Optional, Sequence

from golem.managers.base import ManagerException, TPlugin
from golem.utils.asyncio import create_task_with_logging
from golem.utils.asyncio import create_task_with_logging, ensure_cancelled
from golem.utils.logging import get_trace_id_name, trace_span

logger = logging.getLogger(__name__)
Expand All @@ -29,7 +29,7 @@ async def stop(self) -> None:
raise ManagerException("Already stopped!")

if self._background_loop_task is not None:
self._background_loop_task.cancel()
await ensure_cancelled(self._background_loop_task)
self._background_loop_task = None

def is_started(self) -> bool:
Expand Down
8 changes: 4 additions & 4 deletions golem/managers/proposal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from golem.managers.proposal.default import DefaultProposalManager
from golem.managers.proposal.plugins import (
BlacklistProviderIdPlugin,
Buffer,
LinearAverageCostPricing,
LinearCoeffsCost,
LinearPerCpuAverageCostPricing,
Expand All @@ -11,16 +10,17 @@
NegotiatingPlugin,
PaymentPlatformNegotiator,
PropertyValueLerpScore,
ProposalBuffer,
ProposalScoringBuffer,
ProposalScoringMixin,
RandomScore,
RejectIfCostsExceeds,
ScoringBuffer,
)

__all__ = (
"DefaultProposalManager",
"BlacklistProviderIdPlugin",
"Buffer",
"ProposalBuffer",
"PaymentPlatformNegotiator",
"MidAgreementPaymentsNegotiator",
"NegotiatingPlugin",
Expand All @@ -33,5 +33,5 @@
"LinearPerCpuCoeffsCost",
"PropertyValueLerpScore",
"RandomScore",
"ScoringBuffer",
"ProposalScoringBuffer",
)
Loading
Loading