They are + provided for convenience only; it is probably better to use the tools built + in to your operating system to discover the pid of the specific process you + would like to edit. + + Once you have found the pid, you are ready to construct an instance of Process + and use it to read and write to memory. Once you are done with the process, + use .close() to free up the process for access by other debuggers etc. + + p = + p.close() + + To read/write to memory, first create a buffer using ctypes: + + buffer0 = (ctypes.c_byte * 5)(39, 50, 03, 40, 30) + buffer1 = ctypes.c_ulong() + + and then use + + p.write_memory(0x2fe, buffer0) + + val0 = p.read_memory(0x220, buffer0)[:] + + val1a = p.read_memory(0x149, buffer1).value + val2b = buffer1.value + assert(val1a == val2b) + + Searching for a value can be done in a number of ways: + Search a list of addresses: + found_addresses = p.search_addresses([0x1020, 0x1030], buffer0) + Search the entire memory space: + found_addresses = p.search_all_memory(buffer0, writeable_only=False) + + You can also get a list of which regions in memory are mapped (readable): + regions = p.list_mapped_regions(writeable_only=False) + + which can be used along with search_buffer(...) to re-create .search_all_memory(...): + + found = [] + for region_start, region_stop in regions: + region_buffer = (ctypes.c_byte * (region_stop - region_start))() + p.read_memory(region_start, region_buffer) + found += utils.search_buffer(ctypes.c_ulong(123456790), region_buffer) + + Other useful methods include the context manager, implemented as a static method: + + with Process.open_process(pid) as p: + # use p here, no need to call p.close() + + .get_path(), which reports the path of the executable file which was used + to start the process: + + executable_path = p.get_path() + + and deref_struct_pointer, which takes a pointer to a struct and reads out the struct members: + + # struct is a list of (offset, buffer) pairs + struct_defintion = [(0x0, ctypes.c_ulong()), + (0x20, ctypes.c_byte())] + values = p.deref_struct_pointer(0x0feab4, struct_defintion) + + which is shorthand for + + struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p()) + values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()), + p.read_memory(struct_addr + 0x20, ctypes.c_byte())] + + ================= + + Putting all this together, a simple program which alters a magic number in the only running + instance of 'magic.exe' might look like this: + + import ctypes + from mem_edit import Process + + magic_number = ctypes.ulong(1234567890) + + pid = Process.get_pid_by_name('magic.exe') + with Process.open_process(pid) as p: + addrs = p.search_all_memory(magic_number) + assert(len(addrs) == 1) + p.write_memory(addrs[0], ctypes.c_ulong(42)) + + Searching for a value which changes: + + pid = Process.get_pid_by_name('monitor_me.exe') + with Process.open_process(pid) as p: + addrs = p.search_all_memory(ctypes.c_int(40)) + input('Press enter when value has changed to 55') + filtered_addrs = p.search_addresses(addrs, ctypes.c_int(55)) + print('Found addresses:') + for addr in filtered_addrs: + print(hex(addr)) + + """ + + @abstractmethod + def __init__(self, process_id: int): + """ + Constructing a Process object prepares the process with specified process_id for + memory editing. Finding the process_id for the process you want to edit is often + easiest using os-specific tools (or by launching the process yourself, e.g. with + subprocess.Popen(...)). + + :param process_id: Process id (pid) of the target process + """ + pass + + @abstractmethod + def close(self): + """ + Detach from the process, removing our ability to edit it and + letting other debuggers attach to it instead. + + This function should be called after you are done working with the process + and will no longer need it. See the Process.open_process(...) context + manager to avoid having to call this function yourself. + """ + pass + + @abstractmethod + def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): + """ + Write the given buffer to the process's address space, starting at base_address. + + :param base_address: The address to write at, in the process's address space. + :param write_buffer: A ctypes object, for example, ctypes.c_ulong(48), + (ctypes.c_byte * 3)(43, 21, 0xff), or a subclass of ctypes.Structure, + which will be written into memory starting at base_address. + """ + pass + + @abstractmethod + def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: + """ + Read into the given buffer from the process's address space, starting at base_address. + + :param base_address: The address to read from, in the process's address space. + :param read_buffer: A ctypes object, for example. ctypes.c_ulong(), + (ctypes.c_byte * 3)(), or a subclass of ctypes.Structure, which will be + overwritten with the contents of the process's memory starting at base_address. + :returns: read_buffer is returned as well as being overwritten. + """ + pass + + @abstractmethod + def list_mapped_regions(self, writeable_only=True) -> List[Tuple[int, int]]: + """ + Return a list of (start_address, stop_address) for the regions of the address space + accessible to (readable and possibly writable by) the process. + By default, this function does not return non-writeable regions. + + :param writeable_only: If True, only return regions which are also writeable. + Default true. + :return: List of (start_address, stop_address) for each accessible memory region. + """ + pass + + @abstractmethod + def get_path(self) -> str: + """ + Return the path to the executable file which was run to start this process. + + :return: A string containing the path. + """ + pass + + @staticmethod + @abstractmethod + def list_available_pids() -> List[int]: + """ + Return a list of all process ids (pids) accessible on this system. + + :return: List of running process ids. + """ + pass + + @staticmethod + @abstractmethod + def get_pid_by_name(target_name: str) -> int or None: + """ + Attempt to return the process id (pid) of a process which was run with an executable + file with the provided name. + + This is a convenience method for quickly finding a process which is already known + to be unique and has a well-defined executable name. + + Don't rely on this method if you can possibly avoid it, since it makes no + attempt to confirm that it found a unique process and breaks trivially (e.g. if the + executable file is renamed). + """ + pass + + def deref_struct_pointer(self, + base_address: int, + targets: List[Tuple[int, ctypes_buffer_t]], + ) -> List[ctypes_buffer_t]: + """ + Take a pointer to a struct and read out the struct members: + struct_defintion = [(0x0, ctypes.c_ulong()), + (0x20, ctypes.c_byte())] + values = p.deref_struct_pointer(0x0feab4, struct_defintion) + which is shorthand for + struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p()) + values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()), + p.read_memory(struct_addr + 0x20, ctypes.c_byte())] + + :param base_address: Address at which the struct pointer is located. + :param targets: List of (offset, read_buffer) pairs which will be read from the struct. + :return: List of read values corresponding to the provided targets. + """ + base = self.read_memory(base_address, ctypes.c_void_p()) + vals = [self.read_memory(base + offset, buffer) for offset, buffer in targets] + return vals + + def search_addresses(self, addresses: List[int], needle_buffer: ctypes_buffer_t) -> List[int]: + """ + Search for the provided value at each of the provided addresses, and return the addresses + where it is found. + + :param addresses: List of addresses which should be probed. + :param needle_buffer: The value to search for. This should be a ctypes object of the same + sorts as used by .read_memory(...), which will be compared to the contents of + memory at each of the given addresses. + :return: List of addresses where the needle_buffer was found. + """ + found = [] + read_buffer = copy.copy(needle_buffer) + + for addr in addresses: + read = self.read_memory(addr, read_buffer) + if ctypes_equal(needle_buffer, read): + found.append(addr) + return found + + def search_all_memory(self, needle_buffer, writeable_only=True) -> List[int]: + """ + Search the entire memory space accessible to the process for the provided value. + + :param needle_buffer: The value to search for. This should be a ctypes object of the same + sorts as used by .read_memory(...), which will be compared to the contents of + memory at each acessible address. + :param writeable_only: If True, only search regions where the process has write access. + :return: List of addresses where the needle_buffer was found. + """ + found = [] + for start, stop in self.list_mapped_regions(writeable_only): + try: + region_buffer = (ctypes.c_byte * (stop - start))() + self.read_memory(start, region_buffer) + found += [offset + start for offset in search_buffer(needle_buffer, region_buffer)] + except OSError: + logger.error('Failed to read in range 0x{} - 0x{}'.format(start, stop)) + return found + + @classmethod + @contextmanager + def open_process(cls, process_id: int) -> 'Process': + """ + Context manager which automatically closes the constructed Process: + with Process.open_process(2394) as p: + # use p here + # no need to run p.close() + + :param process_id: Process id (pid), passed to the Process constructor. + :return: Constructed Process object. + """ + process = cls(process_id) + yield process + process.close() diff --git a/mem_edit/ b/mem_edit/ new file mode 100644 index 0000000..acf2998 --- /dev/null +++ b/mem_edit/ @@ -0,0 +1,127 @@ +""" +Implementation of Process class for Linux +""" + +from typing import List, Tuple +from os import strerror +import os +import os.path +import signal +import ctypes +import ctypes.util +import logging + +from .abstract import Process as AbstractProcess +from .utils import ctypes_buffer_t, MemEditError + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +ptrace_commands = { + 'PTRACE_GETREGS': 12, + 'PTRACE_SETREGS': 13, + 'PTRACE_ATTACH': 16, + 'PTRACE_DETACH': 17, + 'PTRACE_SYSCALL': 24, + 'PTRACE_SEIZE': 16902, + } + + +# import ptrace() from libc +_libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) +_ptrace = _libc.ptrace +_ptrace.argtypes = (ctypes.c_ulong,) * 4 +_ptrace.restype = ctypes.c_long + + +def ptrace(command: int, pid: int = 0, arg1: int = 0, arg2: int = 0) -> int: + """ + Call ptrace() with the provided pid and arguments. See the ```man ptrace```. + """ + logger.debug('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2)) + result = _ptrace(command, pid, arg1, arg2) + if result == -1: + errno = ctypes.get_errno() + if errno: + raise MemEditError('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2) + + ' failed with error {}: {}'.format(errno, strerror(errno))) + return result + + +class Process(AbstractProcess): + pid = None + + def __init__(self, process_id: int): + ptrace(ptrace_commands['PTRACE_SEIZE'], process_id) + = process_id + + def close(self): + os.kill(, signal.SIGSTOP) + ptrace(ptrace_commands['PTRACE_DETACH'],, 0, 0) + = None + + def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): + with open('/proc/{}/mem'.format(, 'rb+') as mem: + + mem.write(write_buffer) + + def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: + with open('/proc/{}/mem'.format(, 'rb+') as mem: + + mem.readinto(read_buffer) + return read_buffer + + def get_path(self) -> str: + try: + with open('/proc/{}/cmdline', 'rb') as f: + return'\x00')[0] + except FileNotFoundError: + return '' + + @staticmethod + def list_available_pids() -> List[int]: + pids = [] + for pid_str in os.listdir('/proc'): + try: + pids.append(int(pid_str)) + except ValueError: + continue + return pids + + @staticmethod + def get_pid_by_name(target_name: str) -> int: + for pid in Process.list_available_pids(): + try: +'Checking name for pid {}'.format(pid)) + with open('/proc/{}/cmdline'.format(pid), 'rb') as cmdline: + path ='\x00')[0] + except FileNotFoundError: + continue + + name = os.path.basename(path) +'Name was "{}"'.format(name)) + if path is not None and name == target_name: + return pid + +'Found no process with name {}'.format(target_name)) + return None + + def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]: + regions = [] + with open('/proc/{}/maps'.format(, 'r') as maps: + for line in maps: + bounds, privileges = line.split()[0:2] + + if 'r' not in privileges: + continue + + if writeable_only and 'w' not in privileges: + continue + + start, stop = (int(bound, 16) for bound in bounds.split('-')) + regions.append((start, stop)) + return regions + + diff --git a/mem_edit/ b/mem_edit/ new file mode 100644 index 0000000..c2e691f --- /dev/null +++ b/mem_edit/ @@ -0,0 +1,63 @@ +""" +Utility functions and types: + + Type definition for buffers: + ctypes_buffer_t + Custom exception type: + MemEditError + + Search for a buffer inside another buffer: + search_buffer(needle_buffer, haystack_buffer) + Check if two buffers (ctypes objects) store equal values: + ctypes_equal(a, b) +""" + +from typing import List +import ctypes + + +ctypes_buffer_t = ctypes._SimpleCData or ctypes.Array or ctypes.Structure or ctypes.Union + + +class MemEditError(Exception): + pass + + +def search_buffer(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t) -> List[int]: + """ + Search for a buffer inside another buffer. + + :param needle_buffer: Buffer to search for. + :param haystack_buffer: Buffer to search in. + :return: List of offsets where the needle_buffer was found. + """ + found = [] + read_type = type(needle_buffer) + for offset in range(0, len(haystack_buffer) - ctypes.sizeof(needle_buffer)): + v = read_type.from_buffer(haystack_buffer, offset) + if ctypes_equal(needle_buffer, v): + found.append(offset) + return found + + +def ctypes_equal(a: ctypes_buffer_t, b: ctypes_buffer_t) -> bool: + """ + Check if the values stored inside two ctypes buffers are equal. + """ + if not type(a) == type(b): + return False + + if isinstance(a, ctypes.Array): + return a[:] == b[:] + elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union): + for attr_name, attr_type in a._fields_: + a_attr, b_attr = (getattr(x, attr_name) for x in (a, b)) + if isinstance(a, ctypes_buffer_t): + if not ctypes_equal(a_attr, b_attr): + return False + elif not a_attr == b_attr: + return False + + return True + else: + return a.value == b.value diff --git a/mem_edit/ b/mem_edit/ new file mode 100644 index 0000000..4117d33 --- /dev/null +++ b/mem_edit/ @@ -0,0 +1,256 @@ +""" +Implementation of Process class for Windows +""" + +from typing import List, Tuple +from math import floor +from os import strerror +import os.path +import ctypes +import ctypes.wintypes +import logging + +from .abstract import Process as AbstractProcess +from .utils import ctypes_buffer_t, MemEditError + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# Process handle privileges +privileges = { + 'PROCESS_QUERY_INFORMATION': 0x0400, + 'PROCESS_VM_OPERATION': 0x0008, + 'PROCESS_VM_READ': 0x0010, + 'PROCESS_VM_WRITE': 0x0020, + } +privileges['PROCESS_RW'] = ( + privileges['PROCESS_QUERY_INFORMATION'] | + privileges['PROCESS_VM_OPERATION'] | + privileges['PROCESS_VM_READ'] | + privileges['PROCESS_VM_WRITE'] + ) + +# Memory region states +mem_states = { + 'MEM_COMMIT': 0x1000, + 'MEM_FREE': 0x10000, + 'MEM_RESERVE': 0x2000, + } + +# Memory region permissions +page_protections = { + 'PAGE_EXECUTE': 0x10, + 'PAGE_EXECUTE_READ': 0x20, + 'PAGE_EXECUTE_READWRITE': 0x40, + 'PAGE_EXECUTE_WRITECOPY': 0x80, + 'PAGE_NOACCESS': 0x01, + 'PAGE_READWRITE': 0x04, + 'PAGE_WRITECOPY': 0x08, + } +# Custom (combined) permissions +page_protections['PAGE_READABLE'] = ( + page_protections['PAGE_EXECUTE_READ'] | + page_protections['PAGE_EXECUTE_READWRITE'] | + page_protections['PAGE_READWRITE'] + ) +page_protections['PAGE_READWRITEABLE'] = ( + page_protections['PAGE_EXECUTE_READWRITE'] | + page_protections['PAGE_READWRITE'] + ) + +# Memory types +mem_types = { + 'MEM_IMAGE': 0x1000000, + 'MEM_MAPPED': 0x40000, + 'MEM_PRIVATE': 0x20000, + } + + +# C struct for VirtualQueryEx +class MEMORY_BASIC_INFORMATION(ctypes.Structure): + _fields_ = [ + ('BaseAddress', ctypes.c_void_p), + ('AllocationBase', ctypes.c_void_p), + ('AllocationProtect', ctypes.wintypes.DWORD), + ('RegionSize', ctypes.wintypes.UINT), + ('State', ctypes.wintypes.DWORD), + ('Protect', ctypes.wintypes.DWORD), + ('Type', ctypes.wintypes.DWORD), + ] + +# C struct for GetSystemInfo +class SYSTEM_INFO(ctypes.Structure): + _fields_ = [ + ('wProcessorArchitecture', ctypes.wintypes.WORD), + ('wReserved', ctypes.wintypes.WORD), + ('dwPageSize', ctypes.wintypes.DWORD), + ('lpMinimumApplicationAddress', ctypes.c_void_p), + ('lpMaximumApplicationAddress', ctypes.c_void_p), + ('dwActiveProcessorMask', ctypes.wintypes.DWORD), + ('dwNumberOfProcessors', ctypes.wintypes.DWORD), + ('dwProcessorType', ctypes.wintypes.DWORD), + ('dwAllocationGranularity', ctypes.wintypes.DWORD), + ('wProcessorLevel', ctypes.wintypes.WORD), + ('wProcessorRevision', ctypes.wintypes.WORD), + ] + + +class Process(AbstractProcess): + process_handle = None + + def __init__(self, process_id: int): + process_handle = ctypes.windll.kernel32.OpenProcess( + privileges['PROCESS_RW'], + False, + process_id + ) + + if not process_handle: + raise MemEditError('Couldn\'t open process {}'.format(process_id)) + + self.process_handle = process_handle + + def close(self): + ctypes.windll.kernel32.CloseHandle(self.process_handle) + self.process_handle = None + + def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): + try: + ctypes.windll.kernel32.WriteProcessMemory( + self.process_handle, + base_address, + ctypes.byref(write_buffer), + ctypes.sizeof(write_buffer), + None + ) + except (BufferError, ValueError, TypeError): + raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error())) + + def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: + try: + ctypes.windll.kernel32.ReadProcessMemory( + self.process_handle, + base_address, + ctypes.byref(read_buffer), + ctypes.sizeof(read_buffer), + None + ) + except (BufferError, ValueError, TypeError): + raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error())) + + return read_buffer + + @staticmethod + def _get_last_error() -> Tuple[int, str]: + err = ctypes.windll.kernel32.GetLastError() + return err, strerror(err) + + def get_path(self) -> str: + max_path_len = 260 + name_buffer = (ctypes.c_char * max_path_len)() + rval = ctypes.windll.psapi.GetProcessImageFileNameA( + self.process_handle, + name_buffer, + max_path_len + ) + + if rval > 0: + return name_buffer.value.decode() + else: + return None + + @staticmethod + def list_available_pids() -> List[int]: + # According to EnumProcesses docs, you can't find out how many processes there are before + # fetching the list. As a result, we grab 100 on the first try, and if we get a full list + # of 100, repeatedly double the number until we get fewer than we asked for. + + n = 100 + num_returned = n + + returned_size = ctypes.wintypes.DWORD() + returned_size_ptr = ctypes.byref(returned_size) + + while True: + pids = (ctypes.wintypes.DWORD * n)() + size = ctypes.sizeof(pids) + pids_ptr = ctypes.byref(pids) + + success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr) + if not success: + raise MemEditError('Failed to enumerate processes: n={}'.format(n)) + + num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD)) + + if n == num_returned: + n *= 2 + continue + else: + break + + return pids[:num_returned] + + @staticmethod + def get_pid_by_name(target_name: str) -> int: + for pid in Process.list_available_pids(): + try: +'Checking name for pid {}'.format(pid)) + with Process.open_process(pid) as process: + path = process.get_path() + + name = os.path.basename(path) +'Name was "{}"'.format(name)) + if path is not None and name == target_name: + return pid + except ValueError: + pass + +'Found no process with name {}'.format(target_name)) + return None + + def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]: + sysinfo = SYSTEM_INFO() + sysinfo_ptr = ctypes.byref(sysinfo) + ctypes.windll.kernel32.GetSystemInfo(sysinfo_ptr) + + start = sysinfo.lpMinimumApplicationAddress + stop = sysinfo.lpMaximumApplicationAddress + + def get_mem_info(address): + """ + Query the memory region starting at or before 'address' to get its size/type/state/permissions. + """ + mbi = MEMORY_BASIC_INFORMATION() + mbi_ptr = ctypes.byref(mbi) + mbi_size = ctypes.sizeof(mbi) + + success = ctypes.windll.kernel32.VirtualQueryEx( + self.process_handle, + address, + mbi_ptr, + mbi_size, + ) + + if success != mbi_size: + if success == 0: + raise MemEditError('Failed VirtualQueryEx with handle ' + + '{}: {}'.format(self.process_handle, self._get_last_error())) + else: + raise MemEditError('VirtualQueryEx output too short!') + + return mbi + + regions = [] + page_ptr = start + while page_ptr < stop: + page_info = get_mem_info(page_ptr) + if page_info.Type == mem_types['MEM_PRIVATE'] and \ + page_info.State == mem_states['MEM_COMMIT'] and \ + page_info.Protect & page_protections['PAGE_READABLE'] != 0 and \ + (page_info.Protect & page_protections['PAGE_READWRITEABLE'] != 0 or not writeable_only): + regions.append((page_ptr, page_ptr + page_info.RegionSize)) + page_ptr += page_info.RegionSize + + return regions diff --git a/ b/ new file mode 100644 index 0000000..c38b8b0 --- /dev/null +++ b/ @@ -0,0 +1,52 @@ +#!/usr/bin/env python + +from setuptools import setup, find_packages + +setup(name='mem_edit', + version='0.1', + description='Multiplatform library for memory editing', + author='Jan Petykiewicz', + author_email='', + url='', + keywords = [ + 'memory', + 'edit', + 'editing', + 'ReadProcessMemory', + 'WriteProcessMemory', + 'proc', + 'mem', + 'ptrace', + 'multiplatform', + 'scan', + 'scanner', + 'search', + 'debug', + 'cheat', + 'trainer', + ], + classifiers = [ + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Development Status :: 4 - Beta', + 'Environment :: Other Environment', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: GNU Affero General Public License v3', + 'Operating System :: POSIX :: Linux', + 'Operating System :: Microsoft :: Windows', + 'Topic :: Software Development', + 'Topic :: Software Development :: Debuggers', + 'Topic :: Software Development :: Testing', + 'Topic :: System', + 'Topic :: Games/Entertainment', + 'Topic :: Utilities', + ], + packages=find_packages(), + install_requires=[ + 'ctypes', + 'typing', + ], + extras_require={ + }, + ) +