From 1467d200316b19d05961da299f405cc0dbe43434 Mon Sep 17 00:00:00 2001 From: Shane Canon Date: Thu, 24 Oct 2024 16:09:45 -0700 Subject: [PATCH] Add nsenter support This adds an option to use nsenter for shared run support. In testing this can be significantly faster to start up especially when there are a high number of tasks per node. --- podman_hpc/nsenter.py | 99 ++++++++++++++++++++++++++++ podman_hpc/podman_hpc.py | 139 +++++++++++++++++++++++---------------- podman_hpc/siteconfig.py | 13 +++- 3 files changed, 190 insertions(+), 61 deletions(-) create mode 100644 podman_hpc/nsenter.py diff --git a/podman_hpc/nsenter.py b/podman_hpc/nsenter.py new file mode 100644 index 0000000..a822705 --- /dev/null +++ b/podman_hpc/nsenter.py @@ -0,0 +1,99 @@ +from subprocess import Popen, PIPE +import json +import os +import time + + +""" +This provides a method to use nsenter to spawn the +mpi tasks. +""" + + +ns2flag = { + 'cgroup': '-C', + 'ipc': '-i', + 'mnt': '-m', + 'net': '-n', + 'pid': '-p', + 'time': '-T', + 'user': '-U', + 'uts': '-u', + } + + +def get_env(pid, conf): + """ + Construct the environment for the exec command + """ + # Gather the environment from the run command + with open(f"/proc/{pid}/environ") as f: + data = f.read().split('\x00')[0:-1] + new_env = dict() + for e in data: + k, v = e.split("=", maxsplit=1) + new_env[k] = v + next = False + + # Find any environments that should be + # passed + for arg in conf.shared_run_exec_args: + # Find environment flags + if arg == "-e": + next = True + continue + if not next: + continue + next = False + if arg.endswith('*'): + patt = arg[0:-1] + for env in os.environ: + if env.startswith(patt): + new_env[env] = os.environ[env] + elif '=' in arg: + k, v = arg.split("=", maxsplit=1) + new_env[k] = v + else: + new_env[arg] = os.environ[k] + return new_env + + +def nsenter(conf, timer, args): + """ + Run a command and ignore the output. + Returns the exit code + """ + + cmd = ["lsns", "-J"] + shared_run_command = " ".join(conf.shared_run_command) + pid = None + + while not pid: + timer.check() + proc = Popen(cmd, stdout=PIPE, stderr=PIPE) + out, err = proc.communicate() + data = None + try: + data = json.loads(out.decode()) + except json.JSONDecodeError: + time.sleep(conf.wait_poll_interval) + continue + for proc in data['namespaces']: + if proc['command'] == shared_run_command: + pid = proc['pid'] + if not pid: + time.sleep(conf.wait_poll_interval) + continue + cmd = ["/usr/bin/nsenter", + '-t', str(pid), '-U', + "--preserve-credentials" + ] + for ns in data['namespaces']: + if ns['pid'] == pid: + cmd.append(ns2flag[ns['type']]) + + cmd.extend(args) + new_env = get_env(pid, conf) + proc = Popen(cmd, env=new_env) + proc.communicate() + return proc.returncode diff --git a/podman_hpc/podman_hpc.py b/podman_hpc/podman_hpc.py index 72fba94..1c079a9 100755 --- a/podman_hpc/podman_hpc.py +++ b/podman_hpc/podman_hpc.py @@ -12,6 +12,7 @@ from .siteconfig import SiteConfig from multiprocessing import Process from subprocess import Popen, PIPE +from .nsenter import nsenter def podman_devnull(cmd, conf): @@ -47,6 +48,50 @@ def pmi_fd(): return ["--preserve-fds", "1"] +class Timer(): + def __init__(self, conf, run_thread): + start_time = time.time() + self.end_time = start_time + conf.wait_timeout + self.run_thread = run_thread + + def check(self): + if time.time() > self.end_time: + msg = "Timeout waiting for shared-run start" + raise OSError(msg) + elif self.run_thread and self.run_thread.exitcode: + raise OSError("Failed to start container") + + +def _podman_exec(conf, timer, cli_opts, + options, container_cmd): + # wait for container to exist + comm = ["container", "exists", conf.container_name] + time.sleep(conf.wait_poll_interval) + while podman_devnull(comm, conf) != 0: + time.sleep(conf.wait_poll_interval) + timer.check() + comm = ["wait", "--condition", "running", conf.container_name] + podman_devnull(comm, conf) + exec_cmd = [ + conf.podman_bin, + "exec", + ] + exec_cmd.extend(cli_opts) + exec_cmd.extend(pmi_fd()) + exec_cmd.extend(conf.shared_run_exec_args) + exec_cmd.extend( + cpt.filterValidOptions(options, [conf.podman_bin, "exec", "--help"]) + ) + exec_cmd.extend([conf.container_name] + list(container_cmd)) + fds = [0, 1, 2] + if 'PMI_FD' in os.environ: + fds.append(int(os.environ['PMI_FD'])) + conf.env["PMI_FD"] = os.environ["PMI_FD"] + proc = Popen(exec_cmd, env=conf.env, pass_fds=fds) + proc.communicate() + return proc.returncode + + # function to specify help message formatting to mimic the podman help page. # follows the style of click.Command.format_help() # this will be inherited by subcommands created with @podhpc.command() @@ -90,6 +135,9 @@ def podman_format(self, ctx, formatter): invoke_without_command=True, ) @click.pass_context +@click.option( + "--nsenter", is_flag=True, help="Use nsenter for shared run", default=False +) @click.option( "--additional-stores", type=str, help="Specify other storage locations" ) @@ -98,8 +146,8 @@ def podman_format(self, ctx, formatter): type=str, help="Specify alternate squash directory location", ) -@click.option("--log-level", type=str, default="fatal", hidden=True) -def podhpc(ctx, additional_stores, squash_dir, log_level): +@click.option("--log-level", type=str, default="error", hidden=True) +def podhpc(ctx, nsenter, additional_stores, squash_dir, log_level): """Manage pods, containers and images ... on HPC! The podman-hpc utility is a wrapper script around the podman @@ -114,7 +162,9 @@ def podhpc(ctx, additional_stores, squash_dir, log_level): # set up site configuration object try: - conf = SiteConfig(squash_dir=squash_dir, log_level=log_level) + conf = SiteConfig(squash_dir=squash_dir, + log_level=log_level, + nsenter=nsenter) except Exception as ex: sys.stderr.write(f"Error: {ex}... Exiting\n") sys.exit(1) @@ -171,6 +221,7 @@ def rmsqi(siteconf, image): mu = MigrateUtils(conf=siteconf) mu.remove_image(image) + # podman-hpc images subcommand ############################################# @pass_siteconf @click.pass_context @@ -181,6 +232,7 @@ def images(ctx, siteconf, image, podman_args, **site_opts): cmd.extend(podman_args) cmd.extend(siteconf.get_cmd_extensions("images", site_opts)) + # podman-hpc pull subcommand (modified) #################################### @podhpc.command( context_settings=dict( @@ -208,6 +260,7 @@ def pull(ctx, siteconf, image, podman_args, **site_opts): sys.stderr.write("Pull failed.\n") sys.exit(proc.returncode) + # podman-hpc shared-run subcommand ######################################### @podhpc.command( context_settings=dict( @@ -242,18 +295,15 @@ def shared_run(conf, run_args, **site_opts): # click.echo(f"Launching a shared-run with args: {sys.argv}") _shared_run(conf, run_args, **site_opts) + def _shared_run(conf, run_args, **site_opts): """ Internal function for the shared_run. This is so we can also call it when the user does run but enabled a module - that has shared_run set to True. + that has shared_run set to True. """ localid = os.environ.get(conf.localid_var) - ntasks_raw = os.environ.get(conf.tasks_per_node_var, "1") - ntasks = int(re.search(conf.ntasks_pattern, ntasks_raw)[0]) - container_name = f"uid-{os.getuid()}-pid-{os.getppid()}" - sock_name = f"/tmp/uid-{os.getuid()}-pid-{os.getppid()}" # construct run and exec commands from user options # We need to filter out any run args in the run_args @@ -273,7 +323,8 @@ def _shared_run(conf, run_args, **site_opts): sys.argv.index("shared-run") + 1: sys.argv.index(image) ] - run_cmd = [conf.podman_bin, "run", "--rm", "-d", "--name", container_name] + run_cmd = [conf.podman_bin, "run", "--rm", "-d", "--name", + conf.container_name] run_cmd.extend( cpt.filterValidOptions(options, [conf.podman_bin, "run", "--help"]) ) @@ -281,51 +332,25 @@ def _shared_run(conf, run_args, **site_opts): run_cmd.append(image) run_cmd.extend(conf.shared_run_command) - exec_cmd = [ - conf.podman_bin, - "exec", - ] - exec_cmd.extend(conf.get_cmd_extensions("exec", site_opts)) - exec_cmd.extend(pmi_fd()) - exec_cmd.extend(conf.shared_run_exec_args) - exec_cmd.extend( - cpt.filterValidOptions(options, [conf.podman_bin, "exec", "--help"]) - ) - exec_cmd.extend([container_name] + list(container_cmd)) - # click.echo(f"run_cmd is: {run_cmd}") - # click.echo(f"exec_cmd is: {exec_cmd}") - # Start monitor and run threads monitor_thread = None run_thread = None - proc = None if (localid is None or int(localid) == 0): - monitor_thread = Process(target=monitor, args=(sock_name, ntasks, - container_name, conf)) + monitor_thread = Process(target=monitor, args=(conf,)) monitor_thread.start() run_thread = Process(target=shared_run_exec, args=(run_cmd, conf.env)) run_thread.start() - + timer = Timer(conf, run_thread) + cli_opts = conf.get_cmd_extensions("exec", site_opts) try: - # wait for container to exist - comm = ["container", "exists", container_name] - start_time = time.time() - while podman_devnull(comm, conf) != 0: - time.sleep(conf.wait_poll_interval) - if time.time() - start_time > conf.wait_timeout: - msg = "Timeout waiting for shared-run start" - raise OSError(msg) - if run_thread and run_thread.exitcode: - raise OSError("Failed to start container") - comm = ["wait", "--condition", "running", container_name] - podman_devnull(comm, conf) - fds = [0, 1, 2] - if 'PMI_FD' in os.environ: - fds.append(int(os.environ['PMI_FD'])) - conf.env["PMI_FD"] = os.environ["PMI_FD"] - proc = Popen(exec_cmd, env=conf.env, pass_fds=fds) - proc.communicate() - send_complete(sock_name, localid) + exit_code = 1 + if conf.nsenter: + exit_code = nsenter(conf, timer, container_cmd) + else: + exit_code = _podman_exec(conf, timer, cli_opts, + options, container_cmd) + + send_complete(conf.sock_name, localid) # Close out threads if monitor_thread: monitor_thread.join() @@ -338,12 +363,9 @@ def _shared_run(conf, run_args, **site_opts): monitor_thread.kill() if run_thread: run_thread.kill() - if os.path.exists(sock_name): - os.remove(sock_name) + if os.path.exists(conf.sock_name): + os.remove(conf.sock_name) finally: - exit_code = 1 - if proc: - exit_code = proc.returncode sys.exit(exit_code) @@ -404,27 +426,28 @@ def shared_run_exec(run_cmd, env): out, err = proc.communicate() if proc.returncode != 0: sys.stderr.write(err.decode()) + sys.stderr.write(out.decode()) -def monitor(sockfile, ntasks, container_name, conf): +def monitor(conf): s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: - os.remove(sockfile) + os.remove(conf.sock_name) except OSError: pass - s.bind(sockfile) + s.bind(conf.sock_name) ct = 0 while True: s.listen() conn, addr = s.accept() ct += 1 - if ct == ntasks: + if ct == conf.ntasks: break conn.close() - os.remove(sockfile) + os.remove(conf.sock_name) # cleanup - podman_devnull(["kill", container_name], conf) - podman_devnull(["rm", container_name], conf) + podman_devnull(["kill", conf.container_name], conf) + podman_devnull(["rm", conf.container_name], conf) def send_complete(sockfile, lid): diff --git a/podman_hpc/siteconfig.py b/podman_hpc/siteconfig.py index 918faa5..a946158 100755 --- a/podman_hpc/siteconfig.py +++ b/podman_hpc/siteconfig.py @@ -66,7 +66,7 @@ class SiteConfig: shared_run = False source = dict() - def __init__(self, squash_dir=None, log_level=None): + def __init__(self, squash_dir=None, log_level=None, nsenter=False): # getlogin may fail on a compute node try: @@ -81,6 +81,7 @@ def __init__(self, squash_dir=None, log_level=None): except OSError: self.runtime = self.trywhich("runc") # self.options = [] + self.nsenter = nsenter if squash_dir: self.squash_dir = squash_dir self.conf_file_data = {} @@ -90,6 +91,12 @@ def __init__(self, squash_dir=None, log_level=None): for param in self._valid_params: self._check_and_set(param) + # These are used for shared run support + ntasks_raw = os.environ.get(self.tasks_per_node_var, "1") + self.ntasks = int(re.search(self.ntasks_pattern, ntasks_raw)[0]) + self.sock_name = f"/tmp/uid-{os.getuid()}-pid-{os.getppid()}" + self.container_name = f"uid-{os.getuid()}-pid-{os.getppid()}" + self.read_site_modules() if isinstance(self.wait_poll_interval, str): @@ -108,7 +115,7 @@ def __init__(self, squash_dir=None, log_level=None): ] self.default_run_args = [ "--storage-opt", - "ignore_chown_errors=true", + "ignore_chown_errors=true", "--storage-opt", f"additionalimagestore={self.additionalimagestore()}", "--hooks-dir", self.hooks_dir, @@ -135,7 +142,7 @@ def __init__(self, squash_dir=None, log_level=None): self.default_build_args = [] self.default_pull_args = [] self.default_images_args = [] - + self.log_level = log_level def dump_config(self):