Compare commits

...

7 Commits

Author SHA1 Message Date
e26381a578 bump version to v0.8 2024-03-30 17:46:16 -07:00
e316322fbf Update reqs 2024-03-30 17:45:41 -07:00
b889ad8133 update readme and note github mirror 2024-03-30 17:44:19 -07:00
f10674e2b5 flake8 preferences 2024-03-30 17:43:25 -07:00
bdf0fb323e early bailout conditions caught by type check 2024-03-30 17:41:44 -07:00
f7c7496cfd get_path should return None on failure 2024-03-30 17:41:23 -07:00
e56ec88761 modernize some code style
indentation, type annotations, f-strings
2024-03-30 17:41:18 -07:00
8 changed files with 160 additions and 101 deletions

29
.flake8 Normal file
View File

@ -0,0 +1,29 @@
[flake8]
ignore =
# E501 line too long
E501,
# W391 newlines at EOF
W391,
# E241 multiple spaces after comma
E241,
# E302 expected 2 newlines
E302,
# W503 line break before binary operator (to be deprecated)
W503,
# E265 block comment should start with '# '
E265,
# E123 closing bracket does not match indentation of opening bracket's line
E123,
# E124 closing bracket does not match visual indentation
E124,
# E221 multiple spaces before operator
E221,
# E201 whitespace after '['
E201,
# E741 ambiguous variable name 'I'
E741,
per-file-ignores =
# F401 import without use
*/__init__.py: F401,

View File

@ -1,8 +1,10 @@
# mem_edit
# mem_edit
**mem_edit** is a multi-platform memory editing library written in Python.
**Homepage:** https://mpxd.net/code/jan/mem_edit
* PyPI: https://pypi.org/project/mem-edit/
* Github mirror: https://github.com/anewusername/mem_edit
**Capabilities:**
* Scan all readable memory used by a process.
@ -18,7 +20,7 @@
## Installation
**Dependencies:**
* python 3 (written and tested with 3.7)
* python >=3.11
* ctypes
* typing (for type annotations)

View File

@ -17,7 +17,7 @@ from .utils import MemEditError
__author__ = 'Jan Petykiewicz'
__version__ = '0.7'
__version__ = '0.8'
version = __version__ # legacy compatibility

View File

@ -2,7 +2,7 @@
Abstract class for cross-platform memory editing.
"""
from typing import List, Tuple, Optional, Union, Generator
from typing import Generator
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
import copy
@ -122,7 +122,7 @@ class Process(metaclass=ABCMeta):
"""
@abstractmethod
def __init__(self, process_id: int):
def __init__(self, process_id: int) -> None:
"""
Constructing a Process object prepares the process with specified process_id for
memory editing. Finding the `process_id` for the process you want to edit is often
@ -135,7 +135,7 @@ class Process(metaclass=ABCMeta):
pass
@abstractmethod
def close(self):
def close(self) -> None:
"""
Detach from the process, removing our ability to edit it and
letting other debuggers attach to it instead.
@ -147,7 +147,11 @@ class Process(metaclass=ABCMeta):
pass
@abstractmethod
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
def write_memory(
self,
base_address: int,
write_buffer: ctypes_buffer_t,
) -> None:
"""
Write the given buffer to the process's address space, starting at `base_address`.
@ -160,7 +164,11 @@ class Process(metaclass=ABCMeta):
pass
@abstractmethod
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
def read_memory(
self,
base_address: int,
read_buffer: ctypes_buffer_t,
) -> ctypes_buffer_t:
"""
Read into the given buffer from the process's address space, starting at `base_address`.
@ -176,7 +184,7 @@ class Process(metaclass=ABCMeta):
pass
@abstractmethod
def list_mapped_regions(self, writeable_only=True) -> List[Tuple[int, int]]:
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
"""
Return a list of `(start_address, stop_address)` for the regions of the address space
accessible to (readable and possibly writable by) the process.
@ -192,18 +200,18 @@ class Process(metaclass=ABCMeta):
pass
@abstractmethod
def get_path(self) -> str:
def get_path(self) -> str | None:
"""
Return the path to the executable file which was run to start this process.
Returns:
A string containing the path.
A string containing the path, or None if no path was found.
"""
pass
@staticmethod
@abstractmethod
def list_available_pids() -> List[int]:
def list_available_pids() -> list[int]:
"""
Return a list of all process ids (pids) accessible on this system.
@ -214,7 +222,7 @@ class Process(metaclass=ABCMeta):
@staticmethod
@abstractmethod
def get_pid_by_name(target_name: str) -> Optional[int]:
def get_pid_by_name(target_name: str) -> int | None:
"""
Attempt to return the process id (pid) of a process which was run with an executable
file with the provided name. If no process is found, return None.
@ -234,10 +242,11 @@ class Process(metaclass=ABCMeta):
"""
pass
def deref_struct_pointer(self,
base_address: int,
targets: List[Tuple[int, ctypes_buffer_t]],
) -> List[ctypes_buffer_t]:
def deref_struct_pointer(
self,
base_address: int,
targets: list[tuple[int, ctypes_buffer_t]],
) -> list[ctypes_buffer_t]:
"""
Take a pointer to a struct and read out the struct members:
```
@ -263,11 +272,12 @@ class Process(metaclass=ABCMeta):
values = [self.read_memory(base + offset, buffer) for offset, buffer in targets]
return values
def search_addresses(self,
addresses: List[int],
needle_buffer: ctypes_buffer_t,
verbatim: bool = True,
) -> List[int]:
def search_addresses(
self,
addresses: list[int],
needle_buffer: ctypes_buffer_t,
verbatim: bool = True,
) -> list[int]:
"""
Search for the provided value at each of the provided addresses, and return the addresses
where it is found.
@ -298,11 +308,12 @@ class Process(metaclass=ABCMeta):
found.append(address)
return found
def search_all_memory(self,
needle_buffer: ctypes_buffer_t,
writeable_only: bool = True,
verbatim: bool = True,
) -> List[int]:
def search_all_memory(
self,
needle_buffer: ctypes_buffer_t,
writeable_only: bool = True,
verbatim: bool = True,
) -> list[int]:
"""
Search the entire memory space accessible to the process for the provided value.

View File

@ -2,7 +2,6 @@
Implementation of Process class for Linux
"""
from typing import List, Tuple, Optional
from os import strerror
import os
import os.path
@ -35,54 +34,61 @@ _ptrace.argtypes = (ctypes.c_ulong,) * 4
_ptrace.restype = ctypes.c_long
def ptrace(command: int, pid: int = 0, arg1: int = 0, arg2: int = 0) -> int:
def ptrace(
command: int,
pid: int = 0,
arg1: int = 0,
arg2: int = 0,
) -> int:
"""
Call ptrace() with the provided pid and arguments. See the ```man ptrace```.
Call ptrace() with the provided pid and arguments. See `man ptrace`.
"""
logger.debug('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2))
logger.debug(f'ptrace({command}, {pid}, {arg1}, {arg2})')
result = _ptrace(command, pid, arg1, arg2)
if result == -1:
err_no = ctypes.get_errno()
if err_no:
raise MemEditError('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2) +
' failed with error {}: {}'.format(err_no, strerror(err_no)))
raise MemEditError(f'ptrace({command}, {pid}, {arg1}, {arg2})'
+ f' failed with error {err_no}: {strerror(err_no)}')
return result
class Process(AbstractProcess):
pid = None
pid: int | None
def __init__(self, process_id: int):
def __init__(self, process_id: int) -> None:
ptrace(ptrace_commands['PTRACE_SEIZE'], process_id)
self.pid = process_id
def close(self):
def close(self) -> None:
if self.pid is None:
return
os.kill(self.pid, signal.SIGSTOP)
os.waitpid(self.pid, 0)
ptrace(ptrace_commands['PTRACE_DETACH'], self.pid, 0, 0)
os.kill(self.pid, signal.SIGCONT)
self.pid = None
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem:
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
mem.seek(base_address)
mem.write(write_buffer)
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem:
with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
mem.seek(base_address)
mem.readinto(read_buffer)
return read_buffer
def get_path(self) -> str:
def get_path(self) -> str | None:
try:
with open('/proc/{}/cmdline', 'rb') as f:
return f.read().decode().split('\x00')[0]
with open(f'/proc/{self.pid}/cmdline', 'rb') as ff:
return ff.read().decode().split('\x00')[0]
except FileNotFoundError:
return ''
return None
@staticmethod
def list_available_pids() -> List[int]:
def list_available_pids() -> list[int]:
pids = []
for pid_str in os.listdir('/proc'):
try:
@ -92,26 +98,26 @@ class Process(AbstractProcess):
return pids
@staticmethod
def get_pid_by_name(target_name: str) -> Optional[int]:
def get_pid_by_name(target_name: str) -> int | None:
for pid in Process.list_available_pids():
try:
logger.debug('Checking name for pid {}'.format(pid))
with open('/proc/{}/cmdline'.format(pid), 'rb') as cmdline:
logger.debug(f'Checking name for pid {pid}')
with open(f'/proc/{pid}/cmdline', 'rb') as cmdline:
path = cmdline.read().decode().split('\x00')[0]
except FileNotFoundError:
continue
name = os.path.basename(path)
logger.debug('Name was "{}"'.format(name))
logger.debug(f'Name was "{name}"')
if path is not None and name == target_name:
return pid
logger.info('Found no process with name {}'.format(target_name))
logger.info(f'Found no process with name {target_name}')
return None
def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]:
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
regions = []
with open('/proc/{}/maps'.format(self.pid), 'r') as maps:
with open(f'/proc/{self.pid}/maps', 'r') as maps:
for line in maps:
bounds, privileges = line.split()[0:2]

View File

@ -11,21 +11,26 @@ Utility functions and types:
Check if two buffers (ctypes objects) store equal values:
ctypes_equal(a, b)
"""
from typing import List, Union
from typing import Union
import ctypes
ctypes_buffer_t = Union[ctypes._SimpleCData, ctypes.Array, ctypes.Structure, ctypes.Union]
ctypes_buffer_t = Union[
ctypes._SimpleCData,
ctypes.Array,
ctypes.Structure,
ctypes.Union,
]
class MemEditError(Exception):
pass
def search_buffer_verbatim(needle_buffer: ctypes_buffer_t,
haystack_buffer: ctypes_buffer_t,
) -> List[int]:
def search_buffer_verbatim(
needle_buffer: ctypes_buffer_t,
haystack_buffer: ctypes_buffer_t,
) -> list[int]:
"""
Search for a buffer inside another buffer, using a direct (bitwise) comparison
@ -50,9 +55,10 @@ def search_buffer_verbatim(needle_buffer: ctypes_buffer_t,
return found
def search_buffer(needle_buffer: ctypes_buffer_t,
haystack_buffer: ctypes_buffer_t,
) -> List[int]:
def search_buffer(
needle_buffer: ctypes_buffer_t,
haystack_buffer: ctypes_buffer_t,
) -> list[int]:
"""
Search for a buffer inside another buffer, using `ctypes_equal` for comparison.
Much slower than `search_buffer_verbatim`.
@ -73,9 +79,10 @@ def search_buffer(needle_buffer: ctypes_buffer_t,
return found
def ctypes_equal(a: ctypes_buffer_t,
b: ctypes_buffer_t,
) -> bool:
def ctypes_equal(
a: ctypes_buffer_t,
b: ctypes_buffer_t,
) -> bool:
"""
Check if the values stored inside two ctypes buffers are equal.
"""
@ -87,7 +94,7 @@ def ctypes_equal(a: ctypes_buffer_t,
elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union):
for attr_name, attr_type in a._fields_:
a_attr, b_attr = (getattr(x, attr_name) for x in (a, b))
if isinstance(a, ctypes_buffer_t):
if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
if not ctypes_equal(a_attr, b_attr):
return False
elif not a_attr == b_attr:

View File

@ -2,7 +2,6 @@
Implementation of Process class for Windows
"""
from typing import List, Tuple, Optional
from math import floor
from os import strerror
import os.path
@ -25,10 +24,10 @@ privileges = {
'PROCESS_VM_WRITE': 0x0020,
}
privileges['PROCESS_RW'] = (
privileges['PROCESS_QUERY_INFORMATION'] |
privileges['PROCESS_VM_OPERATION'] |
privileges['PROCESS_VM_READ'] |
privileges['PROCESS_VM_WRITE']
privileges['PROCESS_QUERY_INFORMATION']
| privileges['PROCESS_VM_OPERATION']
| privileges['PROCESS_VM_READ']
| privileges['PROCESS_VM_WRITE']
)
# Memory region states
@ -50,13 +49,13 @@ page_protections = {
}
# Custom (combined) permissions
page_protections['PAGE_READABLE'] = (
page_protections['PAGE_EXECUTE_READ'] |
page_protections['PAGE_EXECUTE_READWRITE'] |
page_protections['PAGE_READWRITE']
page_protections['PAGE_EXECUTE_READ']
| page_protections['PAGE_EXECUTE_READWRITE']
| page_protections['PAGE_READWRITE']
)
page_protections['PAGE_READWRITEABLE'] = (
page_protections['PAGE_EXECUTE_READWRITE'] |
page_protections['PAGE_READWRITE']
page_protections['PAGE_EXECUTE_READWRITE']
| page_protections['PAGE_READWRITE']
)
# Memory types
@ -91,7 +90,9 @@ class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
('__alignment2', ctypes.wintypes.DWORD),
]
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
MEMORY_BASIC_INFORMATION: ctypes.Structure
if PTR_SIZE == 8: # 64-bit python
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
elif PTR_SIZE == 4: # 32-bit python
@ -133,9 +134,9 @@ class SYSTEM_INFO(ctypes.Structure):
class Process(AbstractProcess):
process_handle = None
process_handle: int | None
def __init__(self, process_id: int):
def __init__(self, process_id: int) -> None:
process_handle = ctypes.windll.kernel32.OpenProcess(
privileges['PROCESS_RW'],
False,
@ -143,15 +144,15 @@ class Process(AbstractProcess):
)
if not process_handle:
raise MemEditError('Couldn\'t open process {}'.format(process_id))
raise MemEditError(f'Couldn\'t open process {process_id}')
self.process_handle = process_handle
def close(self):
def close(self) -> None:
ctypes.windll.kernel32.CloseHandle(self.process_handle)
self.process_handle = None
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
try:
ctypes.windll.kernel32.WriteProcessMemory(
self.process_handle,
@ -161,7 +162,7 @@ class Process(AbstractProcess):
None
)
except (BufferError, ValueError, TypeError):
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error()))
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
try:
@ -173,22 +174,23 @@ class Process(AbstractProcess):
None
)
except (BufferError, ValueError, TypeError):
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error()))
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
return read_buffer
@staticmethod
def _get_last_error() -> Tuple[int, str]:
def _get_last_error() -> tuple[int, str]:
err = ctypes.windll.kernel32.GetLastError()
return err, strerror(err)
def get_path(self) -> str:
def get_path(self) -> str | None:
max_path_len = 260
name_buffer = (ctypes.c_char * max_path_len)()
rval = ctypes.windll.psapi.GetProcessImageFileNameA(
self.process_handle,
name_buffer,
max_path_len)
self.process_handle,
name_buffer,
max_path_len,
)
if rval > 0:
return name_buffer.value.decode()
@ -196,28 +198,28 @@ class Process(AbstractProcess):
return None
@staticmethod
def list_available_pids() -> List[int]:
def list_available_pids() -> list[int]:
# According to EnumProcesses docs, you can't find out how many processes there are before
# fetching the list. As a result, we grab 100 on the first try, and if we get a full list
# of 100, repeatedly double the number until we get fewer than we asked for.
n = 100
nn = 100
returned_size = ctypes.wintypes.DWORD()
returned_size_ptr = ctypes.byref(returned_size)
while True:
pids = (ctypes.wintypes.DWORD * n)()
pids = (ctypes.wintypes.DWORD * nn)()
size = ctypes.sizeof(pids)
pids_ptr = ctypes.byref(pids)
success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr)
if not success:
raise MemEditError('Failed to enumerate processes: n={}'.format(n))
raise MemEditError(f'Failed to enumerate processes: nn={nn}')
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
if n == num_returned:
n *= 2
if nn == num_returned:
nn *= 2
continue
else:
break
@ -225,15 +227,17 @@ class Process(AbstractProcess):
return pids[:num_returned]
@staticmethod
def get_pid_by_name(target_name: str) -> Optional[int]:
def get_pid_by_name(target_name: str) -> int | None:
for pid in Process.list_available_pids():
try:
logger.debug('Checking name for pid {}'.format(pid))
logger.debug(f'Checking name for pid {pid}')
with Process.open_process(pid) as process:
path = process.get_path()
if path is None:
continue
name = os.path.basename(path)
logger.debug('Name was "{}"'.format(name))
logger.debug(f'Name was "{name}"')
if path is not None and name == target_name:
return pid
except ValueError:
@ -241,10 +245,10 @@ class Process(AbstractProcess):
except MemEditError as err:
logger.debug(repr(err))
logger.info('Found no process with name {}'.format(target_name))
logger.info(f'Found no process with name {target_name}')
return None
def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]:
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
sys_info = SYSTEM_INFO()
sys_info_ptr = ctypes.byref(sys_info)
ctypes.windll.kernel32.GetSystemInfo(sys_info_ptr)
@ -268,8 +272,8 @@ class Process(AbstractProcess):
if success != mbi_size:
if success == 0:
raise MemEditError('Failed VirtualQueryEx with handle ' +
'{}: {}'.format(self.process_handle, self._get_last_error()))
raise MemEditError('Failed VirtualQueryEx with handle '
+ f'{self.process_handle}: {self._get_last_error()}')
else:
raise MemEditError('VirtualQueryEx output too short!')

View File

@ -31,7 +31,7 @@ keywords = [
]
classifiers = [
"Programming Language :: Python :: 3",
"Development Status :: 4 - Beta",
"Development Status :: 5 - Production/Stable",
"Environment :: Other Environment",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU Affero General Public License v3",
@ -44,7 +44,7 @@ classifiers = [
"Topic :: Games/Entertainment",
"Topic :: Utilities",
]
requires-python = ">=3.7"
requires-python = ">=3.11"
dynamic = ["version"]
dependencies = [
]