Compare commits
	
		
			4 Commits
		
	
	
		
			42c69867b8
			...
			8f7955543c
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 8f7955543c | |||
| 5a82f04f9e | |||
| 5f2b1a432e | |||
| fec96b92a7 | 
@ -1,8 +1,8 @@
 | 
			
		||||
"""
 | 
			
		||||
Abstract class for cross-platform memory editing.
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
from typing import Generator
 | 
			
		||||
from typing import Self
 | 
			
		||||
from collections.abc import Generator
 | 
			
		||||
from abc import ABCMeta, abstractmethod
 | 
			
		||||
from contextlib import contextmanager
 | 
			
		||||
import copy
 | 
			
		||||
@ -341,12 +341,12 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
                self.read_memory(start, region_buffer)
 | 
			
		||||
                found += [offset + start for offset in search(needle_buffer, region_buffer)]
 | 
			
		||||
            except OSError:
 | 
			
		||||
                logger.error('Failed to read in range  0x{} - 0x{}'.format(start, stop))
 | 
			
		||||
                logger.exception(f'Failed to read in range  0x{start} - 0x{stop}')
 | 
			
		||||
        return found
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    @contextmanager
 | 
			
		||||
    def open_process(cls, process_id: int) -> Generator['Process', None, None]:
 | 
			
		||||
    def open_process(cls: type[Self], process_id: int) -> Generator[Self, None, None]:
 | 
			
		||||
        """
 | 
			
		||||
        Context manager which automatically closes the constructed Process:
 | 
			
		||||
        ```
 | 
			
		||||
 | 
			
		||||
@ -9,6 +9,7 @@ import signal
 | 
			
		||||
import ctypes
 | 
			
		||||
import ctypes.util
 | 
			
		||||
import logging
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
from .abstract import Process as AbstractProcess
 | 
			
		||||
from .utils import ctypes_buffer_t, MemEditError
 | 
			
		||||
@ -70,19 +71,19 @@ class Process(AbstractProcess):
 | 
			
		||||
        self.pid = None
 | 
			
		||||
 | 
			
		||||
    def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t) -> None:
 | 
			
		||||
        with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
 | 
			
		||||
        with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
 | 
			
		||||
            mem.seek(base_address)
 | 
			
		||||
            mem.write(write_buffer)
 | 
			
		||||
 | 
			
		||||
    def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
 | 
			
		||||
        with open(f'/proc/{self.pid}/mem', 'rb+') as mem:
 | 
			
		||||
        with Path(f'/proc/{self.pid}/mem').open('rb+') as mem:
 | 
			
		||||
            mem.seek(base_address)
 | 
			
		||||
            mem.readinto(read_buffer)
 | 
			
		||||
        return read_buffer
 | 
			
		||||
 | 
			
		||||
    def get_path(self) -> str | None:
 | 
			
		||||
        try:
 | 
			
		||||
            with open(f'/proc/{self.pid}/cmdline', 'rb') as ff:
 | 
			
		||||
            with Path(f'/proc/{self.pid}/cmdline').open('rb') as ff:
 | 
			
		||||
                return ff.read().decode().split('\x00')[0]
 | 
			
		||||
        except FileNotFoundError:
 | 
			
		||||
            return None
 | 
			
		||||
@ -102,12 +103,12 @@ class Process(AbstractProcess):
 | 
			
		||||
        for pid in Process.list_available_pids():
 | 
			
		||||
            try:
 | 
			
		||||
                logger.debug(f'Checking name for pid {pid}')
 | 
			
		||||
                with open(f'/proc/{pid}/cmdline', 'rb') as cmdline:
 | 
			
		||||
                with Path(f'/proc/{pid}/cmdline').open('rb') as cmdline:
 | 
			
		||||
                    path = cmdline.read().decode().split('\x00')[0]
 | 
			
		||||
            except FileNotFoundError:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            name = os.path.basename(path)
 | 
			
		||||
            name = Path(path).name
 | 
			
		||||
            logger.debug(f'Name was "{name}"')
 | 
			
		||||
            if path is not None and name == target_name:
 | 
			
		||||
                return pid
 | 
			
		||||
@ -117,7 +118,7 @@ class Process(AbstractProcess):
 | 
			
		||||
 | 
			
		||||
    def list_mapped_regions(self, writeable_only: bool = True) -> list[tuple[int, int]]:
 | 
			
		||||
        regions = []
 | 
			
		||||
        with open(f'/proc/{self.pid}/maps', 'r') as maps:
 | 
			
		||||
        with Path(f'/proc/{self.pid}/maps').open('r') as maps:
 | 
			
		||||
            for line in maps:
 | 
			
		||||
                bounds, privileges = line.split()[0:2]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -11,16 +11,15 @@ Utility functions and types:
 | 
			
		||||
  Check if two buffers (ctypes objects) store equal values:
 | 
			
		||||
    ctypes_equal(a, b)
 | 
			
		||||
"""
 | 
			
		||||
from typing import Union
 | 
			
		||||
import ctypes
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
ctypes_buffer_t = Union[
 | 
			
		||||
    ctypes._SimpleCData,
 | 
			
		||||
    ctypes.Array,
 | 
			
		||||
    ctypes.Structure,
 | 
			
		||||
    ctypes.Union,
 | 
			
		||||
    ]
 | 
			
		||||
ctypes_buffer_t = (
 | 
			
		||||
    ctypes._SimpleCData
 | 
			
		||||
    | ctypes.Array
 | 
			
		||||
    | ctypes.Structure
 | 
			
		||||
    | ctypes.Union
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MemEditError(Exception):
 | 
			
		||||
@ -97,7 +96,7 @@ def ctypes_equal(
 | 
			
		||||
            if isinstance(a, (ctypes.Array, ctypes.Structure, ctypes.Union, ctypes._SimpleCData)):
 | 
			
		||||
                if not ctypes_equal(a_attr, b_attr):
 | 
			
		||||
                    return False
 | 
			
		||||
            elif not a_attr == b_attr:
 | 
			
		||||
            elif a_attr != b_attr:
 | 
			
		||||
                return False
 | 
			
		||||
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
@ -4,7 +4,7 @@ Implementation of Process class for Windows
 | 
			
		||||
 | 
			
		||||
from math import floor
 | 
			
		||||
from os import strerror
 | 
			
		||||
import os.path
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
import ctypes
 | 
			
		||||
import ctypes.wintypes
 | 
			
		||||
import logging
 | 
			
		||||
@ -92,7 +92,7 @@ class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
 | 
			
		||||
MEMORY_BASIC_INFORMATION: ctypes.Structure
 | 
			
		||||
MEMORY_BASIC_INFORMATION: type[ctypes.Structure]
 | 
			
		||||
if PTR_SIZE == 8:       # 64-bit python
 | 
			
		||||
    MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION64
 | 
			
		||||
elif PTR_SIZE == 4:     # 32-bit python
 | 
			
		||||
@ -161,8 +161,8 @@ class Process(AbstractProcess):
 | 
			
		||||
                ctypes.sizeof(write_buffer),
 | 
			
		||||
                None
 | 
			
		||||
                )
 | 
			
		||||
        except (BufferError, ValueError, TypeError):
 | 
			
		||||
            raise MemEditError(f'Error with handle {self.process_handle}:  {self._get_last_error()}')
 | 
			
		||||
        except (BufferError, ValueError, TypeError) as err:
 | 
			
		||||
            raise MemEditError(f'Error with handle {self.process_handle}:  {self._get_last_error()}') from err
 | 
			
		||||
 | 
			
		||||
    def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
 | 
			
		||||
        try:
 | 
			
		||||
@ -173,8 +173,8 @@ class Process(AbstractProcess):
 | 
			
		||||
                ctypes.sizeof(read_buffer),
 | 
			
		||||
                None
 | 
			
		||||
                )
 | 
			
		||||
        except (BufferError, ValueError, TypeError):
 | 
			
		||||
            raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}')
 | 
			
		||||
        except (BufferError, ValueError, TypeError) as err:
 | 
			
		||||
            raise MemEditError(f'Error with handle {self.process_handle}: {self._get_last_error()}') from err
 | 
			
		||||
 | 
			
		||||
        return read_buffer
 | 
			
		||||
 | 
			
		||||
@ -192,10 +192,9 @@ class Process(AbstractProcess):
 | 
			
		||||
            max_path_len,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        if rval > 0:
 | 
			
		||||
            return name_buffer.value.decode()
 | 
			
		||||
        else:
 | 
			
		||||
        if rval <= 0:
 | 
			
		||||
            return None
 | 
			
		||||
        return name_buffer.value.decode()
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def list_available_pids() -> list[int]:
 | 
			
		||||
@ -218,11 +217,9 @@ class Process(AbstractProcess):
 | 
			
		||||
 | 
			
		||||
            num_returned = floor(returned_size.value / ctypes.sizeof(ctypes.wintypes.DWORD))
 | 
			
		||||
 | 
			
		||||
            if nn == num_returned:
 | 
			
		||||
                nn *= 2
 | 
			
		||||
                continue
 | 
			
		||||
            else:
 | 
			
		||||
            if nn != num_returned:
 | 
			
		||||
                break
 | 
			
		||||
            nn *= 2
 | 
			
		||||
 | 
			
		||||
        return pids[:num_returned]
 | 
			
		||||
 | 
			
		||||
@ -236,7 +233,7 @@ class Process(AbstractProcess):
 | 
			
		||||
                if path is None:
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                name = os.path.basename(path)
 | 
			
		||||
                name = Path(path).name
 | 
			
		||||
                logger.debug(f'Name was "{name}"')
 | 
			
		||||
                if path is not None and name == target_name:
 | 
			
		||||
                    return pid
 | 
			
		||||
@ -256,7 +253,7 @@ class Process(AbstractProcess):
 | 
			
		||||
        start = sys_info.lpMinimumApplicationAddress
 | 
			
		||||
        stop = sys_info.lpMaximumApplicationAddress
 | 
			
		||||
 | 
			
		||||
        def get_mem_info(address):
 | 
			
		||||
        def get_mem_info(address: int) -> MEMORY_BASIC_INFORMATION:
 | 
			
		||||
            """
 | 
			
		||||
            Query the memory region starting at or before 'address' to get its size/type/state/permissions.
 | 
			
		||||
            """
 | 
			
		||||
@ -274,7 +271,6 @@ class Process(AbstractProcess):
 | 
			
		||||
                if success == 0:
 | 
			
		||||
                    raise MemEditError('Failed VirtualQueryEx with handle '
 | 
			
		||||
                                       + f'{self.process_handle}: {self._get_last_error()}')
 | 
			
		||||
                else:
 | 
			
		||||
                raise MemEditError('VirtualQueryEx output too short!')
 | 
			
		||||
 | 
			
		||||
            return mbi
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user