Skip to content

Commit

Permalink
Merge pull request #36 from delta-mpc/lr
Browse files Browse the repository at this point in the history
Lr
  • Loading branch information
mh739025250 authored Jul 14, 2022
2 parents 1c3d160 + 56c185c commit 13ebcf6
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 27 deletions.
6 changes: 6 additions & 0 deletions delta_node/coord/horizontal/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import logging
import shutil
from typing import Any, MutableMapping, Tuple, List

Expand All @@ -16,6 +17,9 @@
from delta_node.coord import loc


_logger = logging.getLogger(__name__)


class ServerTaskContext(ServerContext):
def __init__(self, task_id: str) -> None:
self.task_id = task_id
Expand All @@ -39,6 +43,7 @@ def get_var(var: DataNode) -> Any:
raise ValueError(f"Cannot get var {var.name}")
if var.name not in self.cache:
self.cache[var.name] = value
_logger.debug(f"Get var {var.name} : {value}")
return value

if len(vars) == 0:
Expand All @@ -53,6 +58,7 @@ def set_var(var: DataNode, data: Any):
self.cache[var.name] = data
filename = loc.task_context_file(self.task_id, var.name)
serialize.dump_obj(filename, data)
_logger.debug(f"Set var {var.name} : {data}")

if len(pairs) == 1:
set_var(*pairs[0])
Expand Down
7 changes: 5 additions & 2 deletions delta_node/coord/horizontal/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import delta.serialize
import sqlalchemy as sa
from delta.core.strategy import Strategy
from delta.core.task import DataLocation, Task
from delta.core.task import DataLocation, Task, EarlyStop

from delta_node import chain, db, entity, pool, serialize
from delta_node.coord import loc
Expand Down Expand Up @@ -120,7 +120,10 @@ async def run(self):
await self.init()
max_rounds = len(self.task.steps)
while self.round < max_rounds + 1:
await self.execute_round(self.round)
try:
await self.execute_round(self.round)
except EarlyStop:
break
self.round += 1
await self.finish()
except Exception as e:
Expand Down
42 changes: 42 additions & 0 deletions delta_node/dataset/examples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from .mnist import mnist_train
from .wages import make_wage_df

import os
from delta_node import config

import pkgutil

__all__ = ["get_mnist", "get_wages", "get_iris", "get_spector", "get_all"]


def get_mnist():
mnist_train()


def get_wages():
make_wage_df()


def get_iris():
data = pkgutil.get_data(__name__, "iris.csv")
if data is None:
raise ValueError("cannot read dataset iris")
filename = os.path.join(config.data_dir, "iris.csv")
with open(filename, mode="wb") as f:
f.write(data)


def get_spector():
data = pkgutil.get_data(__name__, "spector.csv")
if data is None:
raise ValueError("cannot read dataset spector")
filename = os.path.join(config.data_dir, "spector.csv")
with open(filename, mode="wb") as f:
f.write(data)


def get_all():
get_mnist()
get_wages()
get_iris()
get_spector()
151 changes: 151 additions & 0 deletions delta_node/dataset/examples/iris.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
5.1,3.5,1.4,0.2,0
4.9,3.0,1.4,0.2,0
4.7,3.2,1.3,0.2,0
4.6,3.1,1.5,0.2,0
5.0,3.6,1.4,0.2,0
5.4,3.9,1.7,0.4,0
4.6,3.4,1.4,0.3,0
5.0,3.4,1.5,0.2,0
4.4,2.9,1.4,0.2,0
4.9,3.1,1.5,0.1,0
5.4,3.7,1.5,0.2,0
4.8,3.4,1.6,0.2,0
4.8,3.0,1.4,0.1,0
4.3,3.0,1.1,0.1,0
5.8,4.0,1.2,0.2,0
5.7,4.4,1.5,0.4,0
5.4,3.9,1.3,0.4,0
5.1,3.5,1.4,0.3,0
5.7,3.8,1.7,0.3,0
5.1,3.8,1.5,0.3,0
5.4,3.4,1.7,0.2,0
5.1,3.7,1.5,0.4,0
4.6,3.6,1.0,0.2,0
5.1,3.3,1.7,0.5,0
4.8,3.4,1.9,0.2,0
5.0,3.0,1.6,0.2,0
5.0,3.4,1.6,0.4,0
5.2,3.5,1.5,0.2,0
5.2,3.4,1.4,0.2,0
4.7,3.2,1.6,0.2,0
4.8,3.1,1.6,0.2,0
5.4,3.4,1.5,0.4,0
5.2,4.1,1.5,0.1,0
5.5,4.2,1.4,0.2,0
4.9,3.1,1.5,0.2,0
5.0,3.2,1.2,0.2,0
5.5,3.5,1.3,0.2,0
4.9,3.6,1.4,0.1,0
4.4,3.0,1.3,0.2,0
5.1,3.4,1.5,0.2,0
5.0,3.5,1.3,0.3,0
4.5,2.3,1.3,0.3,0
4.4,3.2,1.3,0.2,0
5.0,3.5,1.6,0.6,0
5.1,3.8,1.9,0.4,0
4.8,3.0,1.4,0.3,0
5.1,3.8,1.6,0.2,0
4.6,3.2,1.4,0.2,0
5.3,3.7,1.5,0.2,0
5.0,3.3,1.4,0.2,0
7.0,3.2,4.7,1.4,1
6.4,3.2,4.5,1.5,1
6.9,3.1,4.9,1.5,1
5.5,2.3,4.0,1.3,1
6.5,2.8,4.6,1.5,1
5.7,2.8,4.5,1.3,1
6.3,3.3,4.7,1.6,1
4.9,2.4,3.3,1.0,1
6.6,2.9,4.6,1.3,1
5.2,2.7,3.9,1.4,1
5.0,2.0,3.5,1.0,1
5.9,3.0,4.2,1.5,1
6.0,2.2,4.0,1.0,1
6.1,2.9,4.7,1.4,1
5.6,2.9,3.6,1.3,1
6.7,3.1,4.4,1.4,1
5.6,3.0,4.5,1.5,1
5.8,2.7,4.1,1.0,1
6.2,2.2,4.5,1.5,1
5.6,2.5,3.9,1.1,1
5.9,3.2,4.8,1.8,1
6.1,2.8,4.0,1.3,1
6.3,2.5,4.9,1.5,1
6.1,2.8,4.7,1.2,1
6.4,2.9,4.3,1.3,1
6.6,3.0,4.4,1.4,1
6.8,2.8,4.8,1.4,1
6.7,3.0,5.0,1.7,1
6.0,2.9,4.5,1.5,1
5.7,2.6,3.5,1.0,1
5.5,2.4,3.8,1.1,1
5.5,2.4,3.7,1.0,1
5.8,2.7,3.9,1.2,1
6.0,2.7,5.1,1.6,1
5.4,3.0,4.5,1.5,1
6.0,3.4,4.5,1.6,1
6.7,3.1,4.7,1.5,1
6.3,2.3,4.4,1.3,1
5.6,3.0,4.1,1.3,1
5.5,2.5,4.0,1.3,1
5.5,2.6,4.4,1.2,1
6.1,3.0,4.6,1.4,1
5.8,2.6,4.0,1.2,1
5.0,2.3,3.3,1.0,1
5.6,2.7,4.2,1.3,1
5.7,3.0,4.2,1.2,1
5.7,2.9,4.2,1.3,1
6.2,2.9,4.3,1.3,1
5.1,2.5,3.0,1.1,1
5.7,2.8,4.1,1.3,1
6.3,3.3,6.0,2.5,2
5.8,2.7,5.1,1.9,2
7.1,3.0,5.9,2.1,2
6.3,2.9,5.6,1.8,2
6.5,3.0,5.8,2.2,2
7.6,3.0,6.6,2.1,2
4.9,2.5,4.5,1.7,2
7.3,2.9,6.3,1.8,2
6.7,2.5,5.8,1.8,2
7.2,3.6,6.1,2.5,2
6.5,3.2,5.1,2.0,2
6.4,2.7,5.3,1.9,2
6.8,3.0,5.5,2.1,2
5.7,2.5,5.0,2.0,2
5.8,2.8,5.1,2.4,2
6.4,3.2,5.3,2.3,2
6.5,3.0,5.5,1.8,2
7.7,3.8,6.7,2.2,2
7.7,2.6,6.9,2.3,2
6.0,2.2,5.0,1.5,2
6.9,3.2,5.7,2.3,2
5.6,2.8,4.9,2.0,2
7.7,2.8,6.7,2.0,2
6.3,2.7,4.9,1.8,2
6.7,3.3,5.7,2.1,2
7.2,3.2,6.0,1.8,2
6.2,2.8,4.8,1.8,2
6.1,3.0,4.9,1.8,2
6.4,2.8,5.6,2.1,2
7.2,3.0,5.8,1.6,2
7.4,2.8,6.1,1.9,2
7.9,3.8,6.4,2.0,2
6.4,2.8,5.6,2.2,2
6.3,2.8,5.1,1.5,2
6.1,2.6,5.6,1.4,2
7.7,3.0,6.1,2.3,2
6.3,3.4,5.6,2.4,2
6.4,3.1,5.5,1.8,2
6.0,3.0,4.8,1.8,2
6.9,3.1,5.4,2.1,2
6.7,3.1,5.6,2.4,2
6.9,3.1,5.1,2.3,2
5.8,2.7,5.1,1.9,2
6.8,3.2,5.9,2.3,2
6.7,3.3,5.7,2.5,2
6.7,3.0,5.2,2.3,2
6.3,2.5,5.0,1.9,2
6.5,3.0,5.2,2.0,2
6.2,3.4,5.4,2.3,2
5.9,3.0,5.1,1.8,2
File renamed without changes.
33 changes: 33 additions & 0 deletions delta_node/dataset/examples/spector.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
GPA,TUCE,PSI,GRADE
2.66,20.0,0.0,0.0
2.89,22.0,0.0,0.0
3.28,24.0,0.0,0.0
2.92,12.0,0.0,0.0
4.0,21.0,0.0,1.0
2.86,17.0,0.0,0.0
2.76,17.0,0.0,0.0
2.87,21.0,0.0,0.0
3.03,25.0,0.0,0.0
3.92,29.0,0.0,1.0
2.63,20.0,0.0,0.0
3.32,23.0,0.0,0.0
3.57,23.0,0.0,0.0
3.26,25.0,0.0,1.0
3.53,26.0,0.0,0.0
2.74,19.0,0.0,0.0
2.75,25.0,0.0,0.0
2.83,19.0,0.0,0.0
3.12,23.0,1.0,0.0
3.16,25.0,1.0,1.0
2.06,22.0,1.0,0.0
3.62,28.0,1.0,1.0
2.89,14.0,1.0,0.0
3.51,26.0,1.0,0.0
3.54,24.0,1.0,1.0
2.83,27.0,1.0,1.0
3.39,17.0,1.0,1.0
2.67,24.0,1.0,0.0
3.65,21.0,1.0,1.0
4.0,23.0,1.0,1.0
3.1,21.0,1.0,0.0
2.39,19.0,1.0,1.0
File renamed without changes.
46 changes: 31 additions & 15 deletions delta_node/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@


async def _run():
from delta_node import (app, chain, config, db, log, pool, registry,
runner)
from delta_node import app, chain, config, db, log, pool, registry, runner

if len(config.chain_host) == 0:
raise RuntimeError("chain connector host is required")
Expand Down Expand Up @@ -104,27 +103,28 @@ def init():
os.makedirs(config.log_dir, exist_ok=True)


def get_mnist():
from . import mnist

mnist.mnist_train()


def get_df():
from .wages import make_wage_df

make_wage_df()

def main(input_args: Optional[Sequence[str]] = None):
parser = argparse.ArgumentParser(description="delta node", prog="Delta Node")
parser.add_argument(
"action",
choices=["init", "run", "get-mnist", "get-wages", "leave"],
choices=[
"init",
"run",
"get-mnist",
"get-wages",
"get-iris",
"get-spector",
"get-all-data",
"leave",
],
help="""delta node start action:
'init' to init delta node config,
'run' to start the node,
'get-mnist' to get mnist dataset used for learning example,
'get-wages' to get wages dataframe used for analytics example,
'get-iris' to get iris dataframe used for lr example,
'get-spector' to get spector dataframe used for lr example,
'get-all-data' to get all memtioned dataset,
'leave' to unregister from the computing network""",
)
parser.add_argument("--version", action="version", version="%(prog)s 2.0")
Expand All @@ -134,8 +134,24 @@ def main(input_args: Optional[Sequence[str]] = None):
elif args.action == "run":
run()
elif args.action == "get-mnist":
from .dataset.examples import get_mnist

get_mnist()
elif args.action == "get-wages":
get_df()
from .dataset.examples import get_wages

get_wages()
elif args.action == "get-iris":
from .dataset.examples import get_iris

get_iris()
elif args.action == "get-spector":
from .dataset.examples import get_spector

get_spector()
elif args.action == "get-all-data":
from .dataset.examples import get_all

get_all()
elif args.action == "leave":
leave()
9 changes: 6 additions & 3 deletions delta_node/runner/event_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ async def wait_for_event(self, event_type: entity.EventType, timeout: float | No
_logger.debug(f"event box {self.task_id} wait for event {event_type}")
condition = await self.get_condition(event_type)

async with condition:
await asyncio.wait_for(condition.wait_for(lambda: event_type in self.bucket), timeout=timeout)

async def wait():
async with condition:
await condition.wait_for(lambda: event_type in self.bucket)

await asyncio.wait_for(wait(), timeout=timeout)
await self.remove_condition(event_type)

return self.bucket.pop(event_type)
3 changes: 3 additions & 0 deletions delta_node/runner/horizontal/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def download(self, client: CommuClient, var: DataNode):
with open(filename, mode="wb") as f:
client.download_task_context(self.task_id, var.name, f)

value = serialize.load_obj(filename)
self.cache[var.name] = value

def set(self, *pairs: Tuple[DataNode, Any]):
def set_var(var: DataNode, data: Any):
self.cache[var.name] = data
Expand Down
Loading

0 comments on commit 13ebcf6

Please sign in to comment.