Skip to content

Commit

Permalink
Initial span tracing in selected managers
Browse files Browse the repository at this point in the history
  • Loading branch information
approxit committed Jul 12, 2023
1 parent c8a05b9 commit f726bed
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 145 deletions.
18 changes: 10 additions & 8 deletions golem/managers/demand/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from golem.resources.demand.demand_builder import DemandBuilder
from golem.resources.proposal.proposal import Proposal
from golem.utils.asyncio import create_task_with_logging
from golem.utils.logging import trace_span

logger = logging.getLogger(__name__)

Expand All @@ -44,19 +45,17 @@ def __init__(

super().__init__(*args, **kwargs)

@trace_span()
async def start(self) -> None:
if self.is_started():
message = "Already started!"
logger.debug(f"Starting failed with `{message}`")
raise ManagerException(message)
raise ManagerException("Already started!")

self._manager_loop_task = create_task_with_logging(self._manager_loop())

@trace_span()
async def stop(self) -> None:
if not self.is_started():
message = "Already stopped!"
logger.debug(f"Stopping failed with `{message}`")
raise ManagerException(message)
raise ManagerException("Already stopped!")

self._manager_loop_task.cancel()
self._manager_loop_task = None
Expand All @@ -72,6 +71,7 @@ async def get_initial_proposal(self) -> Proposal:

return proposal

@trace_span()
async def _manager_loop(self) -> None:
allocation = await self._get_allocation()
demand_builder = await self._prepare_demand_builder(allocation)
Expand All @@ -81,7 +81,7 @@ async def _manager_loop(self) -> None:

try:
async for initial_proposal in demand.initial_proposals():
await self._manage_initial(initial_proposal)
await self._manage_scoring(initial_proposal)
finally:
await demand.unsubscribe()

Expand All @@ -97,7 +97,8 @@ async def _prepare_demand_builder(self, allocation: Allocation) -> DemandBuilder

return demand_builder

async def _manage_initial(self, proposal: Proposal) -> None:
@trace_span()
async def _manage_scoring(self, proposal: Proposal) -> None:
async with self._scored_proposals_condition:
all_proposals = list(sp[1] for sp in self._scored_proposals)
all_proposals.append(proposal)
Expand All @@ -115,6 +116,7 @@ async def _do_scoring(self, proposals: Sequence[Proposal]):

return scored_proposals

@trace_span()
async def _run_plugins(
self, proposals_data: Sequence[ProposalData]
) -> Sequence[Tuple[float, Sequence[float]]]:
Expand Down
89 changes: 39 additions & 50 deletions golem/managers/negotiation/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from copy import deepcopy
from datetime import datetime
from typing import AsyncIterator, Awaitable, Callable, Optional, cast
from typing import Awaitable, Callable, Optional, cast

from ya_market import ApiException

Expand All @@ -16,7 +16,7 @@
from golem.node import GolemNode
from golem.payload import Properties
from golem.payload.parsers.textx import TextXPayloadSyntaxParser
from golem.resources import Allocation, DemandData, Proposal, ProposalData
from golem.resources import DemandData, Proposal, ProposalData
from golem.utils.asyncio import create_task_with_logging
from golem.utils.logging import trace_span

Expand All @@ -30,7 +30,7 @@ class SequentialNegotiationManager(
def __init__(
self,
golem: GolemNode,
get_initial_proposal: Callable[[], Awaitable[Allocation]],
get_initial_proposal: Callable[[], Awaitable[Proposal]],
*args,
**kwargs,
) -> None:
Expand All @@ -43,18 +43,18 @@ def __init__(

super().__init__(*args, **kwargs)

@trace_span("Getting proposal")
@trace_span()
async def get_draft_proposal(self) -> Proposal:
return await self._eligible_proposals.get()

@trace_span('Starting')
@trace_span()
async def start(self) -> None:
if self.is_started():
raise ManagerException("Already started!")

self._negotiation_loop_task = create_task_with_logging(self._negotiation_loop())

@trace_span('Stopping')
@trace_span()
async def stop(self) -> None:
if not self.is_started():
raise ManagerException("Already stopped!")
Expand All @@ -65,27 +65,19 @@ async def stop(self) -> None:
def is_started(self) -> bool:
return self._negotiation_loop_task is not None and not self._negotiation_loop_task.done()

@trace_span()
async def _negotiation_loop(self) -> None:
while True: # TODO add buffer
proposal = await self._get_initial_proposal()
offer_proposal = await self._negotiate(proposal)
if offer_proposal is not None:
await self._eligible_proposals.put(offer_proposal)

async def _negotiate(self, initial_proposal: Proposal) -> AsyncIterator[Proposal]:
demand_data = await self._get_demand_data_from_proposal(initial_proposal)
demand_data = await self._get_demand_data_from_proposal(proposal)

offer_proposal = await self._negotiate_proposal(demand_data, initial_proposal)
offer_proposal = await self._negotiate_proposal(demand_data, proposal)

if offer_proposal is None:
logger.debug(
f"Negotiating proposal `{initial_proposal}` done and proposal was rejected"
)
return

return offer_proposal
if offer_proposal is not None:
await self._eligible_proposals.put(offer_proposal)

@trace_span("Negotiating proposal")
@trace_span()
async def _negotiate_proposal(
self, demand_data: DemandData, offer_proposal: Proposal
) -> Optional[Proposal]:
Expand All @@ -102,42 +94,27 @@ async def _negotiate_proposal(

return None

if offer_proposal.initial or demand_data_after_plugins != demand_data:
logger.debug("Sending demand proposal...")

demand_data = demand_data_after_plugins
if not offer_proposal.initial and demand_data_after_plugins == demand_data:
return offer_proposal

try:
demand_proposal = await offer_proposal.respond(
demand_data_after_plugins.properties,
demand_data_after_plugins.constraints,
)
except (ApiException, asyncio.TimeoutError) as e:
logger.debug(f"Sending demand proposal failed with `{e}`")
return None
demand_data = demand_data_after_plugins

logger.debug("Sending demand proposal done")

logger.debug("Waiting for response...")

try:
new_offer_proposal = await demand_proposal.responses().__anext__()
except StopAsyncIteration:
logger.debug("Waiting for response failed with provider rejection")
return None
try:
demand_proposal = await self._send_demand_proposal(offer_proposal, demand_data)
except (ApiException, asyncio.TimeoutError):
return None

logger.debug(f"Waiting for response done with `{new_offer_proposal}`")
try:
new_offer_proposal = await self._wait_for_proposal_response(demand_proposal)
except StopAsyncIteration:
return None

logger.debug(
f"Proposal `{offer_proposal}` received counter proposal `{new_offer_proposal}`"
)
offer_proposal = new_offer_proposal
logger.debug(
f"Proposal `{offer_proposal}` received counter proposal `{new_offer_proposal}`"
)

continue
else:
break
offer_proposal = new_offer_proposal

return offer_proposal
@trace_span()
async def _apply_plugins(self, demand_data_after_plugins: DemandData, offer_proposal: Proposal):
proposal_data = await self._get_proposal_data_from_proposal(offer_proposal)
Expand All @@ -155,6 +132,18 @@ async def _apply_plugins(self, demand_data_after_plugins: DemandData, offer_prop
if plugin_result is False:
raise RejectProposal()

@trace_span()
async def _send_demand_proposal(
self, offer_proposal: Proposal, demand_data: DemandData
) -> Proposal:
return await offer_proposal.respond(
demand_data.properties,
demand_data.constraints,
)

@trace_span()
async def _wait_for_proposal_response(self, demand_proposal: Proposal) -> Proposal:
return await demand_proposal.responses().__anext__()

async def _get_demand_data_from_proposal(self, proposal: Proposal) -> DemandData:
# FIXME: Unnecessary serialisation from DemandBuilder to Demand,
Expand Down
97 changes: 48 additions & 49 deletions golem/managers/payment/pay_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NewDebitNote,
NewInvoice,
)
from golem.utils.logging import trace_span

logger = logging.getLogger(__name__)

Expand All @@ -37,47 +38,45 @@ def __init__(
self._closed_agreements_count = 0
self._payed_invoices_count = 0

async def start(self) -> None:
logger.debug("Starting...")
self._event_handlers = []

@trace_span()
async def start(self) -> None:
# TODO: Add stop with event_bus.off()

await self._golem.event_bus.on(NewInvoice, self._pay_invoice_if_received)
await self._golem.event_bus.on(NewDebitNote, self._pay_debit_note_if_received)

await self._golem.event_bus.on(NewAgreement, self._increment_opened_agreements)
await self._golem.event_bus.on(AgreementClosed, self._increment_closed_agreements)

logger.debug("Starting done")

self._event_handlers.extend(
[
await self._golem.event_bus.on(NewInvoice, self._pay_invoice_if_received),
await self._golem.event_bus.on(NewDebitNote, self._pay_debit_note_if_received),
await self._golem.event_bus.on(NewAgreement, self._increment_opened_agreements),
await self._golem.event_bus.on(AgreementClosed, self._increment_closed_agreements),
]
)

@trace_span()
async def stop(self) -> None:
logger.debug("Stopping...")

await self.wait_for_invoices()

logger.debug("Stopping done")
for event_handler in self._event_handlers:
await self._golem.event_bus.off(event_handler)

@trace_span()
async def _create_allocation(self) -> None:
self._allocation = await Allocation.create_any_account(
self._golem, Decimal(self._budget), self._network, self._driver
)

self._golem.add_autoclose_resource(self._allocation)

@trace_span()
async def get_allocation(self) -> "Allocation":
# TODO handle NoMatchingAccount
logger.debug("Getting allocation...")

if self._allocation is None:
logger.debug("Creating allocation...")

self._allocation = await Allocation.create_any_account(
self._golem, Decimal(self._budget), self._network, self._driver
)
self._golem.add_autoclose_resource(self._allocation)

logger.debug(f"Creating allocation done with `{self._allocation.id}`")

logger.debug(f"Getting allocation done with `{self._allocation.id}`")
await self._create_allocation()

return self._allocation

@trace_span()
async def wait_for_invoices(self):
logger.info("Waiting for invoices...")

for _ in range(60):
await asyncio.sleep(1)
if (
Expand All @@ -97,35 +96,35 @@ async def _increment_opened_agreements(self, event: NewAgreement):
async def _increment_closed_agreements(self, event: AgreementClosed):
self._closed_agreements_count += 1

async def _pay_invoice_if_received(self, event: NewInvoice) -> None:
logger.debug("Received invoice")
@trace_span()
async def _accept_invoice(self, invoice: Invoice) -> None:
assert self._allocation is not None # TODO think of a better way
await invoice.accept_full(self._allocation)
await invoice.get_data(force=True)
self._payed_invoices_count += 1

logger.info(f"Invoice `{invoice.id}` accepted")

@trace_span()
async def _accept_debit_note(self, debit_note: DebitNote) -> None:
assert self._allocation is not None # TODO think of a better way
await debit_note.accept_full(self._allocation)
await debit_note.get_data(force=True)

logger.info(f"DebitNote `{debit_note.id}` accepted")

@trace_span()
async def _pay_invoice_if_received(self, event: NewInvoice) -> None:
invoice = event.resource
assert isinstance(invoice, Invoice)

if (await invoice.get_data(force=True)).status == "RECEIVED":
logger.debug(f"Accepting invoice `{invoice.id}`...")

assert self._allocation is not None # TODO think of a better way
await invoice.accept_full(self._allocation)
await invoice.get_data(force=True)
self._payed_invoices_count += 1

logger.debug(f"Accepting invoice `{invoice.id}` done")
logger.info(f"Invoice `{invoice.id}` accepted")
await self._accept_invoice(invoice)

@trace_span()
async def _pay_debit_note_if_received(self, event: NewDebitNote) -> None:
logger.debug("Received debit note")

debit_note = event.resource
assert isinstance(debit_note, DebitNote)

if (await debit_note.get_data(force=True)).status == "RECEIVED":
logger.debug(f"Accepting DebitNote `{debit_note.id}`...")

assert self._allocation is not None # TODO think of a better way
await debit_note.accept_full(self._allocation)
await debit_note.get_data(force=True)

logger.debug(f"Accepting DebitNote `{debit_note.id}` done")
logger.debug(f"DebitNote `{debit_note.id}` accepted")
await self._accept_debit_note(debit_note)
Loading

0 comments on commit f726bed

Please sign in to comment.