Compare commits

..

4 Commits

Author SHA1 Message Date
8f7955543c use pathlib 2024-08-01 00:23:25 -07:00
5a82f04f9e flatten and simplify some code 2024-08-01 00:22:59 -07:00
5f2b1a432e improve type annotations 2024-08-01 00:21:55 -07:00
fec96b92a7 improve error handling 2024-08-01 00:20:53 -07:00
4 changed files with 31 additions and 35 deletions

View File

@ -1,8 +1,8 @@
""" """
Abstract class for cross-platform memory editing. Abstract class for cross-platform memory editing.
""" """
from typing import Self
from typing import Generator from collections.abc import Generator
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from contextlib import contextmanager from contextlib import contextmanager
import copy import copy
@ -341,12 +341,12 @@ class Process(metaclass=ABCMeta):
self.read_memory(start, region_buffer) self.read_memory(start, region_buffer)
found += [offset + start for offset in search(needle_buffer, region_buffer)] found += [offset + start for offset in search(needle_buffer, region_buffer)]
except OSError: except OSError:
logger.error('Failed to read in range 0x{} - 0x{}'.format(start, stop)) logger.exception(f'Failed to read in range 0x{start} - 0x{stop}')
return found return found
@classmethod @classmethod
@contextmanager @contextmanager
def open_process(cls, process_id: int) -> Generator['Process', None, None]: def open_process(cls: type[Self], process_id: int) -> Generator[Self, None, None]:
""" """
Context manager which automatically closes the constructed Process: Context manager which automatically closes the constructed Process:
``` ```

View File

@ -9,6 +9,7 @@ import signal
import ctypes import ctypes
import ctypes.util import ctypes.util
import logging import logging
from pathlib import Path
from .abstract import Process as AbstractProcess from .abstract import Process as AbstractProcess
from .utils import ctypes_buffer_t, MemEditError from .utils import ctypes_buffer_t, MemEditError
@ -70,19 +71,19 @@ class Process(AbstractProcess):
self.pid = None self.pid = None
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None: def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
with open(f'/proc/{self.pid}/mem', 'rb+') as mem: with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
mem.seek(base_address) mem.seek(base_address)
mem.write(write_buffer) mem.write(write_buffer)
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
with open(f'/proc/{self.pid}/mem', 'rb+') as mem: with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
mem.seek(base_address) mem.seek(base_address)
mem.readinto(read_buffer) mem.readinto(read_buffer)
return read_buffer return read_buffer
def get_path(self) -> str | None: def get_path(self) -> str | None:
try: try:
with open(f'/proc/{self.pid}/cmdline', 'rb') as ff: with Path(f'/proc/{self.pid}/cmdline').open('rb') as ff:
return ff.read().decode().split('\x00')[0] return ff.read().decode().split('\x00')[0]
except FileNotFoundError: except FileNotFoundError:
return None return None
@ -102,12 +103,12 @@ class Process(AbstractProcess):
for pid in Process.list_available_pids(): for pid in Process.list_available_pids():
try: try:
logger.debug(f'Checking name for pid {pid}') logger.debug(f'Checking name for pid {pid}')
with open(f'/proc/{pid}/cmdline', 'rb') as cmdline: with Path(f'/proc/{pid}/cmdline').open('rb') as cmdline:
path = cmdline.read().decode().split('\x00')[0] path = cmdline.read().decode().split('\x00')[0]
except FileNotFoundError: except FileNotFoundError:
continue continue
name = os.path.basename(path) name = Path(path).name
logger.debug(f'Name was "{name}"') logger.debug(f'Name was "{name}"')
if path is not None and name == target_name: if path is not None and name == target_name:
return pid return pid
@ -117,7 +118,7 @@ class Process(AbstractProcess):
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]: def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
regions = [] regions = []
with open(f'/proc/{self.pid}/maps', 'r') as maps: with Path(f'/proc/{self.pid}/maps').open('r') as maps:
for line in maps: for line in maps:
bounds, privileges = line.split()[0:2] bounds, privileges = line.split()[0:2]

View File

@ -11,16 +11,15 @@ Utility functions and types:
Check if two buffers (ctypes objects) store equal values: Check if two buffers (ctypes objects) store equal values:
ctypes_equal(a, b) ctypes_equal(a, b)
""" """
from typing import Union
import ctypes import ctypes
ctypes_buffer_t = Union[ ctypes_buffer_t = (
ctypes._SimpleCData, ctypes._SimpleCData
ctypes.Array, | ctypes.Array
ctypes.Structure, | ctypes.Structure
ctypes.Union, | ctypes.Union
] )
class MemEditError(Exception): class MemEditError(Exception):
@ -97,7 +96,7 @@ def ctypes_equal(
if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)): if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
if not ctypes_equal(a_attr, b_attr): if not ctypes_equal(a_attr, b_attr):
return False return False
elif not a_attr == b_attr: elif a_attr != b_attr:
return False return False
return True return True

View File

@ -4,7 +4,7 @@ Implementation of Process class for Windows
from math import floor from math import floor
from os import strerror from os import strerror
import os.path from pathlib import Path
import ctypes import ctypes
import ctypes.wintypes import ctypes.wintypes
import logging import logging
@ -92,7 +92,7 @@ class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p) PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
MEMORY_BASIC_INFORMATION: ctypes.Structure MEMORY_BASIC_INFORMATION: type[ctypes.Structure]
if PTR_SIZE == 8: # 64-bit python if PTR_SIZE == 8: # 64-bit python
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64 MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
elif PTR_SIZE == 4: # 32-bit python elif PTR_SIZE == 4: # 32-bit python
@ -161,8 +161,8 @@ class Process(AbstractProcess):
ctypes.sizeof(write_buffer), ctypes.sizeof(write_buffer),
None None
) )
except (BufferError, ValueError, TypeError): except (BufferError, ValueError, TypeError) as err:
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
try: try:
@ -173,8 +173,8 @@ class Process(AbstractProcess):
ctypes.sizeof(read_buffer), ctypes.sizeof(read_buffer),
None None
) )
except (BufferError, ValueError, TypeError): except (BufferError, ValueError, TypeError) as err:
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
return read_buffer return read_buffer
@ -192,10 +192,9 @@ class Process(AbstractProcess):
max_path_len, max_path_len,
) )
if rval > 0: if rval <= 0:
return name_buffer.value.decode()
else:
return None return None
return name_buffer.value.decode()
@staticmethod @staticmethod
def list_available_pids() -> list[int]: def list_available_pids() -> list[int]:
@ -218,11 +217,9 @@ class Process(AbstractProcess):
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD)) num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
if nn == num_returned: if nn != num_returned:
nn *= 2
continue
else:
break break
nn *= 2
return pids[:num_returned] return pids[:num_returned]
@ -236,7 +233,7 @@ class Process(AbstractProcess):
if path is None: if path is None:
continue continue
name = os.path.basename(path) name = Path(path).name
logger.debug(f'Name was "{name}"') logger.debug(f'Name was "{name}"')
if path is not None and name == target_name: if path is not None and name == target_name:
return pid return pid
@ -256,7 +253,7 @@ class Process(AbstractProcess):
start = sys_info.lpMinimumApplicationAddress start = sys_info.lpMinimumApplicationAddress
stop = sys_info.lpMaximumApplicationAddress stop = sys_info.lpMaximumApplicationAddress
def get_mem_info(address): def get_mem_info(address: int) -> MEMORY_BASIC_INFORMATION:
""" """
Query the memory region starting at or before 'address' to get its size/type/state/permissions. Query the memory region starting at or before 'address' to get its size/type/state/permissions.
""" """
@ -274,8 +271,7 @@ class Process(AbstractProcess):
if success == 0: if success == 0:
raise MemEditError('Failed VirtualQueryEx with handle ' raise MemEditError('Failed VirtualQueryEx with handle '
+ f'{self.process_handle}: {self._get_last_error()}') + f'{self.process_handle}: {self._get_last_error()}')
else: raise MemEditError('VirtualQueryEx output too short!')
raise MemEditError('VirtualQueryEx output too short!')
return mbi return mbi