Compare commits

..

28 Commits

Author SHA1 Message Date
8f7955543c use pathlib 2024-08-01 00:23:25 -07:00
5a82f04f9e flatten and simplify some code 2024-08-01 00:22:59 -07:00
5f2b1a432e improve type annotations 2024-08-01 00:21:55 -07:00
fec96b92a7 improve error handling 2024-08-01 00:20:53 -07:00
42c69867b8 using == on purpose here 2024-07-31 22:54:27 -07:00
db1dcc5e83 re-export using "import x as x" 2024-07-31 22:54:10 -07:00
1ecab08651 add ruff config 2024-07-31 22:53:55 -07:00
e26381a578 bump version to v0.8 2024-03-30 17:46:16 -07:00
e316322fbf Update reqs 2024-03-30 17:45:41 -07:00
b889ad8133 update readme and note github mirror 2024-03-30 17:44:19 -07:00
f10674e2b5 flake8 preferences 2024-03-30 17:43:25 -07:00
bdf0fb323e early bailout conditions caught by type check 2024-03-30 17:41:44 -07:00
f7c7496cfd get_path should return None on failure 2024-03-30 17:41:23 -07:00
e56ec88761 modernize some code style
indentation, type annotations, f-strings
2024-03-30 17:41:18 -07:00
f03ea6acad bump version to v0.7 2022-08-18 23:42:12 -07:00
303620b0a2 Move to hatch-based build 2022-08-18 23:41:19 -07:00
jan
d49555ad15 Merge pull request 'linux: wait for process before detach, and send SIGCONT' (#1) from XeroOl/mem_edit:master into master
Reviewed-on: jan/mem_edit#1
2022-08-18 23:38:31 -07:00
xerool
46e9456fd4 linux: wait for process before detach, and send SIGCONT
I had issues with the ptrace call failing because the process had not yet stopped
from SIGSTOP.
From this stackoverflow answer, it seems that you can use waitpid to
wait until the process is actually stopped. In python, this is exposed
as os.waitpid.

https://stackoverflow.com/questions/20510300/ptrace-detach-fails-after-ptrace-cont-with-errno-esrch#20525326

Additionally, the process was left frozen. I send a SIGCONT to continue
the process after the detach, so that it isn't left stopped.
2022-04-30 22:37:17 -05:00
f3154e443d update email 2021-07-11 17:25:00 -07:00
ef1a39152c bump version to v0.6 2021-04-08 19:50:22 -07:00
c29be9f429 strip newlines from version string 2021-04-08 19:49:55 -07:00
5a032da984 try to reduce log spam 2021-04-08 19:49:19 -07:00
6ab295fc26 bump version to v0.5 2020-11-01 20:21:35 -08:00
6913f73db4 cosmetic and typing-related changes 2020-11-01 20:16:46 -08:00
9759645f92 move version info into VERSION.py
This avoid needing custom spec files for pyinstaller, which doesn't
handle package_data by default
2020-11-01 20:16:06 -08:00
0632b205ab bump version number to v0.4: fixed on 64-bit python on windows 2020-04-15 02:03:13 -07:00
jan
5c75da31d5 support 64-bit python 2020-04-15 01:33:36 -07:00
jan
bd6c22ca1d windows: Don't fail search on unopenable process 2020-04-15 01:32:33 -07:00
14 changed files with 497 additions and 327 deletions

29
.flake8 Normal file
View File

@ -0,0 +1,29 @@
[flake8]
ignore =
# E501 line too long
E501,
# W391 newlines at EOF
W391,
# E241 multiple spaces after comma
E241,
# E302 expected 2 newlines
E302,
# W503 line break before binary operator (to be deprecated)
W503,
# E265 block comment should start with '# '
E265,
# E123 closing bracket does not match indentation of opening bracket's line
E123,
# E124 closing bracket does not match visual indentation
E124,
# E221 multiple spaces before operator
E221,
# E201 whitespace after '['
E201,
# E741 ambiguous variable name 'I'
E741,
per-file-ignores =
# F401 import without use
*/__init__.py: F401,

7
.gitignore vendored
View File

@ -1,8 +1,13 @@
.idea/ .idea/
__pycache__ __pycache__/
*.pyc *.pyc
*.egg-info/ *.egg-info/
build/ build/
dist/ dist/
.pytest_cache/
.mypy_cache/
*.pickle

View File

@ -1,3 +0,0 @@
include README.md
include LICENSE.md
include mem_edit/VERSION

View File

@ -1,8 +1,10 @@
# mem_edit # mem_edit
**mem_edit** is a multi-platform memory editing library written in Python. **mem_edit** is a multi-platform memory editing library written in Python.
**Homepage:** https://mpxd.net/code/jan/mem_edit **Homepage:** https://mpxd.net/code/jan/mem_edit
* PyPI: https://pypi.org/project/mem-edit/
* Github mirror: https://github.com/anewusername/mem_edit
**Capabilities:** **Capabilities:**
* Scan all readable memory used by a process. * Scan all readable memory used by a process.
@ -18,7 +20,7 @@
## Installation ## Installation
**Dependencies:** **Dependencies:**
* python 3 (written and tested with 3.7) * python >=3.11
* ctypes * ctypes
* typing (for type annotations) * typing (for type annotations)

1
mem_edit/LICENSE.md Symbolic link
View File

@ -0,0 +1 @@
../LICENSE.md

1
mem_edit/README.md Symbolic link
View File

@ -0,0 +1 @@
../README.md

View File

@ -1 +0,0 @@
0.3

View File

@ -12,22 +12,19 @@ To get started, try:
""" """
import platform import platform
import pathlib
from .utils import MemEditError from .utils import MemEditError
__author__ = 'Jan Petykiewicz' __author__ = 'Jan Petykiewicz'
__version__ = '0.8'
with open(pathlib.Path(__file__).parent / 'VERSION', 'r') as f: version = __version__ # legacy compatibility
__version__ = f.read().strip()
version = __version__
system = platform.system() system = platform.system()
if system == 'Windows': if system == 'Windows':
from .windows import Process from .windows import Process as Process
elif system == 'Linux': elif system == 'Linux':
from .linux import Process from .linux import Process as Process
else: else:
raise MemEditError('Only Linux and Windows are currently supported.') raise MemEditError('Only Linux and Windows are currently supported.')

View File

@ -1,8 +1,8 @@
""" """
Abstract class for cross-platform memory editing. Abstract class for cross-platform memory editing.
""" """
from typing import Self
from typing import List, Tuple from collections.abc import Generator
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from contextlib import contextmanager from contextlib import contextmanager
import copy import copy
@ -13,7 +13,6 @@ from . import utils
from .utils import ctypes_buffer_t from .utils import ctypes_buffer_t
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -23,8 +22,8 @@ class Process(metaclass=ABCMeta):
(i.e., by reading from or writing to the memory used by a given process). (i.e., by reading from or writing to the memory used by a given process).
The static methods The static methods
Process.list_available_pids() `Process.list_available_pids()`
Process.get_pid_by_name(executable_filename) `Process.get_pid_by_name(executable_filename)`
can be used to help find the process id (pid) of the target process. They are can be used to help find the process id (pid) of the target process. They are
provided for convenience only; it is probably better to use the tools built provided for convenience only; it is probably better to use the tools built
in to your operating system to discover the pid of the specific process you in to your operating system to discover the pid of the specific process you
@ -32,18 +31,19 @@ class Process(metaclass=ABCMeta):
Once you have found the pid, you are ready to construct an instance of Process Once you have found the pid, you are ready to construct an instance of Process
and use it to read and write to memory. Once you are done with the process, and use it to read and write to memory. Once you are done with the process,
use .close() to free up the process for access by other debuggers etc. use `.close()` to free up the process for access by other debuggers etc.
```
p = Process(1239) p = Process(1239)
p.close() p.close()
```
To read/write to memory, first create a buffer using ctypes: To read/write to memory, first create a buffer using ctypes:
```
buffer0 = (ctypes.c_byte * 5)(39, 50, 03, 40, 30) buffer0 = (ctypes.c_byte * 5)(39, 50, 03, 40, 30)
buffer1 = ctypes.c_ulong() buffer1 = ctypes.c_ulong()
```
and then use and then use
```
p.write_memory(0x2fe, buffer0) p.write_memory(0x2fe, buffer0)
val0 = p.read_memory(0x220, buffer0)[:] val0 = p.read_memory(0x220, buffer0)[:]
@ -51,52 +51,52 @@ class Process(metaclass=ABCMeta):
val1a = p.read_memory(0x149, buffer1).value val1a = p.read_memory(0x149, buffer1).value
val2b = buffer1.value val2b = buffer1.value
assert(val1a == val2b) assert(val1a == val2b)
```
Searching for a value can be done in a number of ways: Searching for a value can be done in a number of ways:
Search a list of addresses: Search a list of addresses:
found_addresses = p.search_addresses([0x1020, 0x1030], buffer0) `found_addresses = p.search_addresses([0x1020, 0x1030], buffer0)`
Search the entire memory space: Search the entire memory space:
found_addresses = p.search_all_memory(buffer0, writeable_only=False) `found_addresses = p.search_all_memory(buffer0, writeable_only=False)`
You can also get a list of which regions in memory are mapped (readable): You can also get a list of which regions in memory are mapped (readable):
regions = p.list_mapped_regions(writeable_only=False) `regions = p.list_mapped_regions(writeable_only=False)`
which can be used along with search_buffer(...) to re-create .search_all_memory(...):
which can be used along with search_buffer(...) to re-create .search_all_memory(...): ```
found = [] found = []
for region_start, region_stop in regions: for region_start, region_stop in regions:
region_buffer = (ctypes.c_byte * (region_stop - region_start))() region_buffer = (ctypes.c_byte * (region_stop - region_start))()
p.read_memory(region_start, region_buffer) p.read_memory(region_start, region_buffer)
found += utils.search_buffer(ctypes.c_ulong(123456790), region_buffer) found += utils.search_buffer(ctypes.c_ulong(123456790), region_buffer)
```
Other useful methods include the context manager, implemented as a static method: Other useful methods include the context manager, implemented as a static method:
```
with Process.open_process(pid) as p: with Process.open_process(pid) as p:
# use p here, no need to call p.close() # use p here, no need to call p.close()
```
.get_path(), which reports the path of the executable file which was used .get_path(), which reports the path of the executable file which was used
to start the process: to start the process:
```
executable_path = p.get_path() executable_path = p.get_path()
```
and deref_struct_pointer, which takes a pointer to a struct and reads out the struct members: and deref_struct_pointer, which takes a pointer to a struct and reads out the struct members:
```
# struct is a list of (offset, buffer) pairs # struct is a list of (offset, buffer) pairs
struct_defintion = [(0x0, ctypes.c_ulong()), struct_defintion = [(0x0, ctypes.c_ulong()),
(0x20, ctypes.c_byte())] (0x20, ctypes.c_byte())]
values = p.deref_struct_pointer(0x0feab4, struct_defintion) values = p.deref_struct_pointer(0x0feab4, struct_defintion)
```
which is shorthand for which is shorthand for
```
struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p()) struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()), values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
p.read_memory(struct_addr + 0x20, ctypes.c_byte())] p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
```
================= =================
Putting all this together, a simple program which alters a magic number in the only running Putting all this together, a simple program which alters a magic number in the only running
instance of 'magic.exe' might look like this: instance of 'magic.exe' might look like this:
```
import ctypes import ctypes
from mem_edit import Process from mem_edit import Process
@ -107,9 +107,9 @@ class Process(metaclass=ABCMeta):
addrs = p.search_all_memory(magic_number) addrs = p.search_all_memory(magic_number)
assert(len(addrs) == 1) assert(len(addrs) == 1)
p.write_memory(addrs[0], ctypes.c_ulong(42)) p.write_memory(addrs[0], ctypes.c_ulong(42))
```
Searching for a value which changes: Searching for a value which changes:
```
pid = Process.get_pid_by_name('monitor_me.exe') pid = Process.get_pid_by_name('monitor_me.exe')
with Process.open_process(pid) as p: with Process.open_process(pid) as p:
addrs = p.search_all_memory(ctypes.c_int(40)) addrs = p.search_all_memory(ctypes.c_int(40))
@ -118,93 +118,111 @@ class Process(metaclass=ABCMeta):
print('Found addresses:') print('Found addresses:')
for addr in filtered_addrs: for addr in filtered_addrs:
print(hex(addr)) print(hex(addr))
```
""" """
@abstractmethod @abstractmethod
def __init__(self, process_id: int): def __init__(self, process_id: int) -> None:
""" """
Constructing a Process object prepares the process with specified process_id for Constructing a Process object prepares the process with specified process_id for
memory editing. Finding the process_id for the process you want to edit is often memory editing. Finding the `process_id` for the process you want to edit is often
easiest using os-specific tools (or by launching the process yourself, e.g. with easiest using os-specific tools (or by launching the process yourself, e.g. with
subprocess.Popen(...)). `subprocess.Popen(...)`).
:param process_id: Process id (pid) of the target process Args:
process_id: Process id (pid) of the target process
""" """
pass pass
@abstractmethod @abstractmethod
def close(self): def close(self) -> None:
""" """
Detach from the process, removing our ability to edit it and Detach from the process, removing our ability to edit it and
letting other debuggers attach to it instead. letting other debuggers attach to it instead.
This function should be called after you are done working with the process This function should be called after you are done working with the process
and will no longer need it. See the Process.open_process(...) context and will no longer need it. See the `Process.open_process(...)` context
manager to avoid having to call this function yourself. manager to avoid having to call this function yourself.
""" """
pass pass
@abstractmethod @abstractmethod
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): def write_memory(
self,
base_address: int,
write_buffer: ctypes_buffer_t,
) -> None:
""" """
Write the given buffer to the process's address space, starting at base_address. Write the given buffer to the process's address space, starting at `base_address`.
:param base_address: The address to write at, in the process's address space. Args:
:param write_buffer: A ctypes object, for example, ctypes.c_ulong(48), base_address: The address to write at, in the process's address space.
(ctypes.c_byte * 3)(43, 21, 0xff), or a subclass of ctypes.Structure, write_buffer: A ctypes object, for example, `ctypes.c_ulong(48)`,
which will be written into memory starting at base_address. `(ctypes.c_byte * 3)(43, 21, 0xff)`, or a subclass of `ctypes.Structure`,
which will be written into memory starting at `base_address`.
""" """
pass pass
@abstractmethod @abstractmethod
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(
self,
base_address: int,
read_buffer: ctypes_buffer_t,
) -> ctypes_buffer_t:
""" """
Read into the given buffer from the process's address space, starting at base_address. Read into the given buffer from the process's address space, starting at `base_address`.
:param base_address: The address to read from, in the process's address space. Args:
:param read_buffer: A ctypes object, for example. ctypes.c_ulong(), base_address: The address to read from, in the process's address space.
(ctypes.c_byte * 3)(), or a subclass of ctypes.Structure, which will be read_buffer: A `ctypes` object, for example. `ctypes.c_ulong()`,
overwritten with the contents of the process's memory starting at base_address. `(ctypes.c_byte * 3)()`, or a subclass of `ctypes.Structure`, which will be
:returns: read_buffer is returned as well as being overwritten. overwritten with the contents of the process's memory starting at `base_address`.
Returns:
`read_buffer` is returned as well as being overwritten.
""" """
pass pass
@abstractmethod @abstractmethod
def list_mapped_regions(self, writeable_only=True) -> List[Tuple[int, int]]: def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
""" """
Return a list of (start_address, stop_address) for the regions of the address space Return a list of `(start_address, stop_address)` for the regions of the address space
accessible to (readable and possibly writable by) the process. accessible to (readable and possibly writable by) the process.
By default, this function does not return non-writeable regions. By default, this function does not return non-writeable regions.
:param writeable_only: If True, only return regions which are also writeable. Args:
Default true. writeable_only: If `True`, only return regions which are also writeable.
:return: List of (start_address, stop_address) for each accessible memory region. Default `True`.
Returns:
List of `(start_address, stop_address)` for each accessible memory region.
""" """
pass pass
@abstractmethod @abstractmethod
def get_path(self) -> str: def get_path(self) -> str | None:
""" """
Return the path to the executable file which was run to start this process. Return the path to the executable file which was run to start this process.
:return: A string containing the path. Returns:
A string containing the path, or None if no path was found.
""" """
pass pass
@staticmethod @staticmethod
@abstractmethod @abstractmethod
def list_available_pids() -> List[int]: def list_available_pids() -> list[int]:
""" """
Return a list of all process ids (pids) accessible on this system. Return a list of all process ids (pids) accessible on this system.
:return: List of running process ids. Returns:
List of running process ids.
""" """
pass pass
@staticmethod @staticmethod
@abstractmethod @abstractmethod
def get_pid_by_name(target_name: str) -> int or None: def get_pid_by_name(target_name: str) -> int | None:
""" """
Attempt to return the process id (pid) of a process which was run with an executable Attempt to return the process id (pid) of a process which was run with an executable
file with the provided name. If no process is found, return None. file with the provided name. If no process is found, return None.
@ -215,44 +233,65 @@ class Process(metaclass=ABCMeta):
Don't rely on this method if you can possibly avoid it, since it makes no Don't rely on this method if you can possibly avoid it, since it makes no
attempt to confirm that it found a unique process and breaks trivially (e.g. if the attempt to confirm that it found a unique process and breaks trivially (e.g. if the
executable file is renamed). executable file is renamed).
:return: Process id (pid) of a process with the provided name, or None.
Args:
target_name: Name of the process to find the PID for
Returns:
Process id (pid) of a process with the provided name, or `None`.
""" """
pass pass
def deref_struct_pointer(self, def deref_struct_pointer(
base_address: int, self,
targets: List[Tuple[int, ctypes_buffer_t]], base_address: int,
) -> List[ctypes_buffer_t]: targets: list[tuple[int, ctypes_buffer_t]],
) -> list[ctypes_buffer_t]:
""" """
Take a pointer to a struct and read out the struct members: Take a pointer to a struct and read out the struct members:
```
struct_defintion = [(0x0, ctypes.c_ulong()), struct_defintion = [(0x0, ctypes.c_ulong()),
(0x20, ctypes.c_byte())] (0x20, ctypes.c_byte())]
values = p.deref_struct_pointer(0x0feab4, struct_defintion) values = p.deref_struct_pointer(0x0feab4, struct_defintion)
```
which is shorthand for which is shorthand for
```
struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p()) struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()), values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
p.read_memory(struct_addr + 0x20, ctypes.c_byte())] p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
```
:param base_address: Address at which the struct pointer is located. Args:
:param targets: List of (offset, read_buffer) pairs which will be read from the struct. base_address: Address at which the struct pointer is located.
:return: List of read values corresponding to the provided targets. targets: List of `(offset, read_buffer)` pairs which will be read from the struct.
Return:
List of read values corresponding to the provided targets.
""" """
base = self.read_memory(base_address, ctypes.c_void_p()).value base = self.read_memory(base_address, ctypes.c_void_p()).value
values = [self.read_memory(base + offset, buffer) for offset, buffer in targets] values = [self.read_memory(base + offset, buffer) for offset, buffer in targets]
return values return values
def search_addresses(self, addresses: List[int], needle_buffer: ctypes_buffer_t, verbatim: bool=True) -> List[int]: def search_addresses(
self,
addresses: list[int],
needle_buffer: ctypes_buffer_t,
verbatim: bool = True,
) -> list[int]:
""" """
Search for the provided value at each of the provided addresses, and return the addresses Search for the provided value at each of the provided addresses, and return the addresses
where it is found. where it is found.
:param addresses: List of addresses which should be probed. Args:
:param needle_buffer: The value to search for. This should be a ctypes object of the same addresses: List of addresses which should be probed.
sorts as used by .read_memory(...), which will be compared to the contents of needle_buffer: The value to search for. This should be a `ctypes` object of the same
sorts as used by `.read_memory(...)`, which will be compared to the contents of
memory at each of the given addresses. memory at each of the given addresses.
:param verbatim: If True, perform bitwise comparison when searching for needle_buffer. verbatim: If `True`, perform bitwise comparison when searching for `needle_buffer`.
If False, perform utils.ctypes_equal-based comparison. Default True. If `False`, perform `utils.ctypes_equal`-based comparison. Default `True`.
:return: List of addresses where the needle_buffer was found.
Returns:
List of addresses where the `needle_buffer` was found.
""" """
found = [] found = []
read_buffer = copy.copy(needle_buffer) read_buffer = copy.copy(needle_buffer)
@ -269,18 +308,26 @@ class Process(metaclass=ABCMeta):
found.append(address) found.append(address)
return found return found
def search_all_memory(self, needle_buffer: ctypes_buffer_t, writeable_only: bool=True, verbatim: bool=True) -> List[int]: def search_all_memory(
self,
needle_buffer: ctypes_buffer_t,
writeable_only: bool = True,
verbatim: bool = True,
) -> list[int]:
""" """
Search the entire memory space accessible to the process for the provided value. Search the entire memory space accessible to the process for the provided value.
:param needle_buffer: The value to search for. This should be a ctypes object of the same Args:
sorts as used by .read_memory(...), which will be compared to the contents of needle_buffer: The value to search for. This should be a ctypes object of the same
sorts as used by `.read_memory(...)`, which will be compared to the contents of
memory at each accessible address. memory at each accessible address.
:param writeable_only: If True, only search regions where the process has write access. writeable_only: If `True`, only search regions where the process has write access.
Default True. Default `True`.
:param verbatim: If True, perform bitwise comparison when searching for needle_buffer. verbatim: If `True`, perform bitwise comparison when searching for `needle_buffer`.
If False, perform utils.ctypes_equal-based comparison. Default True. If `False`, perform `utils.ctypes_equal-based` comparison. Default `True`.
:return: List of addresses where the needle_buffer was found.
Returns:
List of addresses where the `needle_buffer` was found.
""" """
found = [] found = []
if verbatim: if verbatim:
@ -294,20 +341,25 @@ class Process(metaclass=ABCMeta):
self.read_memory(start, region_buffer) self.read_memory(start, region_buffer)
found += [offset + start for offset in search(needle_buffer, region_buffer)] found += [offset + start for offset in search(needle_buffer, region_buffer)]
except OSError: except OSError:
logger.error('Failed to read in range 0x{} - 0x{}'.format(start, stop)) logger.exception(f'Failed to read in range 0x{start} - 0x{stop}')
return found return found
@classmethod @classmethod
@contextmanager @contextmanager
def open_process(cls, process_id: int) -> 'Process': def open_process(cls: type[Self], process_id: int) -> Generator[Self, None, None]:
""" """
Context manager which automatically closes the constructed Process: Context manager which automatically closes the constructed Process:
```
with Process.open_process(2394) as p: with Process.open_process(2394) as p:
# use p here # use p here
# no need to run p.close() # no need to run p.close()
```
:param process_id: Process id (pid), passed to the Process constructor. Args:
:return: Constructed Process object. process_id: Process id (pid), passed to the Process constructor.
Returns:
Constructed Process object.
""" """
process = cls(process_id) process = cls(process_id)
yield process yield process

View File

@ -2,7 +2,6 @@
Implementation of Process class for Linux Implementation of Process class for Linux
""" """
from typing import List, Tuple
from os import strerror from os import strerror
import os import os
import os.path import os.path
@ -10,23 +9,23 @@ import signal
import ctypes import ctypes
import ctypes.util import ctypes.util
import logging import logging
from pathlib import Path
from .abstract import Process as AbstractProcess from .abstract import Process as AbstractProcess
from .utils import ctypes_buffer_t, MemEditError from .utils import ctypes_buffer_t, MemEditError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ptrace_commands = { ptrace_commands = {
'PTRACE_GETREGS': 12, 'PTRACE_GETREGS': 12,
'PTRACE_SETREGS': 13, 'PTRACE_SETREGS': 13,
'PTRACE_ATTACH': 16, 'PTRACE_ATTACH': 16,
'PTRACE_DETACH': 17, 'PTRACE_DETACH': 17,
'PTRACE_SYSCALL': 24, 'PTRACE_SYSCALL': 24,
'PTRACE_SEIZE': 16902, 'PTRACE_SEIZE': 16902,
} }
# import ptrace() from libc # import ptrace() from libc
@ -36,52 +35,61 @@ _ptrace.argtypes = (ctypes.c_ulong,) * 4
_ptrace.restype = ctypes.c_long _ptrace.restype = ctypes.c_long
def ptrace(command: int, pid: int = 0, arg1: int = 0, arg2: int = 0) -> int: def ptrace(
command: int,
pid: int = 0,
arg1: int = 0,
arg2: int = 0,
) -> int:
""" """
Call ptrace() with the provided pid and arguments. See the ```man ptrace```. Call ptrace() with the provided pid and arguments. See `man ptrace`.
""" """
logger.debug('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2)) logger.debug(f'ptrace({command}, {pid}, {arg1}, {arg2})')
result = _ptrace(command, pid, arg1, arg2) result = _ptrace(command, pid, arg1, arg2)
if result == -1: if result == -1:
err_no = ctypes.get_errno() err_no = ctypes.get_errno()
if err_no: if err_no:
raise MemEditError('ptrace({}, {}, {}, {})'.format(command, pid, arg1, arg2) + raise MemEditError(f'ptrace({command}, {pid}, {arg1}, {arg2})'
' failed with error {}: {}'.format(err_no, strerror(err_no))) + f' failed with error {err_no}: {strerror(err_no)}')
return result return result
class Process(AbstractProcess): class Process(AbstractProcess):
pid = None pid: int | None
def __init__(self, process_id: int): def __init__(self, process_id: int) -> None:
ptrace(ptrace_commands['PTRACE_SEIZE'], process_id) ptrace(ptrace_commands['PTRACE_SEIZE'], process_id)
self.pid = process_id self.pid = process_id
def close(self): def close(self) -> None:
if self.pid is None:
return
os.kill(self.pid, signal.SIGSTOP) os.kill(self.pid, signal.SIGSTOP)
os.waitpid(self.pid, 0)
ptrace(ptrace_commands['PTRACE_DETACH'], self.pid, 0, 0) ptrace(ptrace_commands['PTRACE_DETACH'], self.pid, 0, 0)
os.kill(self.pid, signal.SIGCONT)
self.pid = None self.pid = None
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem: with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
mem.seek(base_address) mem.seek(base_address)
mem.write(write_buffer) mem.write(write_buffer)
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
with open('/proc/{}/mem'.format(self.pid), 'rb+') as mem: with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
mem.seek(base_address) mem.seek(base_address)
mem.readinto(read_buffer) mem.readinto(read_buffer)
return read_buffer return read_buffer
def get_path(self) -> str: def get_path(self) -> str | None:
try: try:
with open('/proc/{}/cmdline', 'rb') as f: with Path(f'/proc/{self.pid}/cmdline').open('rb') as ff:
return f.read().decode().split('\x00')[0] return ff.read().decode().split('\x00')[0]
except FileNotFoundError: except FileNotFoundError:
return '' return None
@staticmethod @staticmethod
def list_available_pids() -> List[int]: def list_available_pids() -> list[int]:
pids = [] pids = []
for pid_str in os.listdir('/proc'): for pid_str in os.listdir('/proc'):
try: try:
@ -91,26 +99,26 @@ class Process(AbstractProcess):
return pids return pids
@staticmethod @staticmethod
def get_pid_by_name(target_name: str) -> int or None: def get_pid_by_name(target_name: str) -> int | None:
for pid in Process.list_available_pids(): for pid in Process.list_available_pids():
try: try:
logger.info('Checking name for pid {}'.format(pid)) logger.debug(f'Checking name for pid {pid}')
with open('/proc/{}/cmdline'.format(pid), 'rb') as cmdline: with Path(f'/proc/{pid}/cmdline').open('rb') as cmdline:
path = cmdline.read().decode().split('\x00')[0] path = cmdline.read().decode().split('\x00')[0]
except FileNotFoundError: except FileNotFoundError:
continue continue
name = os.path.basename(path) name = Path(path).name
logger.info('Name was "{}"'.format(name)) logger.debug(f'Name was "{name}"')
if path is not None and name == target_name: if path is not None and name == target_name:
return pid return pid
logger.info('Found no process with name {}'.format(target_name)) logger.info(f'Found no process with name {target_name}')
return None return None
def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]: def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
regions = [] regions = []
with open('/proc/{}/maps'.format(self.pid), 'r') as maps: with Path(f'/proc/{self.pid}/maps').open('r') as maps:
for line in maps: for line in maps:
bounds, privileges = line.split()[0:2] bounds, privileges = line.split()[0:2]

View File

@ -11,25 +11,34 @@ Utility functions and types:
Check if two buffers (ctypes objects) store equal values: Check if two buffers (ctypes objects) store equal values:
ctypes_equal(a, b) ctypes_equal(a, b)
""" """
from typing import List
import ctypes import ctypes
ctypes_buffer_t = ctypes._SimpleCData or ctypes.Array or ctypes.Structure or ctypes.Union ctypes_buffer_t = (
ctypes._SimpleCData
| ctypes.Array
| ctypes.Structure
| ctypes.Union
)
class MemEditError(Exception): class MemEditError(Exception):
pass pass
def search_buffer_verbatim(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t) -> List[int]: def search_buffer_verbatim(
needle_buffer: ctypes_buffer_t,
haystack_buffer: ctypes_buffer_t,
) -> list[int]:
""" """
Search for a buffer inside another buffer, using a direct (bitwise) comparison Search for a buffer inside another buffer, using a direct (bitwise) comparison
:param needle_buffer: Buffer to search for. Args:
:param haystack_buffer: Buffer to search in. needle_buffer: Buffer to search for.
:return: List of offsets where the needle_buffer was found. haystack_buffer: Buffer to search in.
Returns:
List of offsets where the `needle_buffer` was found.
""" """
found = [] found = []
@ -45,14 +54,20 @@ def search_buffer_verbatim(needle_buffer: ctypes_buffer_t, haystack_buffer: ctyp
return found return found
def search_buffer(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t) -> List[int]: def search_buffer(
needle_buffer: ctypes_buffer_t,
haystack_buffer: ctypes_buffer_t,
) -> list[int]:
""" """
Search for a buffer inside another buffer, using ctypes_equal for comparison. Search for a buffer inside another buffer, using `ctypes_equal` for comparison.
Much slower than search_buffer_verbatim. Much slower than `search_buffer_verbatim`.
:param needle_buffer: Buffer to search for. Args:
:param haystack_buffer: Buffer to search in. needle_buffer: Buffer to search for.
:return: List of offsets where the needle_buffer was found. haystack_buffer: Buffer to search in.
Returns:
List of offsets where the needle_buffer was found.
""" """
found = [] found = []
read_type = type(needle_buffer) read_type = type(needle_buffer)
@ -63,11 +78,14 @@ def search_buffer(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer
return found return found
def ctypes_equal(a: ctypes_buffer_t, b: ctypes_buffer_t) -> bool: def ctypes_equal(
a: ctypes_buffer_t,
b: ctypes_buffer_t,
) -> bool:
""" """
Check if the values stored inside two ctypes buffers are equal. Check if the values stored inside two ctypes buffers are equal.
""" """
if not type(a) == type(b): if not type(a) == type(b): # noqa: E721
return False return False
if isinstance(a, ctypes.Array): if isinstance(a, ctypes.Array):
@ -75,10 +93,10 @@ def ctypes_equal(a: ctypes_buffer_t, b: ctypes_buffer_t) -> bool:
elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union): elif isinstance(a, ctypes.Structure) or isinstance(a, ctypes.Union):
for attr_name, attr_type in a._fields_: for attr_name, attr_type in a._fields_:
a_attr, b_attr = (getattr(x, attr_name) for x in (a, b)) a_attr, b_attr = (getattr(x, attr_name) for x in (a, b))
if isinstance(a, ctypes_buffer_t): if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
if not ctypes_equal(a_attr, b_attr): if not ctypes_equal(a_attr, b_attr):
return False return False
elif not a_attr == b_attr: elif a_attr != b_attr:
return False return False
return True return True

View File

@ -2,10 +2,9 @@
Implementation of Process class for Windows Implementation of Process class for Windows
""" """
from typing import List, Tuple
from math import floor from math import floor
from os import strerror from os import strerror
import os.path from pathlib import Path
import ctypes import ctypes
import ctypes.wintypes import ctypes.wintypes
import logging import logging
@ -14,94 +13,130 @@ from .abstract import Process as AbstractProcess
from .utils import ctypes_buffer_t, MemEditError from .utils import ctypes_buffer_t, MemEditError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Process handle privileges # Process handle privileges
privileges = { privileges = {
'PROCESS_QUERY_INFORMATION': 0x0400, 'PROCESS_QUERY_INFORMATION': 0x0400,
'PROCESS_VM_OPERATION': 0x0008, 'PROCESS_VM_OPERATION': 0x0008,
'PROCESS_VM_READ': 0x0010, 'PROCESS_VM_READ': 0x0010,
'PROCESS_VM_WRITE': 0x0020, 'PROCESS_VM_WRITE': 0x0020,
} }
privileges['PROCESS_RW'] = ( privileges['PROCESS_RW'] = (
privileges['PROCESS_QUERY_INFORMATION'] | privileges['PROCESS_QUERY_INFORMATION']
privileges['PROCESS_VM_OPERATION'] | | privileges['PROCESS_VM_OPERATION']
privileges['PROCESS_VM_READ'] | | privileges['PROCESS_VM_READ']
privileges['PROCESS_VM_WRITE'] | privileges['PROCESS_VM_WRITE']
) )
# Memory region states # Memory region states
mem_states = { mem_states = {
'MEM_COMMIT': 0x1000, 'MEM_COMMIT': 0x1000,
'MEM_FREE': 0x10000, 'MEM_FREE': 0x10000,
'MEM_RESERVE': 0x2000, 'MEM_RESERVE': 0x2000,
} }
# Memory region permissions # Memory region permissions
page_protections = { page_protections = {
'PAGE_EXECUTE': 0x10, 'PAGE_EXECUTE': 0x10,
'PAGE_EXECUTE_READ': 0x20, 'PAGE_EXECUTE_READ': 0x20,
'PAGE_EXECUTE_READWRITE': 0x40, 'PAGE_EXECUTE_READWRITE': 0x40,
'PAGE_EXECUTE_WRITECOPY': 0x80, 'PAGE_EXECUTE_WRITECOPY': 0x80,
'PAGE_NOACCESS': 0x01, 'PAGE_NOACCESS': 0x01,
'PAGE_READWRITE': 0x04, 'PAGE_READWRITE': 0x04,
'PAGE_WRITECOPY': 0x08, 'PAGE_WRITECOPY': 0x08,
} }
# Custom (combined) permissions # Custom (combined) permissions
page_protections['PAGE_READABLE'] = ( page_protections['PAGE_READABLE'] = (
page_protections['PAGE_EXECUTE_READ'] | page_protections['PAGE_EXECUTE_READ']
page_protections['PAGE_EXECUTE_READWRITE'] | | page_protections['PAGE_EXECUTE_READWRITE']
page_protections['PAGE_READWRITE'] | page_protections['PAGE_READWRITE']
) )
page_protections['PAGE_READWRITEABLE'] = ( page_protections['PAGE_READWRITEABLE'] = (
page_protections['PAGE_EXECUTE_READWRITE'] | page_protections['PAGE_EXECUTE_READWRITE']
page_protections['PAGE_READWRITE'] | page_protections['PAGE_READWRITE']
) )
# Memory types # Memory types
mem_types = { mem_types = {
'MEM_IMAGE': 0x1000000, 'MEM_IMAGE': 0x1000000,
'MEM_MAPPED': 0x40000, 'MEM_MAPPED': 0x40000,
'MEM_PRIVATE': 0x20000, 'MEM_PRIVATE': 0x20000,
} }
# C struct for VirtualQueryEx # C struct for VirtualQueryEx
class MEMORY_BASIC_INFORMATION(ctypes.Structure): class MEMORY_BASIC_INFORMATION32(ctypes.Structure):
_fields_ = [ _fields_ = [
('BaseAddress', ctypes.c_void_p), ('BaseAddress', ctypes.wintypes.DWORD),
('AllocationBase', ctypes.c_void_p), ('AllocationBase', ctypes.wintypes.DWORD),
('AllocationProtect', ctypes.wintypes.DWORD), ('AllocationProtect', ctypes.wintypes.DWORD),
('RegionSize', ctypes.wintypes.UINT), ('RegionSize', ctypes.wintypes.DWORD),
('State', ctypes.wintypes.DWORD), ('State', ctypes.wintypes.DWORD),
('Protect', ctypes.wintypes.DWORD), ('Protect', ctypes.wintypes.DWORD),
('Type', ctypes.wintypes.DWORD), ('Type', ctypes.wintypes.DWORD),
] ]
class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
_fields_ = [
('BaseAddress', ctypes.c_ulonglong),
('AllocationBase', ctypes.c_ulonglong),
('AllocationProtect', ctypes.wintypes.DWORD),
('__alignment1', ctypes.wintypes.DWORD),
('RegionSize', ctypes.c_ulonglong),
('State', ctypes.wintypes.DWORD),
('Protect', ctypes.wintypes.DWORD),
('Type', ctypes.wintypes.DWORD),
('__alignment2', ctypes.wintypes.DWORD),
]
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
MEMORY_BASIC_INFORMATION: type[ctypes.Structure]
if PTR_SIZE == 8: # 64-bit python
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
elif PTR_SIZE == 4: # 32-bit python
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION32
ctypes.windll.kernel32.VirtualQueryEx.argtypes = [
ctypes.wintypes.HANDLE,
ctypes.wintypes.LPCVOID,
ctypes.c_void_p,
ctypes.c_size_t]
ctypes.windll.kernel32.ReadProcessMemory.argtypes = [
ctypes.wintypes.HANDLE,
ctypes.wintypes.LPCVOID,
ctypes.c_void_p,
ctypes.c_size_t,
ctypes.c_void_p]
ctypes.windll.kernel32.WriteProcessMemory.argtypes = [
ctypes.wintypes.HANDLE,
ctypes.wintypes.LPCVOID,
ctypes.c_void_p,
ctypes.c_size_t,
ctypes.c_void_p]
# C struct for GetSystemInfo # C struct for GetSystemInfo
class SYSTEM_INFO(ctypes.Structure): class SYSTEM_INFO(ctypes.Structure):
_fields_ = [ _fields_ = [
('wProcessorArchitecture', ctypes.wintypes.WORD), ('wProcessorArchitecture', ctypes.wintypes.WORD),
('wReserved', ctypes.wintypes.WORD), ('wReserved', ctypes.wintypes.WORD),
('dwPageSize', ctypes.wintypes.DWORD), ('dwPageSize', ctypes.wintypes.DWORD),
('lpMinimumApplicationAddress', ctypes.c_void_p), ('lpMinimumApplicationAddress', ctypes.c_void_p),
('lpMaximumApplicationAddress', ctypes.c_void_p), ('lpMaximumApplicationAddress', ctypes.c_void_p),
('dwActiveProcessorMask', ctypes.wintypes.DWORD), ('dwActiveProcessorMask', ctypes.c_void_p),
('dwNumberOfProcessors', ctypes.wintypes.DWORD), ('dwNumberOfProcessors', ctypes.wintypes.DWORD),
('dwProcessorType', ctypes.wintypes.DWORD), ('dwProcessorType', ctypes.wintypes.DWORD),
('dwAllocationGranularity', ctypes.wintypes.DWORD), ('dwAllocationGranularity', ctypes.wintypes.DWORD),
('wProcessorLevel', ctypes.wintypes.WORD), ('wProcessorLevel', ctypes.wintypes.WORD),
('wProcessorRevision', ctypes.wintypes.WORD), ('wProcessorRevision', ctypes.wintypes.WORD),
] ]
class Process(AbstractProcess): class Process(AbstractProcess):
process_handle = None process_handle: int | None
def __init__(self, process_id: int): def __init__(self, process_id: int) -> None:
process_handle = ctypes.windll.kernel32.OpenProcess( process_handle = ctypes.windll.kernel32.OpenProcess(
privileges['PROCESS_RW'], privileges['PROCESS_RW'],
False, False,
@ -109,107 +144,108 @@ class Process(AbstractProcess):
) )
if not process_handle: if not process_handle:
raise MemEditError('Couldn\'t open process {}'.format(process_id)) raise MemEditError(f'Couldn\'t open process {process_id}')
self.process_handle = process_handle self.process_handle = process_handle
def close(self): def close(self) -> None:
ctypes.windll.kernel32.CloseHandle(self.process_handle) ctypes.windll.kernel32.CloseHandle(self.process_handle)
self.process_handle = None self.process_handle = None
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t): def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
try: try:
ctypes.windll.kernel32.WriteProcessMemory( ctypes.windll.kernel32.WriteProcessMemory(
self.process_handle, self.process_handle,
base_address, base_address,
ctypes.byref(write_buffer), ctypes.byref(write_buffer),
ctypes.sizeof(write_buffer), ctypes.sizeof(write_buffer),
None None
) )
except (BufferError, ValueError, TypeError): except (BufferError, ValueError, TypeError) as err:
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error())) raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t: def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
try: try:
ctypes.windll.kernel32.ReadProcessMemory( ctypes.windll.kernel32.ReadProcessMemory(
self.process_handle, self.process_handle,
base_address, base_address,
ctypes.byref(read_buffer), ctypes.byref(read_buffer),
ctypes.sizeof(read_buffer), ctypes.sizeof(read_buffer),
None None
) )
except (BufferError, ValueError, TypeError): except (BufferError, ValueError, TypeError) as err:
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error())) raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
return read_buffer return read_buffer
@staticmethod @staticmethod
def _get_last_error() -> Tuple[int, str]: def _get_last_error() -> tuple[int, str]:
err = ctypes.windll.kernel32.GetLastError() err = ctypes.windll.kernel32.GetLastError()
return err, strerror(err) return err, strerror(err)
def get_path(self) -> str: def get_path(self) -> str | None:
max_path_len = 260 max_path_len = 260
name_buffer = (ctypes.c_char * max_path_len)() name_buffer = (ctypes.c_char * max_path_len)()
rval = ctypes.windll.psapi.GetProcessImageFileNameA( rval = ctypes.windll.psapi.GetProcessImageFileNameA(
self.process_handle, self.process_handle,
name_buffer, name_buffer,
max_path_len max_path_len,
) )
if rval > 0: if rval <= 0:
return name_buffer.value.decode()
else:
return None return None
return name_buffer.value.decode()
@staticmethod @staticmethod
def list_available_pids() -> List[int]: def list_available_pids() -> list[int]:
# According to EnumProcesses docs, you can't find out how many processes there are before # According to EnumProcesses docs, you can't find out how many processes there are before
# fetching the list. As a result, we grab 100 on the first try, and if we get a full list # fetching the list. As a result, we grab 100 on the first try, and if we get a full list
# of 100, repeatedly double the number until we get fewer than we asked for. # of 100, repeatedly double the number until we get fewer than we asked for.
n = 100 nn = 100
returned_size = ctypes.wintypes.DWORD() returned_size = ctypes.wintypes.DWORD()
returned_size_ptr = ctypes.byref(returned_size) returned_size_ptr = ctypes.byref(returned_size)
while True: while True:
pids = (ctypes.wintypes.DWORD * n)() pids = (ctypes.wintypes.DWORD * nn)()
size = ctypes.sizeof(pids) size = ctypes.sizeof(pids)
pids_ptr = ctypes.byref(pids) pids_ptr = ctypes.byref(pids)
success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr) success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr)
if not success: if not success:
raise MemEditError('Failed to enumerate processes: n={}'.format(n)) raise MemEditError(f'Failed to enumerate processes: nn={nn}')
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD)) num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
if n == num_returned: if nn != num_returned:
n *= 2
continue
else:
break break
nn *= 2
return pids[:num_returned] return pids[:num_returned]
@staticmethod @staticmethod
def get_pid_by_name(target_name: str) -> int or None: def get_pid_by_name(target_name: str) -> int | None:
for pid in Process.list_available_pids(): for pid in Process.list_available_pids():
try: try:
logger.info('Checking name for pid {}'.format(pid)) logger.debug(f'Checking name for pid {pid}')
with Process.open_process(pid) as process: with Process.open_process(pid) as process:
path = process.get_path() path = process.get_path()
if path is None:
continue
name = os.path.basename(path) name = Path(path).name
logger.info('Name was "{}"'.format(name)) logger.debug(f'Name was "{name}"')
if path is not None and name == target_name: if path is not None and name == target_name:
return pid return pid
except ValueError: except ValueError:
pass pass
except MemEditError as err:
logger.debug(repr(err))
logger.info('Found no process with name {}'.format(target_name)) logger.info(f'Found no process with name {target_name}')
return None return None
def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]: def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
sys_info = SYSTEM_INFO() sys_info = SYSTEM_INFO()
sys_info_ptr = ctypes.byref(sys_info) sys_info_ptr = ctypes.byref(sys_info)
ctypes.windll.kernel32.GetSystemInfo(sys_info_ptr) ctypes.windll.kernel32.GetSystemInfo(sys_info_ptr)
@ -217,7 +253,7 @@ class Process(AbstractProcess):
start = sys_info.lpMinimumApplicationAddress start = sys_info.lpMinimumApplicationAddress
stop = sys_info.lpMaximumApplicationAddress stop = sys_info.lpMaximumApplicationAddress
def get_mem_info(address): def get_mem_info(address: int) -> MEMORY_BASIC_INFORMATION:
""" """
Query the memory region starting at or before 'address' to get its size/type/state/permissions. Query the memory region starting at or before 'address' to get its size/type/state/permissions.
""" """
@ -226,18 +262,16 @@ class Process(AbstractProcess):
mbi_size = ctypes.sizeof(mbi) mbi_size = ctypes.sizeof(mbi)
success = ctypes.windll.kernel32.VirtualQueryEx( success = ctypes.windll.kernel32.VirtualQueryEx(
self.process_handle, self.process_handle,
address, address,
mbi_ptr, mbi_ptr,
mbi_size, mbi_size)
)
if success != mbi_size: if success != mbi_size:
if success == 0: if success == 0:
raise MemEditError('Failed VirtualQueryEx with handle ' + raise MemEditError('Failed VirtualQueryEx with handle '
'{}: {}'.format(self.process_handle, self._get_last_error())) + f'{self.process_handle}: {self._get_last_error()}')
else: raise MemEditError('VirtualQueryEx output too short!')
raise MemEditError('VirtualQueryEx output too short!')
return mbi return mbi
@ -245,10 +279,11 @@ class Process(AbstractProcess):
page_ptr = start page_ptr = start
while page_ptr < stop: while page_ptr < stop:
page_info = get_mem_info(page_ptr) page_info = get_mem_info(page_ptr)
if page_info.Type == mem_types['MEM_PRIVATE'] and \ if (page_info.Type == mem_types['MEM_PRIVATE']
page_info.State == mem_states['MEM_COMMIT'] and \ and page_info.State == mem_states['MEM_COMMIT']
page_info.Protect & page_protections['PAGE_READABLE'] != 0 and \ and page_info.Protect & page_protections['PAGE_READABLE'] != 0
(page_info.Protect & page_protections['PAGE_READWRITEABLE'] != 0 or not writeable_only): and (page_info.Protect & page_protections['PAGE_READWRITEABLE'] != 0
or not writeable_only)):
regions.append((page_ptr, page_ptr + page_info.RegionSize)) regions.append((page_ptr, page_ptr + page_info.RegionSize))
page_ptr += page_info.RegionSize page_ptr += page_info.RegionSize

87
pyproject.toml Normal file
View File

@ -0,0 +1,87 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mem_edit"
description = "Multi-platform library for memory editing"
readme = "README.md"
license = { file = "LICENSE.md" }
authors = [
{ name="Jan Petykiewicz", email="jan@mpxd.net" },
]
homepage = "https://mpxd.net/code/jan/mem_edit"
repository = "https://mpxd.net/code/jan/mem_edit"
keywords = [
"memory",
"edit",
"editing",
"ReadProcessMemory",
"WriteProcessMemory",
"proc",
"mem",
"ptrace",
"multiplatform",
"scan",
"scanner",
"search",
"debug",
"cheat",
"trainer",
]
classifiers = [
"Programming Language :: Python :: 3",
"Development Status :: 5 - Production/Stable",
"Environment :: Other Environment",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU Affero General Public License v3",
"Operating System :: POSIX :: Linux",
"Operating System :: Microsoft :: Windows",
"Topic :: Software Development",
"Topic :: Software Development :: Debuggers",
"Topic :: Software Development :: Testing",
"Topic :: System",
"Topic :: Games/Entertainment",
"Topic :: Utilities",
]
requires-python = ">=3.11"
dynamic = ["version"]
dependencies = [
]
[tool.hatch.version]
path = "mem_edit/__init__.py"
[tool.ruff]
exclude = [
".git",
"dist",
]
line-length = 145
indent-width = 4
lint.dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
lint.select = [
"NPY", "E", "F", "W", "B", "ANN", "UP", "SLOT", "SIM", "LOG",
"C4", "ISC", "PIE", "PT", "RET", "TCH", "PTH", "INT",
"ARG", "PL", "R", "TRY",
"G010", "G101", "G201", "G202",
"Q002", "Q003", "Q004",
]
lint.ignore = [
#"ANN001", # No annotation
"ANN002", # *args
"ANN003", # **kwargs
"ANN401", # Any
"ANN101", # self: Self
"SIM108", # single-line if / else assignment
"RET504", # x=y+z; return x
"PIE790", # unnecessary pass
"ISC003", # non-implicit string concatenation
"C408", # dict(x=y) instead of {'x': y}
"PLR09", # Too many xxx
"PLR2004", # magic number
"PLC0414", # import x as x
"TRY003", # Long exception message
]

View File

@ -1,61 +0,0 @@
#!/usr/bin/env python3
from setuptools import setup, find_packages
with open('README.md', 'r') as f:
long_description = f.read()
with open('mem_edit/VERSION', 'r') as f:
version = f.read().strip()
setup(name='mem_edit',
version=version,
description='Multi-platform library for memory editing',
long_description=long_description,
long_description_content_type='text/markdown',
author='Jan Petykiewicz',
author_email='anewusername@gmail.com',
url='https://mpxd.net/code/jan/mem_edit',
keywords=[
'memory',
'edit',
'editing',
'ReadProcessMemory',
'WriteProcessMemory',
'proc',
'mem',
'ptrace',
'multiplatform',
'scan',
'scanner',
'search',
'debug',
'cheat',
'trainer',
],
classifiers=[
'Programming Language :: Python :: 3',
'Development Status :: 4 - Beta',
'Environment :: Other Environment',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU Affero General Public License v3',
'Operating System :: POSIX :: Linux',
'Operating System :: Microsoft :: Windows',
'Topic :: Software Development',
'Topic :: Software Development :: Debuggers',
'Topic :: Software Development :: Testing',
'Topic :: System',
'Topic :: Games/Entertainment',
'Topic :: Utilities',
],
packages=find_packages(),
package_data={
'mem_edit': ['VERSION']
},
install_requires=[
'typing',
],
extras_require={
},
)