From e56ec887610954fd64e507f9f7ab4157ae0eee39 Mon Sep 17 00:00:00 2001 From: Jan Petykiewicz Date: Sat, 30 Mar 2024 17:40:18 -0700 Subject: [PATCH] modernize some code style indentation, type annotations, f-strings --- mem_edit/abstract.py | 59 +++++++++++++++++++++--------------- mem_edit/linux.py | 44 +++++++++++++++------------ mem_edit/utils.py | 33 ++++++++++++-------- mem_edit/windows.py | 72 +++++++++++++++++++++++--------------------- 4 files changed, 116 insertions(+), 92 deletions(-) diff --git a/mem_edit/abstract.py b/mem_edit/abstract.py index e086d4e..5305116 100644 --- a/mem_edit/abstract.py +++ b/mem_edit/abstract.py @@ -2,7 +2,7 @@ Abstract class for cross-platform memory editing. """ -from typing import List, Tuple, Optional, Union, Generator +from typing import Generator from abc import ABCMeta, abstractmethod from contextlib import contextmanager import copy @@ -122,7 +122,7 @@ class Process(metaclass=ABCMeta): """ @abstractmethod - def __init__(self, process_id: int): + def __init__(self, process_id: int) -> None: """ Constructing a Process object prepares the process with specified process_id for memory editing. Finding the `process_id` for the process you want to edit is often @@ -135,7 +135,7 @@ class Process(metaclass=ABCMeta): pass @abstractmethod - def close(self): + def close(self) -> None: """ Detach from the process, removing our ability to edit it and letting other debuggers attach to it instead. @@ -147,7 +147,11 @@ class Process(metaclass=ABCMeta): pass @abstractmethod - def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): + def write_memory( + self, + base_address: int, + write_buffer: ctypes_buffer_t, + ) -> None: """ Write the given buffer to the process's address space, starting at `base_address`. @@ -160,7 +164,11 @@ class Process(metaclass=ABCMeta): pass @abstractmethod - def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: + def read_memory( + self, + base_address: int, + read_buffer: ctypes_buffer_t, + ) -> ctypes_buffer_t: """ Read into the given buffer from the process's address space, starting at `base_address`. @@ -176,7 +184,7 @@ class Process(metaclass=ABCMeta): pass @abstractmethod - def list_mapped_regions(self, writeable_only=True) -> List[Tuple[int, int]]: + def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]: """ Return a list of `(start_address, stop_address)` for the regions of the address space accessible to (readable and possibly writable by) the process. @@ -192,18 +200,18 @@ class Process(metaclass=ABCMeta): pass @abstractmethod - def get_path(self) -> str: + def get_path(self) -> str | None: """ Return the path to the executable file which was run to start this process. Returns: - A string containing the path. + A string containing the path, or None if no path was found. """ pass @staticmethod @abstractmethod - def list_available_pids() -> List[int]: + def list_available_pids() -> list[int]: """ Return a list of all process ids (pids) accessible on this system. @@ -214,7 +222,7 @@ class Process(metaclass=ABCMeta): @staticmethod @abstractmethod - def get_pid_by_name(target_name: str) -> Optional[int]: + def get_pid_by_name(target_name: str) -> int | None: """ Attempt to return the process id (pid) of a process which was run with an executable file with the provided name. If no process is found, return None. @@ -234,10 +242,11 @@ class Process(metaclass=ABCMeta): """ pass - def deref_struct_pointer(self, - base_address: int, - targets: List[Tuple[int, ctypes_buffer_t]], - ) -> List[ctypes_buffer_t]: + def deref_struct_pointer( + self, + base_address: int, + targets: list[tuple[int, ctypes_buffer_t]], + ) -> list[ctypes_buffer_t]: """ Take a pointer to a struct and read out the struct members: ``` @@ -263,11 +272,12 @@ class Process(metaclass=ABCMeta): values = [self.read_memory(base + offset, buffer) for offset, buffer in targets] return values - def search_addresses(self, - addresses: List[int], - needle_buffer: ctypes_buffer_t, - verbatim: bool = True, - ) -> List[int]: + def search_addresses( + self, + addresses: list[int], + needle_buffer: ctypes_buffer_t, + verbatim: bool = True, + ) -> list[int]: """ Search for the provided value at each of the provided addresses, and return the addresses where it is found. @@ -298,11 +308,12 @@ class Process(metaclass=ABCMeta): found.append(address) return found - def search_all_memory(self, - needle_buffer: ctypes_buffer_t, - writeable_only: bool = True, - verbatim: bool = True, - ) -> List[int]: + def search_all_memory( + self, + needle_buffer: ctypes_buffer_t, + writeable_only: bool = True, + verbatim: bool = True, + ) -> list[int]: """ Search the entire memory space accessible to the process for the provided value. diff --git a/mem_edit/linux.py b/mem_edit/linux.py index aa25a49..e6e92ad 100644 --- a/mem_edit/linux.py +++ b/mem_edit/linux.py @@ -2,7 +2,6 @@ Implementation of Process class for Linux """ -from typing import List, Tuple, Optional from os import strerror import os import os.path @@ -35,41 +34,46 @@ _ptrace.argtypes = (ctypes.c_ulong,) * 4 _ptrace.restype = ctypes.c_long -def ptrace(command: int, pid: int = 0, arg1: int = 0, arg2: int = 0) -> int: +def ptrace( + command: int, + pid: int = 0, + arg1: int = 0, + arg2: int = 0, + ) -> int: """ - Call ptrace() with the provided pid and arguments. See the ```man ptrace```. + Call ptrace() with the provided pid and arguments. See `man ptrace`. """ - logger.debug('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2)) + logger.debug(f'ptrace({command}, {pid}, {arg1}, {arg2})') result = _ptrace(command, pid, arg1, arg2) if result == -1: err_no = ctypes.get_errno() if err_no: - raise MemEditError('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2) + - ' failed with error {}: {}'.format(err_no, strerror(err_no))) + raise MemEditError(f'ptrace({command}, {pid}, {arg1}, {arg2})' + + f' failed with error {err_no}: {strerror(err_no)}') return result class Process(AbstractProcess): - pid = None + pid: int | None - def __init__(self, process_id: int): + def __init__(self, process_id: int) -> None: ptrace(ptrace_commands['PTRACE_SEIZE'], process_id) self.pid = process_id - def close(self): + def close(self) -> None: os.kill(self.pid, signal.SIGSTOP) os.waitpid(self.pid, 0) ptrace(ptrace_commands['PTRACE_DETACH'], self.pid, 0, 0) os.kill(self.pid, signal.SIGCONT) self.pid = None - def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): - with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem: + def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None: + with open(f'/proc/{self.pid}/mem', 'rb+') as mem: mem.seek(base_address) mem.write(write_buffer) def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: - with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem: + with open(f'/proc/{self.pid}/mem', 'rb+') as mem: mem.seek(base_address) mem.readinto(read_buffer) return read_buffer @@ -82,7 +86,7 @@ class Process(AbstractProcess): return '' @staticmethod - def list_available_pids() -> List[int]: + def list_available_pids() -> list[int]: pids = [] for pid_str in os.listdir('/proc'): try: @@ -92,26 +96,26 @@ class Process(AbstractProcess): return pids @staticmethod - def get_pid_by_name(target_name: str) -> Optional[int]: + def get_pid_by_name(target_name: str) -> int | None: for pid in Process.list_available_pids(): try: - logger.debug('Checking name for pid {}'.format(pid)) - with open('/proc/{}/cmdline'.format(pid), 'rb') as cmdline: + logger.debug(f'Checking name for pid {pid}') + with open(f'/proc/{pid}/cmdline', 'rb') as cmdline: path = cmdline.read().decode().split('\x00')[0] except FileNotFoundError: continue name = os.path.basename(path) - logger.debug('Name was "{}"'.format(name)) + logger.debug(f'Name was "{name}"') if path is not None and name == target_name: return pid - logger.info('Found no process with name {}'.format(target_name)) + logger.info(f'Found no process with name {target_name}') return None - def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]: + def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]: regions = [] - with open('/proc/{}/maps'.format(self.pid), 'r') as maps: + with open(f'/proc/{self.pid}/maps', 'r') as maps: for line in maps: bounds, privileges = line.split()[0:2] diff --git a/mem_edit/utils.py b/mem_edit/utils.py index 2c9c022..a1f0bc3 100644 --- a/mem_edit/utils.py +++ b/mem_edit/utils.py @@ -11,21 +11,26 @@ Utility functions and types: Check if two buffers (ctypes objects) store equal values: ctypes_equal(a, b) """ - -from typing import List, Union +from typing import Union import ctypes -ctypes_buffer_t = Union[ctypes._SimpleCData, ctypes.Array, ctypes.Structure, ctypes.Union] +ctypes_buffer_t = Union[ + ctypes._SimpleCData, + ctypes.Array, + ctypes.Structure, + ctypes.Union, + ] class MemEditError(Exception): pass -def search_buffer_verbatim(needle_buffer: ctypes_buffer_t, - haystack_buffer: ctypes_buffer_t, - ) -> List[int]: +def search_buffer_verbatim( + needle_buffer: ctypes_buffer_t, + haystack_buffer: ctypes_buffer_t, + ) -> list[int]: """ Search for a buffer inside another buffer, using a direct (bitwise) comparison @@ -50,9 +55,10 @@ def search_buffer_verbatim(needle_buffer: ctypes_buffer_t, return found -def search_buffer(needle_buffer: ctypes_buffer_t, - haystack_buffer: ctypes_buffer_t, - ) -> List[int]: +def search_buffer( + needle_buffer: ctypes_buffer_t, + haystack_buffer: ctypes_buffer_t, + ) -> list[int]: """ Search for a buffer inside another buffer, using `ctypes_equal` for comparison. Much slower than `search_buffer_verbatim`. @@ -73,9 +79,10 @@ def search_buffer(needle_buffer: ctypes_buffer_t, return found -def ctypes_equal(a: ctypes_buffer_t, - b: ctypes_buffer_t, - ) -> bool: +def ctypes_equal( + a: ctypes_buffer_t, + b: ctypes_buffer_t, + ) -> bool: """ Check if the values stored inside two ctypes buffers are equal. """ @@ -87,7 +94,7 @@ def ctypes_equal(a: ctypes_buffer_t, elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union): for attr_name, attr_type in a._fields_: a_attr, b_attr = (getattr(x, attr_name) for x in (a, b)) - if isinstance(a, ctypes_buffer_t): + if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)): if not ctypes_equal(a_attr, b_attr): return False elif not a_attr == b_attr: diff --git a/mem_edit/windows.py b/mem_edit/windows.py index b945058..dc03521 100644 --- a/mem_edit/windows.py +++ b/mem_edit/windows.py @@ -2,7 +2,6 @@ Implementation of Process class for Windows """ -from typing import List, Tuple, Optional from math import floor from os import strerror import os.path @@ -25,10 +24,10 @@ privileges = { 'PROCESS_VM_WRITE': 0x0020, } privileges['PROCESS_RW'] = ( - privileges['PROCESS_QUERY_INFORMATION'] | - privileges['PROCESS_VM_OPERATION'] | - privileges['PROCESS_VM_READ'] | - privileges['PROCESS_VM_WRITE'] + privileges['PROCESS_QUERY_INFORMATION'] + | privileges['PROCESS_VM_OPERATION'] + | privileges['PROCESS_VM_READ'] + | privileges['PROCESS_VM_WRITE'] ) # Memory region states @@ -50,13 +49,13 @@ page_protections = { } # Custom (combined) permissions page_protections['PAGE_READABLE'] = ( - page_protections['PAGE_EXECUTE_READ'] | - page_protections['PAGE_EXECUTE_READWRITE'] | - page_protections['PAGE_READWRITE'] + page_protections['PAGE_EXECUTE_READ'] + | page_protections['PAGE_EXECUTE_READWRITE'] + | page_protections['PAGE_READWRITE'] ) page_protections['PAGE_READWRITEABLE'] = ( - page_protections['PAGE_EXECUTE_READWRITE'] | - page_protections['PAGE_READWRITE'] + page_protections['PAGE_EXECUTE_READWRITE'] + | page_protections['PAGE_READWRITE'] ) # Memory types @@ -91,7 +90,9 @@ class MEMORY_BASIC_INFORMATION64(ctypes.Structure): ('__alignment2', ctypes.wintypes.DWORD), ] + PTR_SIZE = ctypes.sizeof(ctypes.c_void_p) +MEMORY_BASIC_INFORMATION: ctypes.Structure if PTR_SIZE == 8: # 64-bit python MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64 elif PTR_SIZE == 4: # 32-bit python @@ -133,9 +134,9 @@ class SYSTEM_INFO(ctypes.Structure): class Process(AbstractProcess): - process_handle = None + process_handle: int | None - def __init__(self, process_id: int): + def __init__(self, process_id: int) -> None: process_handle = ctypes.windll.kernel32.OpenProcess( privileges['PROCESS_RW'], False, @@ -143,15 +144,15 @@ class Process(AbstractProcess): ) if not process_handle: - raise MemEditError('Couldn\'t open process {}'.format(process_id)) + raise MemEditError(f'Couldn\'t open process {process_id}') self.process_handle = process_handle - def close(self): + def close(self) -> None: ctypes.windll.kernel32.CloseHandle(self.process_handle) self.process_handle = None - def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): + def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None: try: ctypes.windll.kernel32.WriteProcessMemory( self.process_handle, @@ -161,7 +162,7 @@ class Process(AbstractProcess): None ) except (BufferError, ValueError, TypeError): - raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error())) + raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: try: @@ -173,22 +174,23 @@ class Process(AbstractProcess): None ) except (BufferError, ValueError, TypeError): - raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error())) + raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') return read_buffer @staticmethod - def _get_last_error() -> Tuple[int, str]: + def _get_last_error() -> tuple[int, str]: err = ctypes.windll.kernel32.GetLastError() return err, strerror(err) - def get_path(self) -> str: + def get_path(self) -> str | None: max_path_len = 260 name_buffer = (ctypes.c_char * max_path_len)() rval = ctypes.windll.psapi.GetProcessImageFileNameA( - self.process_handle, - name_buffer, - max_path_len) + self.process_handle, + name_buffer, + max_path_len, + ) if rval > 0: return name_buffer.value.decode() @@ -196,28 +198,28 @@ class Process(AbstractProcess): return None @staticmethod - def list_available_pids() -> List[int]: + def list_available_pids() -> list[int]: # According to EnumProcesses docs, you can't find out how many processes there are before # fetching the list. As a result, we grab 100 on the first try, and if we get a full list # of 100, repeatedly double the number until we get fewer than we asked for. - n = 100 + nn = 100 returned_size = ctypes.wintypes.DWORD() returned_size_ptr = ctypes.byref(returned_size) while True: - pids = (ctypes.wintypes.DWORD * n)() + pids = (ctypes.wintypes.DWORD * nn)() size = ctypes.sizeof(pids) pids_ptr = ctypes.byref(pids) success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr) if not success: - raise MemEditError('Failed to enumerate processes: n={}'.format(n)) + raise MemEditError(f'Failed to enumerate processes: nn={nn}') num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD)) - if n == num_returned: - n *= 2 + if nn == num_returned: + nn *= 2 continue else: break @@ -225,15 +227,15 @@ class Process(AbstractProcess): return pids[:num_returned] @staticmethod - def get_pid_by_name(target_name: str) -> Optional[int]: + def get_pid_by_name(target_name: str) -> int | None: for pid in Process.list_available_pids(): try: - logger.debug('Checking name for pid {}'.format(pid)) + logger.debug(f'Checking name for pid {pid}') with Process.open_process(pid) as process: path = process.get_path() name = os.path.basename(path) - logger.debug('Name was "{}"'.format(name)) + logger.debug(f'Name was "{name}"') if path is not None and name == target_name: return pid except ValueError: @@ -241,10 +243,10 @@ class Process(AbstractProcess): except MemEditError as err: logger.debug(repr(err)) - logger.info('Found no process with name {}'.format(target_name)) + logger.info(f'Found no process with name {target_name}') return None - def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]: + def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]: sys_info = SYSTEM_INFO() sys_info_ptr = ctypes.byref(sys_info) ctypes.windll.kernel32.GetSystemInfo(sys_info_ptr) @@ -268,8 +270,8 @@ class Process(AbstractProcess): if success != mbi_size: if success == 0: - raise MemEditError('Failed VirtualQueryEx with handle ' + - '{}: {}'.format(self.process_handle, self._get_last_error())) + raise MemEditError('Failed VirtualQueryEx with handle ' + + f'{self.process_handle}: {self._get_last_error()}') else: raise MemEditError('VirtualQueryEx output too short!')