Compare commits

..

No commits in common. "8f7955543c10962548a4f23395f451de6da9a6f0" and "42c69867b8fbe283dbe9614c86de5fcb125cb335" have entirely different histories.

4 changed files with 35 additions and 31 deletions

View File

@ -1,8 +1,8 @@
""" """
Abstract class for cross-platform memory editing. Abstract class for cross-platform memory editing.
""" """
from typing import Self
from collections.abc import Generator from typing import Generator
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from contextlib import contextmanager from contextlib import contextmanager
import copy import copy
@ -341,12 +341,12 @@ class Process(metaclass=ABCMeta):
self.read_memory(start, region_buffer) self.read_memory(start, region_buffer)
found += [offset + start for offset in search(needle_buffer, region_buffer)] found += [offset + start for offset in search(needle_buffer, region_buffer)]
except OSError: except OSError:
logger.exception(f'Failed to read in range 0x{start} - 0x{stop}') logger.error('Failed to read in range 0x{} - 0x{}'.format(start, stop))
return found return found
@classmethod @classmethod
@contextmanager @contextmanager
def open_process(cls: type[Self], process_id: int) -> Generator[Self, None, None]: def open_process(cls, process_id: int) -> Generator['Process', None, None]:
""" """
Context manager which automatically closes the constructed Process: Context manager which automatically closes the constructed Process:
``` ```

View File

@ -9,7 +9,6 @@ import signal
import ctypes import ctypes
import ctypes.util import ctypes.util
import logging import logging
from pathlib import Path
from .abstract import Process as AbstractProcess from .abstract import Process as AbstractProcess
from .utils import ctypes_buffer_t, MemEditError from .utils import ctypes_buffer_t, MemEditError
@ -71,19 +70,19 @@ class Process(AbstractProcess):
self.pid = None self.pid = None
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None: def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
with Path(f'/proc/{self.pid}/mem').open('rb+') as mem: with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
mem.seek(base_address) mem.seek(base_address)
mem.write(write_buffer) mem.write(write_buffer)
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
with Path(f'/proc/{self.pid}/mem').open('rb+') as mem: with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
mem.seek(base_address) mem.seek(base_address)
mem.readinto(read_buffer) mem.readinto(read_buffer)
return read_buffer return read_buffer
def get_path(self) -> str | None: def get_path(self) -> str | None:
try: try:
with Path(f'/proc/{self.pid}/cmdline').open('rb') as ff: with open(f'/proc/{self.pid}/cmdline', 'rb') as ff:
return ff.read().decode().split('\x00')[0] return ff.read().decode().split('\x00')[0]
except FileNotFoundError: except FileNotFoundError:
return None return None
@ -103,12 +102,12 @@ class Process(AbstractProcess):
for pid in Process.list_available_pids(): for pid in Process.list_available_pids():
try: try:
logger.debug(f'Checking name for pid {pid}') logger.debug(f'Checking name for pid {pid}')
with Path(f'/proc/{pid}/cmdline').open('rb') as cmdline: with open(f'/proc/{pid}/cmdline', 'rb') as cmdline:
path = cmdline.read().decode().split('\x00')[0] path = cmdline.read().decode().split('\x00')[0]
except FileNotFoundError: except FileNotFoundError:
continue continue
name = Path(path).name name = os.path.basename(path)
logger.debug(f'Name was "{name}"') logger.debug(f'Name was "{name}"')
if path is not None and name == target_name: if path is not None and name == target_name:
return pid return pid
@ -118,7 +117,7 @@ class Process(AbstractProcess):
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]: def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
regions = [] regions = []
with Path(f'/proc/{self.pid}/maps').open('r') as maps: with open(f'/proc/{self.pid}/maps', 'r') as maps:
for line in maps: for line in maps:
bounds, privileges = line.split()[0:2] bounds, privileges = line.split()[0:2]

View File

@ -11,15 +11,16 @@ Utility functions and types:
Check if two buffers (ctypes objects) store equal values: Check if two buffers (ctypes objects) store equal values:
ctypes_equal(a, b) ctypes_equal(a, b)
""" """
from typing import Union
import ctypes import ctypes
ctypes_buffer_t = ( ctypes_buffer_t = Union[
ctypes._SimpleCData ctypes._SimpleCData,
| ctypes.Array ctypes.Array,
| ctypes.Structure ctypes.Structure,
| ctypes.Union ctypes.Union,
) ]
class MemEditError(Exception): class MemEditError(Exception):
@ -96,7 +97,7 @@ def ctypes_equal(
if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)): if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
if not ctypes_equal(a_attr, b_attr): if not ctypes_equal(a_attr, b_attr):
return False return False
elif a_attr != b_attr: elif not a_attr == b_attr:
return False return False
return True return True

View File

@ -4,7 +4,7 @@ Implementation of Process class for Windows
from math import floor from math import floor
from os import strerror from os import strerror
from pathlib import Path import os.path
import ctypes import ctypes
import ctypes.wintypes import ctypes.wintypes
import logging import logging
@ -92,7 +92,7 @@ class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p) PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
MEMORY_BASIC_INFORMATION: type[ctypes.Structure] MEMORY_BASIC_INFORMATION: ctypes.Structure
if PTR_SIZE == 8: # 64-bit python if PTR_SIZE == 8: # 64-bit python
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64 MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
elif PTR_SIZE == 4: # 32-bit python elif PTR_SIZE == 4: # 32-bit python
@ -161,8 +161,8 @@ class Process(AbstractProcess):
ctypes.sizeof(write_buffer), ctypes.sizeof(write_buffer),
None None
) )
except (BufferError, ValueError, TypeError) as err: except (BufferError, ValueError, TypeError):
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
try: try:
@ -173,8 +173,8 @@ class Process(AbstractProcess):
ctypes.sizeof(read_buffer), ctypes.sizeof(read_buffer),
None None
) )
except (BufferError, ValueError, TypeError) as err: except (BufferError, ValueError, TypeError):
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
return read_buffer return read_buffer
@ -192,9 +192,10 @@ class Process(AbstractProcess):
max_path_len, max_path_len,
) )
if rval <= 0: if rval > 0:
return name_buffer.value.decode()
else:
return None return None
return name_buffer.value.decode()
@staticmethod @staticmethod
def list_available_pids() -> list[int]: def list_available_pids() -> list[int]:
@ -217,9 +218,11 @@ class Process(AbstractProcess):
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD)) num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
if nn != num_returned: if nn == num_returned:
nn *= 2
continue
else:
break break
nn *= 2
return pids[:num_returned] return pids[:num_returned]
@ -233,7 +236,7 @@ class Process(AbstractProcess):
if path is None: if path is None:
continue continue
name = Path(path).name name = os.path.basename(path)
logger.debug(f'Name was "{name}"') logger.debug(f'Name was "{name}"')
if path is not None and name == target_name: if path is not None and name == target_name:
return pid return pid
@ -253,7 +256,7 @@ class Process(AbstractProcess):
start = sys_info.lpMinimumApplicationAddress start = sys_info.lpMinimumApplicationAddress
stop = sys_info.lpMaximumApplicationAddress stop = sys_info.lpMaximumApplicationAddress
def get_mem_info(address: int) -> MEMORY_BASIC_INFORMATION: def get_mem_info(address):
""" """
Query the memory region starting at or before 'address' to get its size/type/state/permissions. Query the memory region starting at or before 'address' to get its size/type/state/permissions.
""" """
@ -271,7 +274,8 @@ class Process(AbstractProcess):
if success == 0: if success == 0:
raise MemEditError('Failed VirtualQueryEx with handle ' raise MemEditError('Failed VirtualQueryEx with handle '
+ f'{self.process_handle}: {self._get_last_error()}') + f'{self.process_handle}: {self._get_last_error()}')
raise MemEditError('VirtualQueryEx output too short!') else:
raise MemEditError('VirtualQueryEx output too short!')
return mbi return mbi