From a27766176cee69f3c3a8d5418eda725ca3b763dc Mon Sep 17 00:00:00 2001 From: hugsy Date: Sun, 3 Nov 2024 11:45:36 -0800 Subject: [PATCH 1/4] checkpoint: added new remote modes, gdbserver & gdbserver-multi work --- gef.py | 194 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 122 insertions(+), 72 deletions(-) diff --git a/gef.py b/gef.py index 08bed01fd..41cb61da6 100644 --- a/gef.py +++ b/gef.py @@ -273,11 +273,13 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return wrapper -class ValidationError(Exception): pass # # Helpers # +class ValidationError(Exception): pass + +class InitializationError(Exception): pass class ObsoleteException(Exception): pass @@ -347,10 +349,10 @@ def is_alive() -> bool: return False -def calling_function() -> str | None: +def calling_function(frame: int = 3) -> str | None: """Return the name of the calling function""" try: - stack_info = traceback.extract_stack()[-3] + stack_info = traceback.extract_stack()[-frame] return stack_info.name except Exception as e: dbg(f"traceback failed with {str(e)}") @@ -3512,30 +3514,57 @@ def get_os() -> str: return gef.session.os -@lru_cache() -def is_qemu() -> bool: - if not is_remote_debug(): +def is_target_remote(conn: gdb.TargetConnection | None = None) -> bool: + "Returns True for `extended-remote` only." + _conn = conn or gdb.selected_inferior().connection + return isinstance(_conn, gdb.RemoteTargetConnection) and _conn.type == "remote" + + +def is_target_extended_remote(conn: gdb.TargetConnection | None = None) -> bool: + "Returns True for `extended-remote` only." + _conn = conn or gdb.selected_inferior().connection + return isinstance(_conn, gdb.RemoteTargetConnection) and _conn.type == "extended-remote" + + +def is_target_remote_or_extended(conn: gdb.TargetConnection | None = None) -> bool: + return is_target_remote(conn) or is_target_extended_remote(conn) + + +def is_running_under_qemu() -> bool: + "See https://www.qemu.org/docs/master/system/gdb.html " + if not is_target_remote(): return False response = gdb.execute("maintenance packet Qqemu.sstepbits", to_string=True, from_tty=False) or "" return "ENABLE=" in response -@lru_cache() -def is_qemu_usermode() -> bool: - if not is_qemu(): +def is_running_under_qemu_user() -> bool: + if not is_running_under_qemu(): return False - response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" + response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" # Use `qAttached`? return "Text=" in response -@lru_cache() -def is_qemu_system() -> bool: - if not is_qemu(): +def is_running_under_qemu_system() -> bool: + if not is_running_under_qemu(): return False + # Use "maintenance packet qqemu.PhyMemMode"? response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" return "received: \"\"" in response +def is_running_in_gdbserver() -> bool: + if is_running_under_qemu(): + return False + return not is_running_under_qemu() + + +def is_running_in_rr() -> bool: + if not is_running_in_gdbserver(): + return False + return os.environ.get("GDB_UNDER_RR", None) == "1" + + def get_filepath() -> str | None: """Return the local absolute path of the file currently debugged.""" if gef.session.remote: @@ -3960,6 +3989,7 @@ def is_in_x86_kernel(address: int) -> bool: return (address >> memalign) == 0xF +@deprecated("Use `is_target_remote()`") def is_remote_debug() -> bool: """"Return True is the current debugging session is running through GDB remote session.""" return gef.session.remote_initializing or gef.session.remote is not None @@ -6294,7 +6324,10 @@ def do_invoke(self, _: list[str], **kwargs: Any) -> None: # calls `is_remote_debug` which checks if `remote_initializing` is True or `.remote` is None # This prevents some spurious errors being thrown during startup gef.session.remote_initializing = True - session = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary) + # session = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary) + conn = gdb.selected_inferior().connection + assert isinstance(conn, gdb.RemoteTargetConnection) + session = GefRemoteSessionManager(conn) dbg(f"[remote] initializing remote session with {session.target} under {session.root}") if not session.connect(args.pid) or not session.setup(): @@ -7414,7 +7447,7 @@ def __init__(self) -> None: def do_invoke(self, _: list[str], **kwargs: Any) -> None: args : argparse.Namespace = kwargs["arguments"] - if is_qemu_system(): + if is_running_under_qemu_system(): err("Unsupported") return @@ -11312,7 +11345,7 @@ def __repr__(self) -> str: def auxiliary_vector(self) -> dict[str, int] | None: if not is_alive(): return None - if is_qemu_system(): + if is_running_under_qemu_system(): return None if not self._auxiliary_vector: auxiliary_vector = {} @@ -11431,6 +11464,9 @@ class RemoteMode(enum.IntEnum): GDBSERVER = 0 QEMU = 1 RR = 2 + GDBSERVER_MULTI = 3 + QEMU_USER = 4 + QEMU_SYSTEM = 5 def __str__(self): return self.name @@ -11440,30 +11476,48 @@ def __repr__(self): def prompt_string(self) -> str: match self: - case GefRemoteSessionManager.RemoteMode.QEMU: + case ( + GefRemoteSessionManager.RemoteMode.QEMU | + GefRemoteSessionManager.RemoteMode.QEMU_USER | + GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM + ): return Color.boldify("(qemu) ") case GefRemoteSessionManager.RemoteMode.RR: return Color.boldify("(rr) ") - case GefRemoteSessionManager.RemoteMode.GDBSERVER: + case ( + GefRemoteSessionManager.RemoteMode.GDBSERVER | + GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI + ): return Color.boldify("(remote) ") raise AttributeError("Unknown value") - def __init__(self, host: str, port: int, pid: int =-1, qemu: pathlib.Path | None = None) -> None: + @staticmethod + def init() -> "GefRemoteSessionManager.RemoteMode": + if is_running_under_qemu_system(): + return GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM + if is_running_under_qemu_user(): + return GefRemoteSessionManager.RemoteMode.QEMU_USER + if is_running_in_rr(): + return GefRemoteSessionManager.RemoteMode.RR + if is_running_in_gdbserver(): + if is_target_extended_remote(): + return GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI + return GefRemoteSessionManager.RemoteMode.GDBSERVER + raise AttributeError + + def __init__(self, conn: gdb.RemoteTargetConnection) -> None: super().__init__() - self.__host = host - self.__port = port + assert is_target_remote_or_extended() + remote_host = conn.details + assert remote_host + host, port = remote_host.split(":", 1) + self.__host = host or "localhost" + self.__port = int(port) self.__local_root_fd = tempfile.TemporaryDirectory() self.__local_root_path = pathlib.Path(self.__local_root_fd.name) - self.__qemu = qemu - if pid > 0: - self._pid = pid + self._mode = GefRemoteSessionManager.RemoteMode.init() - if self.__qemu is not None: - self._mode = GefRemoteSessionManager.RemoteMode.QEMU - elif os.environ.get("GDB_UNDER_RR", None) == "1": - self._mode = GefRemoteSessionManager.RemoteMode.RR - else: - self._mode = GefRemoteSessionManager.RemoteMode.GDBSERVER + self.setup() def close(self) -> None: self.__local_root_fd.cleanup() @@ -11475,7 +11529,11 @@ def close(self) -> None: raise def __str__(self) -> str: - return f"RemoteSession(target='{self.target}', local='{self.root}', pid={self.pid}, mode={self.mode})" + msg = f"RemoteSession(target='{self.target}', local='{self.root}', mode={self.mode}" + if self.mode == GefRemoteSessionManager.RemoteMode.GDBSERVER: + msg += f", pid={self.pid}" + msg += ")" + return msg def __repr__(self) -> str: return str(self) @@ -11561,16 +11619,18 @@ def connect(self, pid: int) -> bool: def setup(self) -> bool: # setup remote adequately depending on remote or qemu mode + dbg(f"Setting up the {self._mode} session") match self.mode: - case GefRemoteSessionManager.RemoteMode.QEMU: - dbg(f"Setting up as qemu session, target={self.__qemu}") - self.__setup_qemu() + case GefRemoteSessionManager.RemoteMode.QEMU_USER: + self.__setup_qemu_user() + case GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM: + raise Exception("TODO") case GefRemoteSessionManager.RemoteMode.RR: - dbg("Setting up as rr session") self.__setup_rr() case GefRemoteSessionManager.RemoteMode.GDBSERVER: - dbg("Setting up as remote session") self.__setup_remote() + case GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI: + pass case _: raise ValueError @@ -11580,13 +11640,13 @@ def setup(self) -> bool: reset_architecture() return True - def __setup_qemu(self) -> bool: + def __setup_qemu_user(self) -> bool: # setup emulated file in the chroot - assert self.__qemu - target = self.root / str(self.__qemu.parent).lstrip("/") + __qemu_target = pathlib.Path("foo") # TODO + target = self.root / str(__qemu_target.parent).lstrip("/") target.mkdir(parents=True, exist_ok=False) - shutil.copy2(self.__qemu, target) - self._file = self.__qemu + shutil.copy2(__qemu_target, target) + self._file = __qemu_target assert self.lfile.exists() # create a procfs @@ -11767,12 +11827,20 @@ def reset_caches(self) -> None: def target_remote_posthook(): - if gef.session.remote_initializing: - return + print(f"{is_target_remote()=}") + print(f"{is_target_remote_or_extended()=}") + print(f"{is_target_extended_remote()=}") + print(f"{is_running_under_qemu()=}") + print(f"{is_running_under_qemu_system()=}") + print(f"{is_running_under_qemu_user()=}") + print(f"{is_running_in_gdbserver()=}") + print(f"{is_running_in_rr()=}") + conn = gdb.selected_inferior().connection + if not isinstance(conn, gdb.RemoteTargetConnection): + raise TypeError("Expected type gdb.RemoteTargetConnection") + assert is_target_remote_or_extended(conn), "Target is not remote" + gef.session.remote = GefRemoteSessionManager(conn) - gef.session.remote = GefRemoteSessionManager("", 0) - if not gef.session.remote.setup(): - raise EnvironmentError(f"Failed to create a proper environment for {gef.session.remote}") if __name__ == "__main__": if sys.version_info[0] == 2: @@ -11835,32 +11903,14 @@ def target_remote_posthook(): GefTmuxSetup() - if GDB_VERSION > (9, 0): - disable_tr_overwrite_setting = "gef.disable_target_remote_overwrite" - - if not gef.config[disable_tr_overwrite_setting]: - warnmsg = ("Using `target remote` with GEF should work in most cases, " - "but use `gef-remote` if you can. You can disable the " - "overwrite of the `target remote` command by toggling " - f"`{disable_tr_overwrite_setting}` in the config.") - hook = f""" - define target hookpost-{{}} - pi target_remote_posthook() - context - pi if calling_function() != "connect": warn("{warnmsg}") - end - """ - - # Register a post-hook for `target remote` that initialize the remote session - gdb.execute(hook.format("remote")) - gdb.execute(hook.format("extended-remote")) - else: - errmsg = ("Using `target remote` does not work, use `gef-remote` " - f"instead. You can toggle `{disable_tr_overwrite_setting}` " - "if this is not desired.") - hook = f"""pi if calling_function() != "connect": err("{errmsg}")""" - gdb.execute(f"define target hook-remote\n{hook}\nend") - gdb.execute(f"define target hook-extended-remote\n{hook}\nend") + # Initialize `target *remote` post hooks + hook = """ + define target hookpost-{0} + pi target_remote_posthook() + end + """ + gdb.execute(hook.format("remote")) + gdb.execute(hook.format("extended-remote")) # restore saved breakpoints (if any) bkp_fpath = pathlib.Path(gef.config["gef.autosave_breakpoints_file"]).expanduser().absolute() From 6d166f0a088f0098c8703e042ee0ec7af64b6311 Mon Sep 17 00:00:00 2001 From: hugsy Date: Sun, 3 Nov 2024 18:24:41 -0800 Subject: [PATCH 2/4] checkpoint: added very basic pytests --- gef.py | 57 +++++++++++++++------------ tests/api/gef_remote.py | 87 +++++++++++++++++++++++++++++++++++++++++ tests/base.py | 4 +- tests/utils.py | 30 ++++++++++++++ 4 files changed, 150 insertions(+), 28 deletions(-) create mode 100644 tests/api/gef_remote.py diff --git a/gef.py b/gef.py index 41cb61da6..4799c5e69 100644 --- a/gef.py +++ b/gef.py @@ -3530,7 +3530,7 @@ def is_target_remote_or_extended(conn: gdb.TargetConnection | None = None) -> bo return is_target_remote(conn) or is_target_extended_remote(conn) -def is_running_under_qemu() -> bool: +def is_running_in_qemu() -> bool: "See https://www.qemu.org/docs/master/system/gdb.html " if not is_target_remote(): return False @@ -3538,15 +3538,15 @@ def is_running_under_qemu() -> bool: return "ENABLE=" in response -def is_running_under_qemu_user() -> bool: - if not is_running_under_qemu(): +def is_running_in_qemu_user() -> bool: + if not is_running_in_qemu(): return False response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" # Use `qAttached`? return "Text=" in response -def is_running_under_qemu_system() -> bool: - if not is_running_under_qemu(): +def is_running_in_qemu_system() -> bool: + if not is_running_in_qemu(): return False # Use "maintenance packet qqemu.PhyMemMode"? response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" @@ -3554,9 +3554,7 @@ def is_running_under_qemu_system() -> bool: def is_running_in_gdbserver() -> bool: - if is_running_under_qemu(): - return False - return not is_running_under_qemu() + return not is_running_in_qemu() def is_running_in_rr() -> bool: @@ -7447,7 +7445,7 @@ def __init__(self) -> None: def do_invoke(self, _: list[str], **kwargs: Any) -> None: args : argparse.Namespace = kwargs["arguments"] - if is_running_under_qemu_system(): + if is_running_in_qemu_system(): err("Unsupported") return @@ -11345,7 +11343,7 @@ def __repr__(self) -> str: def auxiliary_vector(self) -> dict[str, int] | None: if not is_alive(): return None - if is_running_under_qemu_system(): + if is_running_in_qemu_system(): return None if not self._auxiliary_vector: auxiliary_vector = {} @@ -11493,9 +11491,9 @@ def prompt_string(self) -> str: @staticmethod def init() -> "GefRemoteSessionManager.RemoteMode": - if is_running_under_qemu_system(): + if is_running_in_qemu_system(): return GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM - if is_running_under_qemu_user(): + if is_running_in_qemu_user(): return GefRemoteSessionManager.RemoteMode.QEMU_USER if is_running_in_rr(): return GefRemoteSessionManager.RemoteMode.RR @@ -11619,7 +11617,7 @@ def connect(self, pid: int) -> bool: def setup(self) -> bool: # setup remote adequately depending on remote or qemu mode - dbg(f"Setting up the {self._mode} session") + info(f"Setting up remote session as '{self._mode}'") match self.mode: case GefRemoteSessionManager.RemoteMode.QEMU_USER: self.__setup_qemu_user() @@ -11826,21 +11824,24 @@ def reset_caches(self) -> None: return +def target_remote_hook(): + # disable the context until the session has been fully established + gef.config["context.enable"] = False + + def target_remote_posthook(): - print(f"{is_target_remote()=}") - print(f"{is_target_remote_or_extended()=}") - print(f"{is_target_extended_remote()=}") - print(f"{is_running_under_qemu()=}") - print(f"{is_running_under_qemu_system()=}") - print(f"{is_running_under_qemu_user()=}") - print(f"{is_running_in_gdbserver()=}") - print(f"{is_running_in_rr()=}") conn = gdb.selected_inferior().connection if not isinstance(conn, gdb.RemoteTargetConnection): raise TypeError("Expected type gdb.RemoteTargetConnection") assert is_target_remote_or_extended(conn), "Target is not remote" gef.session.remote = GefRemoteSessionManager(conn) + # re-enable context + gef.config["context.enable"] = True + + # if here, no exception was thrown, print context + gdb.execute("context") + if __name__ == "__main__": if sys.version_info[0] == 2: @@ -11903,14 +11904,18 @@ def target_remote_posthook(): GefTmuxSetup() - # Initialize `target *remote` post hooks + # Initialize `target *remote` pre/post hooks hook = """ - define target hookpost-{0} - pi target_remote_posthook() + define target hook{1}-{0} + pi target_remote_{1}hook() end """ - gdb.execute(hook.format("remote")) - gdb.execute(hook.format("extended-remote")) + # pre-hooks + gdb.execute(hook.format("remote", "")) + gdb.execute(hook.format("extended-remote", "")) + # post-hooks + gdb.execute(hook.format("remote", "post")) + gdb.execute(hook.format("extended-remote", "post")) # restore saved breakpoints (if any) bkp_fpath = pathlib.Path(gef.config["gef.autosave_breakpoints_file"]).expanduser().absolute() diff --git a/tests/api/gef_remote.py b/tests/api/gef_remote.py new file mode 100644 index 000000000..fceb96546 --- /dev/null +++ b/tests/api/gef_remote.py @@ -0,0 +1,87 @@ +""" +`target remote/extended-remote` test module. +""" + + +from tests.base import RemoteGefUnitTestGeneric +from tests.utils import debug_target, gdbserver_session, gdbserver_multi_session, get_random_port, qemuuser_session + + +class GefRemoteApi(RemoteGefUnitTestGeneric): + + def setUp(self) -> None: + self._target = debug_target("default") + return super().setUp() + + def test_gef_remote_test_gdbserver(self): + """Test `gdbserver file`""" + _gdb = self._gdb + _root = self._conn.root + port = get_random_port() + + with gdbserver_session(port=port): + assert not _root.eval("is_target_remote()") + assert not _root.eval("is_target_remote_or_extended()") + assert not _root.eval("is_running_in_gdbserver()") + + _gdb.execute(f"target remote :{port}") + + assert _root.eval("is_target_remote()") + assert _root.eval("is_target_remote_or_extended()") + assert _root.eval("is_running_in_gdbserver()") + + assert not _root.eval("is_target_extended_remote()") + assert not _root.eval("is_running_under_qemu()") + assert not _root.eval("is_running_under_qemu_system()") + assert not _root.eval("is_running_under_qemu_user()") + assert not _root.eval("is_running_in_rr()") + + def test_gef_remote_test_gdbserver_multi(self): + """Test `gdbserver --multi file`""" + _gdb = self._gdb + _root = self._conn.root + port = get_random_port() + + with gdbserver_multi_session(port=port): + assert not _root.eval("is_target_remote()") + assert not _root.eval("is_target_remote_or_extended()") + assert not _root.eval("is_running_in_gdbserver()") + + _gdb.execute(f"target extended-remote :{port}") + + assert _root.eval("is_target_remote()") + assert _root.eval("is_target_remote_or_extended()") + assert _root.eval("is_target_extended_remote()") + assert _root.eval("is_running_in_gdbserver()") + + assert not _root.eval("is_running_under_qemu()") + assert not _root.eval("is_running_under_qemu_system()") + assert not _root.eval("is_running_under_qemu_user()") + assert not _root.eval("is_running_in_rr()") + + def test_gef_remote_test_qemuuser(self): + """Test `qemu-user -g`""" + _gdb = self._gdb + _root = self._conn.root + port = get_random_port() + + with qemuuser_session(port=port): + assert not _root.eval("is_target_remote()") + assert not _root.eval("is_target_remote_or_extended()") + assert not _root.eval("is_running_in_gdbserver()") + + _gdb.execute(f"target remote :{port}") + + assert _root.eval("is_target_remote()") + assert _root.eval("is_target_remote_or_extended()") + assert _root.eval("is_running_under_qemu()") + assert _root.eval("is_running_under_qemu_user()") + + assert not _root.eval("is_target_extended_remote()") + assert not _root.eval("is_running_under_qemu_system()") + assert not _root.eval("is_running_in_gdbserver()") + assert not _root.eval("is_running_in_rr()") + + # TODO add tests for + # - [ ] qemu-system + # - [ ] rr diff --git a/tests/base.py b/tests/base.py index 868923d3a..e1584eaf7 100644 --- a/tests/base.py +++ b/tests/base.py @@ -10,7 +10,7 @@ import rpyc -from .utils import debug_target +from .utils import debug_target, get_random_port COVERAGE_DIR = os.getenv("COVERAGE_DIR", "") GEF_PATH = pathlib.Path(os.getenv("GEF_PATH", "gef.py")).absolute() @@ -58,7 +58,7 @@ def __setup(self): # # Select a random tcp port for rpyc # - self._port = random.randint(1025, 65535) + self._port = get_random_port() self._commands = "" if COVERAGE_DIR: diff --git a/tests/utils.py b/tests/utils.py index f88d3b304..8b9247758 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,6 +12,7 @@ import subprocess import tempfile import time +import random from typing import Iterable, List, Optional, Union from urllib.request import urlopen @@ -112,6 +113,13 @@ def start_gdbserver( logging.debug(f"Starting {cmd}") return subprocess.Popen(cmd) +def start_gdbserver_multi( + host: str = GDBSERVER_DEFAULT_HOST, + port: int = GDBSERVER_DEFAULT_PORT, +) -> subprocess.Popen: + cmd = [GDBSERVER_BINARY, "--multi", f"{host}:{port}"] + logging.debug(f"Starting {cmd}") + return subprocess.Popen(cmd) def stop_gdbserver(gdbserver: subprocess.Popen) -> None: """Stop the gdbserver and wait until it is terminated if it was @@ -138,6 +146,17 @@ def gdbserver_session( finally: stop_gdbserver(sess) +@contextlib.contextmanager +def gdbserver_multi_session( + port: int = GDBSERVER_DEFAULT_PORT, + host: str = GDBSERVER_DEFAULT_HOST, +): + sess = start_gdbserver_multi(host, port) + try: + time.sleep(1) # forced delay to allow gdbserver to start listening + yield sess + finally: + stop_gdbserver(sess) def start_qemuuser( exe: Union[str, pathlib.Path] = debug_target("default"), @@ -301,3 +320,14 @@ def p32(x: int) -> bytes: def p64(x: int) -> bytes: return struct.pack(" int: + global __available_ports + if len(__available_ports) < 2: + __available_ports = list( range(1024, 65535) ) + idx = random.choice(range(len(__available_ports))) + port = __available_ports[idx] + __available_ports.pop(idx) + return port From 771a598f649e8162477902759dbc968cc079c2e2 Mon Sep 17 00:00:00 2001 From: hugsy Date: Mon, 4 Nov 2024 17:19:29 -0800 Subject: [PATCH 3/4] removed all obsolete code --- gef.py | 128 +++++------------------------------ tests/api/gef_remote.py | 41 ++++++++--- tests/commands/gef_remote.py | 45 +++++------- tests/utils.py | 7 +- 4 files changed, 70 insertions(+), 151 deletions(-) diff --git a/gef.py b/gef.py index 4799c5e69..092257cff 100644 --- a/gef.py +++ b/gef.py @@ -3554,13 +3554,12 @@ def is_running_in_qemu_system() -> bool: def is_running_in_gdbserver() -> bool: - return not is_running_in_qemu() + return is_target_remote_or_extended() and not is_running_in_qemu() def is_running_in_rr() -> bool: - if not is_running_in_gdbserver(): - return False - return os.environ.get("GDB_UNDER_RR", None) == "1" + return is_running_in_gdbserver() and \ + os.environ.get("GDB_UNDER_RR", None) == "1" def get_filepath() -> str | None: @@ -3774,7 +3773,7 @@ def exit_handler(_: "gdb.ExitedEvent") -> None: with bkp_fpath.open("w") as fd: for bp in list(gdb.breakpoints()): - if not bp.enabled or not bp.is_valid: + if not bp.enabled or not bp.is_valid(): continue fd.write(f"{'t' if bp.temporary else ''}break {bp.location}\n") return @@ -11519,12 +11518,6 @@ def __init__(self, conn: gdb.RemoteTargetConnection) -> None: def close(self) -> None: self.__local_root_fd.cleanup() - try: - gef_on_new_unhook(self.remote_objfile_event_handler) - gef_on_new_hook(new_objfile_handler) - except Exception as e: - warn(f"Exception while restoring local context: {str(e)}") - raise def __str__(self) -> str: msg = f"RemoteSession(target='{self.target}', local='{self.root}', mode={self.mode}" @@ -11574,46 +11567,10 @@ def mode(self) -> RemoteMode: return self._mode def sync(self, src: str, dst: str | None = None) -> bool: - """Copy the `src` into the temporary chroot. If `dst` is provided, that path will be - used instead of `src`.""" - if not dst: - dst = src - tgt = self.root / dst.lstrip("/") - if tgt.exists(): - return True - tgt.parent.mkdir(parents=True, exist_ok=True) - dbg(f"[remote] downloading '{src}' -> '{tgt}'") - gdb.execute(f"remote get '{src}' '{tgt.absolute()}'") - return tgt.exists() + raise DeprecationWarning def connect(self, pid: int) -> bool: - """Connect to remote target. If in extended mode, also attach to the given PID.""" - # before anything, register our new hook to download files from the remote target - dbg("[remote] Installing new objfile handlers") - try: - gef_on_new_unhook(new_objfile_handler) - except SystemError: - # the default objfile handler might already have been removed, ignore failure - pass - - gef_on_new_hook(self.remote_objfile_event_handler) - - # then attempt to connect - is_extended_mode = (pid > -1) - dbg(f"[remote] Enabling extended remote: {bool(is_extended_mode)}") - try: - with DisableContextOutputContext(): - cmd = f"target {'extended-' if is_extended_mode else ''}remote {self.target}" - dbg(f"[remote] Executing '{cmd}'") - gdb.execute(cmd) - if is_extended_mode: - gdb.execute(f"attach {pid:d}") - return True - except Exception as e: - err(f"Failed to connect to {self.target}: {e}") - - # a failure will trigger the cleanup, deleting our hook anyway - return False + raise DeprecationWarning def setup(self) -> bool: # setup remote adequately depending on remote or qemu mode @@ -11622,13 +11579,13 @@ def setup(self) -> bool: case GefRemoteSessionManager.RemoteMode.QEMU_USER: self.__setup_qemu_user() case GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM: - raise Exception("TODO") + self.__setup_qemu_system() case GefRemoteSessionManager.RemoteMode.RR: self.__setup_rr() case GefRemoteSessionManager.RemoteMode.GDBSERVER: self.__setup_remote() case GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI: - pass + self.__setup_remote_multi() case _: raise ValueError @@ -11638,77 +11595,26 @@ def setup(self) -> bool: reset_architecture() return True + def __setup_qemu_system(self) -> bool: + raise Exception("TODO") + return True + def __setup_qemu_user(self) -> bool: - # setup emulated file in the chroot - __qemu_target = pathlib.Path("foo") # TODO - target = self.root / str(__qemu_target.parent).lstrip("/") - target.mkdir(parents=True, exist_ok=False) - shutil.copy2(__qemu_target, target) - self._file = __qemu_target - assert self.lfile.exists() - - # create a procfs - procfs = self.root / f"proc/{self.pid}/" - procfs.mkdir(parents=True, exist_ok=True) - - ## /proc/pid/cmdline - cmdline = procfs / "cmdline" - if not cmdline.exists(): - with cmdline.open("w") as fd: - fd.write("") - - ## /proc/pid/environ - environ = procfs / "environ" - if not environ.exists(): - with environ.open("wb") as fd: - fd.write(b"PATH=/bin\x00HOME=/tmp\x00") - - ## /proc/pid/maps - maps = procfs / "maps" - if not maps.exists(): - with maps.open("w") as fd: - fname = self.file.absolute() - mem_range = "00000000-ffffffff" if is_32bit() else "0000000000000000-ffffffffffffffff" - fd.write(f"{mem_range} rwxp 00000000 00:00 0 {fname}\n") + self.__local_root_path = pathlib.Path("/") return True def __setup_remote(self) -> bool: - # get the file - fpath = f"/proc/{self.pid}/exe" - if not self.sync(fpath, str(self.file)): - err(f"'{fpath}' could not be fetched on the remote system.") - return False - - # pseudo procfs - for _file in ("maps", "environ", "cmdline"): - fpath = f"/proc/{self.pid}/{_file}" - if not self.sync(fpath): - err(f"'{fpath}' could not be fetched on the remote system.") - return False + self.__local_root_path = pathlib.Path("/") + return True + def __setup_remote_multi(self) -> bool: + self.__local_root_path = pathlib.Path("/") return True def __setup_rr(self) -> bool: - # - # Simply override the local root path, the binary must exist - # on the host. - # self.__local_root_path = pathlib.Path("/") return True - def remote_objfile_event_handler(self, evt: "gdb.NewObjFileEvent") -> None: - dbg(f"[remote] in remote_objfile_handler({evt.new_objfile.filename if evt else 'None'}))") - if not evt or not evt.new_objfile.filename: - return - if not evt.new_objfile.filename.startswith("target:") and not evt.new_objfile.filename.startswith("/"): - warn(f"[remote] skipping '{evt.new_objfile.filename}'") - return - if evt.new_objfile.filename.startswith("target:"): - src: str = evt.new_objfile.filename[len("target:"):] - if not self.sync(src): - raise FileNotFoundError(f"Failed to sync '{src}'") - return - class GefUiManager(GefManager): """Class managing UI settings.""" diff --git a/tests/api/gef_remote.py b/tests/api/gef_remote.py index fceb96546..35e673f6a 100644 --- a/tests/api/gef_remote.py +++ b/tests/api/gef_remote.py @@ -16,6 +16,7 @@ def setUp(self) -> None: def test_gef_remote_test_gdbserver(self): """Test `gdbserver file`""" _gdb = self._gdb + _gef = self._gef _root = self._conn.root port = get_random_port() @@ -23,22 +24,30 @@ def test_gef_remote_test_gdbserver(self): assert not _root.eval("is_target_remote()") assert not _root.eval("is_target_remote_or_extended()") assert not _root.eval("is_running_in_gdbserver()") + assert not _root.eval("is_running_in_rr()") + assert not _root.eval("is_running_in_qemu()") _gdb.execute(f"target remote :{port}") assert _root.eval("is_target_remote()") assert _root.eval("is_target_remote_or_extended()") assert _root.eval("is_running_in_gdbserver()") + assert _root.eval("is_running_in_gdbserver()") assert not _root.eval("is_target_extended_remote()") - assert not _root.eval("is_running_under_qemu()") - assert not _root.eval("is_running_under_qemu_system()") - assert not _root.eval("is_running_under_qemu_user()") + assert not _root.eval("is_running_in_qemu()") + assert not _root.eval("is_running_in_qemu_system()") + assert not _root.eval("is_running_in_qemu_user()") assert not _root.eval("is_running_in_rr()") + assert hasattr(_gef.session, "remote") + assert "GDBSERVER" in str(_gef.session.remote) + assert "GDBSERVER_MULTI" not in str(_gef.session.remote) + def test_gef_remote_test_gdbserver_multi(self): """Test `gdbserver --multi file`""" _gdb = self._gdb + _gef = self._gef _root = self._conn.root port = get_random_port() @@ -46,22 +55,31 @@ def test_gef_remote_test_gdbserver_multi(self): assert not _root.eval("is_target_remote()") assert not _root.eval("is_target_remote_or_extended()") assert not _root.eval("is_running_in_gdbserver()") + assert not _root.eval("is_running_in_rr()") + assert not _root.eval("is_running_in_qemu()") _gdb.execute(f"target extended-remote :{port}") + _gdb.execute(f"set remote exec-file {self._target}") + _gdb.execute(f"file {self._target}") + _gdb.execute(f"start {self._target}") - assert _root.eval("is_target_remote()") assert _root.eval("is_target_remote_or_extended()") assert _root.eval("is_target_extended_remote()") assert _root.eval("is_running_in_gdbserver()") + assert not _root.eval("is_target_remote()") - assert not _root.eval("is_running_under_qemu()") - assert not _root.eval("is_running_under_qemu_system()") - assert not _root.eval("is_running_under_qemu_user()") + assert not _root.eval("is_running_in_qemu()") + assert not _root.eval("is_running_in_qemu_system()") + assert not _root.eval("is_running_in_qemu_user()") assert not _root.eval("is_running_in_rr()") + assert hasattr(_gef.session, "remote") + assert "GDBSERVER_MULTI" in str(_gef.session.remote) + def test_gef_remote_test_qemuuser(self): """Test `qemu-user -g`""" _gdb = self._gdb + _gef = self._gef _root = self._conn.root port = get_random_port() @@ -74,14 +92,17 @@ def test_gef_remote_test_qemuuser(self): assert _root.eval("is_target_remote()") assert _root.eval("is_target_remote_or_extended()") - assert _root.eval("is_running_under_qemu()") - assert _root.eval("is_running_under_qemu_user()") + assert _root.eval("is_running_in_qemu()") + assert _root.eval("is_running_in_qemu_user()") assert not _root.eval("is_target_extended_remote()") - assert not _root.eval("is_running_under_qemu_system()") + assert not _root.eval("is_running_in_qemu_system()") assert not _root.eval("is_running_in_gdbserver()") assert not _root.eval("is_running_in_rr()") + assert hasattr(_gef.session, "remote") + assert "QEMU_USER" in str(_gef.session.remote) + # TODO add tests for # - [ ] qemu-system # - [ ] rr diff --git a/tests/commands/gef_remote.py b/tests/commands/gef_remote.py index 34fd0ab60..e5e1e54b2 100644 --- a/tests/commands/gef_remote.py +++ b/tests/commands/gef_remote.py @@ -11,6 +11,7 @@ ARCH, debug_target, gdbserver_session, + get_random_port, qemuuser_session, GDBSERVER_DEFAULT_HOST, ) @@ -26,50 +27,40 @@ def setUp(self) -> None: def test_cmd_gef_remote_gdbserver(self): gdb = self._gdb gef = self._gef - root = self._conn.root + port = get_random_port() gdbserver_mode = "GDBSERVER" - while True: - port = random.randint(1025, 65535) - if port != self._port: - break with gdbserver_session(port=port): - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") - res: str = root.eval("str(gef.session.remote)") - assert res.startswith(f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/tmp/") - assert res.endswith(f"pid={gef.session.pid}, mode={gdbserver_mode})") + gdb.execute(f"target remote {GDBSERVER_DEFAULT_HOST}:{port}") + res: str = str(gef.session.remote) + assert res.startswith(f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/") + assert res.endswith(f"mode={gdbserver_mode}, pid={gef.session.pid})") @pytest.mark.slow @pytest.mark.skipif(ARCH not in ("x86_64",), reason=f"Skipped for {ARCH}") def test_cmd_gef_remote_qemu_user(self): gdb = self._gdb gef = self._gef - root = self._conn.root - qemu_mode = "QEMU" - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + qemu_mode = "QEMU_USER" + port = get_random_port() with qemuuser_session(port=port): - cmd = f"gef-remote --qemu-user --qemu-binary {self._target} {GDBSERVER_DEFAULT_HOST} {port}" + cmd = f"target remote {GDBSERVER_DEFAULT_HOST}:{port}" gdb.execute(cmd) - res = root.eval("str(gef.session.remote)") - assert res.startswith(f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/tmp/") - assert res.endswith(f"pid={gef.session.pid}, mode={qemu_mode})") + res = str(gef.session.remote) + assert res.startswith(f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/") + assert res.endswith(f"mode={qemu_mode})") def test_cmd_target_remote(self): gdb = self._gdb gef = self._gef - root = self._conn.root gdbserver_mode = "GDBSERVER" - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + port = get_random_port() with gdbserver_session(port=port) as _: gdb.execute(f"target remote {GDBSERVER_DEFAULT_HOST}:{port}") - res: str = root.eval("str(gef.session.remote)") - assert res.startswith("RemoteSession(target=':0', local='/tmp/") - assert res.endswith(f"pid={gef.session.pid}, mode={gdbserver_mode})") + res: str = str(gef.session.remote) + assert res.startswith( + f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/" + ) + assert res.endswith(f"mode={gdbserver_mode}, pid={gef.session.pid})") diff --git a/tests/utils.py b/tests/utils.py index 8b9247758..ee15db5b8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -42,7 +42,8 @@ def which(program: str) -> pathlib.Path: STRIP_ANSI_DEFAULT = True GDBSERVER_DEFAULT_HOST = "localhost" GDBSERVER_DEFAULT_PORT = 1234 -GDBSERVER_BINARY = which("gdbserver") +GDBSERVER_BINARY: pathlib.Path = which("gdbserver") +GDBSERVER_STARTUP_DELAY_SEC : float = 0.5 assert GDBSERVER_BINARY.exists() QEMU_USER_X64_BINARY = which("qemu-x86_64") @@ -141,7 +142,7 @@ def gdbserver_session( ): sess = start_gdbserver(exe, host, port) try: - time.sleep(1) # forced delay to allow gdbserver to start listening + time.sleep(GDBSERVER_STARTUP_DELAY_SEC) yield sess finally: stop_gdbserver(sess) @@ -153,7 +154,7 @@ def gdbserver_multi_session( ): sess = start_gdbserver_multi(host, port) try: - time.sleep(1) # forced delay to allow gdbserver to start listening + time.sleep(GDBSERVER_STARTUP_DELAY_SEC) yield sess finally: stop_gdbserver(sess) From 79447be3f1879a63eabb659010ec0f9e6f29bffe Mon Sep 17 00:00:00 2001 From: hugsy Date: Mon, 4 Nov 2024 17:40:26 -0800 Subject: [PATCH 4/4] fixed `gef-remote` from tests, using only `target remote` --- tests/api/gef_memory.py | 30 +++++++---------------- tests/api/gef_session.py | 15 +++++------- tests/regressions/gdbserver_connection.py | 4 +-- 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/tests/api/gef_memory.py b/tests/api/gef_memory.py index 6b0b6760d..d77ec0f1a 100644 --- a/tests/api/gef_memory.py +++ b/tests/api/gef_memory.py @@ -12,6 +12,7 @@ ARCH, debug_target, gdbserver_session, + get_random_port, qemuuser_session, GDBSERVER_DEFAULT_HOST, ) @@ -121,48 +122,35 @@ def test_func_parse_maps_local_procfs(self): @pytest.mark.slow def test_func_parse_maps_remote_gdbserver(self): gef, gdb = self._gef, self._gdb - # When in a gef-remote session `parse_gdb_info_proc_maps` should work to + # When in a remote session `parse_gdb_info_proc_maps` should work to # query the memory maps - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + port = get_random_port() with pytest.raises(Exception): - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") + gdb.execute(f"target remote :{port}") with gdbserver_session(port=port) as _: - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") + gdb.execute(f"target remote :{port}") sections = gef.memory.maps assert len(sections) > 0 @pytest.mark.skipif(ARCH not in ("x86_64",), reason=f"Skipped for {ARCH}") def test_func_parse_maps_remote_qemu(self): gdb, gef = self._gdb, self._gef - # When in a gef-remote qemu-user session `parse_gdb_info_proc_maps` - # should work to query the memory maps - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + port = get_random_port() with qemuuser_session(port=port) as _: - cmd = f"gef-remote --qemu-user --qemu-binary {self._target} {GDBSERVER_DEFAULT_HOST} {port}" + cmd = f"target remote :{port}" gdb.execute(cmd) sections = gef.memory.maps assert len(sections) > 0 def test_func_parse_maps_realpath(self): gef, gdb = self._gef, self._gdb - # When in a gef-remote session `parse_gdb_info_proc_maps` should work to - # query the memory maps - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + port = get_random_port() with gdbserver_session(port=port) as _: - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") + gdb.execute(f"target remote :{port}") gdb.execute("b main") gdb.execute("continue") sections = gef.memory.maps diff --git a/tests/api/gef_session.py b/tests/api/gef_session.py index a24c04370..5ebc8a0c1 100644 --- a/tests/api/gef_session.py +++ b/tests/api/gef_session.py @@ -14,6 +14,7 @@ ARCH, debug_target, gdbserver_session, + get_random_port, qemuuser_session, GDBSERVER_DEFAULT_HOST, ) @@ -63,12 +64,11 @@ def test_root_dir_local(self): def test_root_dir_remote(self): gdb = self._gdb gdb.execute("start") - expected = os.stat("/") - host = GDBSERVER_DEFAULT_HOST - port = random.randint(1025, 65535) + port = get_random_port() + with gdbserver_session(port=port): - gdb.execute(f"gef-remote {host} {port}") + gdb.execute(f"target remote :{port}") result = self._conn.root.eval("os.stat(gef.session.root)") assert (expected.st_dev == result.st_dev) and ( expected.st_ino == result.st_ino @@ -77,11 +77,8 @@ def test_root_dir_remote(self): @pytest.mark.skipif(ARCH not in ("x86_64",), reason=f"Skipped for {ARCH}") def test_root_dir_qemu(self): gdb, gef = self._gdb, self._gef + port = get_random_port() - host = GDBSERVER_DEFAULT_HOST - port = random.randint(1025, 65535) with qemuuser_session(port=port): - gdb.execute( - f"gef-remote --qemu-user --qemu-binary {self._target} {host} {port}" - ) + gdb.execute(f"target remote :{port}") assert re.search(r"\/proc\/[0-9]+/root", str(gef.session.root)) diff --git a/tests/regressions/gdbserver_connection.py b/tests/regressions/gdbserver_connection.py index 6c46d9659..244b4e55b 100644 --- a/tests/regressions/gdbserver_connection.py +++ b/tests/regressions/gdbserver_connection.py @@ -10,8 +10,8 @@ def test_can_establish_connection_to_gdbserver_again_after_disconnect(self): gdb = self._gdb with gdbserver_session(port=5001) as _, gdbserver_session(port=5002) as _: - gdb.execute("gef-remote 127.0.0.1 5001") + gdb.execute("target remote :5001") gdb.execute("detach") - gdb.execute("gef-remote 127.0.0.1 5002") + gdb.execute("target remote :5002") gdb.execute("continue")