Compare commits

..

No commits in common. "8f7955543c10962548a4f23395f451de6da9a6f0" and "42c69867b8fbe283dbe9614c86de5fcb125cb335" have entirely different histories.

4 changed files with 35 additions and 31 deletions

View File

@ -1,8 +1,8 @@
"""
Abstract class for cross-platform memory editing.
"""
from typing import Self
from collections.abc import Generator
from typing import Generator
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
import copy
@ -341,12 +341,12 @@ class Process(metaclass=ABCMeta):
self.read_memory(start, region_buffer)
found += [offset + start for offset in search(needle_buffer, region_buffer)]
except OSError:
logger.exception(f'Failed to read in range 0x{start} - 0x{stop}')
logger.error('Failed to read in range 0x{} - 0x{}'.format(start, stop))
return found
@classmethod
@contextmanager
def open_process(cls: type[Self], process_id: int) -> Generator[Self, None, None]:
def open_process(cls, process_id: int) -> Generator['Process', None, None]:
"""
Context manager which automatically closes the constructed Process:
```

View File

@ -9,7 +9,6 @@ import signal
import ctypes
import ctypes.util
import logging
from pathlib import Path
from .abstract import Process as AbstractProcess
from .utils import ctypes_buffer_t, MemEditError
@ -71,19 +70,19 @@ class Process(AbstractProcess):
self.pid = None
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
mem.seek(base_address)
mem.write(write_buffer)
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
mem.seek(base_address)
mem.readinto(read_buffer)
return read_buffer
def get_path(self) -> str | None:
try:
with Path(f'/proc/{self.pid}/cmdline').open('rb') as ff:
with open(f'/proc/{self.pid}/cmdline', 'rb') as ff:
return ff.read().decode().split('\x00')[0]
except FileNotFoundError:
return None
@ -103,12 +102,12 @@ class Process(AbstractProcess):
for pid in Process.list_available_pids():
try:
logger.debug(f'Checking name for pid {pid}')
with Path(f'/proc/{pid}/cmdline').open('rb') as cmdline:
with open(f'/proc/{pid}/cmdline', 'rb') as cmdline:
path = cmdline.read().decode().split('\x00')[0]
except FileNotFoundError:
continue
name = Path(path).name
name = os.path.basename(path)
logger.debug(f'Name was "{name}"')
if path is not None and name == target_name:
return pid
@ -118,7 +117,7 @@ class Process(AbstractProcess):
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
regions = []
with Path(f'/proc/{self.pid}/maps').open('r') as maps:
with open(f'/proc/{self.pid}/maps', 'r') as maps:
for line in maps:
bounds, privileges = line.split()[0:2]

View File

@ -11,15 +11,16 @@ Utility functions and types:
Check if two buffers (ctypes objects) store equal values:
ctypes_equal(a, b)
"""
from typing import Union
import ctypes
ctypes_buffer_t = (
ctypes._SimpleCData
| ctypes.Array
| ctypes.Structure
| ctypes.Union
)
ctypes_buffer_t = Union[
ctypes._SimpleCData,
ctypes.Array,
ctypes.Structure,
ctypes.Union,
]
class MemEditError(Exception):
@ -96,7 +97,7 @@ def ctypes_equal(
if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
if not ctypes_equal(a_attr, b_attr):
return False
elif a_attr != b_attr:
elif not a_attr == b_attr:
return False
return True

View File

@ -4,7 +4,7 @@ Implementation of Process class for Windows
from math import floor
from os import strerror
from pathlib import Path
import os.path
import ctypes
import ctypes.wintypes
import logging
@ -92,7 +92,7 @@ class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
MEMORY_BASIC_INFORMATION: type[ctypes.Structure]
MEMORY_BASIC_INFORMATION: ctypes.Structure
if PTR_SIZE == 8: # 64-bit python
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
elif PTR_SIZE == 4: # 32-bit python
@ -161,8 +161,8 @@ class Process(AbstractProcess):
ctypes.sizeof(write_buffer),
None
)
except (BufferError, ValueError, TypeError) as err:
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
except (BufferError, ValueError, TypeError):
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
try:
@ -173,8 +173,8 @@ class Process(AbstractProcess):
ctypes.sizeof(read_buffer),
None
)
except (BufferError, ValueError, TypeError) as err:
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
except (BufferError, ValueError, TypeError):
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
return read_buffer
@ -192,9 +192,10 @@ class Process(AbstractProcess):
max_path_len,
)
if rval <= 0:
if rval > 0:
return name_buffer.value.decode()
else:
return None
return name_buffer.value.decode()
@staticmethod
def list_available_pids() -> list[int]:
@ -217,9 +218,11 @@ class Process(AbstractProcess):
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
if nn != num_returned:
if nn == num_returned:
nn *= 2
continue
else:
break
nn *= 2
return pids[:num_returned]
@ -233,7 +236,7 @@ class Process(AbstractProcess):
if path is None:
continue
name = Path(path).name
name = os.path.basename(path)
logger.debug(f'Name was "{name}"')
if path is not None and name == target_name:
return pid
@ -253,7 +256,7 @@ class Process(AbstractProcess):
start = sys_info.lpMinimumApplicationAddress
stop = sys_info.lpMaximumApplicationAddress
def get_mem_info(address: int) -> MEMORY_BASIC_INFORMATION:
def get_mem_info(address):
"""
Query the memory region starting at or before 'address' to get its size/type/state/permissions.
"""
@ -271,7 +274,8 @@ class Process(AbstractProcess):
if success == 0:
raise MemEditError('Failed VirtualQueryEx with handle '
+ f'{self.process_handle}: {self._get_last_error()}')
raise MemEditError('VirtualQueryEx output too short!')
else:
raise MemEditError('VirtualQueryEx output too short!')
return mbi