Skip to content

Commit

Permalink
kill process and its children, small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
solarw committed Jul 4, 2024
1 parent d96346b commit 37239b5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 83 deletions.
2 changes: 1 addition & 1 deletion operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def cancel_healthcheck_job(service: str) -> None:

def with_retries(f: t.Callable) -> t.Callable:
"""Retries decorator."""
return f

async def _call(request: Request) -> JSONResponse:
"""Call the endpoint."""
logger.info(f"Calling `{f.__name__}` with retries enabled")
Expand Down
140 changes: 67 additions & 73 deletions operate/services/deployment_runner.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""Source code to run and stop deployments created."""
import json
import os
import platform
import shutil
import signal
import subprocess
import sys
import shutil # nosec
import signal # nosec
import subprocess # nosec
import sys # nosec
import time
import typing as t
from abc import ABC, ABCMeta, abstractmethod
from pathlib import Path
from typing import Any
from venv import main as venv_cli

import psutil
Expand All @@ -17,16 +19,19 @@


class AbstractDeploymentRunner(ABC):
def __init__(self, work_directory: Path):
"""Abstract deployment runner."""

def __init__(self, work_directory: Path) -> None:
"""Init the deployment runner."""
self._work_directory = work_directory

@abstractmethod
def start(self):
pass
def start(self) -> None:
"""Start the deployment."""

@abstractmethod
def stop(self):
pass
def stop(self) -> None:
"""Stop the deployment."""


def _kill_process(pid: int) -> None:
Expand All @@ -35,7 +40,8 @@ def _kill_process(pid: int) -> None:
while True:
if not psutil.pid_exists(pid=pid):
return
if psutil.Process(pid=pid).status() in (
process = psutil.Process(pid=pid)
if process.status() in (
psutil.STATUS_DEAD,
psutil.STATUS_ZOMBIE,
):
Expand All @@ -54,15 +60,28 @@ def _kill_process(pid: int) -> None:
time.sleep(1)


def kill_process(pid: int) -> None:
"""Kill the process and all children first."""
if not psutil.pid_exists(pid=pid):
return
current_process = psutil.Process(pid=pid)
children = list(reversed(current_process.children(recursive=True)))
for child in children:
_kill_process(child.pid)
_kill_process(pid)


class BaseDeploymentRunner(AbstractDeploymentRunner, metaclass=ABCMeta):
def _run_aea(self, *args: t.List[str], cwd: Path):
"""Base deployment with aea support."""

def _run_aea(self, *args: str, cwd: Path) -> Any:
"""Run aea command."""
return self._run_cmd(args=[self._aea_bin, *args], cwd=cwd)

@staticmethod
def _run_cmd(args: t.List[str], cwd: t.Optional[Path] = None) -> None:
"""Run command in a subprocess."""
print(f"Running: {' '.join(args)}")
# print working dir
print(f"Working dir: {os.getcwd()}")
result = subprocess.run( # pylint: disable=subprocess-run-check # nosec
args=args,
Expand All @@ -75,7 +94,8 @@ def _run_cmd(args: t.List[str], cwd: t.Optional[Path] = None) -> None:
f"Error running: {args} @ {cwd}\n{result.stderr.decode()}"
)

def _prepare_agent_env(self):
def _prepare_agent_env(self) -> Any:
"""Prepare agent env, add keys, run aea commands."""
working_dir = self._work_directory
env = json.loads((working_dir / "agent.json").read_text(encoding="utf-8"))
# Patch for trader agent
Expand Down Expand Up @@ -112,26 +132,9 @@ def _prepare_agent_env(self):

def _setup_agent(self) -> None:
"""Setup agent."""
print(1111111, "SETUP AGENT", flush=True)
working_dir = self._work_directory
env = self._prepare_agent_env()

# abin = self._aea_bin
# Fetch agent
# self._run_cmd(
# args=[
# abin,
# "init",
# "--reset",
# "--author",
# "valory",
# "--remote",
# "--ipfs",
# "--ipfs-node",
# "/dns/registry.autonolas.tech/tcp/443/https",
# ],
# cwd=working_dir,
# )
self._run_aea(
"init",
"--reset",
Expand All @@ -144,16 +147,6 @@ def _setup_agent(self) -> None:
cwd=working_dir,
)

# self._run_cmd(
# args=[
# abin,
# "fetch",
# env["AEA_AGENT"],
# "--alias",
# "agent",
# ],
# cwd=working_dir,
# )
self._run_aea("fetch", env["AEA_AGENT"], "--alias", "agent", cwd=working_dir)

# Add keys
Expand All @@ -162,69 +155,63 @@ def _setup_agent(self) -> None:
working_dir / "agent" / "ethereum_private_key.txt",
)

# self._run_cmd(
# args=[abin, "add-key", "ethereum"],
# cwd=working_dir / "agent",
# )
self._run_aea("add-key", "ethereum", cwd=working_dir / "agent")

# self._run_cmd(
# args=[abin, "issue-certificates"],
# cwd=working_dir / "agent",
# )
self._run_aea("issue-certificates", cwd=working_dir / "agent")
print(1111111, "SETUP AGENT complete", flush=True)

def start(self):
print(1111111111111111, "START _DEPLOYMENT", flush=True)
def start(self) -> None:
"""Start the deployment."""
self._setup_agent()
self._start_tendermint()
self._start_agent()
print(1111111111111111, "START _DEPLOYMENT complete", flush=True)

def stop(self):
print(1111111111111111, "STOP _DEPLOYMENT", flush=True)
def stop(self) -> None:
"""Stop the deployment."""
self._stop_agent()
self._stop_tendermint()
print(1111111111111111, "STOP _DEPLOYMENT complete", flush=True)

def _stop_agent(self) -> None:
"""Start process."""
pid = self._work_directory / "agent.pid"
if not pid.exists():
return
_kill_process(int(pid.read_text(encoding="utf-8")))
kill_process(int(pid.read_text(encoding="utf-8")))

def _stop_tendermint(self) -> None:
"""Start tendermint process."""
pid = self._work_directory / "tendermint.pid"
if not pid.exists():
return
_kill_process(int(pid.read_text(encoding="utf-8")))
kill_process(int(pid.read_text(encoding="utf-8")))

@abstractmethod
def _start_tendermint(self):
pass
def _start_tendermint(self) -> None:
"""Start tendermint process."""

@abstractmethod
def _start_agent(self):
pass
def _start_agent(self) -> None:
"""Start aea process."""

@property
@abstractmethod
def _aea_bin(self):
pass
def _aea_bin(self) -> str:
"""Return aea_bin path."""
raise NotImplementedError


class PyInstallerHostDeploymentRunner(BaseDeploymentRunner):
"""Deployment runner within pyinstaller env."""

@property
def _aea_bin(self) -> str:
"""Return aea_bin path."""
abin = str(Path(sys._MEIPASS) / "aea_bin") # type: ignore # pylint: disable=protected-access
return abin

@property
def _tendermint_bin(self) -> str:
return str(Path(sys._MEIPASS) / "tendermint")
"""Return tendermint path."""
return str(Path(sys._MEIPASS) / "tendermint") # type: ignore # pylint: disable=protected-access

def _start_agent(self) -> None:
"""Start agent process."""
Expand Down Expand Up @@ -267,21 +254,20 @@ def _start_tendermint(self) -> None:


class HostPythonHostDeploymentRunner(BaseDeploymentRunner):
"""Deployment runner for host installed python."""

@property
def _aea_bin(self) -> str:
"""Return aea_bin path."""
return str(self._venv_dir / "bin" / "aea")

def _start_agent(self) -> None:
"""Start agent process."""
working_dir = self._work_directory
env = json.loads((working_dir / "agent.json").read_text(encoding="utf-8"))
print(1111, "START AGENT", working_dir / "agent", (working_dir / "agent").exists(), flush=True)
print(1111, "START AGENT", working_dir / "agent", (working_dir / "agent").exists(), flush=True)
process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
args=[self._aea_bin, "run"],
cwd=str(working_dir / "agent"),
#stdout=subprocess.STDOUT,
#stderr=subprocess.STDOUT,
env={**os.environ, **env},
creationflags=(
0x00000008 if platform.system() == "Windows" else 0
Expand Down Expand Up @@ -319,11 +305,12 @@ def _start_tendermint(self) -> None:
)

@property
def _venv_dir(self):
def _venv_dir(self) -> Path:
"""Get venv dir for aea."""
return self._work_directory / "venv"

def _setup_venv(self):
print("SETUP VENV", flush=True)
def _setup_venv(self) -> None:
"""Perform venv setup, install deps."""
self._venv_dir.mkdir(exist_ok=True)
venv_cli(args=[str(self._venv_dir)])
pbin = str(self._venv_dir / "bin" / "python")
Expand All @@ -343,18 +330,25 @@ def _setup_venv(self):
"requests",
],
)
print("SETUP VENV COMPLETE", flush=True)

def _setup_agent(self) -> None:
"""Prepare agent."""
self._setup_venv()
super()._setup_agent()
# Install agent dependencies
self._run_aea("-v", "debug", "install", "--timeout", "600",
self._run_aea(
"-v",
"debug",
"install",
"--timeout",
"600",
cwd=self._work_directory / "agent",
)


def _get_host_deployment_runner(build_dir: Path) -> BaseDeploymentRunner:
"""Return depoyment runner according to running env."""
deployment_runner: BaseDeploymentRunner
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
deployment_runner = PyInstallerHostDeploymentRunner(build_dir)
else:
Expand Down
14 changes: 5 additions & 9 deletions operate/services/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@

import json
import os
import platform
import shutil
import signal
import subprocess # nosec
import sys
import time
import typing as t
from copy import copy, deepcopy
from dataclasses import dataclass
from pathlib import Path

import psutil
from aea.configurations.constants import (
DEFAULT_LEDGER,
LEDGER,
Expand Down Expand Up @@ -543,15 +538,16 @@ def _build_host(self, force: bool = True) -> None:
)

except Exception as e:
shutil.rmtree(build)
if build.exists():
shutil.rmtree(build)
raise e

# Mech price patch.
agent_vars = json.loads(Path(build, "agent.json").read_text(encoding="utf-8"))
if "SKILL_TRADER_ABCI_MODELS_PARAMS_ARGS_MECH_REQUEST_PRICE" in agent_vars:
agent_vars["SKILL_TRADER_ABCI_MODELS_PARAMS_ARGS_MECH_REQUEST_PRICE"] = (
"10000000000000000"
)
agent_vars[
"SKILL_TRADER_ABCI_MODELS_PARAMS_ARGS_MECH_REQUEST_PRICE"
] = "10000000000000000"
Path(build, "agent.json").write_text(
json.dumps(agent_vars, indent=4),
encoding="utf-8",
Expand Down

0 comments on commit 37239b5

Please sign in to comment.