|
|
|
@ -2,7 +2,6 @@
|
|
|
|
|
Implementation of Process class for Windows
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from typing import List, Tuple, Optional
|
|
|
|
|
from math import floor
|
|
|
|
|
from os import strerror
|
|
|
|
|
import os.path
|
|
|
|
@ -25,10 +24,10 @@ privileges = {
|
|
|
|
|
'PROCESS_VM_WRITE': 0x0020,
|
|
|
|
|
}
|
|
|
|
|
privileges['PROCESS_RW'] = (
|
|
|
|
|
privileges['PROCESS_QUERY_INFORMATION'] |
|
|
|
|
|
privileges['PROCESS_VM_OPERATION'] |
|
|
|
|
|
privileges['PROCESS_VM_READ'] |
|
|
|
|
|
privileges['PROCESS_VM_WRITE']
|
|
|
|
|
privileges['PROCESS_QUERY_INFORMATION']
|
|
|
|
|
| privileges['PROCESS_VM_OPERATION']
|
|
|
|
|
| privileges['PROCESS_VM_READ']
|
|
|
|
|
| privileges['PROCESS_VM_WRITE']
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Memory region states
|
|
|
|
@ -50,13 +49,13 @@ page_protections = {
|
|
|
|
|
}
|
|
|
|
|
# Custom (combined) permissions
|
|
|
|
|
page_protections['PAGE_READABLE'] = (
|
|
|
|
|
page_protections['PAGE_EXECUTE_READ'] |
|
|
|
|
|
page_protections['PAGE_EXECUTE_READWRITE'] |
|
|
|
|
|
page_protections['PAGE_READWRITE']
|
|
|
|
|
page_protections['PAGE_EXECUTE_READ']
|
|
|
|
|
| page_protections['PAGE_EXECUTE_READWRITE']
|
|
|
|
|
| page_protections['PAGE_READWRITE']
|
|
|
|
|
)
|
|
|
|
|
page_protections['PAGE_READWRITEABLE'] = (
|
|
|
|
|
page_protections['PAGE_EXECUTE_READWRITE'] |
|
|
|
|
|
page_protections['PAGE_READWRITE']
|
|
|
|
|
page_protections['PAGE_EXECUTE_READWRITE']
|
|
|
|
|
| page_protections['PAGE_READWRITE']
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Memory types
|
|
|
|
@ -91,7 +90,9 @@ class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
|
|
|
|
|
('__alignment2', ctypes.wintypes.DWORD),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
|
|
|
|
|
MEMORY_BASIC_INFORMATION: ctypes.Structure
|
|
|
|
|
if PTR_SIZE == 8: # 64-bit python
|
|
|
|
|
MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
|
|
|
|
|
elif PTR_SIZE == 4: # 32-bit python
|
|
|
|
@ -133,9 +134,9 @@ class SYSTEM_INFO(ctypes.Structure):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Process(AbstractProcess):
|
|
|
|
|
process_handle = None
|
|
|
|
|
process_handle: int | None
|
|
|
|
|
|
|
|
|
|
def __init__(self, process_id: int):
|
|
|
|
|
def __init__(self, process_id: int) -> None:
|
|
|
|
|
process_handle = ctypes.windll.kernel32.OpenProcess(
|
|
|
|
|
privileges['PROCESS_RW'],
|
|
|
|
|
False,
|
|
|
|
@ -143,15 +144,15 @@ class Process(AbstractProcess):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not process_handle:
|
|
|
|
|
raise MemEditError('Couldn\'t open process {}'.format(process_id))
|
|
|
|
|
raise MemEditError(f'Couldn\'t open process {process_id}')
|
|
|
|
|
|
|
|
|
|
self.process_handle = process_handle
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
def close(self) -> None:
|
|
|
|
|
ctypes.windll.kernel32.CloseHandle(self.process_handle)
|
|
|
|
|
self.process_handle = None
|
|
|
|
|
|
|
|
|
|
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
|
|
|
|
|
def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
|
|
|
|
|
try:
|
|
|
|
|
ctypes.windll.kernel32.WriteProcessMemory(
|
|
|
|
|
self.process_handle,
|
|
|
|
@ -161,7 +162,7 @@ class Process(AbstractProcess):
|
|
|
|
|
None
|
|
|
|
|
)
|
|
|
|
|
except (BufferError, ValueError, TypeError):
|
|
|
|
|
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error()))
|
|
|
|
|
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
|
|
|
|
|
|
|
|
|
|
def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
|
|
|
|
|
try:
|
|
|
|
@ -173,22 +174,23 @@ class Process(AbstractProcess):
|
|
|
|
|
None
|
|
|
|
|
)
|
|
|
|
|
except (BufferError, ValueError, TypeError):
|
|
|
|
|
raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error()))
|
|
|
|
|
raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
|
|
|
|
|
|
|
|
|
|
return read_buffer
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _get_last_error() -> Tuple[int, str]:
|
|
|
|
|
def _get_last_error() -> tuple[int, str]:
|
|
|
|
|
err = ctypes.windll.kernel32.GetLastError()
|
|
|
|
|
return err, strerror(err)
|
|
|
|
|
|
|
|
|
|
def get_path(self) -> str:
|
|
|
|
|
def get_path(self) -> str | None:
|
|
|
|
|
max_path_len = 260
|
|
|
|
|
name_buffer = (ctypes.c_char * max_path_len)()
|
|
|
|
|
rval = ctypes.windll.psapi.GetProcessImageFileNameA(
|
|
|
|
|
self.process_handle,
|
|
|
|
|
name_buffer,
|
|
|
|
|
max_path_len)
|
|
|
|
|
self.process_handle,
|
|
|
|
|
name_buffer,
|
|
|
|
|
max_path_len,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if rval > 0:
|
|
|
|
|
return name_buffer.value.decode()
|
|
|
|
@ -196,28 +198,28 @@ class Process(AbstractProcess):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def list_available_pids() -> List[int]:
|
|
|
|
|
def list_available_pids() -> list[int]:
|
|
|
|
|
# According to EnumProcesses docs, you can't find out how many processes there are before
|
|
|
|
|
# fetching the list. As a result, we grab 100 on the first try, and if we get a full list
|
|
|
|
|
# of 100, repeatedly double the number until we get fewer than we asked for.
|
|
|
|
|
|
|
|
|
|
n = 100
|
|
|
|
|
nn = 100
|
|
|
|
|
returned_size = ctypes.wintypes.DWORD()
|
|
|
|
|
returned_size_ptr = ctypes.byref(returned_size)
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
pids = (ctypes.wintypes.DWORD * n)()
|
|
|
|
|
pids = (ctypes.wintypes.DWORD * nn)()
|
|
|
|
|
size = ctypes.sizeof(pids)
|
|
|
|
|
pids_ptr = ctypes.byref(pids)
|
|
|
|
|
|
|
|
|
|
success = ctypes.windll.Psapi.EnumProcesses(pids_ptr, size, returned_size_ptr)
|
|
|
|
|
if not success:
|
|
|
|
|
raise MemEditError('Failed to enumerate processes: n={}'.format(n))
|
|
|
|
|
raise MemEditError(f'Failed to enumerate processes: nn={nn}')
|
|
|
|
|
|
|
|
|
|
num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
|
|
|
|
|
|
|
|
|
|
if n == num_returned:
|
|
|
|
|
n *= 2
|
|
|
|
|
if nn == num_returned:
|
|
|
|
|
nn *= 2
|
|
|
|
|
continue
|
|
|
|
|
else:
|
|
|
|
|
break
|
|
|
|
@ -225,15 +227,17 @@ class Process(AbstractProcess):
|
|
|
|
|
return pids[:num_returned]
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def get_pid_by_name(target_name: str) -> Optional[int]:
|
|
|
|
|
def get_pid_by_name(target_name: str) -> int | None:
|
|
|
|
|
for pid in Process.list_available_pids():
|
|
|
|
|
try:
|
|
|
|
|
logger.debug('Checking name for pid {}'.format(pid))
|
|
|
|
|
logger.debug(f'Checking name for pid {pid}')
|
|
|
|
|
with Process.open_process(pid) as process:
|
|
|
|
|
path = process.get_path()
|
|
|
|
|
if path is None:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
name = os.path.basename(path)
|
|
|
|
|
logger.debug('Name was "{}"'.format(name))
|
|
|
|
|
logger.debug(f'Name was "{name}"')
|
|
|
|
|
if path is not None and name == target_name:
|
|
|
|
|
return pid
|
|
|
|
|
except ValueError:
|
|
|
|
@ -241,10 +245,10 @@ class Process(AbstractProcess):
|
|
|
|
|
except MemEditError as err:
|
|
|
|
|
logger.debug(repr(err))
|
|
|
|
|
|
|
|
|
|
logger.info('Found no process with name {}'.format(target_name))
|
|
|
|
|
logger.info(f'Found no process with name {target_name}')
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def list_mapped_regions(self, writeable_only: bool = True) -> List[Tuple[int, int]]:
|
|
|
|
|
def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
|
|
|
|
|
sys_info = SYSTEM_INFO()
|
|
|
|
|
sys_info_ptr = ctypes.byref(sys_info)
|
|
|
|
|
ctypes.windll.kernel32.GetSystemInfo(sys_info_ptr)
|
|
|
|
@ -268,8 +272,8 @@ class Process(AbstractProcess):
|
|
|
|
|
|
|
|
|
|
if success != mbi_size:
|
|
|
|
|
if success == 0:
|
|
|
|
|
raise MemEditError('Failed VirtualQueryEx with handle ' +
|
|
|
|
|
'{}: {}'.format(self.process_handle, self._get_last_error()))
|
|
|
|
|
raise MemEditError('Failed VirtualQueryEx with handle '
|
|
|
|
|
+ f'{self.process_handle}: {self._get_last_error()}')
|
|
|
|
|
else:
|
|
|
|
|
raise MemEditError('VirtualQueryEx output too short!')
|
|
|
|
|
|
|
|
|
|