From 25f6ad7ea90406b431a02c3f48e003ea1613c473 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Thu, 1 Sep 2022 10:57:04 +0800 Subject: [PATCH] refactor!: ql own mem as bytearray, ref passed to uc and r2 BREAKING CHANGE: MapInfoEntry now has 6 elements instead of 5 BREAKING CHANGE: mem is managed in Python instead of uc BREAKING CHANGE: r2 map io from ql.mem, no full binary, now missing symbols BREAKING CHANGE: del_mapinfo and change_mapinfo recreate and remap mem also fix potential bug in syscall_munmap --- examples/extensions/r2/hello_r2.py | 2 +- qiling/arch/utils.py | 2 +- qiling/extensions/r2/r2.py | 33 ++++++------- qiling/os/memory.py | 78 ++++++++++++++++++++---------- qiling/os/posix/syscall/mman.py | 4 +- 5 files changed, 72 insertions(+), 47 deletions(-) diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index 3b02293ea..9d2b9ee51 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -35,7 +35,7 @@ def my_sandbox(path, rootfs): ql.hook_address(func, r2.functions['main'].offset) # enable trace powered by r2 symsmap # r2.enable_trace() - r2.bt(0x401906) + r2.set_backtrace(0x401906) ql.run() if __name__ == "__main__": diff --git a/qiling/arch/utils.py b/qiling/arch/utils.py index 7c6bcd9c8..b4c0ed13c 100644 --- a/qiling/arch/utils.py +++ b/qiling/arch/utils.py @@ -27,7 +27,7 @@ def __init__(self, ql: Qiling): @lru_cache(maxsize=64) def get_base_and_name(self, addr: int) -> Tuple[int, str]: - for begin, end, _, name, _ in self.ql.mem.map_info: + for begin, end, _, name, _, _ in self.ql.mem.map_info: if begin <= addr < end: return begin, basename(name) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 60fb6444f..78aff6087 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -142,10 +142,8 @@ def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0): self.loadaddr = loadaddr # r2 -m [addr] map file at given address self.analyzed = False self._r2c = libr.r_core.r_core_new() - if ql.code: - self._setup_code(ql.code) - else: - self._setup_file(ql.path) + self._r2i = ctypes.cast(self._r2c.contents.io, ctypes.POINTER(libr.r_io.struct_r_io_t)) + self._setup_mem(ql) def _qlarch2r(self, archtype: QL_ARCH) -> str: return { @@ -162,20 +160,21 @@ def _qlarch2r(self, archtype: QL_ARCH) -> str: QL_ARCH.PPC: "ppc", }[archtype] - def _setup_code(self, code: bytes): - path = f'malloc://{len(code)}'.encode() - fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr) - libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) - self._cmd(f'wx {code.hex()}') + def _rbuf_map(self, buf: bytearray, perm: int = UC_PROT_ALL, addr: int = 0, delta: int = 0): + rbuf = libr.r_buf_new_with_pointers(ctypes.c_ubyte.from_buffer(buf), len(buf), False) + rbuf = ctypes.cast(rbuf, ctypes.POINTER(libr.r_io.struct_r_buf_t)) + desc = libr.r_io_open_buffer(self._r2i, rbuf, perm, 0) # last arg `mode` is always 0 in r2 code + libr.r_io.r_io_map_add(self._r2i, desc.contents.fd, desc.contents.perm, delta, addr, len(buf)) + + def _setup_mem(self, ql: 'Qiling'): + if not hasattr(ql, '_mem'): + return + for start, end, perms, _label, _mmio, buf in ql.mem.map_info: + self._rbuf_map(buf, perms, start) # set architecture and bits for r2 asm - arch = self._qlarch2r(self.ql.arch.type) - self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}") - - def _setup_file(self, path: str): - path = path.encode() - fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr) - libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) - + arch = self._qlarch2r(ql.arch.type) + self._cmd(f"e,asm.arch={arch},asm.bits={ql.arch.bits}") + def _cmd(self, cmd: str) -> str: r = libr.r_core.r_core_cmd_str( self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) diff --git a/qiling/os/memory.py b/qiling/os/memory.py index 21d461e65..e23e1de14 100644 --- a/qiling/os/memory.py +++ b/qiling/os/memory.py @@ -3,6 +3,7 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # +import ctypes import os, re from typing import Any, Callable, Iterator, List, Mapping, MutableSequence, Optional, Pattern, Sequence, Tuple, Union @@ -11,8 +12,8 @@ from qiling import Qiling from qiling.exception import * -# tuple: range start, range end, permissions mask, range label, is mmio? -MapInfoEntry = Tuple[int, int, int, str, bool] +# tuple: range start, range end, permissions mask, range label, is mmio?, bytearray +MapInfoEntry = Tuple[int, int, int, str, bool, bytearray] MmioReadCallback = Callable[[Qiling, int, int], int] MmioWriteCallback = Callable[[Qiling, int, int, int], None] @@ -80,7 +81,7 @@ def string(self, addr: int, value=None, encoding='utf-8') -> Optional[str]: self.__write_string(addr, value, encoding) - def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False): + def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False, data : bytearray = None): """Add a new memory range to map. Args: @@ -90,12 +91,11 @@ def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio mem_info: map entry label is_mmio: memory range is mmio """ - - self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio)) - self.map_info = sorted(self.map_info, key=lambda tp: tp[0]) + self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio, data)) + self.map_info.sort(key=lambda tp: tp[0]) def del_mapinfo(self, mem_s: int, mem_e: int): - """Subtract a memory range from map. + """Subtract a memory range from map, will destroy data and unmap uc mem in the range. Args: mem_s: memory range start @@ -104,30 +104,37 @@ def del_mapinfo(self, mem_s: int, mem_e: int): tmp_map_info: MutableSequence[MapInfoEntry] = [] - for s, e, p, info, mmio in self.map_info: + for s, e, p, info, mmio, data in self.map_info: if e <= mem_s: - tmp_map_info.append((s, e, p, info, mmio)) + tmp_map_info.append((s, e, p, info, mmio, data)) continue if s >= mem_e: - tmp_map_info.append((s, e, p, info, mmio)) + tmp_map_info.append((s, e, p, info, mmio, data)) continue if s < mem_s: - tmp_map_info.append((s, mem_s, p, info, mmio)) + self.ql.uc.mem_unmap(s, mem_s - s) + self.map_ptr(s, mem_s - s, p, data[:mem_s - s]) + tmp_map_info.append((s, mem_s, p, info, mmio, data[:mem_s - s])) if s == mem_s: pass if e > mem_e: - tmp_map_info.append((mem_e, e, p, info, mmio)) + self.ql.uc.mem_unmap(mem_e, e - mem_e) + self.map_ptr(mem_e, e - mem_e, p, data[mem_e - e:]) + tmp_map_info.append((mem_e, e, p, info, mmio, data[mem_e - e:])) if e == mem_e: pass + del data[mem_s - s:mem_e - s] + self.ql.uc.mem_unmap(mem_s, mem_e - mem_s) + self.map_info = tmp_map_info - def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None): + def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None, data: Optional[bytearray] = None): tmp_map_info: Optional[MapInfoEntry] = None info_idx: int = None @@ -143,11 +150,14 @@ def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, me if mem_p is not None: self.del_mapinfo(mem_s, mem_e) - self.add_mapinfo(mem_s, mem_e, mem_p, mem_info if mem_info else tmp_map_info[3]) + data = data or bytearray(mem_e - mem_s) + assert(len(data) == mem_e - mem_s) + self.map_ptr(mem_s, mem_e - mem_s, mem_p, data) + self.add_mapinfo(mem_s, mem_e, mem_p, mem_info or tmp_map_info[3], tmp_map_info[4], data) return if mem_info is not None: - self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4]) + self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4], tmp_map_info[5]) def get_mapinfo(self) -> Sequence[Tuple[int, int, str, str, str]]: """Get memory map info. @@ -166,7 +176,7 @@ def __perms_mapping(ps: int) -> str: return ''.join(val if idx & ps else '-' for idx, val in perms_d.items()) - def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool) -> Tuple[int, int, str, str, str]: + def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool, _data: bytearray) -> Tuple[int, int, str, str, str]: perms_str = __perms_mapping(perms) if hasattr(self.ql, 'loader'): @@ -211,7 +221,7 @@ def get_lib_base(self, filename: str) -> Optional[int]: # some info labels may be prefixed by boxed label which breaks the search by basename. # iterate through all info labels and remove all boxed prefixes, if any - stripped = ((lbound, p.sub('', info)) for lbound, _, _, info, _ in self.map_info) + stripped = ((lbound, p.sub('', info)) for lbound, _, _, info, _, _ in self.map_info) return next((lbound for lbound, info in stripped if os.path.basename(info) == filename), None) @@ -268,11 +278,10 @@ def save(self): "mmio" : [] } - for lbound, ubound, perm, label, is_mmio in self.map_info: + for lbound, ubound, perm, label, is_mmio, data in self.map_info: if is_mmio: mem_dict['mmio'].append((lbound, ubound, perm, label, *self.mmio_cbs[(lbound, ubound)])) else: - data = self.read(lbound, ubound - lbound) mem_dict['ram'].append((lbound, ubound, perm, label, bytes(data))) return mem_dict @@ -393,7 +402,7 @@ def search(self, needle: Union[bytes, Pattern[bytes]], begin: Optional[int] = No assert begin < end, 'search arguments do not make sense' # narrow the search down to relevant ranges; mmio ranges are excluded due to potential read size effects - ranges = [(max(begin, lbound), min(ubound, end)) for lbound, ubound, _, _, is_mmio in self.map_info if not (end < lbound or ubound < begin or is_mmio)] + ranges = [(max(begin, lbound), min(ubound, end)) for lbound, ubound, _, _, is_mmio, _data in self.map_info if not (end < lbound or ubound < begin or is_mmio)] results = [] # if needle is a bytes sequence use it verbatim, not as a pattern @@ -439,10 +448,10 @@ def __mapped_regions(self) -> Iterator[Tuple[int, int]]: iter_memmap = iter(self.map_info) - p_lbound, p_ubound, _, _, _ = next(iter_memmap) + p_lbound, p_ubound, _, _, _, _ = next(iter_memmap) # map_info is assumed to contain non-overlapping regions sorted by lbound - for lbound, ubound, _, _, _ in iter_memmap: + for lbound, ubound, _, _, _, _ in iter_memmap: if lbound == p_ubound: p_ubound = ubound else: @@ -514,8 +523,8 @@ def find_free_space(self, size: int, minaddr: Optional[int] = None, maxaddr: Opt assert minaddr < maxaddr # get gap ranges between mapped ones and memory bounds - gaps_ubounds = tuple(lbound for lbound, _, _, _, _ in self.map_info) + (mem_ubound,) - gaps_lbounds = (mem_lbound,) + tuple(ubound for _, ubound, _, _, _ in self.map_info) + gaps_ubounds = tuple(lbound for lbound, _, _, _, _, _ in self.map_info) + (mem_ubound,) + gaps_lbounds = (mem_lbound,) + tuple(ubound for _, ubound, _, _, _, _ in self.map_info) gaps = zip(gaps_lbounds, gaps_ubounds) for lbound, ubound in gaps: @@ -582,8 +591,25 @@ def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str if not self.is_available(addr, size): raise QlMemoryMappedError('Requested memory is unavailable') - self.ql.uc.mem_map(addr, size, perms) - self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False) + buf = self.map_ptr(addr, size, perms) + self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False, data=buf) + + def map_ptr(self, addr: int, size: int, perms: int = UC_PROT_ALL, buf: Optional[bytearray] = None) -> bytearray: + """Map a new memory range allocated as Python bytearray, will not affect map_info + + Args: + addr: memory range base address + size: memory range size (in bytes) + perms: requested permissions mask + buf: bytearray already allocated (if any) + + Returns: + bytearray with size, should be added to map_info by caller + """ + buf = buf or bytearray(size) + buf_type = ctypes.c_byte * size + self.ql.uc.mem_map_ptr(addr, size, perms, buf_type.from_buffer(buf)) + return buf def map_mmio(self, addr: int, size: int, read_cb: Optional[MmioReadCallback], write_cb: Optional[MmioWriteCallback], info: str = '[mmio]'): # TODO: mmio memory overlap with ram? Is that possible? diff --git a/qiling/os/posix/syscall/mman.py b/qiling/os/posix/syscall/mman.py index 5e3eae0bf..0848a5b36 100755 --- a/qiling/os/posix/syscall/mman.py +++ b/qiling/os/posix/syscall/mman.py @@ -16,7 +16,7 @@ def ql_syscall_munmap(ql: Qiling, addr: int, length: int): mapped_fd = [fd for fd in ql.os.fd if fd != 0 and isinstance(fd, ql_file) and fd._is_map_shared and not (fd.name.endswith(".so") or fd.name.endswith(".dylib"))] if mapped_fd: - all_mem_info = [_mem_info for _, _, _, _mem_info in ql.mem.map_info if _mem_info not in ("[mapped]", "[stack]", "[hook_mem]")] + all_mem_info = [_mem_info for _, _, _, _mem_info, _mmio, _data in ql.mem.map_info if _mem_info not in ("[mapped]", "[stack]", "[hook_mem]")] for _fd in mapped_fd: if _fd.name in [each.split()[-1] for each in all_mem_info]: @@ -110,7 +110,7 @@ def syscall_mmap_impl(ql: Qiling, addr: int, mlen: int, prot: int, flags: int, f if mmap_base == 0: mmap_base = ql.loader.mmap_address ql.loader.mmap_address = mmap_base + mmap_size - ql.log.debug("%s - mapping needed for 0x%x" % (api_name, mmap_base)) + ql.log.debug("%s - mapping needed at 0x%x with size 0x%x" % (api_name, mmap_base, mmap_size)) try: ql.mem.map(mmap_base, mmap_size, prot, "[syscall_%s]" % api_name) except Exception as e: