Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer refactor #132

Merged
merged 20 commits into from
Feb 2, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/tests-unit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
os:
- ubuntu-latest
- macos-latest
Expand Down
4 changes: 2 additions & 2 deletions examples/managers/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
PaymentPlatformNegotiator,
PoolActivityManager,
RefreshingDemandManager,
ScoringBuffer,
ScoringBufferPlugin,
WorkContext,
WorkResult,
retry,
Expand Down Expand Up @@ -58,7 +58,7 @@ async def run_on_golem(
demand_manager.get_initial_proposal,
plugins=[
NegotiatingPlugin(proposal_negotiators=negotiators),
ScoringBuffer(
ScoringBufferPlugin(
min_size=3, max_size=5, fill_concurrency_size=3, proposal_scorers=scorers
),
],
Expand Down
6 changes: 3 additions & 3 deletions examples/managers/flexible_negotiation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from golem.managers import (
BlacklistProviderIdPlugin,
Buffer,
BufferPlugin,
DefaultAgreementManager,
DefaultProposalManager,
NegotiatingPlugin,
Expand Down Expand Up @@ -48,14 +48,14 @@ async def main():
golem,
demand_manager.get_initial_proposal,
plugins=[
Buffer(
BufferPlugin(
min_size=10,
max_size=1000,
fill_concurrency_size=5,
),
BlacklistProviderIdPlugin(BLACKLISTED_PROVIDERS),
NegotiatingPlugin(proposal_negotiators=[PaymentPlatformNegotiator()]),
Buffer(
BufferPlugin(
min_size=3,
max_size=5,
fill_concurrency_size=3,
Expand Down
4 changes: 2 additions & 2 deletions examples/managers/mid_agreement_payments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime, timedelta

from golem.managers import (
Buffer,
BufferPlugin,
DefaultAgreementManager,
DefaultProposalManager,
MidAgreementPaymentsNegotiator,
Expand Down Expand Up @@ -91,7 +91,7 @@ async def main():
),
]
),
Buffer(
BufferPlugin(
min_size=1,
max_size=4,
fill_concurrency_size=2,
Expand Down
8 changes: 4 additions & 4 deletions examples/managers/proposal_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from golem.managers import (
BlacklistProviderIdPlugin,
Buffer,
BufferPlugin,
DefaultAgreementManager,
DefaultProposalManager,
LinearAverageCostPricing,
Expand All @@ -19,7 +19,7 @@
RefreshingDemandManager,
RejectIfCostsExceeds,
RejectProposal,
ScoringBuffer,
ScoringBufferPlugin,
SequentialWorkManager,
WorkContext,
WorkResult,
Expand Down Expand Up @@ -78,7 +78,7 @@ async def main():
golem,
demand_manager.get_initial_proposal,
plugins=[
Buffer(
BufferPlugin(
min_size=10,
max_size=1000,
),
Expand All @@ -101,7 +101,7 @@ async def main():
else None,
)
),
ScoringBuffer(
ScoringBufferPlugin(
min_size=3,
max_size=5,
fill_concurrency_size=3,
Expand Down
5 changes: 1 addition & 4 deletions examples/task_api_draft/examples/yacat.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
from collections import defaultdict
from typing import (
Any,
AsyncIterator,
Callable,
DefaultDict,
Expand Down Expand Up @@ -221,9 +220,7 @@ async def main() -> None:

#############################################
# NOT REALLY INTERESTING PARTS OF THE LOGIC
async def close_agreement_repeat_task(
func: Callable, args: Tuple[Activity, Any], e: Exception
) -> None:
async def close_agreement_repeat_task(func: Callable, args: Tuple, e: Exception) -> None:
activity, task = args
tasks_queue.put_nowait(task)
print("Task failed on", activity)
Expand Down
3 changes: 2 additions & 1 deletion examples/task_api_draft/task_api/activity_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from golem.pipeline import InputStreamExhausted
from golem.resources import Activity
from golem.utils.asyncio import cancel_and_await


class ActivityPool:
Expand Down Expand Up @@ -94,7 +95,7 @@ async def _activity_destroyed_cleanup(
return

await activity.wait_destroyed()
manager_task.cancel()
await cancel_and_await(manager_task)

async def _get_next_idle_activity(
self, activity_stream: AsyncIterator[Union[Activity, Awaitable[Activity]]]
Expand Down
6 changes: 3 additions & 3 deletions examples/task_api_draft/task_api/execute_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ async def random_score(proposal: Proposal) -> float:

def close_agreement_repeat_task(
task_stream: TaskDataStream[TaskData],
) -> Callable[[Callable, Tuple[Activity, TaskData], Exception], Awaitable[None]]:
) -> Callable[[Callable, Tuple, Exception], Awaitable[None]]:
async def on_exception(
func: Callable[[Activity, TaskData], Awaitable[TaskResult]],
args: Tuple[Activity, TaskData],
func: Callable,
args: Tuple,
e: Exception,
) -> None:
activity, in_data = args
Expand Down
4 changes: 2 additions & 2 deletions golem/event_bus/in_memory/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Awaitable, Callable, DefaultDict, List, Optional, Tuple, Type

from golem.event_bus.base import Event, EventBus, EventBusError, TEvent
from golem.utils.asyncio import create_task_with_logging
from golem.utils.asyncio import cancel_and_await, create_task_with_logging
from golem.utils.logging import get_trace_id_name, trace_span

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,7 +49,7 @@ async def stop(self):
await self._event_queue.join()

if self._process_event_queue_loop_task is not None:
self._process_event_queue_loop_task.cancel()
await cancel_and_await(self._process_event_queue_loop_task)
approxit marked this conversation as resolved.
Show resolved Hide resolved
self._process_event_queue_loop_task = None

@trace_span(show_results=True)
Expand Down
8 changes: 4 additions & 4 deletions golem/managers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from golem.managers.payment import PayAllPaymentManager
from golem.managers.proposal import (
BlacklistProviderIdPlugin,
Buffer,
BufferPlugin,
DefaultProposalManager,
LinearAverageCostPricing,
LinearCoeffsCost,
Expand All @@ -37,7 +37,7 @@
ProposalScoringMixin,
RandomScore,
RejectIfCostsExceeds,
ScoringBuffer,
ScoringBufferPlugin,
)
from golem.managers.work import (
ConcurrentWorkManager,
Expand Down Expand Up @@ -73,7 +73,7 @@
"PayAllPaymentManager",
"DefaultProposalManager",
"BlacklistProviderIdPlugin",
"Buffer",
"BufferPlugin",
"PaymentPlatformNegotiator",
"MidAgreementPaymentsNegotiator",
"NegotiatingPlugin",
Expand All @@ -86,7 +86,7 @@
"LinearCoeffsCost",
"PropertyValueLerpScore",
"RandomScore",
"ScoringBuffer",
"ScoringBufferPlugin",
"SequentialWorkManager",
"ConcurrentWorkManager",
"WorkManagerPluginsMixin",
Expand Down
3 changes: 1 addition & 2 deletions golem/managers/demand/refreshing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
from golem.payload import defaults as payload_defaults
from golem.resources import Allocation, Demand, Proposal
from golem.resources.demand.demand_builder import DemandBuilder
from golem.utils.asyncio import create_task_with_logging
from golem.utils.asyncio import ErrorReportingQueue, create_task_with_logging
from golem.utils.logging import get_trace_id_name, trace_span
from golem.utils.queue import ErrorReportingQueue

logger = logging.getLogger(__name__)

Expand Down
4 changes: 2 additions & 2 deletions golem/managers/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Generic, List, Optional, Sequence

from golem.managers.base import ManagerException, TPlugin
from golem.utils.asyncio import create_task_with_logging
from golem.utils.asyncio import cancel_and_await, create_task_with_logging
from golem.utils.logging import get_trace_id_name, trace_span

logger = logging.getLogger(__name__)
Expand All @@ -29,7 +29,7 @@ async def stop(self) -> None:
raise ManagerException("Already stopped!")

if self._background_loop_task is not None:
self._background_loop_task.cancel()
await cancel_and_await(self._background_loop_task)
self._background_loop_task = None

def is_started(self) -> bool:
Expand Down
8 changes: 4 additions & 4 deletions golem/managers/proposal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from golem.managers.proposal.default import DefaultProposalManager
from golem.managers.proposal.plugins import (
BlacklistProviderIdPlugin,
Buffer,
BufferPlugin,
LinearAverageCostPricing,
LinearCoeffsCost,
LinearPerCpuAverageCostPricing,
Expand All @@ -14,13 +14,13 @@
ProposalScoringMixin,
RandomScore,
RejectIfCostsExceeds,
ScoringBuffer,
ScoringBufferPlugin,
)

__all__ = (
"DefaultProposalManager",
"BlacklistProviderIdPlugin",
"Buffer",
"BufferPlugin",
"PaymentPlatformNegotiator",
"MidAgreementPaymentsNegotiator",
"NegotiatingPlugin",
Expand All @@ -33,5 +33,5 @@
"LinearPerCpuCoeffsCost",
"PropertyValueLerpScore",
"RandomScore",
"ScoringBuffer",
"ScoringBufferPlugin",
)
8 changes: 4 additions & 4 deletions golem/managers/proposal/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from golem.managers.proposal.plugins.blacklist import BlacklistProviderIdPlugin
from golem.managers.proposal.plugins.buffer import Buffer
from golem.managers.proposal.plugins.buffer import BufferPlugin
from golem.managers.proposal.plugins.linear_coeffs import LinearCoeffsCost, LinearPerCpuCoeffsCost
from golem.managers.proposal.plugins.negotiating import (
MidAgreementPaymentsNegotiator,
Expand All @@ -14,12 +14,12 @@
PropertyValueLerpScore,
ProposalScoringMixin,
RandomScore,
ScoringBuffer,
ScoringBufferPlugin,
)

__all__ = (
"BlacklistProviderIdPlugin",
"Buffer",
"BufferPlugin",
"PaymentPlatformNegotiator",
"MidAgreementPaymentsNegotiator",
"NegotiatingPlugin",
Expand All @@ -32,5 +32,5 @@
"LinearPerCpuCoeffsCost",
"PropertyValueLerpScore",
"RandomScore",
"ScoringBuffer",
"ScoringBufferPlugin",
)
Loading
Loading