Compare commits
30 Commits
Author | SHA1 | Date | |
---|---|---|---|
8f7955543c | |||
5a82f04f9e | |||
5f2b1a432e | |||
fec96b92a7 | |||
42c69867b8 | |||
db1dcc5e83 | |||
1ecab08651 | |||
e26381a578 | |||
e316322fbf | |||
b889ad8133 | |||
f10674e2b5 | |||
bdf0fb323e | |||
f7c7496cfd | |||
e56ec88761 | |||
f03ea6acad | |||
303620b0a2 | |||
d49555ad15 | |||
|
46e9456fd4 | ||
f3154e443d | |||
ef1a39152c | |||
c29be9f429 | |||
5a032da984 | |||
6ab295fc26 | |||
6913f73db4 | |||
9759645f92 | |||
0632b205ab | |||
|
5c75da31d5 | ||
|
bd6c22ca1d | ||
260d67bf81 | |||
4deaa41d7e |
29
.flake8
Normal file
29
.flake8
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
[flake8]
|
||||||
|
ignore =
|
||||||
|
# E501 line too long
|
||||||
|
E501,
|
||||||
|
# W391 newlines at EOF
|
||||||
|
W391,
|
||||||
|
# E241 multiple spaces after comma
|
||||||
|
E241,
|
||||||
|
# E302 expected 2 newlines
|
||||||
|
E302,
|
||||||
|
# W503 line break before binary operator (to be deprecated)
|
||||||
|
W503,
|
||||||
|
# E265 block comment should start with '# '
|
||||||
|
E265,
|
||||||
|
# E123 closing bracket does not match indentation of opening bracket's line
|
||||||
|
E123,
|
||||||
|
# E124 closing bracket does not match visual indentation
|
||||||
|
E124,
|
||||||
|
# E221 multiple spaces before operator
|
||||||
|
E221,
|
||||||
|
# E201 whitespace after '['
|
||||||
|
E201,
|
||||||
|
# E741 ambiguous variable name 'I'
|
||||||
|
E741,
|
||||||
|
|
||||||
|
|
||||||
|
per-file-ignores =
|
||||||
|
# F401 import without use
|
||||||
|
*/__init__.py: F401,
|
8
.gitignore
vendored
8
.gitignore
vendored
@ -1,7 +1,13 @@
|
|||||||
.idea/
|
.idea/
|
||||||
|
|
||||||
__pycache__
|
__pycache__/
|
||||||
*.pyc
|
*.pyc
|
||||||
|
|
||||||
*.egg-info/
|
*.egg-info/
|
||||||
|
build/
|
||||||
dist/
|
dist/
|
||||||
|
|
||||||
|
.pytest_cache/
|
||||||
|
.mypy_cache/
|
||||||
|
|
||||||
|
*.pickle
|
||||||
|
@ -1,3 +0,0 @@
|
|||||||
include README.md
|
|
||||||
include LICENSE.md
|
|
||||||
include mem_edit/VERSION
|
|
12
README.md
12
README.md
@ -3,6 +3,8 @@
|
|||||||
**mem_edit** is a multi-platform memory editing library written in Python.
|
**mem_edit** is a multi-platform memory editing library written in Python.
|
||||||
|
|
||||||
**Homepage:** https://mpxd.net/code/jan/mem_edit
|
**Homepage:** https://mpxd.net/code/jan/mem_edit
|
||||||
|
* PyPI: https://pypi.org/project/mem-edit/
|
||||||
|
* Github mirror: https://github.com/anewusername/mem_edit
|
||||||
|
|
||||||
**Capabilities:**
|
**Capabilities:**
|
||||||
* Scan all readable memory used by a process.
|
* Scan all readable memory used by a process.
|
||||||
@ -18,7 +20,7 @@
|
|||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
**Dependencies:**
|
**Dependencies:**
|
||||||
* python 3 (written and tested with 3.7)
|
* python >=3.11
|
||||||
* ctypes
|
* ctypes
|
||||||
* typing (for type annotations)
|
* typing (for type annotations)
|
||||||
|
|
||||||
@ -38,7 +40,7 @@ pip3 install git+https://mpxd.net/code/jan/mem_edit.git@release
|
|||||||
|
|
||||||
Most functions and classes are documented inline.
|
Most functions and classes are documented inline.
|
||||||
To read the inline help,
|
To read the inline help,
|
||||||
```python
|
```python3
|
||||||
import mem_edit
|
import mem_edit
|
||||||
help(mem_edit.Process)
|
help(mem_edit.Process)
|
||||||
```
|
```
|
||||||
@ -46,7 +48,7 @@ help(mem_edit.Process)
|
|||||||
## Examples
|
## Examples
|
||||||
|
|
||||||
Increment a magic number (unsigned long 1234567890) found in 'magic.exe':
|
Increment a magic number (unsigned long 1234567890) found in 'magic.exe':
|
||||||
```python
|
```python3
|
||||||
import ctypes
|
import ctypes
|
||||||
from mem_edit import Process
|
from mem_edit import Process
|
||||||
|
|
||||||
@ -67,7 +69,7 @@ Increment a magic number (unsigned long 1234567890) found in 'magic.exe':
|
|||||||
```
|
```
|
||||||
|
|
||||||
Narrow down a search after a value changes:
|
Narrow down a search after a value changes:
|
||||||
```python
|
```python3
|
||||||
import ctypes
|
import ctypes
|
||||||
from mem_edit import Process
|
from mem_edit import Process
|
||||||
|
|
||||||
@ -88,7 +90,7 @@ Narrow down a search after a value changes:
|
|||||||
```
|
```
|
||||||
|
|
||||||
Read and alter a structure:
|
Read and alter a structure:
|
||||||
```python
|
```python3
|
||||||
import ctypes
|
import ctypes
|
||||||
from mem_edit import Process
|
from mem_edit import Process
|
||||||
|
|
||||||
|
1
mem_edit/LICENSE.md
Symbolic link
1
mem_edit/LICENSE.md
Symbolic link
@ -0,0 +1 @@
|
|||||||
|
../LICENSE.md
|
1
mem_edit/README.md
Symbolic link
1
mem_edit/README.md
Symbolic link
@ -0,0 +1 @@
|
|||||||
|
../README.md
|
@ -1 +0,0 @@
|
|||||||
0.3
|
|
@ -12,22 +12,19 @@ To get started, try:
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
import platform
|
import platform
|
||||||
import pathlib
|
|
||||||
|
|
||||||
from .utils import MemEditError
|
from .utils import MemEditError
|
||||||
|
|
||||||
|
|
||||||
__author__ = 'Jan Petykiewicz'
|
__author__ = 'Jan Petykiewicz'
|
||||||
|
__version__ = '0.8'
|
||||||
with open(pathlib.Path(__file__).parent / 'VERSION', 'r') as f:
|
version = __version__ # legacy compatibility
|
||||||
__version__ = f.read().strip()
|
|
||||||
version = __version__
|
|
||||||
|
|
||||||
|
|
||||||
system = platform.system()
|
system = platform.system()
|
||||||
if system == 'Windows':
|
if system == 'Windows':
|
||||||
from .windows import Process
|
from .windows import Process as Process
|
||||||
elif system == 'Linux':
|
elif system == 'Linux':
|
||||||
from .linux import Process
|
from .linux import Process as Process
|
||||||
else:
|
else:
|
||||||
raise MemEditError('Only Linux and Windows are currently supported.')
|
raise MemEditError('Only Linux and Windows are currently supported.')
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
"""
|
"""
|
||||||
Abstract class for cross-platform memory editing.
|
Abstract class for cross-platform memory editing.
|
||||||
"""
|
"""
|
||||||
|
from typing import Self
|
||||||
from typing import List, Tuple
|
from collections.abc import Generator
|
||||||
from abc import ABCMeta, abstractmethod
|
from abc import ABCMeta, abstractmethod
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
import copy
|
import copy
|
||||||
@ -13,7 +13,6 @@ from . import utils
|
|||||||
from .utils import ctypes_buffer_t
|
from .utils import ctypes_buffer_t
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -23,8 +22,8 @@ class Process(metaclass=ABCMeta):
|
|||||||
(i.e., by reading from or writing to the memory used by a given process).
|
(i.e., by reading from or writing to the memory used by a given process).
|
||||||
|
|
||||||
The static methods
|
The static methods
|
||||||
Process.list_available_pids()
|
`Process.list_available_pids()`
|
||||||
Process.get_pid_by_name(executable_filename)
|
`Process.get_pid_by_name(executable_filename)`
|
||||||
can be used to help find the process id (pid) of the target process. They are
|
can be used to help find the process id (pid) of the target process. They are
|
||||||
provided for convenience only; it is probably better to use the tools built
|
provided for convenience only; it is probably better to use the tools built
|
||||||
in to your operating system to discover the pid of the specific process you
|
in to your operating system to discover the pid of the specific process you
|
||||||
@ -32,18 +31,19 @@ class Process(metaclass=ABCMeta):
|
|||||||
|
|
||||||
Once you have found the pid, you are ready to construct an instance of Process
|
Once you have found the pid, you are ready to construct an instance of Process
|
||||||
and use it to read and write to memory. Once you are done with the process,
|
and use it to read and write to memory. Once you are done with the process,
|
||||||
use .close() to free up the process for access by other debuggers etc.
|
use `.close()` to free up the process for access by other debuggers etc.
|
||||||
|
```
|
||||||
p = Process(1239)
|
p = Process(1239)
|
||||||
p.close()
|
p.close()
|
||||||
|
```
|
||||||
|
|
||||||
To read/write to memory, first create a buffer using ctypes:
|
To read/write to memory, first create a buffer using ctypes:
|
||||||
|
```
|
||||||
buffer0 = (ctypes.c_byte * 5)(39, 50, 03, 40, 30)
|
buffer0 = (ctypes.c_byte * 5)(39, 50, 03, 40, 30)
|
||||||
buffer1 = ctypes.c_ulong()
|
buffer1 = ctypes.c_ulong()
|
||||||
|
```
|
||||||
and then use
|
and then use
|
||||||
|
```
|
||||||
p.write_memory(0x2fe, buffer0)
|
p.write_memory(0x2fe, buffer0)
|
||||||
|
|
||||||
val0 = p.read_memory(0x220, buffer0)[:]
|
val0 = p.read_memory(0x220, buffer0)[:]
|
||||||
@ -51,52 +51,52 @@ class Process(metaclass=ABCMeta):
|
|||||||
val1a = p.read_memory(0x149, buffer1).value
|
val1a = p.read_memory(0x149, buffer1).value
|
||||||
val2b = buffer1.value
|
val2b = buffer1.value
|
||||||
assert(val1a == val2b)
|
assert(val1a == val2b)
|
||||||
|
```
|
||||||
|
|
||||||
Searching for a value can be done in a number of ways:
|
Searching for a value can be done in a number of ways:
|
||||||
Search a list of addresses:
|
Search a list of addresses:
|
||||||
found_addresses = p.search_addresses([0x1020, 0x1030], buffer0)
|
`found_addresses = p.search_addresses([0x1020, 0x1030], buffer0)`
|
||||||
Search the entire memory space:
|
Search the entire memory space:
|
||||||
found_addresses = p.search_all_memory(buffer0, writeable_only=False)
|
`found_addresses = p.search_all_memory(buffer0, writeable_only=False)`
|
||||||
|
|
||||||
You can also get a list of which regions in memory are mapped (readable):
|
You can also get a list of which regions in memory are mapped (readable):
|
||||||
regions = p.list_mapped_regions(writeable_only=False)
|
`regions = p.list_mapped_regions(writeable_only=False)`
|
||||||
|
|
||||||
which can be used along with search_buffer(...) to re-create .search_all_memory(...):
|
which can be used along with search_buffer(...) to re-create .search_all_memory(...):
|
||||||
|
```
|
||||||
found = []
|
found = []
|
||||||
for region_start, region_stop in regions:
|
for region_start, region_stop in regions:
|
||||||
region_buffer = (ctypes.c_byte * (region_stop - region_start))()
|
region_buffer = (ctypes.c_byte * (region_stop - region_start))()
|
||||||
p.read_memory(region_start, region_buffer)
|
p.read_memory(region_start, region_buffer)
|
||||||
found += utils.search_buffer(ctypes.c_ulong(123456790), region_buffer)
|
found += utils.search_buffer(ctypes.c_ulong(123456790), region_buffer)
|
||||||
|
```
|
||||||
Other useful methods include the context manager, implemented as a static method:
|
Other useful methods include the context manager, implemented as a static method:
|
||||||
|
```
|
||||||
with Process.open_process(pid) as p:
|
with Process.open_process(pid) as p:
|
||||||
# use p here, no need to call p.close()
|
# use p here, no need to call p.close()
|
||||||
|
```
|
||||||
.get_path(), which reports the path of the executable file which was used
|
.get_path(), which reports the path of the executable file which was used
|
||||||
to start the process:
|
to start the process:
|
||||||
|
```
|
||||||
executable_path = p.get_path()
|
executable_path = p.get_path()
|
||||||
|
```
|
||||||
and deref_struct_pointer, which takes a pointer to a struct and reads out the struct members:
|
and deref_struct_pointer, which takes a pointer to a struct and reads out the struct members:
|
||||||
|
```
|
||||||
# struct is a list of (offset, buffer) pairs
|
# struct is a list of (offset, buffer) pairs
|
||||||
struct_defintion = [(0x0, ctypes.c_ulong()),
|
struct_defintion = [(0x0, ctypes.c_ulong()),
|
||||||
(0x20, ctypes.c_byte())]
|
(0x20, ctypes.c_byte())]
|
||||||
values = p.deref_struct_pointer(0x0feab4, struct_defintion)
|
values = p.deref_struct_pointer(0x0feab4, struct_defintion)
|
||||||
|
```
|
||||||
which is shorthand for
|
which is shorthand for
|
||||||
|
```
|
||||||
struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
|
struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
|
||||||
values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
|
values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
|
||||||
p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
|
p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
|
||||||
|
```
|
||||||
=================
|
=================
|
||||||
|
|
||||||
Putting all this together, a simple program which alters a magic number in the only running
|
Putting all this together, a simple program which alters a magic number in the only running
|
||||||
instance of 'magic.exe' might look like this:
|
instance of 'magic.exe' might look like this:
|
||||||
|
```
|
||||||
import ctypes
|
import ctypes
|
||||||
from mem_edit import Process
|
from mem_edit import Process
|
||||||
|
|
||||||
@ -107,9 +107,9 @@ class Process(metaclass=ABCMeta):
|
|||||||
addrs = p.search_all_memory(magic_number)
|
addrs = p.search_all_memory(magic_number)
|
||||||
assert(len(addrs) == 1)
|
assert(len(addrs) == 1)
|
||||||
p.write_memory(addrs[0], ctypes.c_ulong(42))
|
p.write_memory(addrs[0], ctypes.c_ulong(42))
|
||||||
|
```
|
||||||
Searching for a value which changes:
|
Searching for a value which changes:
|
||||||
|
```
|
||||||
pid = Process.get_pid_by_name('monitor_me.exe')
|
pid = Process.get_pid_by_name('monitor_me.exe')
|
||||||
with Process.open_process(pid) as p:
|
with Process.open_process(pid) as p:
|
||||||
addrs = p.search_all_memory(ctypes.c_int(40))
|
addrs = p.search_all_memory(ctypes.c_int(40))
|
||||||
@ -118,93 +118,111 @@ class Process(metaclass=ABCMeta):
|
|||||||
print('Found addresses:')
|
print('Found addresses:')
|
||||||
for addr in filtered_addrs:
|
for addr in filtered_addrs:
|
||||||
print(hex(addr))
|
print(hex(addr))
|
||||||
|
```
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def __init__(self, process_id: int):
|
def __init__(self, process_id: int) -> None:
|
||||||
"""
|
"""
|
||||||
Constructing a Process object prepares the process with specified process_id for
|
Constructing a Process object prepares the process with specified process_id for
|
||||||
memory editing. Finding the process_id for the process you want to edit is often
|
memory editing. Finding the `process_id` for the process you want to edit is often
|
||||||
easiest using os-specific tools (or by launching the process yourself, e.g. with
|
easiest using os-specific tools (or by launching the process yourself, e.g. with
|
||||||
subprocess.Popen(...)).
|
`subprocess.Popen(...)`).
|
||||||
|
|
||||||
:param process_id: Process id (pid) of the target process
|
Args:
|
||||||
|
process_id: Process id (pid) of the target process
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def close(self):
|
def close(self) -> None:
|
||||||
"""
|
"""
|
||||||
Detach from the process, removing our ability to edit it and
|
Detach from the process, removing our ability to edit it and
|
||||||
letting other debuggers attach to it instead.
|
letting other debuggers attach to it instead.
|
||||||
|
|
||||||
This function should be called after you are done working with the process
|
This function should be called after you are done working with the process
|
||||||
and will no longer need it. See the Process.open_process(...) context
|
and will no longer need it. See the `Process.open_process(...)` context
|
||||||
manager to avoid having to call this function yourself.
|
manager to avoid having to call this function yourself.
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
|
def write_memory(
|
||||||
|
self,
|
||||||
|
base_address: int,
|
||||||
|
write_buffer: ctypes_buffer_t,
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Write the given buffer to the process's address space, starting at base_address.
|
Write the given buffer to the process's address space, starting at `base_address`.
|
||||||
|
|
||||||
:param base_address: The address to write at, in the process's address space.
|
Args:
|
||||||
:param write_buffer: A ctypes object, for example, ctypes.c_ulong(48),
|
base_address: The address to write at, in the process's address space.
|
||||||
(ctypes.c_byte * 3)(43, 21, 0xff), or a subclass of ctypes.Structure,
|
write_buffer: A ctypes object, for example, `ctypes.c_ulong(48)`,
|
||||||
which will be written into memory starting at base_address.
|
`(ctypes.c_byte * 3)(43, 21, 0xff)`, or a subclass of `ctypes.Structure`,
|
||||||
|
which will be written into memory starting at `base_address`.
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
def read_memory(
|
||||||
|
self,
|
||||||
|
base_address: int,
|
||||||
|
read_buffer: ctypes_buffer_t,
|
||||||
|
) -> ctypes_buffer_t:
|
||||||
"""
|
"""
|
||||||
Read into the given buffer from the process's address space, starting at base_address.
|
Read into the given buffer from the process's address space, starting at `base_address`.
|
||||||
|
|
||||||
:param base_address: The address to read from, in the process's address space.
|
Args:
|
||||||
:param read_buffer: A ctypes object, for example. ctypes.c_ulong(),
|
base_address: The address to read from, in the process's address space.
|
||||||
(ctypes.c_byte * 3)(), or a subclass of ctypes.Structure, which will be
|
read_buffer: A `ctypes` object, for example. `ctypes.c_ulong()`,
|
||||||
overwritten with the contents of the process's memory starting at base_address.
|
`(ctypes.c_byte * 3)()`, or a subclass of `ctypes.Structure`, which will be
|
||||||
:returns: read_buffer is returned as well as being overwritten.
|
overwritten with the contents of the process's memory starting at `base_address`.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
`read_buffer` is returned as well as being overwritten.
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def list_mapped_regions(self, writeable_only=True) -> List[Tuple[int, int]]:
|
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
|
||||||
"""
|
"""
|
||||||
Return a list of (start_address, stop_address) for the regions of the address space
|
Return a list of `(start_address, stop_address)` for the regions of the address space
|
||||||
accessible to (readable and possibly writable by) the process.
|
accessible to (readable and possibly writable by) the process.
|
||||||
By default, this function does not return non-writeable regions.
|
By default, this function does not return non-writeable regions.
|
||||||
|
|
||||||
:param writeable_only: If True, only return regions which are also writeable.
|
Args:
|
||||||
Default true.
|
writeable_only: If `True`, only return regions which are also writeable.
|
||||||
:return: List of (start_address, stop_address) for each accessible memory region.
|
Default `True`.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of `(start_address, stop_address)` for each accessible memory region.
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_path(self) -> str:
|
def get_path(self) -> str | None:
|
||||||
"""
|
"""
|
||||||
Return the path to the executable file which was run to start this process.
|
Return the path to the executable file which was run to start this process.
|
||||||
|
|
||||||
:return: A string containing the path.
|
Returns:
|
||||||
|
A string containing the path, or None if no path was found.
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def list_available_pids() -> List[int]:
|
def list_available_pids() -> list[int]:
|
||||||
"""
|
"""
|
||||||
Return a list of all process ids (pids) accessible on this system.
|
Return a list of all process ids (pids) accessible on this system.
|
||||||
|
|
||||||
:return: List of running process ids.
|
Returns:
|
||||||
|
List of running process ids.
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_pid_by_name(target_name: str) -> int or None:
|
def get_pid_by_name(target_name: str) -> int | None:
|
||||||
"""
|
"""
|
||||||
Attempt to return the process id (pid) of a process which was run with an executable
|
Attempt to return the process id (pid) of a process which was run with an executable
|
||||||
file with the provided name. If no process is found, return None.
|
file with the provided name. If no process is found, return None.
|
||||||
@ -215,44 +233,65 @@ class Process(metaclass=ABCMeta):
|
|||||||
Don't rely on this method if you can possibly avoid it, since it makes no
|
Don't rely on this method if you can possibly avoid it, since it makes no
|
||||||
attempt to confirm that it found a unique process and breaks trivially (e.g. if the
|
attempt to confirm that it found a unique process and breaks trivially (e.g. if the
|
||||||
executable file is renamed).
|
executable file is renamed).
|
||||||
:return: Process id (pid) of a process with the provided name, or None.
|
|
||||||
|
Args:
|
||||||
|
target_name: Name of the process to find the PID for
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Process id (pid) of a process with the provided name, or `None`.
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def deref_struct_pointer(self,
|
def deref_struct_pointer(
|
||||||
|
self,
|
||||||
base_address: int,
|
base_address: int,
|
||||||
targets: List[Tuple[int, ctypes_buffer_t]],
|
targets: list[tuple[int, ctypes_buffer_t]],
|
||||||
) -> List[ctypes_buffer_t]:
|
) -> list[ctypes_buffer_t]:
|
||||||
"""
|
"""
|
||||||
Take a pointer to a struct and read out the struct members:
|
Take a pointer to a struct and read out the struct members:
|
||||||
|
```
|
||||||
struct_defintion = [(0x0, ctypes.c_ulong()),
|
struct_defintion = [(0x0, ctypes.c_ulong()),
|
||||||
(0x20, ctypes.c_byte())]
|
(0x20, ctypes.c_byte())]
|
||||||
values = p.deref_struct_pointer(0x0feab4, struct_defintion)
|
values = p.deref_struct_pointer(0x0feab4, struct_defintion)
|
||||||
|
```
|
||||||
which is shorthand for
|
which is shorthand for
|
||||||
|
```
|
||||||
struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
|
struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
|
||||||
values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
|
values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
|
||||||
p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
|
p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
|
||||||
|
```
|
||||||
|
|
||||||
:param base_address: Address at which the struct pointer is located.
|
Args:
|
||||||
:param targets: List of (offset, read_buffer) pairs which will be read from the struct.
|
base_address: Address at which the struct pointer is located.
|
||||||
:return: List of read values corresponding to the provided targets.
|
targets: List of `(offset, read_buffer)` pairs which will be read from the struct.
|
||||||
|
|
||||||
|
Return:
|
||||||
|
List of read values corresponding to the provided targets.
|
||||||
"""
|
"""
|
||||||
base = self.read_memory(base_address, ctypes.c_void_p()).value
|
base = self.read_memory(base_address, ctypes.c_void_p()).value
|
||||||
values = [self.read_memory(base + offset, buffer) for offset, buffer in targets]
|
values = [self.read_memory(base + offset, buffer) for offset, buffer in targets]
|
||||||
return values
|
return values
|
||||||
|
|
||||||
def search_addresses(self, addresses: List[int], needle_buffer: ctypes_buffer_t, verbatim: bool=True) -> List[int]:
|
def search_addresses(
|
||||||
|
self,
|
||||||
|
addresses: list[int],
|
||||||
|
needle_buffer: ctypes_buffer_t,
|
||||||
|
verbatim: bool = True,
|
||||||
|
) -> list[int]:
|
||||||
"""
|
"""
|
||||||
Search for the provided value at each of the provided addresses, and return the addresses
|
Search for the provided value at each of the provided addresses, and return the addresses
|
||||||
where it is found.
|
where it is found.
|
||||||
|
|
||||||
:param addresses: List of addresses which should be probed.
|
Args:
|
||||||
:param needle_buffer: The value to search for. This should be a ctypes object of the same
|
addresses: List of addresses which should be probed.
|
||||||
sorts as used by .read_memory(...), which will be compared to the contents of
|
needle_buffer: The value to search for. This should be a `ctypes` object of the same
|
||||||
|
sorts as used by `.read_memory(...)`, which will be compared to the contents of
|
||||||
memory at each of the given addresses.
|
memory at each of the given addresses.
|
||||||
:param verbatim: If True, perform bitwise comparison when searching for needle_buffer.
|
verbatim: If `True`, perform bitwise comparison when searching for `needle_buffer`.
|
||||||
If False, perform utils.ctypes_equal-based comparison. Default True.
|
If `False`, perform `utils.ctypes_equal`-based comparison. Default `True`.
|
||||||
:return: List of addresses where the needle_buffer was found.
|
|
||||||
|
Returns:
|
||||||
|
List of addresses where the `needle_buffer` was found.
|
||||||
"""
|
"""
|
||||||
found = []
|
found = []
|
||||||
read_buffer = copy.copy(needle_buffer)
|
read_buffer = copy.copy(needle_buffer)
|
||||||
@ -269,18 +308,26 @@ class Process(metaclass=ABCMeta):
|
|||||||
found.append(address)
|
found.append(address)
|
||||||
return found
|
return found
|
||||||
|
|
||||||
def search_all_memory(self, needle_buffer: ctypes_buffer_t, writeable_only: bool=True, verbatim: bool=True) -> List[int]:
|
def search_all_memory(
|
||||||
|
self,
|
||||||
|
needle_buffer: ctypes_buffer_t,
|
||||||
|
writeable_only: bool = True,
|
||||||
|
verbatim: bool = True,
|
||||||
|
) -> list[int]:
|
||||||
"""
|
"""
|
||||||
Search the entire memory space accessible to the process for the provided value.
|
Search the entire memory space accessible to the process for the provided value.
|
||||||
|
|
||||||
:param needle_buffer: The value to search for. This should be a ctypes object of the same
|
Args:
|
||||||
sorts as used by .read_memory(...), which will be compared to the contents of
|
needle_buffer: The value to search for. This should be a ctypes object of the same
|
||||||
|
sorts as used by `.read_memory(...)`, which will be compared to the contents of
|
||||||
memory at each accessible address.
|
memory at each accessible address.
|
||||||
:param writeable_only: If True, only search regions where the process has write access.
|
writeable_only: If `True`, only search regions where the process has write access.
|
||||||
Default True.
|
Default `True`.
|
||||||
:param verbatim: If True, perform bitwise comparison when searching for needle_buffer.
|
verbatim: If `True`, perform bitwise comparison when searching for `needle_buffer`.
|
||||||
If False, perform utils.ctypes_equal-based comparison. Default True.
|
If `False`, perform `utils.ctypes_equal-based` comparison. Default `True`.
|
||||||
:return: List of addresses where the needle_buffer was found.
|
|
||||||
|
Returns:
|
||||||
|
List of addresses where the `needle_buffer` was found.
|
||||||
"""
|
"""
|
||||||
found = []
|
found = []
|
||||||
if verbatim:
|
if verbatim:
|
||||||
@ -294,20 +341,25 @@ class Process(metaclass=ABCMeta):
|
|||||||
self.read_memory(start, region_buffer)
|
self.read_memory(start, region_buffer)
|
||||||
found += [offset + start for offset in search(needle_buffer, region_buffer)]
|
found += [offset + start for offset in search(needle_buffer, region_buffer)]
|
||||||
except OSError:
|
except OSError:
|
||||||
logger.error('Failed to read in range 0x{} - 0x{}'.format(start, stop))
|
logger.exception(f'Failed to read in range 0x{start} - 0x{stop}')
|
||||||
return found
|
return found
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def open_process(cls, process_id: int) -> 'Process':
|
def open_process(cls: type[Self], process_id: int) -> Generator[Self, None, None]:
|
||||||
"""
|
"""
|
||||||
Context manager which automatically closes the constructed Process:
|
Context manager which automatically closes the constructed Process:
|
||||||
|
```
|
||||||
with Process.open_process(2394) as p:
|
with Process.open_process(2394) as p:
|
||||||
# use p here
|
# use p here
|
||||||
# no need to run p.close()
|
# no need to run p.close()
|
||||||
|
```
|
||||||
|
|
||||||
:param process_id: Process id (pid), passed to the Process constructor.
|
Args:
|
||||||
:return: Constructed Process object.
|
process_id: Process id (pid), passed to the Process constructor.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Constructed Process object.
|
||||||
"""
|
"""
|
||||||
process = cls(process_id)
|
process = cls(process_id)
|
||||||
yield process
|
yield process
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
Implementation of Process class for Linux
|
Implementation of Process class for Linux
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import List, Tuple
|
|
||||||
from os import strerror
|
from os import strerror
|
||||||
import os
|
import os
|
||||||
import os.path
|
import os.path
|
||||||
@ -10,12 +9,12 @@ import signal
|
|||||||
import ctypes
|
import ctypes
|
||||||
import ctypes.util
|
import ctypes.util
|
||||||
import logging
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from .abstract import Process as AbstractProcess
|
from .abstract import Process as AbstractProcess
|
||||||
from .utils import ctypes_buffer_t, MemEditError
|
from .utils import ctypes_buffer_t, MemEditError
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -36,52 +35,61 @@ _ptrace.argtypes = (ctypes.c_ulong,) * 4
|
|||||||
_ptrace.restype = ctypes.c_long
|
_ptrace.restype = ctypes.c_long
|
||||||
|
|
||||||
|
|
||||||
def ptrace(command: int, pid: int = 0, arg1: int = 0, arg2: int = 0) -> int:
|
def ptrace(
|
||||||
|
command: int,
|
||||||
|
pid: int = 0,
|
||||||
|
arg1: int = 0,
|
||||||
|
arg2: int = 0,
|
||||||
|
) -> int:
|
||||||
"""
|
"""
|
||||||
Call ptrace() with the provided pid and arguments. See the ```man ptrace```.
|
Call ptrace() with the provided pid and arguments. See `man ptrace`.
|
||||||
"""
|
"""
|
||||||
logger.debug('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2))
|
logger.debug(f'ptrace({command}, {pid}, {arg1}, {arg2})')
|
||||||
result = _ptrace(command, pid, arg1, arg2)
|
result = _ptrace(command, pid, arg1, arg2)
|
||||||
if result == -1:
|
if result == -1:
|
||||||
err_no = ctypes.get_errno()
|
err_no = ctypes.get_errno()
|
||||||
if err_no:
|
if err_no:
|
||||||
raise MemEditError('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2) +
|
raise MemEditError(f'ptrace({command}, {pid}, {arg1}, {arg2})'
|
||||||
' failed with error {}: {}'.format(err_no, strerror(err_no)))
|
+ f' failed with error {err_no}: {strerror(err_no)}')
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
class Process(AbstractProcess):
|
class Process(AbstractProcess):
|
||||||
pid = None
|
pid: int | None
|
||||||
|
|
||||||
def __init__(self, process_id: int):
|
def __init__(self, process_id: int) -> None:
|
||||||
ptrace(ptrace_commands['PTRACE_SEIZE'], process_id)
|
ptrace(ptrace_commands['PTRACE_SEIZE'], process_id)
|
||||||
self.pid = process_id
|
self.pid = process_id
|
||||||
|
|
||||||
def close(self):
|
def close(self) -> None:
|
||||||
|
if self.pid is None:
|
||||||
|
return
|
||||||
os.kill(self.pid, signal.SIGSTOP)
|
os.kill(self.pid, signal.SIGSTOP)
|
||||||
|
os.waitpid(self.pid, 0)
|
||||||
ptrace(ptrace_commands['PTRACE_DETACH'], self.pid, 0, 0)
|
ptrace(ptrace_commands['PTRACE_DETACH'], self.pid, 0, 0)
|
||||||
|
os.kill(self.pid, signal.SIGCONT)
|
||||||
self.pid = None
|
self.pid = None
|
||||||
|
|
||||||
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
|
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
|
||||||
with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem:
|
with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
|
||||||
mem.seek(base_address)
|
mem.seek(base_address)
|
||||||
mem.write(write_buffer)
|
mem.write(write_buffer)
|
||||||
|
|
||||||
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
||||||
with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem:
|
with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
|
||||||
mem.seek(base_address)
|
mem.seek(base_address)
|
||||||
mem.readinto(read_buffer)
|
mem.readinto(read_buffer)
|
||||||
return read_buffer
|
return read_buffer
|
||||||
|
|
||||||
def get_path(self) -> str:
|
def get_path(self) -> str | None:
|
||||||
try:
|
try:
|
||||||
with open('/proc/{}/cmdline', 'rb') as f:
|
with Path(f'/proc/{self.pid}/cmdline').open('rb') as ff:
|
||||||
return f.read().decode().split('\x00')[0]
|
return ff.read().decode().split('\x00')[0]
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
return ''
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def list_available_pids() -> List[int]:
|
def list_available_pids() -> list[int]:
|
||||||
pids = []
|
pids = []
|
||||||
for pid_str in os.listdir('/proc'):
|
for pid_str in os.listdir('/proc'):
|
||||||
try:
|
try:
|
||||||
@ -91,26 +99,26 @@ class Process(AbstractProcess):
|
|||||||
return pids
|
return pids
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_pid_by_name(target_name: str) -> int or None:
|
def get_pid_by_name(target_name: str) -> int | None:
|
||||||
for pid in Process.list_available_pids():
|
for pid in Process.list_available_pids():
|
||||||
try:
|
try:
|
||||||
logger.info('Checking name for pid {}'.format(pid))
|
logger.debug(f'Checking name for pid {pid}')
|
||||||
with open('/proc/{}/cmdline'.format(pid), 'rb') as cmdline:
|
with Path(f'/proc/{pid}/cmdline').open('rb') as cmdline:
|
||||||
path = cmdline.read().decode().split('\x00')[0]
|
path = cmdline.read().decode().split('\x00')[0]
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
name = os.path.basename(path)
|
name = Path(path).name
|
||||||
logger.info('Name was "{}"'.format(name))
|
logger.debug(f'Name was "{name}"')
|
||||||
if path is not None and name == target_name:
|
if path is not None and name == target_name:
|
||||||
return pid
|
return pid
|
||||||
|
|
||||||
logger.info('Found no process with name {}'.format(target_name))
|
logger.info(f'Found no process with name {target_name}')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]:
|
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
|
||||||
regions = []
|
regions = []
|
||||||
with open('/proc/{}/maps'.format(self.pid), 'r') as maps:
|
with Path(f'/proc/{self.pid}/maps').open('r') as maps:
|
||||||
for line in maps:
|
for line in maps:
|
||||||
bounds, privileges = line.split()[0:2]
|
bounds, privileges = line.split()[0:2]
|
||||||
|
|
||||||
|
@ -11,25 +11,34 @@ Utility functions and types:
|
|||||||
Check if two buffers (ctypes objects) store equal values:
|
Check if two buffers (ctypes objects) store equal values:
|
||||||
ctypes_equal(a, b)
|
ctypes_equal(a, b)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import List
|
|
||||||
import ctypes
|
import ctypes
|
||||||
|
|
||||||
|
|
||||||
ctypes_buffer_t = ctypes._SimpleCData or ctypes.Array or ctypes.Structure or ctypes.Union
|
ctypes_buffer_t = (
|
||||||
|
ctypes._SimpleCData
|
||||||
|
| ctypes.Array
|
||||||
|
| ctypes.Structure
|
||||||
|
| ctypes.Union
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MemEditError(Exception):
|
class MemEditError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def search_buffer_verbatim(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t) -> List[int]:
|
def search_buffer_verbatim(
|
||||||
|
needle_buffer: ctypes_buffer_t,
|
||||||
|
haystack_buffer: ctypes_buffer_t,
|
||||||
|
) -> list[int]:
|
||||||
"""
|
"""
|
||||||
Search for a buffer inside another buffer, using a direct (bitwise) comparison
|
Search for a buffer inside another buffer, using a direct (bitwise) comparison
|
||||||
|
|
||||||
:param needle_buffer: Buffer to search for.
|
Args:
|
||||||
:param haystack_buffer: Buffer to search in.
|
needle_buffer: Buffer to search for.
|
||||||
:return: List of offsets where the needle_buffer was found.
|
haystack_buffer: Buffer to search in.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of offsets where the `needle_buffer` was found.
|
||||||
"""
|
"""
|
||||||
found = []
|
found = []
|
||||||
|
|
||||||
@ -45,14 +54,20 @@ def search_buffer_verbatim(needle_buffer: ctypes_buffer_t, haystack_buffer: ctyp
|
|||||||
return found
|
return found
|
||||||
|
|
||||||
|
|
||||||
def search_buffer(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t) -> List[int]:
|
def search_buffer(
|
||||||
|
needle_buffer: ctypes_buffer_t,
|
||||||
|
haystack_buffer: ctypes_buffer_t,
|
||||||
|
) -> list[int]:
|
||||||
"""
|
"""
|
||||||
Search for a buffer inside another buffer, using ctypes_equal for comparison.
|
Search for a buffer inside another buffer, using `ctypes_equal` for comparison.
|
||||||
Much slower than search_buffer_verbatim.
|
Much slower than `search_buffer_verbatim`.
|
||||||
|
|
||||||
:param needle_buffer: Buffer to search for.
|
Args:
|
||||||
:param haystack_buffer: Buffer to search in.
|
needle_buffer: Buffer to search for.
|
||||||
:return: List of offsets where the needle_buffer was found.
|
haystack_buffer: Buffer to search in.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of offsets where the needle_buffer was found.
|
||||||
"""
|
"""
|
||||||
found = []
|
found = []
|
||||||
read_type = type(needle_buffer)
|
read_type = type(needle_buffer)
|
||||||
@ -63,11 +78,14 @@ def search_buffer(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer
|
|||||||
return found
|
return found
|
||||||
|
|
||||||
|
|
||||||
def ctypes_equal(a: ctypes_buffer_t, b: ctypes_buffer_t) -> bool:
|
def ctypes_equal(
|
||||||
|
a: ctypes_buffer_t,
|
||||||
|
b: ctypes_buffer_t,
|
||||||
|
) -> bool:
|
||||||
"""
|
"""
|
||||||
Check if the values stored inside two ctypes buffers are equal.
|
Check if the values stored inside two ctypes buffers are equal.
|
||||||
"""
|
"""
|
||||||
if not type(a) == type(b):
|
if not type(a) == type(b): # noqa: E721
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if isinstance(a, ctypes.Array):
|
if isinstance(a, ctypes.Array):
|
||||||
@ -75,10 +93,10 @@ def ctypes_equal(a: ctypes_buffer_t, b: ctypes_buffer_t) -> bool:
|
|||||||
elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union):
|
elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union):
|
||||||
for attr_name, attr_type in a._fields_:
|
for attr_name, attr_type in a._fields_:
|
||||||
a_attr, b_attr = (getattr(x, attr_name) for x in (a, b))
|
a_attr, b_attr = (getattr(x, attr_name) for x in (a, b))
|
||||||
if isinstance(a, ctypes_buffer_t):
|
if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
|
||||||
if not ctypes_equal(a_attr, b_attr):
|
if not ctypes_equal(a_attr, b_attr):
|
||||||
return False
|
return False
|
||||||
elif not a_attr == b_attr:
|
elif a_attr != b_attr:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
@ -2,10 +2,9 @@
|
|||||||
Implementation of Process class for Windows
|
Implementation of Process class for Windows
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import List, Tuple
|
|
||||||
from math import floor
|
from math import floor
|
||||||
from os import strerror
|
from os import strerror
|
||||||
import os.path
|
from pathlib import Path
|
||||||
import ctypes
|
import ctypes
|
||||||
import ctypes.wintypes
|
import ctypes.wintypes
|
||||||
import logging
|
import logging
|
||||||
@ -14,7 +13,6 @@ from .abstract import Process as AbstractProcess
|
|||||||
from .utils import ctypes_buffer_t, MemEditError
|
from .utils import ctypes_buffer_t, MemEditError
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -26,10 +24,10 @@ privileges = {
|
|||||||
'PROCESS_VM_WRITE': 0x0020,
|
'PROCESS_VM_WRITE': 0x0020,
|
||||||
}
|
}
|
||||||
privileges['PROCESS_RW'] = (
|
privileges['PROCESS_RW'] = (
|
||||||
privileges['PROCESS_QUERY_INFORMATION'] |
|
privileges['PROCESS_QUERY_INFORMATION']
|
||||||
privileges['PROCESS_VM_OPERATION'] |
|
| privileges['PROCESS_VM_OPERATION']
|
||||||
privileges['PROCESS_VM_READ'] |
|
| privileges['PROCESS_VM_READ']
|
||||||
privileges['PROCESS_VM_WRITE']
|
| privileges['PROCESS_VM_WRITE']
|
||||||
)
|
)
|
||||||
|
|
||||||
# Memory region states
|
# Memory region states
|
||||||
@ -51,13 +49,13 @@ page_protections = {
|
|||||||
}
|
}
|
||||||
# Custom (combined) permissions
|
# Custom (combined) permissions
|
||||||
page_protections['PAGE_READABLE'] = (
|
page_protections['PAGE_READABLE'] = (
|
||||||
page_protections['PAGE_EXECUTE_READ'] |
|
page_protections['PAGE_EXECUTE_READ']
|
||||||
page_protections['PAGE_EXECUTE_READWRITE'] |
|
| page_protections['PAGE_EXECUTE_READWRITE']
|
||||||
page_protections['PAGE_READWRITE']
|
| page_protections['PAGE_READWRITE']
|
||||||
)
|
)
|
||||||
page_protections['PAGE_READWRITEABLE'] = (
|
page_protections['PAGE_READWRITEABLE'] = (
|
||||||
page_protections['PAGE_EXECUTE_READWRITE'] |
|
page_protections['PAGE_EXECUTE_READWRITE']
|
||||||
page_protections['PAGE_READWRITE']
|
| page_protections['PAGE_READWRITE']
|
||||||
)
|
)
|
||||||
|
|
||||||
# Memory types
|
# Memory types
|
||||||
@ -67,19 +65,56 @@ mem_types = {
|
|||||||
'MEM_PRIVATE': 0x20000,
|
'MEM_PRIVATE': 0x20000,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
# C struct for VirtualQueryEx
|
# C struct for VirtualQueryEx
|
||||||
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
|
class MEMORY_BASIC_INFORMATION32(ctypes.Structure):
|
||||||
_fields_ = [
|
_fields_ = [
|
||||||
('BaseAddress', ctypes.c_void_p),
|
('BaseAddress', ctypes.wintypes.DWORD),
|
||||||
('AllocationBase', ctypes.c_void_p),
|
('AllocationBase', ctypes.wintypes.DWORD),
|
||||||
('AllocationProtect', ctypes.wintypes.DWORD),
|
('AllocationProtect', ctypes.wintypes.DWORD),
|
||||||
('RegionSize', ctypes.wintypes.UINT),
|
('RegionSize', ctypes.wintypes.DWORD),
|
||||||
('State', ctypes.wintypes.DWORD),
|
('State', ctypes.wintypes.DWORD),
|
||||||
('Protect', ctypes.wintypes.DWORD),
|
('Protect', ctypes.wintypes.DWORD),
|
||||||
('Type', ctypes.wintypes.DWORD),
|
('Type', ctypes.wintypes.DWORD),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
|
||||||
|
_fields_ = [
|
||||||
|
('BaseAddress', ctypes.c_ulonglong),
|
||||||
|
('AllocationBase', ctypes.c_ulonglong),
|
||||||
|
('AllocationProtect', ctypes.wintypes.DWORD),
|
||||||
|
('__alignment1', ctypes.wintypes.DWORD),
|
||||||
|
('RegionSize', ctypes.c_ulonglong),
|
||||||
|
('State', ctypes.wintypes.DWORD),
|
||||||
|
('Protect', ctypes.wintypes.DWORD),
|
||||||
|
('Type', ctypes.wintypes.DWORD),
|
||||||
|
('__alignment2', ctypes.wintypes.DWORD),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
|
||||||
|
MEMORY_BASIC_INFORMATION: type[ctypes.Structure]
|
||||||
|
if PTR_SIZE == 8: # 64-bit python
|
||||||
|
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
|
||||||
|
elif PTR_SIZE == 4: # 32-bit python
|
||||||
|
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION32
|
||||||
|
|
||||||
|
ctypes.windll.kernel32.VirtualQueryEx.argtypes = [
|
||||||
|
ctypes.wintypes.HANDLE,
|
||||||
|
ctypes.wintypes.LPCVOID,
|
||||||
|
ctypes.c_void_p,
|
||||||
|
ctypes.c_size_t]
|
||||||
|
ctypes.windll.kernel32.ReadProcessMemory.argtypes = [
|
||||||
|
ctypes.wintypes.HANDLE,
|
||||||
|
ctypes.wintypes.LPCVOID,
|
||||||
|
ctypes.c_void_p,
|
||||||
|
ctypes.c_size_t,
|
||||||
|
ctypes.c_void_p]
|
||||||
|
ctypes.windll.kernel32.WriteProcessMemory.argtypes = [
|
||||||
|
ctypes.wintypes.HANDLE,
|
||||||
|
ctypes.wintypes.LPCVOID,
|
||||||
|
ctypes.c_void_p,
|
||||||
|
ctypes.c_size_t,
|
||||||
|
ctypes.c_void_p]
|
||||||
|
|
||||||
# C struct for GetSystemInfo
|
# C struct for GetSystemInfo
|
||||||
class SYSTEM_INFO(ctypes.Structure):
|
class SYSTEM_INFO(ctypes.Structure):
|
||||||
@ -89,7 +124,7 @@ class SYSTEM_INFO(ctypes.Structure):
|
|||||||
('dwPageSize', ctypes.wintypes.DWORD),
|
('dwPageSize', ctypes.wintypes.DWORD),
|
||||||
('lpMinimumApplicationAddress', ctypes.c_void_p),
|
('lpMinimumApplicationAddress', ctypes.c_void_p),
|
||||||
('lpMaximumApplicationAddress', ctypes.c_void_p),
|
('lpMaximumApplicationAddress', ctypes.c_void_p),
|
||||||
('dwActiveProcessorMask', ctypes.wintypes.DWORD),
|
('dwActiveProcessorMask', ctypes.c_void_p),
|
||||||
('dwNumberOfProcessors', ctypes.wintypes.DWORD),
|
('dwNumberOfProcessors', ctypes.wintypes.DWORD),
|
||||||
('dwProcessorType', ctypes.wintypes.DWORD),
|
('dwProcessorType', ctypes.wintypes.DWORD),
|
||||||
('dwAllocationGranularity', ctypes.wintypes.DWORD),
|
('dwAllocationGranularity', ctypes.wintypes.DWORD),
|
||||||
@ -99,9 +134,9 @@ class SYSTEM_INFO(ctypes.Structure):
|
|||||||
|
|
||||||
|
|
||||||
class Process(AbstractProcess):
|
class Process(AbstractProcess):
|
||||||
process_handle = None
|
process_handle: int | None
|
||||||
|
|
||||||
def __init__(self, process_id: int):
|
def __init__(self, process_id: int) -> None:
|
||||||
process_handle = ctypes.windll.kernel32.OpenProcess(
|
process_handle = ctypes.windll.kernel32.OpenProcess(
|
||||||
privileges['PROCESS_RW'],
|
privileges['PROCESS_RW'],
|
||||||
False,
|
False,
|
||||||
@ -109,15 +144,15 @@ class Process(AbstractProcess):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if not process_handle:
|
if not process_handle:
|
||||||
raise MemEditError('Couldn\'t open process {}'.format(process_id))
|
raise MemEditError(f'Couldn\'t open process {process_id}')
|
||||||
|
|
||||||
self.process_handle = process_handle
|
self.process_handle = process_handle
|
||||||
|
|
||||||
def close(self):
|
def close(self) -> None:
|
||||||
ctypes.windll.kernel32.CloseHandle(self.process_handle)
|
ctypes.windll.kernel32.CloseHandle(self.process_handle)
|
||||||
self.process_handle = None
|
self.process_handle = None
|
||||||
|
|
||||||
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
|
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
|
||||||
try:
|
try:
|
||||||
ctypes.windll.kernel32.WriteProcessMemory(
|
ctypes.windll.kernel32.WriteProcessMemory(
|
||||||
self.process_handle,
|
self.process_handle,
|
||||||
@ -126,8 +161,8 @@ class Process(AbstractProcess):
|
|||||||
ctypes.sizeof(write_buffer),
|
ctypes.sizeof(write_buffer),
|
||||||
None
|
None
|
||||||
)
|
)
|
||||||
except (BufferError, ValueError, TypeError):
|
except (BufferError, ValueError, TypeError) as err:
|
||||||
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error()))
|
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
|
||||||
|
|
||||||
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
||||||
try:
|
try:
|
||||||
@ -138,78 +173,79 @@ class Process(AbstractProcess):
|
|||||||
ctypes.sizeof(read_buffer),
|
ctypes.sizeof(read_buffer),
|
||||||
None
|
None
|
||||||
)
|
)
|
||||||
except (BufferError, ValueError, TypeError):
|
except (BufferError, ValueError, TypeError) as err:
|
||||||
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error()))
|
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
|
||||||
|
|
||||||
return read_buffer
|
return read_buffer
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_last_error() -> Tuple[int, str]:
|
def _get_last_error() -> tuple[int, str]:
|
||||||
err = ctypes.windll.kernel32.GetLastError()
|
err = ctypes.windll.kernel32.GetLastError()
|
||||||
return err, strerror(err)
|
return err, strerror(err)
|
||||||
|
|
||||||
def get_path(self) -> str:
|
def get_path(self) -> str | None:
|
||||||
max_path_len = 260
|
max_path_len = 260
|
||||||
name_buffer = (ctypes.c_char * max_path_len)()
|
name_buffer = (ctypes.c_char * max_path_len)()
|
||||||
rval = ctypes.windll.psapi.GetProcessImageFileNameA(
|
rval = ctypes.windll.psapi.GetProcessImageFileNameA(
|
||||||
self.process_handle,
|
self.process_handle,
|
||||||
name_buffer,
|
name_buffer,
|
||||||
max_path_len
|
max_path_len,
|
||||||
)
|
)
|
||||||
|
|
||||||
if rval > 0:
|
if rval <= 0:
|
||||||
return name_buffer.value.decode()
|
|
||||||
else:
|
|
||||||
return None
|
return None
|
||||||
|
return name_buffer.value.decode()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def list_available_pids() -> List[int]:
|
def list_available_pids() -> list[int]:
|
||||||
# According to EnumProcesses docs, you can't find out how many processes there are before
|
# According to EnumProcesses docs, you can't find out how many processes there are before
|
||||||
# fetching the list. As a result, we grab 100 on the first try, and if we get a full list
|
# fetching the list. As a result, we grab 100 on the first try, and if we get a full list
|
||||||
# of 100, repeatedly double the number until we get fewer than we asked for.
|
# of 100, repeatedly double the number until we get fewer than we asked for.
|
||||||
|
|
||||||
n = 100
|
nn = 100
|
||||||
returned_size = ctypes.wintypes.DWORD()
|
returned_size = ctypes.wintypes.DWORD()
|
||||||
returned_size_ptr = ctypes.byref(returned_size)
|
returned_size_ptr = ctypes.byref(returned_size)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
pids = (ctypes.wintypes.DWORD * n)()
|
pids = (ctypes.wintypes.DWORD * nn)()
|
||||||
size = ctypes.sizeof(pids)
|
size = ctypes.sizeof(pids)
|
||||||
pids_ptr = ctypes.byref(pids)
|
pids_ptr = ctypes.byref(pids)
|
||||||
|
|
||||||
success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr)
|
success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr)
|
||||||
if not success:
|
if not success:
|
||||||
raise MemEditError('Failed to enumerate processes: n={}'.format(n))
|
raise MemEditError(f'Failed to enumerate processes: nn={nn}')
|
||||||
|
|
||||||
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
|
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
|
||||||
|
|
||||||
if n == num_returned:
|
if nn != num_returned:
|
||||||
n *= 2
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
break
|
break
|
||||||
|
nn *= 2
|
||||||
|
|
||||||
return pids[:num_returned]
|
return pids[:num_returned]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_pid_by_name(target_name: str) -> int or None:
|
def get_pid_by_name(target_name: str) -> int | None:
|
||||||
for pid in Process.list_available_pids():
|
for pid in Process.list_available_pids():
|
||||||
try:
|
try:
|
||||||
logger.info('Checking name for pid {}'.format(pid))
|
logger.debug(f'Checking name for pid {pid}')
|
||||||
with Process.open_process(pid) as process:
|
with Process.open_process(pid) as process:
|
||||||
path = process.get_path()
|
path = process.get_path()
|
||||||
|
if path is None:
|
||||||
|
continue
|
||||||
|
|
||||||
name = os.path.basename(path)
|
name = Path(path).name
|
||||||
logger.info('Name was "{}"'.format(name))
|
logger.debug(f'Name was "{name}"')
|
||||||
if path is not None and name == target_name:
|
if path is not None and name == target_name:
|
||||||
return pid
|
return pid
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
except MemEditError as err:
|
||||||
|
logger.debug(repr(err))
|
||||||
|
|
||||||
logger.info('Found no process with name {}'.format(target_name))
|
logger.info(f'Found no process with name {target_name}')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]:
|
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
|
||||||
sys_info = SYSTEM_INFO()
|
sys_info = SYSTEM_INFO()
|
||||||
sys_info_ptr = ctypes.byref(sys_info)
|
sys_info_ptr = ctypes.byref(sys_info)
|
||||||
ctypes.windll.kernel32.GetSystemInfo(sys_info_ptr)
|
ctypes.windll.kernel32.GetSystemInfo(sys_info_ptr)
|
||||||
@ -217,7 +253,7 @@ class Process(AbstractProcess):
|
|||||||
start = sys_info.lpMinimumApplicationAddress
|
start = sys_info.lpMinimumApplicationAddress
|
||||||
stop = sys_info.lpMaximumApplicationAddress
|
stop = sys_info.lpMaximumApplicationAddress
|
||||||
|
|
||||||
def get_mem_info(address):
|
def get_mem_info(address: int) -> MEMORY_BASIC_INFORMATION:
|
||||||
"""
|
"""
|
||||||
Query the memory region starting at or before 'address' to get its size/type/state/permissions.
|
Query the memory region starting at or before 'address' to get its size/type/state/permissions.
|
||||||
"""
|
"""
|
||||||
@ -229,14 +265,12 @@ class Process(AbstractProcess):
|
|||||||
self.process_handle,
|
self.process_handle,
|
||||||
address,
|
address,
|
||||||
mbi_ptr,
|
mbi_ptr,
|
||||||
mbi_size,
|
mbi_size)
|
||||||
)
|
|
||||||
|
|
||||||
if success != mbi_size:
|
if success != mbi_size:
|
||||||
if success == 0:
|
if success == 0:
|
||||||
raise MemEditError('Failed VirtualQueryEx with handle ' +
|
raise MemEditError('Failed VirtualQueryEx with handle '
|
||||||
'{}: {}'.format(self.process_handle, self._get_last_error()))
|
+ f'{self.process_handle}: {self._get_last_error()}')
|
||||||
else:
|
|
||||||
raise MemEditError('VirtualQueryEx output too short!')
|
raise MemEditError('VirtualQueryEx output too short!')
|
||||||
|
|
||||||
return mbi
|
return mbi
|
||||||
@ -245,10 +279,11 @@ class Process(AbstractProcess):
|
|||||||
page_ptr = start
|
page_ptr = start
|
||||||
while page_ptr < stop:
|
while page_ptr < stop:
|
||||||
page_info = get_mem_info(page_ptr)
|
page_info = get_mem_info(page_ptr)
|
||||||
if page_info.Type == mem_types['MEM_PRIVATE'] and \
|
if (page_info.Type == mem_types['MEM_PRIVATE']
|
||||||
page_info.State == mem_states['MEM_COMMIT'] and \
|
and page_info.State == mem_states['MEM_COMMIT']
|
||||||
page_info.Protect & page_protections['PAGE_READABLE'] != 0 and \
|
and page_info.Protect & page_protections['PAGE_READABLE'] != 0
|
||||||
(page_info.Protect & page_protections['PAGE_READWRITEABLE'] != 0 or not writeable_only):
|
and (page_info.Protect & page_protections['PAGE_READWRITEABLE'] != 0
|
||||||
|
or not writeable_only)):
|
||||||
regions.append((page_ptr, page_ptr + page_info.RegionSize))
|
regions.append((page_ptr, page_ptr + page_info.RegionSize))
|
||||||
page_ptr += page_info.RegionSize
|
page_ptr += page_info.RegionSize
|
||||||
|
|
||||||
|
87
pyproject.toml
Normal file
87
pyproject.toml
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["hatchling"]
|
||||||
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "mem_edit"
|
||||||
|
description = "Multi-platform library for memory editing"
|
||||||
|
readme = "README.md"
|
||||||
|
license = { file = "LICENSE.md" }
|
||||||
|
authors = [
|
||||||
|
{ name="Jan Petykiewicz", email="jan@mpxd.net" },
|
||||||
|
]
|
||||||
|
homepage = "https://mpxd.net/code/jan/mem_edit"
|
||||||
|
repository = "https://mpxd.net/code/jan/mem_edit"
|
||||||
|
keywords = [
|
||||||
|
"memory",
|
||||||
|
"edit",
|
||||||
|
"editing",
|
||||||
|
"ReadProcessMemory",
|
||||||
|
"WriteProcessMemory",
|
||||||
|
"proc",
|
||||||
|
"mem",
|
||||||
|
"ptrace",
|
||||||
|
"multiplatform",
|
||||||
|
"scan",
|
||||||
|
"scanner",
|
||||||
|
"search",
|
||||||
|
"debug",
|
||||||
|
"cheat",
|
||||||
|
"trainer",
|
||||||
|
]
|
||||||
|
classifiers = [
|
||||||
|
"Programming Language :: Python :: 3",
|
||||||
|
"Development Status :: 5 - Production/Stable",
|
||||||
|
"Environment :: Other Environment",
|
||||||
|
"Intended Audience :: Developers",
|
||||||
|
"License :: OSI Approved :: GNU Affero General Public License v3",
|
||||||
|
"Operating System :: POSIX :: Linux",
|
||||||
|
"Operating System :: Microsoft :: Windows",
|
||||||
|
"Topic :: Software Development",
|
||||||
|
"Topic :: Software Development :: Debuggers",
|
||||||
|
"Topic :: Software Development :: Testing",
|
||||||
|
"Topic :: System",
|
||||||
|
"Topic :: Games/Entertainment",
|
||||||
|
"Topic :: Utilities",
|
||||||
|
]
|
||||||
|
requires-python = ">=3.11"
|
||||||
|
dynamic = ["version"]
|
||||||
|
dependencies = [
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.hatch.version]
|
||||||
|
path = "mem_edit/__init__.py"
|
||||||
|
|
||||||
|
|
||||||
|
[tool.ruff]
|
||||||
|
exclude = [
|
||||||
|
".git",
|
||||||
|
"dist",
|
||||||
|
]
|
||||||
|
line-length = 145
|
||||||
|
indent-width = 4
|
||||||
|
lint.dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
|
||||||
|
lint.select = [
|
||||||
|
"NPY", "E", "F", "W", "B", "ANN", "UP", "SLOT", "SIM", "LOG",
|
||||||
|
"C4", "ISC", "PIE", "PT", "RET", "TCH", "PTH", "INT",
|
||||||
|
"ARG", "PL", "R", "TRY",
|
||||||
|
"G010", "G101", "G201", "G202",
|
||||||
|
"Q002", "Q003", "Q004",
|
||||||
|
]
|
||||||
|
lint.ignore = [
|
||||||
|
#"ANN001", # No annotation
|
||||||
|
"ANN002", # *args
|
||||||
|
"ANN003", # **kwargs
|
||||||
|
"ANN401", # Any
|
||||||
|
"ANN101", # self: Self
|
||||||
|
"SIM108", # single-line if / else assignment
|
||||||
|
"RET504", # x=y+z; return x
|
||||||
|
"PIE790", # unnecessary pass
|
||||||
|
"ISC003", # non-implicit string concatenation
|
||||||
|
"C408", # dict(x=y) instead of {'x': y}
|
||||||
|
"PLR09", # Too many xxx
|
||||||
|
"PLR2004", # magic number
|
||||||
|
"PLC0414", # import x as x
|
||||||
|
"TRY003", # Long exception message
|
||||||
|
]
|
||||||
|
|
61
setup.py
61
setup.py
@ -1,61 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
from setuptools import setup, find_packages
|
|
||||||
|
|
||||||
with open('README.md', 'r') as f:
|
|
||||||
long_description = f.read()
|
|
||||||
|
|
||||||
with open('mem_edit/VERSION', 'r') as f:
|
|
||||||
version = f.read().strip()
|
|
||||||
|
|
||||||
setup(name='mem_edit',
|
|
||||||
version=version,
|
|
||||||
description='Multi-platform library for memory editing',
|
|
||||||
long_description=long_description,
|
|
||||||
long_description_content_type='text/markdown',
|
|
||||||
author='Jan Petykiewicz',
|
|
||||||
author_email='anewusername@gmail.com',
|
|
||||||
url='https://mpxd.net/code/jan/mem_edit',
|
|
||||||
keywords=[
|
|
||||||
'memory',
|
|
||||||
'edit',
|
|
||||||
'editing',
|
|
||||||
'ReadProcessMemory',
|
|
||||||
'WriteProcessMemory',
|
|
||||||
'proc',
|
|
||||||
'mem',
|
|
||||||
'ptrace',
|
|
||||||
'multiplatform',
|
|
||||||
'scan',
|
|
||||||
'scanner',
|
|
||||||
'search',
|
|
||||||
'debug',
|
|
||||||
'cheat',
|
|
||||||
'trainer',
|
|
||||||
],
|
|
||||||
classifiers=[
|
|
||||||
'Programming Language :: Python :: 3',
|
|
||||||
'Development Status :: 4 - Beta',
|
|
||||||
'Environment :: Other Environment',
|
|
||||||
'Intended Audience :: Developers',
|
|
||||||
'License :: OSI Approved :: GNU Affero General Public License v3',
|
|
||||||
'Operating System :: POSIX :: Linux',
|
|
||||||
'Operating System :: Microsoft :: Windows',
|
|
||||||
'Topic :: Software Development',
|
|
||||||
'Topic :: Software Development :: Debuggers',
|
|
||||||
'Topic :: Software Development :: Testing',
|
|
||||||
'Topic :: System',
|
|
||||||
'Topic :: Games/Entertainment',
|
|
||||||
'Topic :: Utilities',
|
|
||||||
],
|
|
||||||
packages=find_packages(),
|
|
||||||
package_data={
|
|
||||||
'mem_edit': ['VERSION']
|
|
||||||
},
|
|
||||||
install_requires=[
|
|
||||||
'typing',
|
|
||||||
],
|
|
||||||
extras_require={
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user