From 37239b5a6466435387e277530b16bcbe67f6832f Mon Sep 17 00:00:00 2001 From: "Yuri (solarw) Turchenkov" Date: Tue, 2 Jul 2024 14:14:43 +0300 Subject: [PATCH] kill process and its children, small fixes --- operate/cli.py | 2 +- operate/services/deployment_runner.py | 140 ++++++++++++-------------- operate/services/service.py | 14 +-- 3 files changed, 73 insertions(+), 83 deletions(-) diff --git a/operate/cli.py b/operate/cli.py index 64c8b50b..e31ea9ef 100644 --- a/operate/cli.py +++ b/operate/cli.py @@ -233,7 +233,7 @@ def cancel_healthcheck_job(service: str) -> None: def with_retries(f: t.Callable) -> t.Callable: """Retries decorator.""" - return f + async def _call(request: Request) -> JSONResponse: """Call the endpoint.""" logger.info(f"Calling `{f.__name__}` with retries enabled") diff --git a/operate/services/deployment_runner.py b/operate/services/deployment_runner.py index 26607b11..8235b81e 100644 --- a/operate/services/deployment_runner.py +++ b/operate/services/deployment_runner.py @@ -1,14 +1,16 @@ +"""Source code to run and stop deployments created.""" import json import os import platform -import shutil -import signal -import subprocess -import sys +import shutil # nosec +import signal # nosec +import subprocess # nosec +import sys # nosec import time import typing as t from abc import ABC, ABCMeta, abstractmethod from pathlib import Path +from typing import Any from venv import main as venv_cli import psutil @@ -17,16 +19,19 @@ class AbstractDeploymentRunner(ABC): - def __init__(self, work_directory: Path): + """Abstract deployment runner.""" + + def __init__(self, work_directory: Path) -> None: + """Init the deployment runner.""" self._work_directory = work_directory @abstractmethod - def start(self): - pass + def start(self) -> None: + """Start the deployment.""" @abstractmethod - def stop(self): - pass + def stop(self) -> None: + """Stop the deployment.""" def _kill_process(pid: int) -> None: @@ -35,7 +40,8 @@ def _kill_process(pid: int) -> None: while True: if not psutil.pid_exists(pid=pid): return - if psutil.Process(pid=pid).status() in ( + process = psutil.Process(pid=pid) + if process.status() in ( psutil.STATUS_DEAD, psutil.STATUS_ZOMBIE, ): @@ -54,15 +60,28 @@ def _kill_process(pid: int) -> None: time.sleep(1) +def kill_process(pid: int) -> None: + """Kill the process and all children first.""" + if not psutil.pid_exists(pid=pid): + return + current_process = psutil.Process(pid=pid) + children = list(reversed(current_process.children(recursive=True))) + for child in children: + _kill_process(child.pid) + _kill_process(pid) + + class BaseDeploymentRunner(AbstractDeploymentRunner, metaclass=ABCMeta): - def _run_aea(self, *args: t.List[str], cwd: Path): + """Base deployment with aea support.""" + + def _run_aea(self, *args: str, cwd: Path) -> Any: + """Run aea command.""" return self._run_cmd(args=[self._aea_bin, *args], cwd=cwd) @staticmethod def _run_cmd(args: t.List[str], cwd: t.Optional[Path] = None) -> None: """Run command in a subprocess.""" print(f"Running: {' '.join(args)}") - # print working dir print(f"Working dir: {os.getcwd()}") result = subprocess.run( # pylint: disable=subprocess-run-check # nosec args=args, @@ -75,7 +94,8 @@ def _run_cmd(args: t.List[str], cwd: t.Optional[Path] = None) -> None: f"Error running: {args} @ {cwd}\n{result.stderr.decode()}" ) - def _prepare_agent_env(self): + def _prepare_agent_env(self) -> Any: + """Prepare agent env, add keys, run aea commands.""" working_dir = self._work_directory env = json.loads((working_dir / "agent.json").read_text(encoding="utf-8")) # Patch for trader agent @@ -112,26 +132,9 @@ def _prepare_agent_env(self): def _setup_agent(self) -> None: """Setup agent.""" - print(1111111, "SETUP AGENT", flush=True) working_dir = self._work_directory env = self._prepare_agent_env() - # abin = self._aea_bin - # Fetch agent - # self._run_cmd( - # args=[ - # abin, - # "init", - # "--reset", - # "--author", - # "valory", - # "--remote", - # "--ipfs", - # "--ipfs-node", - # "/dns/registry.autonolas.tech/tcp/443/https", - # ], - # cwd=working_dir, - # ) self._run_aea( "init", "--reset", @@ -144,16 +147,6 @@ def _setup_agent(self) -> None: cwd=working_dir, ) - # self._run_cmd( - # args=[ - # abin, - # "fetch", - # env["AEA_AGENT"], - # "--alias", - # "agent", - # ], - # cwd=working_dir, - # ) self._run_aea("fetch", env["AEA_AGENT"], "--alias", "agent", cwd=working_dir) # Add keys @@ -162,69 +155,63 @@ def _setup_agent(self) -> None: working_dir / "agent" / "ethereum_private_key.txt", ) - # self._run_cmd( - # args=[abin, "add-key", "ethereum"], - # cwd=working_dir / "agent", - # ) self._run_aea("add-key", "ethereum", cwd=working_dir / "agent") - # self._run_cmd( - # args=[abin, "issue-certificates"], - # cwd=working_dir / "agent", - # ) self._run_aea("issue-certificates", cwd=working_dir / "agent") - print(1111111, "SETUP AGENT complete", flush=True) - def start(self): - print(1111111111111111, "START _DEPLOYMENT", flush=True) + def start(self) -> None: + """Start the deployment.""" self._setup_agent() self._start_tendermint() self._start_agent() - print(1111111111111111, "START _DEPLOYMENT complete", flush=True) - def stop(self): - print(1111111111111111, "STOP _DEPLOYMENT", flush=True) + def stop(self) -> None: + """Stop the deployment.""" self._stop_agent() self._stop_tendermint() - print(1111111111111111, "STOP _DEPLOYMENT complete", flush=True) def _stop_agent(self) -> None: """Start process.""" pid = self._work_directory / "agent.pid" if not pid.exists(): return - _kill_process(int(pid.read_text(encoding="utf-8"))) + kill_process(int(pid.read_text(encoding="utf-8"))) def _stop_tendermint(self) -> None: """Start tendermint process.""" pid = self._work_directory / "tendermint.pid" if not pid.exists(): return - _kill_process(int(pid.read_text(encoding="utf-8"))) + kill_process(int(pid.read_text(encoding="utf-8"))) @abstractmethod - def _start_tendermint(self): - pass + def _start_tendermint(self) -> None: + """Start tendermint process.""" @abstractmethod - def _start_agent(self): - pass + def _start_agent(self) -> None: + """Start aea process.""" @property @abstractmethod - def _aea_bin(self): - pass + def _aea_bin(self) -> str: + """Return aea_bin path.""" + raise NotImplementedError class PyInstallerHostDeploymentRunner(BaseDeploymentRunner): + """Deployment runner within pyinstaller env.""" + @property def _aea_bin(self) -> str: + """Return aea_bin path.""" abin = str(Path(sys._MEIPASS) / "aea_bin") # type: ignore # pylint: disable=protected-access return abin @property def _tendermint_bin(self) -> str: - return str(Path(sys._MEIPASS) / "tendermint") + """Return tendermint path.""" + return str(Path(sys._MEIPASS) / "tendermint") # type: ignore # pylint: disable=protected-access def _start_agent(self) -> None: """Start agent process.""" @@ -267,21 +254,20 @@ def _start_tendermint(self) -> None: class HostPythonHostDeploymentRunner(BaseDeploymentRunner): + """Deployment runner for host installed python.""" + @property def _aea_bin(self) -> str: + """Return aea_bin path.""" return str(self._venv_dir / "bin" / "aea") def _start_agent(self) -> None: """Start agent process.""" working_dir = self._work_directory env = json.loads((working_dir / "agent.json").read_text(encoding="utf-8")) - print(1111, "START AGENT", working_dir / "agent", (working_dir / "agent").exists(), flush=True) - print(1111, "START AGENT", working_dir / "agent", (working_dir / "agent").exists(), flush=True) process = subprocess.Popen( # pylint: disable=consider-using-with # nosec args=[self._aea_bin, "run"], cwd=str(working_dir / "agent"), - #stdout=subprocess.STDOUT, - #stderr=subprocess.STDOUT, env={**os.environ, **env}, creationflags=( 0x00000008 if platform.system() == "Windows" else 0 @@ -319,11 +305,12 @@ def _start_tendermint(self) -> None: ) @property - def _venv_dir(self): + def _venv_dir(self) -> Path: + """Get venv dir for aea.""" return self._work_directory / "venv" - def _setup_venv(self): - print("SETUP VENV", flush=True) + def _setup_venv(self) -> None: + """Perform venv setup, install deps.""" self._venv_dir.mkdir(exist_ok=True) venv_cli(args=[str(self._venv_dir)]) pbin = str(self._venv_dir / "bin" / "python") @@ -343,18 +330,25 @@ def _setup_venv(self): "requests", ], ) - print("SETUP VENV COMPLETE", flush=True) def _setup_agent(self) -> None: + """Prepare agent.""" self._setup_venv() super()._setup_agent() # Install agent dependencies - self._run_aea("-v", "debug", "install", "--timeout", "600", + self._run_aea( + "-v", + "debug", + "install", + "--timeout", + "600", cwd=self._work_directory / "agent", ) def _get_host_deployment_runner(build_dir: Path) -> BaseDeploymentRunner: + """Return depoyment runner according to running env.""" + deployment_runner: BaseDeploymentRunner if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"): deployment_runner = PyInstallerHostDeploymentRunner(build_dir) else: diff --git a/operate/services/service.py b/operate/services/service.py index 4bd03596..193ba756 100644 --- a/operate/services/service.py +++ b/operate/services/service.py @@ -21,18 +21,13 @@ import json import os -import platform import shutil -import signal import subprocess # nosec -import sys -import time import typing as t from copy import copy, deepcopy from dataclasses import dataclass from pathlib import Path -import psutil from aea.configurations.constants import ( DEFAULT_LEDGER, LEDGER, @@ -543,15 +538,16 @@ def _build_host(self, force: bool = True) -> None: ) except Exception as e: - shutil.rmtree(build) + if build.exists(): + shutil.rmtree(build) raise e # Mech price patch. agent_vars = json.loads(Path(build, "agent.json").read_text(encoding="utf-8")) if "SKILL_TRADER_ABCI_MODELS_PARAMS_ARGS_MECH_REQUEST_PRICE" in agent_vars: - agent_vars["SKILL_TRADER_ABCI_MODELS_PARAMS_ARGS_MECH_REQUEST_PRICE"] = ( - "10000000000000000" - ) + agent_vars[ + "SKILL_TRADER_ABCI_MODELS_PARAMS_ARGS_MECH_REQUEST_PRICE" + ] = "10000000000000000" Path(build, "agent.json").write_text( json.dumps(agent_vars, indent=4), encoding="utf-8",