Compare commits
4 Commits
42c69867b8
...
8f7955543c
Author | SHA1 | Date | |
---|---|---|---|
8f7955543c | |||
5a82f04f9e | |||
5f2b1a432e | |||
fec96b92a7 |
@ -1,8 +1,8 @@
|
|||||||
"""
|
"""
|
||||||
Abstract class for cross-platform memory editing.
|
Abstract class for cross-platform memory editing.
|
||||||
"""
|
"""
|
||||||
|
from typing import Self
|
||||||
from typing import Generator
|
from collections.abc import Generator
|
||||||
from abc import ABCMeta, abstractmethod
|
from abc import ABCMeta, abstractmethod
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
import copy
|
import copy
|
||||||
@ -341,12 +341,12 @@ class Process(metaclass=ABCMeta):
|
|||||||
self.read_memory(start, region_buffer)
|
self.read_memory(start, region_buffer)
|
||||||
found += [offset + start for offset in search(needle_buffer, region_buffer)]
|
found += [offset + start for offset in search(needle_buffer, region_buffer)]
|
||||||
except OSError:
|
except OSError:
|
||||||
logger.error('Failed to read in range 0x{} - 0x{}'.format(start, stop))
|
logger.exception(f'Failed to read in range 0x{start} - 0x{stop}')
|
||||||
return found
|
return found
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def open_process(cls, process_id: int) -> Generator['Process', None, None]:
|
def open_process(cls: type[Self], process_id: int) -> Generator[Self, None, None]:
|
||||||
"""
|
"""
|
||||||
Context manager which automatically closes the constructed Process:
|
Context manager which automatically closes the constructed Process:
|
||||||
```
|
```
|
||||||
|
@ -9,6 +9,7 @@ import signal
|
|||||||
import ctypes
|
import ctypes
|
||||||
import ctypes.util
|
import ctypes.util
|
||||||
import logging
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from .abstract import Process as AbstractProcess
|
from .abstract import Process as AbstractProcess
|
||||||
from .utils import ctypes_buffer_t, MemEditError
|
from .utils import ctypes_buffer_t, MemEditError
|
||||||
@ -70,19 +71,19 @@ class Process(AbstractProcess):
|
|||||||
self.pid = None
|
self.pid = None
|
||||||
|
|
||||||
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
|
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
|
||||||
with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
|
with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
|
||||||
mem.seek(base_address)
|
mem.seek(base_address)
|
||||||
mem.write(write_buffer)
|
mem.write(write_buffer)
|
||||||
|
|
||||||
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
||||||
with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
|
with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
|
||||||
mem.seek(base_address)
|
mem.seek(base_address)
|
||||||
mem.readinto(read_buffer)
|
mem.readinto(read_buffer)
|
||||||
return read_buffer
|
return read_buffer
|
||||||
|
|
||||||
def get_path(self) -> str | None:
|
def get_path(self) -> str | None:
|
||||||
try:
|
try:
|
||||||
with open(f'/proc/{self.pid}/cmdline', 'rb') as ff:
|
with Path(f'/proc/{self.pid}/cmdline').open('rb') as ff:
|
||||||
return ff.read().decode().split('\x00')[0]
|
return ff.read().decode().split('\x00')[0]
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
return None
|
return None
|
||||||
@ -102,12 +103,12 @@ class Process(AbstractProcess):
|
|||||||
for pid in Process.list_available_pids():
|
for pid in Process.list_available_pids():
|
||||||
try:
|
try:
|
||||||
logger.debug(f'Checking name for pid {pid}')
|
logger.debug(f'Checking name for pid {pid}')
|
||||||
with open(f'/proc/{pid}/cmdline', 'rb') as cmdline:
|
with Path(f'/proc/{pid}/cmdline').open('rb') as cmdline:
|
||||||
path = cmdline.read().decode().split('\x00')[0]
|
path = cmdline.read().decode().split('\x00')[0]
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
name = os.path.basename(path)
|
name = Path(path).name
|
||||||
logger.debug(f'Name was "{name}"')
|
logger.debug(f'Name was "{name}"')
|
||||||
if path is not None and name == target_name:
|
if path is not None and name == target_name:
|
||||||
return pid
|
return pid
|
||||||
@ -117,7 +118,7 @@ class Process(AbstractProcess):
|
|||||||
|
|
||||||
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
|
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
|
||||||
regions = []
|
regions = []
|
||||||
with open(f'/proc/{self.pid}/maps', 'r') as maps:
|
with Path(f'/proc/{self.pid}/maps').open('r') as maps:
|
||||||
for line in maps:
|
for line in maps:
|
||||||
bounds, privileges = line.split()[0:2]
|
bounds, privileges = line.split()[0:2]
|
||||||
|
|
||||||
|
@ -11,16 +11,15 @@ Utility functions and types:
|
|||||||
Check if two buffers (ctypes objects) store equal values:
|
Check if two buffers (ctypes objects) store equal values:
|
||||||
ctypes_equal(a, b)
|
ctypes_equal(a, b)
|
||||||
"""
|
"""
|
||||||
from typing import Union
|
|
||||||
import ctypes
|
import ctypes
|
||||||
|
|
||||||
|
|
||||||
ctypes_buffer_t = Union[
|
ctypes_buffer_t = (
|
||||||
ctypes._SimpleCData,
|
ctypes._SimpleCData
|
||||||
ctypes.Array,
|
| ctypes.Array
|
||||||
ctypes.Structure,
|
| ctypes.Structure
|
||||||
ctypes.Union,
|
| ctypes.Union
|
||||||
]
|
)
|
||||||
|
|
||||||
|
|
||||||
class MemEditError(Exception):
|
class MemEditError(Exception):
|
||||||
@ -97,7 +96,7 @@ def ctypes_equal(
|
|||||||
if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
|
if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
|
||||||
if not ctypes_equal(a_attr, b_attr):
|
if not ctypes_equal(a_attr, b_attr):
|
||||||
return False
|
return False
|
||||||
elif not a_attr == b_attr:
|
elif a_attr != b_attr:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
@ -4,7 +4,7 @@ Implementation of Process class for Windows
|
|||||||
|
|
||||||
from math import floor
|
from math import floor
|
||||||
from os import strerror
|
from os import strerror
|
||||||
import os.path
|
from pathlib import Path
|
||||||
import ctypes
|
import ctypes
|
||||||
import ctypes.wintypes
|
import ctypes.wintypes
|
||||||
import logging
|
import logging
|
||||||
@ -92,7 +92,7 @@ class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
|
|||||||
|
|
||||||
|
|
||||||
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
|
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
|
||||||
MEMORY_BASIC_INFORMATION: ctypes.Structure
|
MEMORY_BASIC_INFORMATION: type[ctypes.Structure]
|
||||||
if PTR_SIZE == 8: # 64-bit python
|
if PTR_SIZE == 8: # 64-bit python
|
||||||
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
|
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
|
||||||
elif PTR_SIZE == 4: # 32-bit python
|
elif PTR_SIZE == 4: # 32-bit python
|
||||||
@ -161,8 +161,8 @@ class Process(AbstractProcess):
|
|||||||
ctypes.sizeof(write_buffer),
|
ctypes.sizeof(write_buffer),
|
||||||
None
|
None
|
||||||
)
|
)
|
||||||
except (BufferError, ValueError, TypeError):
|
except (BufferError, ValueError, TypeError) as err:
|
||||||
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
|
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
|
||||||
|
|
||||||
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
||||||
try:
|
try:
|
||||||
@ -173,8 +173,8 @@ class Process(AbstractProcess):
|
|||||||
ctypes.sizeof(read_buffer),
|
ctypes.sizeof(read_buffer),
|
||||||
None
|
None
|
||||||
)
|
)
|
||||||
except (BufferError, ValueError, TypeError):
|
except (BufferError, ValueError, TypeError) as err:
|
||||||
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
|
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
|
||||||
|
|
||||||
return read_buffer
|
return read_buffer
|
||||||
|
|
||||||
@ -192,10 +192,9 @@ class Process(AbstractProcess):
|
|||||||
max_path_len,
|
max_path_len,
|
||||||
)
|
)
|
||||||
|
|
||||||
if rval > 0:
|
if rval <= 0:
|
||||||
return name_buffer.value.decode()
|
|
||||||
else:
|
|
||||||
return None
|
return None
|
||||||
|
return name_buffer.value.decode()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def list_available_pids() -> list[int]:
|
def list_available_pids() -> list[int]:
|
||||||
@ -218,11 +217,9 @@ class Process(AbstractProcess):
|
|||||||
|
|
||||||
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
|
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
|
||||||
|
|
||||||
if nn == num_returned:
|
if nn != num_returned:
|
||||||
nn *= 2
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
break
|
break
|
||||||
|
nn *= 2
|
||||||
|
|
||||||
return pids[:num_returned]
|
return pids[:num_returned]
|
||||||
|
|
||||||
@ -236,7 +233,7 @@ class Process(AbstractProcess):
|
|||||||
if path is None:
|
if path is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
name = os.path.basename(path)
|
name = Path(path).name
|
||||||
logger.debug(f'Name was "{name}"')
|
logger.debug(f'Name was "{name}"')
|
||||||
if path is not None and name == target_name:
|
if path is not None and name == target_name:
|
||||||
return pid
|
return pid
|
||||||
@ -256,7 +253,7 @@ class Process(AbstractProcess):
|
|||||||
start = sys_info.lpMinimumApplicationAddress
|
start = sys_info.lpMinimumApplicationAddress
|
||||||
stop = sys_info.lpMaximumApplicationAddress
|
stop = sys_info.lpMaximumApplicationAddress
|
||||||
|
|
||||||
def get_mem_info(address):
|
def get_mem_info(address: int) -> MEMORY_BASIC_INFORMATION:
|
||||||
"""
|
"""
|
||||||
Query the memory region starting at or before 'address' to get its size/type/state/permissions.
|
Query the memory region starting at or before 'address' to get its size/type/state/permissions.
|
||||||
"""
|
"""
|
||||||
@ -274,7 +271,6 @@ class Process(AbstractProcess):
|
|||||||
if success == 0:
|
if success == 0:
|
||||||
raise MemEditError('Failed VirtualQueryEx with handle '
|
raise MemEditError('Failed VirtualQueryEx with handle '
|
||||||
+ f'{self.process_handle}: {self._get_last_error()}')
|
+ f'{self.process_handle}: {self._get_last_error()}')
|
||||||
else:
|
|
||||||
raise MemEditError('VirtualQueryEx output too short!')
|
raise MemEditError('VirtualQueryEx output too short!')
|
||||||
|
|
||||||
return mbi
|
return mbi
|
||||||
|
Loading…
Reference in New Issue
Block a user