modernize some code style

indentation, type annotations, f-strings
This commit is contained in:
Jan Petykiewicz 2024-03-30 17:40:18 -07:00
parent f03ea6acad
commit e56ec88761
4 changed files with 116 additions and 92 deletions

View File

@ -2,7 +2,7 @@
Abstract class for cross-platform memory editing. Abstract class for cross-platform memory editing.
""" """
from typing import List, Tuple, Optional, Union, Generator from typing import Generator
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from contextlib import contextmanager from contextlib import contextmanager
import copy import copy
@ -122,7 +122,7 @@ class Process(metaclass=ABCMeta):
""" """
@abstractmethod @abstractmethod
def __init__(self, process_id: int): def __init__(self, process_id: int) -> None:
""" """
Constructing a Process object prepares the process with specified process_id for Constructing a Process object prepares the process with specified process_id for
memory editing. Finding the `process_id` for the process you want to edit is often memory editing. Finding the `process_id` for the process you want to edit is often
@ -135,7 +135,7 @@ class Process(metaclass=ABCMeta):
pass pass
@abstractmethod @abstractmethod
def close(self): def close(self) -> None:
""" """
Detach from the process, removing our ability to edit it and Detach from the process, removing our ability to edit it and
letting other debuggers attach to it instead. letting other debuggers attach to it instead.
@ -147,7 +147,11 @@ class Process(metaclass=ABCMeta):
pass pass
@abstractmethod @abstractmethod
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): def write_memory(
self,
base_address: int,
write_buffer: ctypes_buffer_t,
) -> None:
""" """
Write the given buffer to the process's address space, starting at `base_address`. Write the given buffer to the process's address space, starting at `base_address`.
@ -160,7 +164,11 @@ class Process(metaclass=ABCMeta):
pass pass
@abstractmethod @abstractmethod
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(
self,
base_address: int,
read_buffer: ctypes_buffer_t,
) -> ctypes_buffer_t:
""" """
Read into the given buffer from the process's address space, starting at `base_address`. Read into the given buffer from the process's address space, starting at `base_address`.
@ -176,7 +184,7 @@ class Process(metaclass=ABCMeta):
pass pass
@abstractmethod @abstractmethod
def list_mapped_regions(self, writeable_only=True) -> List[Tuple[int, int]]: def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
""" """
Return a list of `(start_address, stop_address)` for the regions of the address space Return a list of `(start_address, stop_address)` for the regions of the address space
accessible to (readable and possibly writable by) the process. accessible to (readable and possibly writable by) the process.
@ -192,18 +200,18 @@ class Process(metaclass=ABCMeta):
pass pass
@abstractmethod @abstractmethod
def get_path(self) -> str: def get_path(self) -> str | None:
""" """
Return the path to the executable file which was run to start this process. Return the path to the executable file which was run to start this process.
Returns: Returns:
A string containing the path. A string containing the path, or None if no path was found.
""" """
pass pass
@staticmethod @staticmethod
@abstractmethod @abstractmethod
def list_available_pids() -> List[int]: def list_available_pids() -> list[int]:
""" """
Return a list of all process ids (pids) accessible on this system. Return a list of all process ids (pids) accessible on this system.
@ -214,7 +222,7 @@ class Process(metaclass=ABCMeta):
@staticmethod @staticmethod
@abstractmethod @abstractmethod
def get_pid_by_name(target_name: str) -> Optional[int]: def get_pid_by_name(target_name: str) -> int | None:
""" """
Attempt to return the process id (pid) of a process which was run with an executable Attempt to return the process id (pid) of a process which was run with an executable
file with the provided name. If no process is found, return None. file with the provided name. If no process is found, return None.
@ -234,10 +242,11 @@ class Process(metaclass=ABCMeta):
""" """
pass pass
def deref_struct_pointer(self, def deref_struct_pointer(
self,
base_address: int, base_address: int,
targets: List[Tuple[int, ctypes_buffer_t]], targets: list[tuple[int, ctypes_buffer_t]],
) -> List[ctypes_buffer_t]: ) -> list[ctypes_buffer_t]:
""" """
Take a pointer to a struct and read out the struct members: Take a pointer to a struct and read out the struct members:
``` ```
@ -263,11 +272,12 @@ class Process(metaclass=ABCMeta):
values = [self.read_memory(base + offset, buffer) for offset, buffer in targets] values = [self.read_memory(base + offset, buffer) for offset, buffer in targets]
return values return values
def search_addresses(self, def search_addresses(
addresses: List[int], self,
addresses: list[int],
needle_buffer: ctypes_buffer_t, needle_buffer: ctypes_buffer_t,
verbatim: bool = True, verbatim: bool = True,
) -> List[int]: ) -> list[int]:
""" """
Search for the provided value at each of the provided addresses, and return the addresses Search for the provided value at each of the provided addresses, and return the addresses
where it is found. where it is found.
@ -298,11 +308,12 @@ class Process(metaclass=ABCMeta):
found.append(address) found.append(address)
return found return found
def search_all_memory(self, def search_all_memory(
self,
needle_buffer: ctypes_buffer_t, needle_buffer: ctypes_buffer_t,
writeable_only: bool = True, writeable_only: bool = True,
verbatim: bool = True, verbatim: bool = True,
) -> List[int]: ) -> list[int]:
""" """
Search the entire memory space accessible to the process for the provided value. Search the entire memory space accessible to the process for the provided value.

View File

@ -2,7 +2,6 @@
Implementation of Process class for Linux Implementation of Process class for Linux
""" """
from typing import List, Tuple, Optional
from os import strerror from os import strerror
import os import os
import os.path import os.path
@ -35,41 +34,46 @@ _ptrace.argtypes = (ctypes.c_ulong,) * 4
_ptrace.restype = ctypes.c_long _ptrace.restype = ctypes.c_long
def ptrace(command: int, pid: int = 0, arg1: int = 0, arg2: int = 0) -> int: def ptrace(
command: int,
pid: int = 0,
arg1: int = 0,
arg2: int = 0,
) -> int:
""" """
Call ptrace() with the provided pid and arguments. See the ```man ptrace```. Call ptrace() with the provided pid and arguments. See `man ptrace`.
""" """
logger.debug('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2)) logger.debug(f'ptrace({command}, {pid}, {arg1}, {arg2})')
result = _ptrace(command, pid, arg1, arg2) result = _ptrace(command, pid, arg1, arg2)
if result == -1: if result == -1:
err_no = ctypes.get_errno() err_no = ctypes.get_errno()
if err_no: if err_no:
raise MemEditError('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2) + raise MemEditError(f'ptrace({command}, {pid}, {arg1}, {arg2})'
' failed with error {}: {}'.format(err_no, strerror(err_no))) + f' failed with error {err_no}: {strerror(err_no)}')
return result return result
class Process(AbstractProcess): class Process(AbstractProcess):
pid = None pid: int | None
def __init__(self, process_id: int): def __init__(self, process_id: int) -> None:
ptrace(ptrace_commands['PTRACE_SEIZE'], process_id) ptrace(ptrace_commands['PTRACE_SEIZE'], process_id)
self.pid = process_id self.pid = process_id
def close(self): def close(self) -> None:
os.kill(self.pid, signal.SIGSTOP) os.kill(self.pid, signal.SIGSTOP)
os.waitpid(self.pid, 0) os.waitpid(self.pid, 0)
ptrace(ptrace_commands['PTRACE_DETACH'], self.pid, 0, 0) ptrace(ptrace_commands['PTRACE_DETACH'], self.pid, 0, 0)
os.kill(self.pid, signal.SIGCONT) os.kill(self.pid, signal.SIGCONT)
self.pid = None self.pid = None
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem: with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
mem.seek(base_address) mem.seek(base_address)
mem.write(write_buffer) mem.write(write_buffer)
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem: with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
mem.seek(base_address) mem.seek(base_address)
mem.readinto(read_buffer) mem.readinto(read_buffer)
return read_buffer return read_buffer
@ -82,7 +86,7 @@ class Process(AbstractProcess):
return '' return ''
@staticmethod @staticmethod
def list_available_pids() -> List[int]: def list_available_pids() -> list[int]:
pids = [] pids = []
for pid_str in os.listdir('/proc'): for pid_str in os.listdir('/proc'):
try: try:
@ -92,26 +96,26 @@ class Process(AbstractProcess):
return pids return pids
@staticmethod @staticmethod
def get_pid_by_name(target_name: str) -> Optional[int]: def get_pid_by_name(target_name: str) -> int | None:
for pid in Process.list_available_pids(): for pid in Process.list_available_pids():
try: try:
logger.debug('Checking name for pid {}'.format(pid)) logger.debug(f'Checking name for pid {pid}')
with open('/proc/{}/cmdline'.format(pid), 'rb') as cmdline: with open(f'/proc/{pid}/cmdline', 'rb') as cmdline:
path = cmdline.read().decode().split('\x00')[0] path = cmdline.read().decode().split('\x00')[0]
except FileNotFoundError: except FileNotFoundError:
continue continue
name = os.path.basename(path) name = os.path.basename(path)
logger.debug('Name was "{}"'.format(name)) logger.debug(f'Name was "{name}"')
if path is not None and name == target_name: if path is not None and name == target_name:
return pid return pid
logger.info('Found no process with name {}'.format(target_name)) logger.info(f'Found no process with name {target_name}')
return None return None
def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]: def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
regions = [] regions = []
with open('/proc/{}/maps'.format(self.pid), 'r') as maps: with open(f'/proc/{self.pid}/maps', 'r') as maps:
for line in maps: for line in maps:
bounds, privileges = line.split()[0:2] bounds, privileges = line.split()[0:2]

View File

@ -11,21 +11,26 @@ Utility functions and types:
Check if two buffers (ctypes objects) store equal values: Check if two buffers (ctypes objects) store equal values:
ctypes_equal(a, b) ctypes_equal(a, b)
""" """
from typing import Union
from typing import List, Union
import ctypes import ctypes
ctypes_buffer_t = Union[ctypes._SimpleCData, ctypes.Array, ctypes.Structure, ctypes.Union] ctypes_buffer_t = Union[
ctypes._SimpleCData,
ctypes.Array,
ctypes.Structure,
ctypes.Union,
]
class MemEditError(Exception): class MemEditError(Exception):
pass pass
def search_buffer_verbatim(needle_buffer: ctypes_buffer_t, def search_buffer_verbatim(
needle_buffer: ctypes_buffer_t,
haystack_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t,
) -> List[int]: ) -> list[int]:
""" """
Search for a buffer inside another buffer, using a direct (bitwise) comparison Search for a buffer inside another buffer, using a direct (bitwise) comparison
@ -50,9 +55,10 @@ def search_buffer_verbatim(needle_buffer: ctypes_buffer_t,
return found return found
def search_buffer(needle_buffer: ctypes_buffer_t, def search_buffer(
needle_buffer: ctypes_buffer_t,
haystack_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t,
) -> List[int]: ) -> list[int]:
""" """
Search for a buffer inside another buffer, using `ctypes_equal` for comparison. Search for a buffer inside another buffer, using `ctypes_equal` for comparison.
Much slower than `search_buffer_verbatim`. Much slower than `search_buffer_verbatim`.
@ -73,7 +79,8 @@ def search_buffer(needle_buffer: ctypes_buffer_t,
return found return found
def ctypes_equal(a: ctypes_buffer_t, def ctypes_equal(
a: ctypes_buffer_t,
b: ctypes_buffer_t, b: ctypes_buffer_t,
) -> bool: ) -> bool:
""" """
@ -87,7 +94,7 @@ def ctypes_equal(a: ctypes_buffer_t,
elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union): elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union):
for attr_name, attr_type in a._fields_: for attr_name, attr_type in a._fields_:
a_attr, b_attr = (getattr(x, attr_name) for x in (a, b)) a_attr, b_attr = (getattr(x, attr_name) for x in (a, b))
if isinstance(a, ctypes_buffer_t): if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
if not ctypes_equal(a_attr, b_attr): if not ctypes_equal(a_attr, b_attr):
return False return False
elif not a_attr == b_attr: elif not a_attr == b_attr:

View File

@ -2,7 +2,6 @@
Implementation of Process class for Windows Implementation of Process class for Windows
""" """
from typing import List, Tuple, Optional
from math import floor from math import floor
from os import strerror from os import strerror
import os.path import os.path
@ -25,10 +24,10 @@ privileges = {
'PROCESS_VM_WRITE': 0x0020, 'PROCESS_VM_WRITE': 0x0020,
} }
privileges['PROCESS_RW'] = ( privileges['PROCESS_RW'] = (
privileges['PROCESS_QUERY_INFORMATION'] | privileges['PROCESS_QUERY_INFORMATION']
privileges['PROCESS_VM_OPERATION'] | | privileges['PROCESS_VM_OPERATION']
privileges['PROCESS_VM_READ'] | | privileges['PROCESS_VM_READ']
privileges['PROCESS_VM_WRITE'] | privileges['PROCESS_VM_WRITE']
) )
# Memory region states # Memory region states
@ -50,13 +49,13 @@ page_protections = {
} }
# Custom (combined) permissions # Custom (combined) permissions
page_protections['PAGE_READABLE'] = ( page_protections['PAGE_READABLE'] = (
page_protections['PAGE_EXECUTE_READ'] | page_protections['PAGE_EXECUTE_READ']
page_protections['PAGE_EXECUTE_READWRITE'] | | page_protections['PAGE_EXECUTE_READWRITE']
page_protections['PAGE_READWRITE'] | page_protections['PAGE_READWRITE']
) )
page_protections['PAGE_READWRITEABLE'] = ( page_protections['PAGE_READWRITEABLE'] = (
page_protections['PAGE_EXECUTE_READWRITE'] | page_protections['PAGE_EXECUTE_READWRITE']
page_protections['PAGE_READWRITE'] | page_protections['PAGE_READWRITE']
) )
# Memory types # Memory types
@ -91,7 +90,9 @@ class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
('__alignment2', ctypes.wintypes.DWORD), ('__alignment2', ctypes.wintypes.DWORD),
] ]
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p) PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
MEMORY_BASIC_INFORMATION: ctypes.Structure
if PTR_SIZE == 8: # 64-bit python if PTR_SIZE == 8: # 64-bit python
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64 MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
elif PTR_SIZE == 4: # 32-bit python elif PTR_SIZE == 4: # 32-bit python
@ -133,9 +134,9 @@ class SYSTEM_INFO(ctypes.Structure):
class Process(AbstractProcess): class Process(AbstractProcess):
process_handle = None process_handle: int | None
def __init__(self, process_id: int): def __init__(self, process_id: int) -> None:
process_handle = ctypes.windll.kernel32.OpenProcess( process_handle = ctypes.windll.kernel32.OpenProcess(
privileges['PROCESS_RW'], privileges['PROCESS_RW'],
False, False,
@ -143,15 +144,15 @@ class Process(AbstractProcess):
) )
if not process_handle: if not process_handle:
raise MemEditError('Couldn\'t open process {}'.format(process_id)) raise MemEditError(f'Couldn\'t open process {process_id}')
self.process_handle = process_handle self.process_handle = process_handle
def close(self): def close(self) -> None:
ctypes.windll.kernel32.CloseHandle(self.process_handle) ctypes.windll.kernel32.CloseHandle(self.process_handle)
self.process_handle = None self.process_handle = None
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
try: try:
ctypes.windll.kernel32.WriteProcessMemory( ctypes.windll.kernel32.WriteProcessMemory(
self.process_handle, self.process_handle,
@ -161,7 +162,7 @@ class Process(AbstractProcess):
None None
) )
except (BufferError, ValueError, TypeError): except (BufferError, ValueError, TypeError):
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error())) raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
try: try:
@ -173,22 +174,23 @@ class Process(AbstractProcess):
None None
) )
except (BufferError, ValueError, TypeError): except (BufferError, ValueError, TypeError):
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error())) raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
return read_buffer return read_buffer
@staticmethod @staticmethod
def _get_last_error() -> Tuple[int, str]: def _get_last_error() -> tuple[int, str]:
err = ctypes.windll.kernel32.GetLastError() err = ctypes.windll.kernel32.GetLastError()
return err, strerror(err) return err, strerror(err)
def get_path(self) -> str: def get_path(self) -> str | None:
max_path_len = 260 max_path_len = 260
name_buffer = (ctypes.c_char * max_path_len)() name_buffer = (ctypes.c_char * max_path_len)()
rval = ctypes.windll.psapi.GetProcessImageFileNameA( rval = ctypes.windll.psapi.GetProcessImageFileNameA(
self.process_handle, self.process_handle,
name_buffer, name_buffer,
max_path_len) max_path_len,
)
if rval > 0: if rval > 0:
return name_buffer.value.decode() return name_buffer.value.decode()
@ -196,28 +198,28 @@ class Process(AbstractProcess):
return None return None
@staticmethod @staticmethod
def list_available_pids() -> List[int]: def list_available_pids() -> list[int]:
# According to EnumProcesses docs, you can't find out how many processes there are before # According to EnumProcesses docs, you can't find out how many processes there are before
# fetching the list. As a result, we grab 100 on the first try, and if we get a full list # fetching the list. As a result, we grab 100 on the first try, and if we get a full list
# of 100, repeatedly double the number until we get fewer than we asked for. # of 100, repeatedly double the number until we get fewer than we asked for.
n = 100 nn = 100
returned_size = ctypes.wintypes.DWORD() returned_size = ctypes.wintypes.DWORD()
returned_size_ptr = ctypes.byref(returned_size) returned_size_ptr = ctypes.byref(returned_size)
while True: while True:
pids = (ctypes.wintypes.DWORD * n)() pids = (ctypes.wintypes.DWORD * nn)()
size = ctypes.sizeof(pids) size = ctypes.sizeof(pids)
pids_ptr = ctypes.byref(pids) pids_ptr = ctypes.byref(pids)
success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr) success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr)
if not success: if not success:
raise MemEditError('Failed to enumerate processes: n={}'.format(n)) raise MemEditError(f'Failed to enumerate processes: nn={nn}')
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD)) num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
if n == num_returned: if nn == num_returned:
n *= 2 nn *= 2
continue continue
else: else:
break break
@ -225,15 +227,15 @@ class Process(AbstractProcess):
return pids[:num_returned] return pids[:num_returned]
@staticmethod @staticmethod
def get_pid_by_name(target_name: str) -> Optional[int]: def get_pid_by_name(target_name: str) -> int | None:
for pid in Process.list_available_pids(): for pid in Process.list_available_pids():
try: try:
logger.debug('Checking name for pid {}'.format(pid)) logger.debug(f'Checking name for pid {pid}')
with Process.open_process(pid) as process: with Process.open_process(pid) as process:
path = process.get_path() path = process.get_path()
name = os.path.basename(path) name = os.path.basename(path)
logger.debug('Name was "{}"'.format(name)) logger.debug(f'Name was "{name}"')
if path is not None and name == target_name: if path is not None and name == target_name:
return pid return pid
except ValueError: except ValueError:
@ -241,10 +243,10 @@ class Process(AbstractProcess):
except MemEditError as err: except MemEditError as err:
logger.debug(repr(err)) logger.debug(repr(err))
logger.info('Found no process with name {}'.format(target_name)) logger.info(f'Found no process with name {target_name}')
return None return None
def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]: def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
sys_info = SYSTEM_INFO() sys_info = SYSTEM_INFO()
sys_info_ptr = ctypes.byref(sys_info) sys_info_ptr = ctypes.byref(sys_info)
ctypes.windll.kernel32.GetSystemInfo(sys_info_ptr) ctypes.windll.kernel32.GetSystemInfo(sys_info_ptr)
@ -268,8 +270,8 @@ class Process(AbstractProcess):
if success != mbi_size: if success != mbi_size:
if success == 0: if success == 0:
raise MemEditError('Failed VirtualQueryEx with handle ' + raise MemEditError('Failed VirtualQueryEx with handle '
'{}: {}'.format(self.process_handle, self._get_last_error())) + f'{self.process_handle}: {self._get_last_error()}')
else: else:
raise MemEditError('VirtualQueryEx output too short!') raise MemEditError('VirtualQueryEx output too short!')