forked from jan/mem_edit
Compare commits
No commits in common. "master" and "0.1" have entirely different histories.
10 changed files with 194 additions and 342 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -4,5 +4,4 @@ __pycache__
|
|||
*.pyc
|
||||
|
||||
*.egg-info/
|
||||
build/
|
||||
dist/
|
||||
|
|
|
|||
|
|
@ -1,3 +1,2 @@
|
|||
include README.md
|
||||
include LICENSE.md
|
||||
include mem_edit/VERSION
|
||||
|
|
|
|||
14
README.md
14
README.md
|
|
@ -18,19 +18,19 @@
|
|||
## Installation
|
||||
|
||||
**Dependencies:**
|
||||
* python 3 (written and tested with 3.7)
|
||||
* ctypes
|
||||
* typing (for type annotations)
|
||||
* python 3 (written and tested with 3.5)
|
||||
* ctypes
|
||||
* typing (for type annotations)
|
||||
|
||||
|
||||
Install with pip, from PyPI (preferred):
|
||||
```bash
|
||||
pip3 install mem_edit
|
||||
pip install mem_edit
|
||||
```
|
||||
|
||||
Install with pip from git repository
|
||||
```bash
|
||||
pip3 install git+https://mpxd.net/code/jan/mem_edit.git@release
|
||||
pip install git+https://mpxd.net/code/jan/mem_edit.git@release
|
||||
```
|
||||
|
||||
|
||||
|
|
@ -55,7 +55,7 @@ Increment a magic number (unsigned long 1234567890) found in 'magic.exe':
|
|||
pid = Process.get_pid_by_name('magic.exe')
|
||||
with Process.open_process(pid) as p:
|
||||
addrs = p.search_all_memory(magic_number)
|
||||
|
||||
|
||||
# We don't want to edit if there's more than one result...
|
||||
assert(len(addrs) == 1)
|
||||
|
||||
|
|
@ -104,7 +104,7 @@ Read and alter a structure:
|
|||
s = MyStruct()
|
||||
s.first_member = 1234567890
|
||||
s.second_member = 0x1234
|
||||
|
||||
|
||||
addrs = p.search_all_memory(s)
|
||||
print(addrs)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +0,0 @@
|
|||
""" VERSION defintion. THIS FILE IS MANUALLY PARSED BY setup.py and REQUIRES A SPECIFIC FORMAT """
|
||||
__version__ = '''
|
||||
0.6
|
||||
'''.strip()
|
||||
|
|
@ -18,9 +18,6 @@ from .utils import MemEditError
|
|||
|
||||
__author__ = 'Jan Petykiewicz'
|
||||
|
||||
from .VERSION import __version__
|
||||
version = __version__ # legacy compatibility
|
||||
|
||||
|
||||
system = platform.system()
|
||||
if system == 'Windows':
|
||||
|
|
|
|||
|
|
@ -2,17 +2,17 @@
|
|||
Abstract class for cross-platform memory editing.
|
||||
"""
|
||||
|
||||
from typing import List, Tuple, Optional, Union, Generator
|
||||
from typing import List, Tuple
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from contextlib import contextmanager
|
||||
import copy
|
||||
import ctypes
|
||||
import logging
|
||||
|
||||
from . import utils
|
||||
from .utils import ctypes_buffer_t
|
||||
from .utils import ctypes_buffer_t, search_buffer, ctypes_equal
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
|
@ -22,8 +22,8 @@ class Process(metaclass=ABCMeta):
|
|||
(i.e., by reading from or writing to the memory used by a given process).
|
||||
|
||||
The static methods
|
||||
`Process.list_available_pids()`
|
||||
`Process.get_pid_by_name(executable_filename)`
|
||||
Process.list_available_pids()
|
||||
Process.get_pid_by_name(executable_filename)
|
||||
can be used to help find the process id (pid) of the target process. They are
|
||||
provided for convenience only; it is probably better to use the tools built
|
||||
in to your operating system to discover the pid of the specific process you
|
||||
|
|
@ -31,19 +31,18 @@ class Process(metaclass=ABCMeta):
|
|||
|
||||
Once you have found the pid, you are ready to construct an instance of Process
|
||||
and use it to read and write to memory. Once you are done with the process,
|
||||
use `.close()` to free up the process for access by other debuggers etc.
|
||||
```
|
||||
use .close() to free up the process for access by other debuggers etc.
|
||||
|
||||
p = Process(1239)
|
||||
p.close()
|
||||
```
|
||||
|
||||
To read/write to memory, first create a buffer using ctypes:
|
||||
```
|
||||
|
||||
buffer0 = (ctypes.c_byte * 5)(39, 50, 03, 40, 30)
|
||||
buffer1 = ctypes.c_ulong()
|
||||
```
|
||||
|
||||
and then use
|
||||
```
|
||||
|
||||
p.write_memory(0x2fe, buffer0)
|
||||
|
||||
val0 = p.read_memory(0x220, buffer0)[:]
|
||||
|
|
@ -51,52 +50,52 @@ class Process(metaclass=ABCMeta):
|
|||
val1a = p.read_memory(0x149, buffer1).value
|
||||
val2b = buffer1.value
|
||||
assert(val1a == val2b)
|
||||
```
|
||||
|
||||
Searching for a value can be done in a number of ways:
|
||||
Search a list of addresses:
|
||||
`found_addresses = p.search_addresses([0x1020, 0x1030], buffer0)`
|
||||
found_addresses = p.search_addresses([0x1020, 0x1030], buffer0)
|
||||
Search the entire memory space:
|
||||
`found_addresses = p.search_all_memory(buffer0, writeable_only=False)`
|
||||
found_addresses = p.search_all_memory(buffer0, writeable_only=False)
|
||||
|
||||
You can also get a list of which regions in memory are mapped (readable):
|
||||
`regions = p.list_mapped_regions(writeable_only=False)`
|
||||
which can be used along with search_buffer(...) to re-create .search_all_memory(...):
|
||||
```
|
||||
regions = p.list_mapped_regions(writeable_only=False)
|
||||
|
||||
which can be used along with search_buffer(...) to re-create .search_all_memory(...):
|
||||
|
||||
found = []
|
||||
for region_start, region_stop in regions:
|
||||
region_buffer = (ctypes.c_byte * (region_stop - region_start))()
|
||||
p.read_memory(region_start, region_buffer)
|
||||
found += utils.search_buffer(ctypes.c_ulong(123456790), region_buffer)
|
||||
```
|
||||
|
||||
Other useful methods include the context manager, implemented as a static method:
|
||||
```
|
||||
|
||||
with Process.open_process(pid) as p:
|
||||
# use p here, no need to call p.close()
|
||||
```
|
||||
|
||||
.get_path(), which reports the path of the executable file which was used
|
||||
to start the process:
|
||||
```
|
||||
|
||||
executable_path = p.get_path()
|
||||
```
|
||||
|
||||
and deref_struct_pointer, which takes a pointer to a struct and reads out the struct members:
|
||||
```
|
||||
|
||||
# struct is a list of (offset, buffer) pairs
|
||||
struct_defintion = [(0x0, ctypes.c_ulong()),
|
||||
(0x20, ctypes.c_byte())]
|
||||
values = p.deref_struct_pointer(0x0feab4, struct_defintion)
|
||||
```
|
||||
|
||||
which is shorthand for
|
||||
```
|
||||
|
||||
struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
|
||||
values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
|
||||
p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
|
||||
```
|
||||
|
||||
=================
|
||||
|
||||
Putting all this together, a simple program which alters a magic number in the only running
|
||||
instance of 'magic.exe' might look like this:
|
||||
```
|
||||
|
||||
import ctypes
|
||||
from mem_edit import Process
|
||||
|
||||
|
|
@ -107,9 +106,9 @@ class Process(metaclass=ABCMeta):
|
|||
addrs = p.search_all_memory(magic_number)
|
||||
assert(len(addrs) == 1)
|
||||
p.write_memory(addrs[0], ctypes.c_ulong(42))
|
||||
```
|
||||
|
||||
Searching for a value which changes:
|
||||
```
|
||||
|
||||
pid = Process.get_pid_by_name('monitor_me.exe')
|
||||
with Process.open_process(pid) as p:
|
||||
addrs = p.search_all_memory(ctypes.c_int(40))
|
||||
|
|
@ -118,19 +117,18 @@ class Process(metaclass=ABCMeta):
|
|||
print('Found addresses:')
|
||||
for addr in filtered_addrs:
|
||||
print(hex(addr))
|
||||
```
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, process_id: int):
|
||||
"""
|
||||
Constructing a Process object prepares the process with specified process_id for
|
||||
memory editing. Finding the `process_id` for the process you want to edit is often
|
||||
memory editing. Finding the process_id for the process you want to edit is often
|
||||
easiest using os-specific tools (or by launching the process yourself, e.g. with
|
||||
`subprocess.Popen(...)`).
|
||||
subprocess.Popen(...)).
|
||||
|
||||
Args:
|
||||
process_id: Process id (pid) of the target process
|
||||
:param process_id: Process id (pid) of the target process
|
||||
"""
|
||||
pass
|
||||
|
||||
|
|
@ -141,7 +139,7 @@ class Process(metaclass=ABCMeta):
|
|||
letting other debuggers attach to it instead.
|
||||
|
||||
This function should be called after you are done working with the process
|
||||
and will no longer need it. See the `Process.open_process(...)` context
|
||||
and will no longer need it. See the Process.open_process(...) context
|
||||
manager to avoid having to call this function yourself.
|
||||
"""
|
||||
pass
|
||||
|
|
@ -149,45 +147,38 @@ class Process(metaclass=ABCMeta):
|
|||
@abstractmethod
|
||||
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
|
||||
"""
|
||||
Write the given buffer to the process's address space, starting at `base_address`.
|
||||
Write the given buffer to the process's address space, starting at base_address.
|
||||
|
||||
Args:
|
||||
base_address: The address to write at, in the process's address space.
|
||||
write_buffer: A ctypes object, for example, `ctypes.c_ulong(48)`,
|
||||
`(ctypes.c_byte * 3)(43, 21, 0xff)`, or a subclass of `ctypes.Structure`,
|
||||
which will be written into memory starting at `base_address`.
|
||||
:param base_address: The address to write at, in the process's address space.
|
||||
:param write_buffer: A ctypes object, for example, ctypes.c_ulong(48),
|
||||
(ctypes.c_byte * 3)(43, 21, 0xff), or a subclass of ctypes.Structure,
|
||||
which will be written into memory starting at base_address.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
||||
"""
|
||||
Read into the given buffer from the process's address space, starting at `base_address`.
|
||||
Read into the given buffer from the process's address space, starting at base_address.
|
||||
|
||||
Args:
|
||||
base_address: The address to read from, in the process's address space.
|
||||
read_buffer: A `ctypes` object, for example. `ctypes.c_ulong()`,
|
||||
`(ctypes.c_byte * 3)()`, or a subclass of `ctypes.Structure`, which will be
|
||||
overwritten with the contents of the process's memory starting at `base_address`.
|
||||
|
||||
Returns:
|
||||
`read_buffer` is returned as well as being overwritten.
|
||||
:param base_address: The address to read from, in the process's address space.
|
||||
:param read_buffer: A ctypes object, for example. ctypes.c_ulong(),
|
||||
(ctypes.c_byte * 3)(), or a subclass of ctypes.Structure, which will be
|
||||
overwritten with the contents of the process's memory starting at base_address.
|
||||
:returns: read_buffer is returned as well as being overwritten.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def list_mapped_regions(self, writeable_only=True) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Return a list of `(start_address, stop_address)` for the regions of the address space
|
||||
Return a list of (start_address, stop_address) for the regions of the address space
|
||||
accessible to (readable and possibly writable by) the process.
|
||||
By default, this function does not return non-writeable regions.
|
||||
|
||||
Args:
|
||||
writeable_only: If `True`, only return regions which are also writeable.
|
||||
Default `True`.
|
||||
|
||||
Returns:
|
||||
List of `(start_address, stop_address)` for each accessible memory region.
|
||||
:param writeable_only: If True, only return regions which are also writeable.
|
||||
Default true.
|
||||
:return: List of (start_address, stop_address) for each accessible memory region.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
|
@ -196,8 +187,7 @@ class Process(metaclass=ABCMeta):
|
|||
"""
|
||||
Return the path to the executable file which was run to start this process.
|
||||
|
||||
Returns:
|
||||
A string containing the path.
|
||||
:return: A string containing the path.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
|
@ -207,14 +197,13 @@ class Process(metaclass=ABCMeta):
|
|||
"""
|
||||
Return a list of all process ids (pids) accessible on this system.
|
||||
|
||||
Returns:
|
||||
List of running process ids.
|
||||
:return: List of running process ids.
|
||||
"""
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
@abstractmethod
|
||||
def get_pid_by_name(target_name: str) -> Optional[int]:
|
||||
def get_pid_by_name(target_name: str) -> int or None:
|
||||
"""
|
||||
Attempt to return the process id (pid) of a process which was run with an executable
|
||||
file with the provided name. If no process is found, return None.
|
||||
|
|
@ -225,12 +214,7 @@ class Process(metaclass=ABCMeta):
|
|||
Don't rely on this method if you can possibly avoid it, since it makes no
|
||||
attempt to confirm that it found a unique process and breaks trivially (e.g. if the
|
||||
executable file is renamed).
|
||||
|
||||
Args:
|
||||
target_name: Name of the process to find the PID for
|
||||
|
||||
Returns:
|
||||
Process id (pid) of a process with the provided name, or `None`.
|
||||
:return: Process id (pid) of a process with the provided name, or None.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
|
@ -240,115 +224,73 @@ class Process(metaclass=ABCMeta):
|
|||
) -> List[ctypes_buffer_t]:
|
||||
"""
|
||||
Take a pointer to a struct and read out the struct members:
|
||||
```
|
||||
struct_defintion = [(0x0, ctypes.c_ulong()),
|
||||
(0x20, ctypes.c_byte())]
|
||||
values = p.deref_struct_pointer(0x0feab4, struct_defintion)
|
||||
```
|
||||
which is shorthand for
|
||||
```
|
||||
struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
|
||||
values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
|
||||
p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
|
||||
```
|
||||
|
||||
Args:
|
||||
base_address: Address at which the struct pointer is located.
|
||||
targets: List of `(offset, read_buffer)` pairs which will be read from the struct.
|
||||
|
||||
Return:
|
||||
List of read values corresponding to the provided targets.
|
||||
:param base_address: Address at which the struct pointer is located.
|
||||
:param targets: List of (offset, read_buffer) pairs which will be read from the struct.
|
||||
:return: List of read values corresponding to the provided targets.
|
||||
"""
|
||||
base = self.read_memory(base_address, ctypes.c_void_p()).value
|
||||
values = [self.read_memory(base + offset, buffer) for offset, buffer in targets]
|
||||
return values
|
||||
|
||||
def search_addresses(self,
|
||||
addresses: List[int],
|
||||
needle_buffer: ctypes_buffer_t,
|
||||
verbatim: bool = True,
|
||||
) -> List[int]:
|
||||
def search_addresses(self, addresses: List[int], needle_buffer: ctypes_buffer_t) -> List[int]:
|
||||
"""
|
||||
Search for the provided value at each of the provided addresses, and return the addresses
|
||||
where it is found.
|
||||
|
||||
Args:
|
||||
addresses: List of addresses which should be probed.
|
||||
needle_buffer: The value to search for. This should be a `ctypes` object of the same
|
||||
sorts as used by `.read_memory(...)`, which will be compared to the contents of
|
||||
:param addresses: List of addresses which should be probed.
|
||||
:param needle_buffer: The value to search for. This should be a ctypes object of the same
|
||||
sorts as used by .read_memory(...), which will be compared to the contents of
|
||||
memory at each of the given addresses.
|
||||
verbatim: If `True`, perform bitwise comparison when searching for `needle_buffer`.
|
||||
If `False`, perform `utils.ctypes_equal`-based comparison. Default `True`.
|
||||
|
||||
Returns:
|
||||
List of addresses where the `needle_buffer` was found.
|
||||
:return: List of addresses where the needle_buffer was found.
|
||||
"""
|
||||
found = []
|
||||
read_buffer = copy.copy(needle_buffer)
|
||||
|
||||
if verbatim:
|
||||
def compare(a, b):
|
||||
return bytes(read_buffer) == bytes(needle_buffer)
|
||||
else:
|
||||
compare = utils.ctypes_equal
|
||||
|
||||
for address in addresses:
|
||||
self.read_memory(address, read_buffer)
|
||||
if compare(needle_buffer, read_buffer):
|
||||
read = self.read_memory(address, read_buffer)
|
||||
if ctypes_equal(needle_buffer, read):
|
||||
found.append(address)
|
||||
return found
|
||||
|
||||
def search_all_memory(self,
|
||||
needle_buffer: ctypes_buffer_t,
|
||||
writeable_only: bool = True,
|
||||
verbatim: bool = True,
|
||||
) -> List[int]:
|
||||
def search_all_memory(self, needle_buffer, writeable_only=True) -> List[int]:
|
||||
"""
|
||||
Search the entire memory space accessible to the process for the provided value.
|
||||
|
||||
Args:
|
||||
needle_buffer: The value to search for. This should be a ctypes object of the same
|
||||
sorts as used by `.read_memory(...)`, which will be compared to the contents of
|
||||
:param needle_buffer: The value to search for. This should be a ctypes object of the same
|
||||
sorts as used by .read_memory(...), which will be compared to the contents of
|
||||
memory at each accessible address.
|
||||
writeable_only: If `True`, only search regions where the process has write access.
|
||||
Default `True`.
|
||||
verbatim: If `True`, perform bitwise comparison when searching for `needle_buffer`.
|
||||
If `False`, perform `utils.ctypes_equal-based` comparison. Default `True`.
|
||||
|
||||
Returns:
|
||||
List of addresses where the `needle_buffer` was found.
|
||||
:param writeable_only: If True, only search regions where the process has write access.
|
||||
:return: List of addresses where the needle_buffer was found.
|
||||
"""
|
||||
found = []
|
||||
if verbatim:
|
||||
search = utils.search_buffer_verbatim
|
||||
else:
|
||||
search = utils.search_buffer
|
||||
|
||||
for start, stop in self.list_mapped_regions(writeable_only):
|
||||
try:
|
||||
region_buffer = (ctypes.c_byte * (stop - start))()
|
||||
self.read_memory(start, region_buffer)
|
||||
found += [offset + start for offset in search(needle_buffer, region_buffer)]
|
||||
found += [offset + start for offset in search_buffer(needle_buffer, region_buffer)]
|
||||
except OSError:
|
||||
logger.error('Failed to read in range 0x{} - 0x{}'.format(start, stop))
|
||||
return found
|
||||
|
||||
@classmethod
|
||||
@contextmanager
|
||||
def open_process(cls, process_id: int) -> Generator['Process', None, None]:
|
||||
def open_process(cls, process_id: int) -> 'Process':
|
||||
"""
|
||||
Context manager which automatically closes the constructed Process:
|
||||
```
|
||||
with Process.open_process(2394) as p:
|
||||
# use p here
|
||||
# no need to run p.close()
|
||||
```
|
||||
|
||||
Args:
|
||||
process_id: Process id (pid), passed to the Process constructor.
|
||||
|
||||
Returns:
|
||||
Constructed Process object.
|
||||
:param process_id: Process id (pid), passed to the Process constructor.
|
||||
:return: Constructed Process object.
|
||||
"""
|
||||
process = cls(process_id)
|
||||
yield process
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
Implementation of Process class for Linux
|
||||
"""
|
||||
|
||||
from typing import List, Tuple, Optional
|
||||
from typing import List, Tuple
|
||||
from os import strerror
|
||||
import os
|
||||
import os.path
|
||||
|
|
@ -15,17 +15,18 @@ from .abstract import Process as AbstractProcess
|
|||
from .utils import ctypes_buffer_t, MemEditError
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
ptrace_commands = {
|
||||
'PTRACE_GETREGS': 12,
|
||||
'PTRACE_SETREGS': 13,
|
||||
'PTRACE_ATTACH': 16,
|
||||
'PTRACE_DETACH': 17,
|
||||
'PTRACE_SYSCALL': 24,
|
||||
'PTRACE_SEIZE': 16902,
|
||||
}
|
||||
'PTRACE_GETREGS': 12,
|
||||
'PTRACE_SETREGS': 13,
|
||||
'PTRACE_ATTACH': 16,
|
||||
'PTRACE_DETACH': 17,
|
||||
'PTRACE_SYSCALL': 24,
|
||||
'PTRACE_SEIZE': 16902,
|
||||
}
|
||||
|
||||
|
||||
# import ptrace() from libc
|
||||
|
|
@ -39,7 +40,7 @@ def ptrace(command: int, pid: int = 0, arg1: int = 0, arg2: int = 0) -> int:
|
|||
"""
|
||||
Call ptrace() with the provided pid and arguments. See the ```man ptrace```.
|
||||
"""
|
||||
logger.debug('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2))
|
||||
logger.debug('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2))
|
||||
result = _ptrace(command, pid, arg1, arg2)
|
||||
if result == -1:
|
||||
err_no = ctypes.get_errno()
|
||||
|
|
@ -57,10 +58,8 @@ class Process(AbstractProcess):
|
|||
self.pid = process_id
|
||||
|
||||
def close(self):
|
||||
os.kill(self.pid, signal.SIGSTOP)
|
||||
os.waitpid(self.pid, 0)
|
||||
os.kill(self.pid, signal.SIGSTOP)
|
||||
ptrace(ptrace_commands['PTRACE_DETACH'], self.pid, 0, 0)
|
||||
os.kill(self.pid, signal.SIGCONT)
|
||||
self.pid = None
|
||||
|
||||
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
|
||||
|
|
@ -79,7 +78,7 @@ class Process(AbstractProcess):
|
|||
with open('/proc/{}/cmdline', 'rb') as f:
|
||||
return f.read().decode().split('\x00')[0]
|
||||
except FileNotFoundError:
|
||||
return ''
|
||||
return ''
|
||||
|
||||
@staticmethod
|
||||
def list_available_pids() -> List[int]:
|
||||
|
|
@ -92,17 +91,17 @@ class Process(AbstractProcess):
|
|||
return pids
|
||||
|
||||
@staticmethod
|
||||
def get_pid_by_name(target_name: str) -> Optional[int]:
|
||||
def get_pid_by_name(target_name: str) -> int or None:
|
||||
for pid in Process.list_available_pids():
|
||||
try:
|
||||
logger.debug('Checking name for pid {}'.format(pid))
|
||||
logger.info('Checking name for pid {}'.format(pid))
|
||||
with open('/proc/{}/cmdline'.format(pid), 'rb') as cmdline:
|
||||
path = cmdline.read().decode().split('\x00')[0]
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
name = os.path.basename(path)
|
||||
logger.debug('Name was "{}"'.format(name))
|
||||
logger.info('Name was "{}"'.format(name))
|
||||
if path is not None and name == target_name:
|
||||
return pid
|
||||
|
||||
|
|
|
|||
|
|
@ -12,57 +12,24 @@ Utility functions and types:
|
|||
ctypes_equal(a, b)
|
||||
"""
|
||||
|
||||
from typing import List, Union
|
||||
from typing import List
|
||||
import ctypes
|
||||
|
||||
|
||||
ctypes_buffer_t = Union[ctypes._SimpleCData, ctypes.Array, ctypes.Structure, ctypes.Union]
|
||||
ctypes_buffer_t = ctypes._SimpleCData or ctypes.Array or ctypes.Structure or ctypes.Union
|
||||
|
||||
|
||||
class MemEditError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def search_buffer_verbatim(needle_buffer: ctypes_buffer_t,
|
||||
haystack_buffer: ctypes_buffer_t,
|
||||
) -> List[int]:
|
||||
def search_buffer(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t) -> List[int]:
|
||||
"""
|
||||
Search for a buffer inside another buffer, using a direct (bitwise) comparison
|
||||
Search for a buffer inside another buffer.
|
||||
|
||||
Args:
|
||||
needle_buffer: Buffer to search for.
|
||||
haystack_buffer: Buffer to search in.
|
||||
|
||||
Returns:
|
||||
List of offsets where the `needle_buffer` was found.
|
||||
"""
|
||||
found = []
|
||||
|
||||
haystack = bytes(haystack_buffer)
|
||||
needle = bytes(needle_buffer)
|
||||
|
||||
start = 0
|
||||
result = haystack.find(needle, start)
|
||||
while start < len(haystack) and result != -1:
|
||||
found.append(result)
|
||||
start = result + 1
|
||||
result = haystack.find(needle, start)
|
||||
return found
|
||||
|
||||
|
||||
def search_buffer(needle_buffer: ctypes_buffer_t,
|
||||
haystack_buffer: ctypes_buffer_t,
|
||||
) -> List[int]:
|
||||
"""
|
||||
Search for a buffer inside another buffer, using `ctypes_equal` for comparison.
|
||||
Much slower than `search_buffer_verbatim`.
|
||||
|
||||
Args:
|
||||
needle_buffer: Buffer to search for.
|
||||
haystack_buffer: Buffer to search in.
|
||||
|
||||
Returns:
|
||||
List of offsets where the needle_buffer was found.
|
||||
:param needle_buffer: Buffer to search for.
|
||||
:param haystack_buffer: Buffer to search in.
|
||||
:return: List of offsets where the needle_buffer was found.
|
||||
"""
|
||||
found = []
|
||||
read_type = type(needle_buffer)
|
||||
|
|
@ -73,15 +40,13 @@ def search_buffer(needle_buffer: ctypes_buffer_t,
|
|||
return found
|
||||
|
||||
|
||||
def ctypes_equal(a: ctypes_buffer_t,
|
||||
b: ctypes_buffer_t,
|
||||
) -> bool:
|
||||
def ctypes_equal(a: ctypes_buffer_t, b: ctypes_buffer_t) -> bool:
|
||||
"""
|
||||
Check if the values stored inside two ctypes buffers are equal.
|
||||
"""
|
||||
if not type(a) == type(b):
|
||||
return False
|
||||
|
||||
|
||||
if isinstance(a, ctypes.Array):
|
||||
return a[:] == b[:]
|
||||
elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union):
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
Implementation of Process class for Windows
|
||||
"""
|
||||
|
||||
from typing import List, Tuple, Optional
|
||||
from typing import List, Tuple
|
||||
from math import floor
|
||||
from os import strerror
|
||||
import os.path
|
||||
|
|
@ -14,122 +14,88 @@ from .abstract import Process as AbstractProcess
|
|||
from .utils import ctypes_buffer_t, MemEditError
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Process handle privileges
|
||||
privileges = {
|
||||
'PROCESS_QUERY_INFORMATION': 0x0400,
|
||||
'PROCESS_VM_OPERATION': 0x0008,
|
||||
'PROCESS_VM_READ': 0x0010,
|
||||
'PROCESS_VM_WRITE': 0x0020,
|
||||
}
|
||||
'PROCESS_QUERY_INFORMATION': 0x0400,
|
||||
'PROCESS_VM_OPERATION': 0x0008,
|
||||
'PROCESS_VM_READ': 0x0010,
|
||||
'PROCESS_VM_WRITE': 0x0020,
|
||||
}
|
||||
privileges['PROCESS_RW'] = (
|
||||
privileges['PROCESS_QUERY_INFORMATION'] |
|
||||
privileges['PROCESS_VM_OPERATION'] |
|
||||
privileges['PROCESS_VM_READ'] |
|
||||
privileges['PROCESS_VM_WRITE']
|
||||
)
|
||||
privileges['PROCESS_QUERY_INFORMATION'] |
|
||||
privileges['PROCESS_VM_OPERATION'] |
|
||||
privileges['PROCESS_VM_READ'] |
|
||||
privileges['PROCESS_VM_WRITE']
|
||||
)
|
||||
|
||||
# Memory region states
|
||||
mem_states = {
|
||||
'MEM_COMMIT': 0x1000,
|
||||
'MEM_FREE': 0x10000,
|
||||
'MEM_RESERVE': 0x2000,
|
||||
}
|
||||
'MEM_COMMIT': 0x1000,
|
||||
'MEM_FREE': 0x10000,
|
||||
'MEM_RESERVE': 0x2000,
|
||||
}
|
||||
|
||||
# Memory region permissions
|
||||
page_protections = {
|
||||
'PAGE_EXECUTE': 0x10,
|
||||
'PAGE_EXECUTE_READ': 0x20,
|
||||
'PAGE_EXECUTE_READWRITE': 0x40,
|
||||
'PAGE_EXECUTE_WRITECOPY': 0x80,
|
||||
'PAGE_NOACCESS': 0x01,
|
||||
'PAGE_READWRITE': 0x04,
|
||||
'PAGE_WRITECOPY': 0x08,
|
||||
}
|
||||
'PAGE_EXECUTE': 0x10,
|
||||
'PAGE_EXECUTE_READ': 0x20,
|
||||
'PAGE_EXECUTE_READWRITE': 0x40,
|
||||
'PAGE_EXECUTE_WRITECOPY': 0x80,
|
||||
'PAGE_NOACCESS': 0x01,
|
||||
'PAGE_READWRITE': 0x04,
|
||||
'PAGE_WRITECOPY': 0x08,
|
||||
}
|
||||
# Custom (combined) permissions
|
||||
page_protections['PAGE_READABLE'] = (
|
||||
page_protections['PAGE_EXECUTE_READ'] |
|
||||
page_protections['PAGE_EXECUTE_READWRITE'] |
|
||||
page_protections['PAGE_READWRITE']
|
||||
)
|
||||
page_protections['PAGE_EXECUTE_READ'] |
|
||||
page_protections['PAGE_EXECUTE_READWRITE'] |
|
||||
page_protections['PAGE_READWRITE']
|
||||
)
|
||||
page_protections['PAGE_READWRITEABLE'] = (
|
||||
page_protections['PAGE_EXECUTE_READWRITE'] |
|
||||
page_protections['PAGE_READWRITE']
|
||||
)
|
||||
page_protections['PAGE_EXECUTE_READWRITE'] |
|
||||
page_protections['PAGE_READWRITE']
|
||||
)
|
||||
|
||||
# Memory types
|
||||
mem_types = {
|
||||
'MEM_IMAGE': 0x1000000,
|
||||
'MEM_MAPPED': 0x40000,
|
||||
'MEM_PRIVATE': 0x20000,
|
||||
}
|
||||
'MEM_IMAGE': 0x1000000,
|
||||
'MEM_MAPPED': 0x40000,
|
||||
'MEM_PRIVATE': 0x20000,
|
||||
}
|
||||
|
||||
|
||||
# C struct for VirtualQueryEx
|
||||
class MEMORY_BASIC_INFORMATION32(ctypes.Structure):
|
||||
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
|
||||
_fields_ = [
|
||||
('BaseAddress', ctypes.wintypes.DWORD),
|
||||
('AllocationBase', ctypes.wintypes.DWORD),
|
||||
('AllocationProtect', ctypes.wintypes.DWORD),
|
||||
('RegionSize', ctypes.wintypes.DWORD),
|
||||
('State', ctypes.wintypes.DWORD),
|
||||
('Protect', ctypes.wintypes.DWORD),
|
||||
('Type', ctypes.wintypes.DWORD),
|
||||
]
|
||||
('BaseAddress', ctypes.c_void_p),
|
||||
('AllocationBase', ctypes.c_void_p),
|
||||
('AllocationProtect', ctypes.wintypes.DWORD),
|
||||
('RegionSize', ctypes.wintypes.UINT),
|
||||
('State', ctypes.wintypes.DWORD),
|
||||
('Protect', ctypes.wintypes.DWORD),
|
||||
('Type', ctypes.wintypes.DWORD),
|
||||
]
|
||||
|
||||
class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
|
||||
_fields_ = [
|
||||
('BaseAddress', ctypes.c_ulonglong),
|
||||
('AllocationBase', ctypes.c_ulonglong),
|
||||
('AllocationProtect', ctypes.wintypes.DWORD),
|
||||
('__alignment1', ctypes.wintypes.DWORD),
|
||||
('RegionSize', ctypes.c_ulonglong),
|
||||
('State', ctypes.wintypes.DWORD),
|
||||
('Protect', ctypes.wintypes.DWORD),
|
||||
('Type', ctypes.wintypes.DWORD),
|
||||
('__alignment2', ctypes.wintypes.DWORD),
|
||||
]
|
||||
|
||||
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
|
||||
if PTR_SIZE == 8: # 64-bit python
|
||||
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
|
||||
elif PTR_SIZE == 4: # 32-bit python
|
||||
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION32
|
||||
|
||||
ctypes.windll.kernel32.VirtualQueryEx.argtypes = [
|
||||
ctypes.wintypes.HANDLE,
|
||||
ctypes.wintypes.LPCVOID,
|
||||
ctypes.c_void_p,
|
||||
ctypes.c_size_t]
|
||||
ctypes.windll.kernel32.ReadProcessMemory.argtypes = [
|
||||
ctypes.wintypes.HANDLE,
|
||||
ctypes.wintypes.LPCVOID,
|
||||
ctypes.c_void_p,
|
||||
ctypes.c_size_t,
|
||||
ctypes.c_void_p]
|
||||
ctypes.windll.kernel32.WriteProcessMemory.argtypes = [
|
||||
ctypes.wintypes.HANDLE,
|
||||
ctypes.wintypes.LPCVOID,
|
||||
ctypes.c_void_p,
|
||||
ctypes.c_size_t,
|
||||
ctypes.c_void_p]
|
||||
|
||||
# C struct for GetSystemInfo
|
||||
class SYSTEM_INFO(ctypes.Structure):
|
||||
_fields_ = [
|
||||
('wProcessorArchitecture', ctypes.wintypes.WORD),
|
||||
('wReserved', ctypes.wintypes.WORD),
|
||||
('dwPageSize', ctypes.wintypes.DWORD),
|
||||
('lpMinimumApplicationAddress', ctypes.c_void_p),
|
||||
('lpMaximumApplicationAddress', ctypes.c_void_p),
|
||||
('dwActiveProcessorMask', ctypes.c_void_p),
|
||||
('dwNumberOfProcessors', ctypes.wintypes.DWORD),
|
||||
('dwProcessorType', ctypes.wintypes.DWORD),
|
||||
('dwAllocationGranularity', ctypes.wintypes.DWORD),
|
||||
('wProcessorLevel', ctypes.wintypes.WORD),
|
||||
('wProcessorRevision', ctypes.wintypes.WORD),
|
||||
]
|
||||
('wProcessorArchitecture', ctypes.wintypes.WORD),
|
||||
('wReserved', ctypes.wintypes.WORD),
|
||||
('dwPageSize', ctypes.wintypes.DWORD),
|
||||
('lpMinimumApplicationAddress', ctypes.c_void_p),
|
||||
('lpMaximumApplicationAddress', ctypes.c_void_p),
|
||||
('dwActiveProcessorMask', ctypes.wintypes.DWORD),
|
||||
('dwNumberOfProcessors', ctypes.wintypes.DWORD),
|
||||
('dwProcessorType', ctypes.wintypes.DWORD),
|
||||
('dwAllocationGranularity', ctypes.wintypes.DWORD),
|
||||
('wProcessorLevel', ctypes.wintypes.WORD),
|
||||
('wProcessorRevision', ctypes.wintypes.WORD),
|
||||
]
|
||||
|
||||
|
||||
class Process(AbstractProcess):
|
||||
|
|
@ -154,24 +120,24 @@ class Process(AbstractProcess):
|
|||
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
|
||||
try:
|
||||
ctypes.windll.kernel32.WriteProcessMemory(
|
||||
self.process_handle,
|
||||
base_address,
|
||||
ctypes.byref(write_buffer),
|
||||
ctypes.sizeof(write_buffer),
|
||||
None
|
||||
)
|
||||
self.process_handle,
|
||||
base_address,
|
||||
ctypes.byref(write_buffer),
|
||||
ctypes.sizeof(write_buffer),
|
||||
None
|
||||
)
|
||||
except (BufferError, ValueError, TypeError):
|
||||
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error()))
|
||||
|
||||
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
||||
try:
|
||||
ctypes.windll.kernel32.ReadProcessMemory(
|
||||
self.process_handle,
|
||||
base_address,
|
||||
ctypes.byref(read_buffer),
|
||||
ctypes.sizeof(read_buffer),
|
||||
None
|
||||
)
|
||||
self.process_handle,
|
||||
base_address,
|
||||
ctypes.byref(read_buffer),
|
||||
ctypes.sizeof(read_buffer),
|
||||
None
|
||||
)
|
||||
except (BufferError, ValueError, TypeError):
|
||||
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error()))
|
||||
|
||||
|
|
@ -186,9 +152,10 @@ class Process(AbstractProcess):
|
|||
max_path_len = 260
|
||||
name_buffer = (ctypes.c_char * max_path_len)()
|
||||
rval = ctypes.windll.psapi.GetProcessImageFileNameA(
|
||||
self.process_handle,
|
||||
name_buffer,
|
||||
max_path_len)
|
||||
self.process_handle,
|
||||
name_buffer,
|
||||
max_path_len
|
||||
)
|
||||
|
||||
if rval > 0:
|
||||
return name_buffer.value.decode()
|
||||
|
|
@ -200,7 +167,7 @@ class Process(AbstractProcess):
|
|||
# According to EnumProcesses docs, you can't find out how many processes there are before
|
||||
# fetching the list. As a result, we grab 100 on the first try, and if we get a full list
|
||||
# of 100, repeatedly double the number until we get fewer than we asked for.
|
||||
|
||||
|
||||
n = 100
|
||||
returned_size = ctypes.wintypes.DWORD()
|
||||
returned_size_ptr = ctypes.byref(returned_size)
|
||||
|
|
@ -225,21 +192,19 @@ class Process(AbstractProcess):
|
|||
return pids[:num_returned]
|
||||
|
||||
@staticmethod
|
||||
def get_pid_by_name(target_name: str) -> Optional[int]:
|
||||
def get_pid_by_name(target_name: str) -> int or None:
|
||||
for pid in Process.list_available_pids():
|
||||
try:
|
||||
logger.debug('Checking name for pid {}'.format(pid))
|
||||
logger.info('Checking name for pid {}'.format(pid))
|
||||
with Process.open_process(pid) as process:
|
||||
path = process.get_path()
|
||||
|
||||
name = os.path.basename(path)
|
||||
logger.debug('Name was "{}"'.format(name))
|
||||
logger.info('Name was "{}"'.format(name))
|
||||
if path is not None and name == target_name:
|
||||
return pid
|
||||
except ValueError:
|
||||
pass
|
||||
except MemEditError as err:
|
||||
logger.debug(repr(err))
|
||||
|
||||
logger.info('Found no process with name {}'.format(target_name))
|
||||
return None
|
||||
|
|
@ -261,10 +226,11 @@ class Process(AbstractProcess):
|
|||
mbi_size = ctypes.sizeof(mbi)
|
||||
|
||||
success = ctypes.windll.kernel32.VirtualQueryEx(
|
||||
self.process_handle,
|
||||
address,
|
||||
mbi_ptr,
|
||||
mbi_size)
|
||||
self.process_handle,
|
||||
address,
|
||||
mbi_ptr,
|
||||
mbi_size,
|
||||
)
|
||||
|
||||
if success != mbi_size:
|
||||
if success == 0:
|
||||
|
|
@ -279,11 +245,10 @@ class Process(AbstractProcess):
|
|||
page_ptr = start
|
||||
while page_ptr < stop:
|
||||
page_info = get_mem_info(page_ptr)
|
||||
if (page_info.Type == mem_types['MEM_PRIVATE']
|
||||
and page_info.State == mem_states['MEM_COMMIT']
|
||||
and page_info.Protect & page_protections['PAGE_READABLE'] != 0
|
||||
and (page_info.Protect & page_protections['PAGE_READWRITEABLE'] != 0
|
||||
or not writeable_only)):
|
||||
if page_info.Type == mem_types['MEM_PRIVATE'] and \
|
||||
page_info.State == mem_states['MEM_COMMIT'] and \
|
||||
page_info.Protect & page_protections['PAGE_READABLE'] != 0 and \
|
||||
(page_info.Protect & page_protections['PAGE_READWRITEABLE'] != 0 or not writeable_only):
|
||||
regions.append((page_ptr, page_ptr + page_info.RegionSize))
|
||||
page_ptr += page_info.RegionSize
|
||||
|
||||
|
|
|
|||
20
setup.py
20
setup.py
|
|
@ -1,21 +1,12 @@
|
|||
#!/usr/bin/env python3
|
||||
#!/usr/bin/env python
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
|
||||
with open('README.md', 'rt') as f:
|
||||
long_description = f.read()
|
||||
|
||||
with open('mem_edit/VERSION.py', 'rt') as f:
|
||||
version = f.readlines()[2].strip()
|
||||
|
||||
setup(name='mem_edit',
|
||||
version=version,
|
||||
version='0.1',
|
||||
description='Multi-platform library for memory editing',
|
||||
long_description=long_description,
|
||||
long_description_content_type='text/markdown',
|
||||
author='Jan Petykiewicz',
|
||||
author_email='jan@mpxd.net',
|
||||
author_email='anewusername@gmail.com',
|
||||
url='https://mpxd.net/code/jan/mem_edit',
|
||||
keywords=[
|
||||
'memory',
|
||||
|
|
@ -35,6 +26,7 @@ setup(name='mem_edit',
|
|||
'trainer',
|
||||
],
|
||||
classifiers=[
|
||||
'Programming Language :: Python',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Environment :: Other Environment',
|
||||
|
|
@ -50,10 +42,8 @@ setup(name='mem_edit',
|
|||
'Topic :: Utilities',
|
||||
],
|
||||
packages=find_packages(),
|
||||
package_data={
|
||||
'mem_edit': []
|
||||
},
|
||||
install_requires=[
|
||||
'ctypes',
|
||||
'typing',
|
||||
],
|
||||
extras_require={
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue