Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer refactor #132

Merged
merged 20 commits into from
Feb 2, 2024
4 changes: 2 additions & 2 deletions examples/managers/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
PayAllPaymentManager,
PaymentPlatformNegotiator,
PoolActivityManager,
ProposalScoringBuffer,
RefreshingDemandManager,
ScoringBufferPlugin,
WorkContext,
WorkResult,
retry,
Expand Down Expand Up @@ -58,7 +58,7 @@ async def run_on_golem(
demand_manager.get_initial_proposal,
plugins=[
NegotiatingPlugin(proposal_negotiators=negotiators),
ScoringBufferPlugin(
ProposalScoringBuffer(
min_size=3, max_size=5, fill_concurrency_size=3, proposal_scorers=scorers
),
],
Expand Down
6 changes: 3 additions & 3 deletions examples/managers/flexible_negotiation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

from golem.managers import (
BlacklistProviderIdPlugin,
BufferPlugin,
DefaultAgreementManager,
DefaultProposalManager,
NegotiatingPlugin,
PayAllPaymentManager,
PaymentPlatformNegotiator,
PoolActivityManager,
ProposalBuffer,
RefreshingDemandManager,
SequentialWorkManager,
WorkContext,
Expand Down Expand Up @@ -48,14 +48,14 @@ async def main():
golem,
demand_manager.get_initial_proposal,
plugins=[
BufferPlugin(
ProposalBuffer(
min_size=10,
max_size=1000,
fill_concurrency_size=5,
),
BlacklistProviderIdPlugin(BLACKLISTED_PROVIDERS),
NegotiatingPlugin(proposal_negotiators=[PaymentPlatformNegotiator()]),
BufferPlugin(
ProposalBuffer(
min_size=3,
max_size=5,
fill_concurrency_size=3,
Expand Down
4 changes: 2 additions & 2 deletions examples/managers/mid_agreement_payments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
from datetime import datetime, timedelta

from golem.managers import (
BufferPlugin,
DefaultAgreementManager,
DefaultProposalManager,
MidAgreementPaymentsNegotiator,
NegotiatingPlugin,
PayAllPaymentManager,
PaymentPlatformNegotiator,
ProposalBuffer,
RefreshingDemandManager,
SequentialWorkManager,
SingleUseActivityManager,
Expand Down Expand Up @@ -91,7 +91,7 @@ async def main():
),
]
),
BufferPlugin(
ProposalBuffer(
min_size=1,
max_size=4,
fill_concurrency_size=2,
Expand Down
8 changes: 4 additions & 4 deletions examples/managers/proposal_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from golem.managers import (
BlacklistProviderIdPlugin,
BufferPlugin,
DefaultAgreementManager,
DefaultProposalManager,
LinearAverageCostPricing,
Expand All @@ -15,11 +14,12 @@
PaymentPlatformNegotiator,
PoolActivityManager,
PropertyValueLerpScore,
ProposalBuffer,
ProposalScoringBuffer,
RandomScore,
RefreshingDemandManager,
RejectIfCostsExceeds,
RejectProposal,
ScoringBufferPlugin,
SequentialWorkManager,
WorkContext,
WorkResult,
Expand Down Expand Up @@ -78,7 +78,7 @@ async def main():
golem,
demand_manager.get_initial_proposal,
plugins=[
BufferPlugin(
ProposalBuffer(
min_size=10,
max_size=1000,
),
Expand All @@ -101,7 +101,7 @@ async def main():
else None,
)
),
ScoringBufferPlugin(
ProposalScoringBuffer(
min_size=3,
max_size=5,
fill_concurrency_size=3,
Expand Down
4 changes: 2 additions & 2 deletions examples/task_api_draft/task_api/activity_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from golem.pipeline import InputStreamExhausted
from golem.resources import Activity
from golem.utils.asyncio import cancel_and_await
from golem.utils.asyncio import ensure_cancelled


class ActivityPool:
Expand Down Expand Up @@ -95,7 +95,7 @@ async def _activity_destroyed_cleanup(
return

await activity.wait_destroyed()
await cancel_and_await(manager_task)
await ensure_cancelled(manager_task)

async def _get_next_idle_activity(
self, activity_stream: AsyncIterator[Union[Activity, Awaitable[Activity]]]
Expand Down
4 changes: 2 additions & 2 deletions golem/event_bus/in_memory/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Awaitable, Callable, DefaultDict, List, Optional, Tuple, Type

from golem.event_bus.base import Event, EventBus, EventBusError, TEvent
from golem.utils.asyncio import cancel_and_await, create_task_with_logging
from golem.utils.asyncio import create_task_with_logging, ensure_cancelled
from golem.utils.logging import get_trace_id_name, trace_span

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,7 +49,7 @@ async def stop(self):
await self._event_queue.join()

if self._process_event_queue_loop_task is not None:
await cancel_and_await(self._process_event_queue_loop_task)
await ensure_cancelled(self._process_event_queue_loop_task)
self._process_event_queue_loop_task = None

@trace_span(show_results=True)
Expand Down
8 changes: 4 additions & 4 deletions golem/managers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from golem.managers.payment import PayAllPaymentManager
from golem.managers.proposal import (
BlacklistProviderIdPlugin,
BufferPlugin,
DefaultProposalManager,
LinearAverageCostPricing,
LinearCoeffsCost,
Expand All @@ -34,10 +33,11 @@
NegotiatingPlugin,
PaymentPlatformNegotiator,
PropertyValueLerpScore,
ProposalBuffer,
ProposalScoringBuffer,
ProposalScoringMixin,
RandomScore,
RejectIfCostsExceeds,
ScoringBufferPlugin,
)
from golem.managers.work import (
ConcurrentWorkManager,
Expand Down Expand Up @@ -73,7 +73,7 @@
"PayAllPaymentManager",
"DefaultProposalManager",
"BlacklistProviderIdPlugin",
"BufferPlugin",
"ProposalBuffer",
"PaymentPlatformNegotiator",
"MidAgreementPaymentsNegotiator",
"NegotiatingPlugin",
Expand All @@ -86,7 +86,7 @@
"LinearCoeffsCost",
"PropertyValueLerpScore",
"RandomScore",
"ScoringBufferPlugin",
"ProposalScoringBuffer",
"SequentialWorkManager",
"ConcurrentWorkManager",
"WorkManagerPluginsMixin",
Expand Down
28 changes: 21 additions & 7 deletions golem/managers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,20 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence, Tuple, TypeVar, Union
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Protocol,
Sequence,
Tuple,
TypeVar,
Union,
runtime_checkable,
)

from golem.exceptions import GolemException
from golem.resources import (
Expand Down Expand Up @@ -181,8 +194,8 @@ class RejectProposal(ManagerPluginException):
pass


class ProposalNegotiator(ABC):
@abstractmethod
@runtime_checkable
class ProposalNegotiator(Protocol):
def __call__(
self, demand_data: DemandData, proposal_data: ProposalData
) -> MaybeAwaitable[Optional[RejectProposal]]:
Expand All @@ -209,8 +222,8 @@ async def stop(self) -> None:
ProposalScoringResult = Sequence[Optional[float]]


class ProposalScorer(ABC):
@abstractmethod
@runtime_checkable
class ProposalScorer(Protocol):
def __call__(
self, proposals_data: Sequence[ProposalData]
) -> MaybeAwaitable[ProposalScoringResult]:
Expand All @@ -220,10 +233,11 @@ def __call__(
ScorerWithOptionalWeight = Union[ProposalScorer, Tuple[float, ProposalScorer]]


class WorkManagerPlugin(ABC):
@abstractmethod
@runtime_checkable
class WorkManagerPlugin(Protocol):
def __call__(self, do_work: DoWorkCallable) -> DoWorkCallable:
...


# TODO: Make consistent naming on functions in arguments in whole project - callable or func
PricingCallable = Callable[[ProposalData], Optional[float]]
4 changes: 2 additions & 2 deletions golem/managers/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Generic, List, Optional, Sequence

from golem.managers.base import ManagerException, TPlugin
from golem.utils.asyncio import cancel_and_await, create_task_with_logging
from golem.utils.asyncio import create_task_with_logging, ensure_cancelled
from golem.utils.logging import get_trace_id_name, trace_span

logger = logging.getLogger(__name__)
Expand All @@ -29,7 +29,7 @@ async def stop(self) -> None:
raise ManagerException("Already stopped!")

if self._background_loop_task is not None:
await cancel_and_await(self._background_loop_task)
await ensure_cancelled(self._background_loop_task)
self._background_loop_task = None

def is_started(self) -> bool:
Expand Down
8 changes: 4 additions & 4 deletions golem/managers/proposal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from golem.managers.proposal.default import DefaultProposalManager
from golem.managers.proposal.plugins import (
BlacklistProviderIdPlugin,
BufferPlugin,
LinearAverageCostPricing,
LinearCoeffsCost,
LinearPerCpuAverageCostPricing,
Expand All @@ -11,16 +10,17 @@
NegotiatingPlugin,
PaymentPlatformNegotiator,
PropertyValueLerpScore,
ProposalBuffer,
ProposalScoringBuffer,
ProposalScoringMixin,
RandomScore,
RejectIfCostsExceeds,
ScoringBufferPlugin,
)

__all__ = (
"DefaultProposalManager",
"BlacklistProviderIdPlugin",
"BufferPlugin",
"ProposalBuffer",
"PaymentPlatformNegotiator",
"MidAgreementPaymentsNegotiator",
"NegotiatingPlugin",
Expand All @@ -33,5 +33,5 @@
"LinearPerCpuCoeffsCost",
"PropertyValueLerpScore",
"RandomScore",
"ScoringBufferPlugin",
"ProposalScoringBuffer",
)
8 changes: 4 additions & 4 deletions golem/managers/proposal/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from golem.managers.proposal.plugins.blacklist import BlacklistProviderIdPlugin
from golem.managers.proposal.plugins.buffer import BufferPlugin
from golem.managers.proposal.plugins.buffer import ProposalBuffer
from golem.managers.proposal.plugins.linear_coeffs import LinearCoeffsCost, LinearPerCpuCoeffsCost
from golem.managers.proposal.plugins.negotiating import (
MidAgreementPaymentsNegotiator,
Expand All @@ -12,14 +12,14 @@
LinearPerCpuAverageCostPricing,
MapScore,
PropertyValueLerpScore,
ProposalScoringBuffer,
ProposalScoringMixin,
RandomScore,
ScoringBufferPlugin,
)

__all__ = (
"BlacklistProviderIdPlugin",
"BufferPlugin",
"ProposalBuffer",
"PaymentPlatformNegotiator",
"MidAgreementPaymentsNegotiator",
"NegotiatingPlugin",
Expand All @@ -32,5 +32,5 @@
"LinearPerCpuCoeffsCost",
"PropertyValueLerpScore",
"RandomScore",
"ScoringBufferPlugin",
"ProposalScoringBuffer",
)
Loading
Loading