Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 0.7.5 #157

Merged
merged 6 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/task_api_draft/task_api/redundance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def _worker_task(self, activity_stream: AsyncIterator[Awaitable[Activity]]
def _task_for_provider(self, provider_id: str) -> Optional[TaskData]:
for task_data in self.remaining_tasks:
if task_data not in self._provider_tasks[provider_id]:
return task_data
return task_data # type: ignore
return None

def _process_task_result(self, this_task_data: TaskData, this_task_result: TaskResult) -> None:
Expand All @@ -110,7 +110,7 @@ def _process_task_result(self, this_task_data: TaskData, this_task_result: TaskR
# things up, so is not really a bug/problem, but rather a decision.
return

self._partial_results.append((this_task_data, this_task_result))
self._partial_results.append((this_task_data, this_task_result)) # type: ignore
task_results = [
task_result
for task_data, task_result in self._partial_results
Expand All @@ -128,5 +128,5 @@ def _process_task_result(self, this_task_data: TaskData, this_task_result: TaskR
if (task_results.count(most_common) / cnt) < self.min_success:
return

self.remaining_tasks.remove(this_task_data)
self.remaining_tasks.remove(this_task_data) # type: ignore
self._results_queue.put_nowait(most_common)
8 changes: 4 additions & 4 deletions golem/event_bus/in_memory/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ async def on(
filter_func: Optional[Callable[[TEvent], bool]] = None,
) -> _CallbackHandler:
callback_info = _CallbackInfo(
callback=callback,
filter_func=filter_func,
callback=callback, # type: ignore
filter_func=filter_func, # type: ignore
once=False,
)

Expand All @@ -79,8 +79,8 @@ async def on_once(
filter_func: Optional[Callable[[TEvent], bool]] = None,
) -> _CallbackHandler:
callback_info = _CallbackInfo(
callback=callback,
filter_func=filter_func,
callback=callback, # type: ignore
filter_func=filter_func, # type: ignore
once=True,
)

Expand Down
4 changes: 3 additions & 1 deletion golem/managers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Callable,
Dict,
List,
MutableMapping,
Optional,
Protocol,
Sequence,
Expand Down Expand Up @@ -68,8 +69,9 @@ async def __call__(self):


class WorkContext:
def __init__(self, activity: Activity):
def __init__(self, activity: Activity, extra: Optional[MutableMapping] = None):
self._activity = activity
self.extra = extra or {}

async def deploy(
self, deploy_args: Optional[commands.ArgsDict] = None, timeout: Optional[timedelta] = None
Expand Down
2 changes: 2 additions & 0 deletions golem/managers/demand/refreshing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


class RefreshingDemandManager(BackgroundLoopMixin, DemandManager):
"""DemandManager that creates new demand under the hood after given lifetime is reached."""

def __init__(
self,
golem: GolemNode,
Expand Down
104 changes: 104 additions & 0 deletions golem/managers/demand/single_use.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import logging
from itertools import chain
from typing import Awaitable, Callable, Sequence

from golem.managers.base import DemandManager
from golem.managers.mixins import BackgroundLoopMixin
from golem.node import GolemNode
from golem.payload import Payload
from golem.payload import defaults as payload_defaults
from golem.resources import Allocation, Demand, Proposal
from golem.resources.demand.demand_builder import DemandBuilder
from golem.utils.asyncio import ErrorReportingQueue
from golem.utils.logging import trace_span

logger = logging.getLogger(__name__)


class SingleUseDemandManager(BackgroundLoopMixin, DemandManager):
"""DemandManager that creates one single demand as a single source of initial proposals."""

def __init__(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a pydoc that describes what is special about this one so it is easier to tell how it is different then other managers of the same type.

self,
golem: GolemNode,
get_allocation: Callable[[], Awaitable[Allocation]],
payloads: Sequence[Payload],
) -> None:
self._golem = golem
self._get_allocation = get_allocation
self._payloads = payloads

self._initial_proposals: ErrorReportingQueue[Proposal] = ErrorReportingQueue()

super().__init__()

@trace_span("Starting SingleUseDemandManager", log_level=logging.INFO)
async def start(self) -> None:
return await super().start()

@trace_span("Stopping SingleUseDemandManager", log_level=logging.INFO)
async def stop(self) -> None:
return await super().stop()

@trace_span("Getting initial proposal", show_results=True)
async def get_initial_proposal(self) -> Proposal:
proposal = await self._initial_proposals.get()
self._initial_proposals.task_done()
return proposal

@trace_span()
async def _background_loop(self) -> None:
demand = await self._create_and_subscribe_demand()
demand.start_collecting_events()

try:
await self._consume_initial_proposals(demand)
except Exception as e:
self._initial_proposals.set_exception(e)
logger.debug(
"Encountered unexpected exception while handling demands,"
" exception is set and background loop will be stopped!"
)
finally:
await demand.unsubscribe()

@trace_span()
async def _create_and_subscribe_demand(self):
allocation = await self._get_allocation()
demand_builder = await self._prepare_demand_builder(allocation)
logger.debug(f"Creating demand: {demand_builder=}")
demand = await demand_builder.create_demand(self._golem)

return demand

@trace_span()
async def _prepare_demand_builder(self, allocation: Allocation) -> DemandBuilder:
# FIXME: Code looks duplicated as GolemNode.create_demand does the same
demand_builder = DemandBuilder()

for demand_spec in chain(
[
payload_defaults.ActivityInfo(
lifetime=payload_defaults.DEFAULT_LIFETIME, multi_activity=True
),
payload_defaults.PaymentInfo(),
await allocation.get_demand_spec(),
],
self._payloads,
):
await demand_builder.add(demand_spec)

return demand_builder

@trace_span()
async def _consume_initial_proposals(self, demand: Demand):
initial_proposals_gen = demand.initial_proposals()
first_initial_proposal = await initial_proposals_gen.__anext__()
logger.info("Received first initial proposal")

logger.debug(f"New initial proposal {first_initial_proposal}")
self._initial_proposals.put_nowait(first_initial_proposal)

async for initial in initial_proposals_gen:
logger.debug(f"New initial proposal {initial}")
self._initial_proposals.put_nowait(initial)
12 changes: 5 additions & 7 deletions golem/managers/payment/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,7 @@ async def start(self):
async def get_allocation(self) -> Allocation:
async with self._lock:
if not self._allocation:
self._allocation = await self._create_allocation(
Decimal(self._budget),
self._network,
self._driver,
)
self._allocation = await self._create_allocation()

return self._allocation # type: ignore[return-value]

Expand All @@ -81,9 +77,11 @@ async def _release_allocation(self) -> None:
self._allocation = None

@trace_span(show_arguments=True, show_results=True)
async def _create_allocation(self, budget: Decimal, network: str, driver: str) -> Allocation:
async def _create_allocation(self) -> Allocation:
try:
return await Allocation.create_any_account(self._golem, budget, network, driver)
return await Allocation.create_any_account(
self._golem, Decimal(self._budget), self._network, self._driver
)
except ApiException as e:
raise ManagerException(json.loads(e.body)["message"]) from e

Expand Down
2 changes: 1 addition & 1 deletion golem/payload/parser.tx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ConstraintOperator:
;

PropertyName:
/[\w-]+(\.[\w-]+)*/
/[\w!-]+(\.[\w!-]+)*/
;

PropertyValueList:
Expand Down
7 changes: 5 additions & 2 deletions golem/payload/vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class VmPayload(BaseVmPayload, _VmPayload):

@dataclass
class _ManifestVmPayload(Payload, ABC):
manifest: str = prop("golem.srv.comp.payload")
manifest: Optional[str] = prop("golem.srv.comp.payload", default=None)
manifest_sig: Optional[str] = prop("golem.srv.comp.payload.sig", default=None)
manifest_sig_algorithm: Optional[str] = prop(
"golem.srv.comp.payload.sig.algorithm", default=None
Expand All @@ -82,7 +82,7 @@ class ManifestVmPayload(BaseVmPayload, _ManifestVmPayload):

@dataclass
class _RepositoryVmPayload(ABC):
image_hash: str
image_hash: Optional[str] = None
image_url: Optional[str] = None
package_url: Optional[str] = prop("golem.srv.comp.task_package", default=None)

Expand All @@ -94,6 +94,9 @@ class RepositoryVmPayload(BaseVmPayload, _RepositoryVmPayload):
parameter from remote repository."""

async def _resolve_package_url(self) -> None:
if not self.image_hash:
return

if self.image_url:
await check_image_url(self.image_url)
image_url = self.image_url
Expand Down
2 changes: 1 addition & 1 deletion golem/pipeline/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async def _process_single_value(self, in_val: Union[DataType, Awaitable[DataType
# NOTE: Buffer is useful only with awaitables, so this scenario doesn't make much
# sense. But maybe stream sometimes returns awaitables and sometimes already
# awaited values?
awaited = in_val # type: ignore
awaited = in_val

self._result_queue.put_nowait(awaited)

Expand Down
2 changes: 1 addition & 1 deletion golem/pipeline/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def _elements(self) -> AsyncIterator[Tuple[TElement, float]]:
await asyncio.sleep(0.1)

async def score_function(self, element: TElement) -> Optional[float]:
return await self._score_function(element)
return await self._score_function(element) # type: ignore

async def _process_stream(self, element_stream: AsyncIterator[TElement]) -> None:
async for element in element_stream:
Expand Down
2 changes: 1 addition & 1 deletion golem/resources/proposal/proposal.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async def responses(self) -> AsyncIterator["Proposal"]:
@api_call_wrapper()
async def create_agreement(
self, autoclose: bool = True, timeout: timedelta = timedelta(seconds=60)
) -> "Agreement":
) -> Agreement:
"""Promote this proposal to an agreement.

:param autoclose: Terminate the agreement when the :any:`GolemNode` closes.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "golem-core"
version = "0.7.4"
version = "0.7.5"
description = "Golem Network (https://golem.network/) API for Python"
authors = ["Golem Factory <[email protected]>"]
license = "LGPL-3.0-or-later"
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/run-goth.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ echo INSTALLING TOOLS
.envs/goth/bin/python -m pip install --upgrade setuptools wheel

echo INSTALLING DEPENDENCIES
.envs/goth/bin/python -m pip install goth==0.15.11 pytest pytest-asyncio pexpect
.envs/goth/bin/python -m pip install goth==0.17.0 pytest pytest-asyncio pexpect

echo CREATING ASSETS
.envs/goth/bin/python -m goth create-assets .envs/goth/assets
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_payload_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def test_parse_raises_exception_on_bad_syntax(demand_offer_parser):
(
("(foo=1)", Constraint("foo", "=", "1")),
("(float.value=1.5)", Constraint("float.value", "=", "1.5")),
("(float.!exp.value=1.5)", Constraint("float.!exp.value", "=", "1.5")),
("(foo=bar)", Constraint("foo", "=", "bar")),
(
"(foo=more.complex.value)",
Expand Down
Loading