forked from jan/mem_edit
		
	cosmetic and typing-related changes
This commit is contained in:
		
							parent
							
								
									9759645f92
								
							
						
					
					
						commit
						6913f73db4
					
				@ -2,7 +2,7 @@
 | 
			
		||||
Abstract class for cross-platform memory editing.
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
from typing import List, Tuple
 | 
			
		||||
from typing import List, Tuple, Optional, Union, Generator
 | 
			
		||||
from abc import ABCMeta, abstractmethod
 | 
			
		||||
from contextlib import contextmanager
 | 
			
		||||
import copy
 | 
			
		||||
@ -23,8 +23,8 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
      (i.e., by reading from or writing to the memory used by a given process).
 | 
			
		||||
 | 
			
		||||
    The static methods
 | 
			
		||||
        Process.list_available_pids()
 | 
			
		||||
        Process.get_pid_by_name(executable_filename)
 | 
			
		||||
        `Process.list_available_pids()`
 | 
			
		||||
        `Process.get_pid_by_name(executable_filename)`
 | 
			
		||||
      can be used to help find the process id (pid) of the target process. They are
 | 
			
		||||
      provided for convenience only; it is probably better to use the tools built
 | 
			
		||||
      in to your operating system to discover the pid of the specific process you
 | 
			
		||||
@ -32,18 +32,19 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
 | 
			
		||||
    Once you have found the pid, you are ready to construct an instance of Process
 | 
			
		||||
      and use it to read and write to memory. Once you are done with the process,
 | 
			
		||||
      use .close() to free up the process for access by other debuggers etc.
 | 
			
		||||
 | 
			
		||||
      use `.close()` to free up the process for access by other debuggers etc.
 | 
			
		||||
    ```
 | 
			
		||||
        p = Process(1239)
 | 
			
		||||
        p.close()
 | 
			
		||||
    ```
 | 
			
		||||
 | 
			
		||||
    To read/write to memory, first create a buffer using ctypes:
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
        buffer0 = (ctypes.c_byte * 5)(39, 50, 03, 40, 30)
 | 
			
		||||
        buffer1 = ctypes.c_ulong()
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
      and then use
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
        p.write_memory(0x2fe, buffer0)
 | 
			
		||||
 | 
			
		||||
        val0 = p.read_memory(0x220, buffer0)[:]
 | 
			
		||||
@ -51,52 +52,52 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
        val1a = p.read_memory(0x149, buffer1).value
 | 
			
		||||
        val2b = buffer1.value
 | 
			
		||||
        assert(val1a == val2b)
 | 
			
		||||
    ```
 | 
			
		||||
 | 
			
		||||
    Searching for a value can be done in a number of ways:
 | 
			
		||||
      Search a list of addresses:
 | 
			
		||||
        found_addresses = p.search_addresses([0x1020, 0x1030], buffer0)
 | 
			
		||||
        `found_addresses = p.search_addresses([0x1020, 0x1030], buffer0)`
 | 
			
		||||
      Search the entire memory space:
 | 
			
		||||
        found_addresses = p.search_all_memory(buffer0, writeable_only=False)
 | 
			
		||||
        `found_addresses = p.search_all_memory(buffer0, writeable_only=False)`
 | 
			
		||||
 | 
			
		||||
    You can also get a list of which regions in memory are mapped (readable):
 | 
			
		||||
        regions = p.list_mapped_regions(writeable_only=False)
 | 
			
		||||
 | 
			
		||||
      which can be used along with search_buffer(...) to re-create .search_all_memory(...):
 | 
			
		||||
 | 
			
		||||
        `regions = p.list_mapped_regions(writeable_only=False)`
 | 
			
		||||
     which can be used along with search_buffer(...) to re-create .search_all_memory(...):
 | 
			
		||||
    ```
 | 
			
		||||
        found = []
 | 
			
		||||
        for region_start, region_stop in regions:
 | 
			
		||||
            region_buffer = (ctypes.c_byte * (region_stop - region_start))()
 | 
			
		||||
            p.read_memory(region_start, region_buffer)
 | 
			
		||||
            found += utils.search_buffer(ctypes.c_ulong(123456790), region_buffer)
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
    Other useful methods include the context manager, implemented as a static method:
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
        with Process.open_process(pid) as p:
 | 
			
		||||
            # use p here, no need to call p.close()
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
      .get_path(), which reports the path of the executable file which was used
 | 
			
		||||
      to start the process:
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
        executable_path = p.get_path()
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
      and deref_struct_pointer, which takes a pointer to a struct and reads out the struct members:
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
        # struct is a list of (offset, buffer) pairs
 | 
			
		||||
        struct_defintion = [(0x0, ctypes.c_ulong()),
 | 
			
		||||
                            (0x20, ctypes.c_byte())]
 | 
			
		||||
        values = p.deref_struct_pointer(0x0feab4, struct_defintion)
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
      which is shorthand for
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
        struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
 | 
			
		||||
        values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
 | 
			
		||||
                  p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
    =================
 | 
			
		||||
 | 
			
		||||
    Putting all this together, a simple program which alters a magic number in the only running
 | 
			
		||||
      instance of 'magic.exe' might look like this:
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
        import ctypes
 | 
			
		||||
        from mem_edit import Process
 | 
			
		||||
 | 
			
		||||
@ -107,9 +108,9 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
            addrs = p.search_all_memory(magic_number)
 | 
			
		||||
            assert(len(addrs) == 1)
 | 
			
		||||
            p.write_memory(addrs[0], ctypes.c_ulong(42))
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
    Searching for a value which changes:
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
        pid = Process.get_pid_by_name('monitor_me.exe')
 | 
			
		||||
        with Process.open_process(pid) as p:
 | 
			
		||||
            addrs = p.search_all_memory(ctypes.c_int(40))
 | 
			
		||||
@ -118,18 +119,19 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
            print('Found addresses:')
 | 
			
		||||
            for addr in filtered_addrs:
 | 
			
		||||
                print(hex(addr))
 | 
			
		||||
 | 
			
		||||
    ```
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def __init__(self, process_id: int):
 | 
			
		||||
        """
 | 
			
		||||
        Constructing a Process object prepares the process with specified process_id for
 | 
			
		||||
          memory editing. Finding the process_id for the process you want to edit is often
 | 
			
		||||
          memory editing. Finding the `process_id` for the process you want to edit is often
 | 
			
		||||
          easiest using os-specific tools (or by launching the process yourself, e.g. with
 | 
			
		||||
          subprocess.Popen(...)).
 | 
			
		||||
          `subprocess.Popen(...)`).
 | 
			
		||||
 | 
			
		||||
        :param process_id: Process id (pid) of the target process
 | 
			
		||||
        Args:
 | 
			
		||||
            process_id: Process id (pid) of the target process
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
@ -140,7 +142,7 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
          letting other debuggers attach to it instead.
 | 
			
		||||
 | 
			
		||||
        This function should be called after you are done working with the process
 | 
			
		||||
          and will no longer need it. See the Process.open_process(...) context
 | 
			
		||||
          and will no longer need it. See the `Process.open_process(...)` context
 | 
			
		||||
          manager to avoid having to call this function yourself.
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
@ -148,38 +150,45 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
 | 
			
		||||
        """
 | 
			
		||||
        Write the given buffer to the process's address space, starting at base_address.
 | 
			
		||||
        Write the given buffer to the process's address space, starting at `base_address`.
 | 
			
		||||
 | 
			
		||||
        :param base_address: The address to write at, in the process's address space.
 | 
			
		||||
        :param write_buffer: A ctypes object, for example, ctypes.c_ulong(48),
 | 
			
		||||
                (ctypes.c_byte * 3)(43, 21, 0xff), or a subclass of ctypes.Structure,
 | 
			
		||||
                which will be written into memory starting at base_address.
 | 
			
		||||
        Args:
 | 
			
		||||
            base_address: The address to write at, in the process's address space.
 | 
			
		||||
            write_buffer: A ctypes object, for example, `ctypes.c_ulong(48)`,
 | 
			
		||||
                `(ctypes.c_byte * 3)(43, 21, 0xff)`, or a subclass of `ctypes.Structure`,
 | 
			
		||||
                which will be written into memory starting at `base_address`.
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
 | 
			
		||||
        """
 | 
			
		||||
        Read into the given buffer from the process's address space, starting at base_address.
 | 
			
		||||
        Read into the given buffer from the process's address space, starting at `base_address`.
 | 
			
		||||
 | 
			
		||||
        :param base_address: The address to read from, in the process's address space.
 | 
			
		||||
        :param read_buffer: A ctypes object, for example. ctypes.c_ulong(),
 | 
			
		||||
                (ctypes.c_byte * 3)(), or a subclass of ctypes.Structure,  which will be
 | 
			
		||||
                overwritten with the contents of the process's memory starting at base_address.
 | 
			
		||||
        :returns: read_buffer is returned as well as being overwritten.
 | 
			
		||||
        Args:
 | 
			
		||||
            base_address: The address to read from, in the process's address space.
 | 
			
		||||
            read_buffer: A `ctypes` object, for example. `ctypes.c_ulong()`,
 | 
			
		||||
                `(ctypes.c_byte * 3)()`, or a subclass of `ctypes.Structure`, which will be
 | 
			
		||||
                overwritten with the contents of the process's memory starting at `base_address`.
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            `read_buffer` is returned as well as being overwritten.
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def list_mapped_regions(self, writeable_only=True) -> List[Tuple[int, int]]:
 | 
			
		||||
        """
 | 
			
		||||
        Return a list of (start_address, stop_address) for the regions of the address space
 | 
			
		||||
        Return a list of `(start_address, stop_address)` for the regions of the address space
 | 
			
		||||
          accessible to (readable and possibly writable by) the process.
 | 
			
		||||
        By default, this function does not return non-writeable regions.
 | 
			
		||||
 | 
			
		||||
        :param writeable_only: If True, only return regions which are also writeable.
 | 
			
		||||
                Default true.
 | 
			
		||||
        :return: List of (start_address, stop_address) for each accessible memory region.
 | 
			
		||||
        Args:
 | 
			
		||||
            writeable_only: If `True`, only return regions which are also writeable.
 | 
			
		||||
                Default `True`.
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            List of `(start_address, stop_address)` for each accessible memory region.
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
@ -188,7 +197,8 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
        """
 | 
			
		||||
        Return the path to the executable file which was run to start this process.
 | 
			
		||||
 | 
			
		||||
        :return: A string containing the path.
 | 
			
		||||
        Returns:
 | 
			
		||||
            A string containing the path.
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
@ -198,13 +208,14 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
        """
 | 
			
		||||
        Return a list of all process ids (pids) accessible on this system.
 | 
			
		||||
 | 
			
		||||
        :return: List of running process ids.
 | 
			
		||||
        Returns:
 | 
			
		||||
            List of running process ids.
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_pid_by_name(target_name: str) -> int or None:
 | 
			
		||||
    def get_pid_by_name(target_name: str) -> Optional[int]:
 | 
			
		||||
        """
 | 
			
		||||
        Attempt to return the process id (pid) of a process which was run with an executable
 | 
			
		||||
          file with the provided name. If no process is found, return None.
 | 
			
		||||
@ -215,7 +226,12 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
        Don't rely on this method if you can possibly avoid it, since it makes no
 | 
			
		||||
          attempt to confirm that it found a unique process and breaks trivially (e.g. if the
 | 
			
		||||
          executable file is renamed).
 | 
			
		||||
        :return: Process id (pid) of a process with the provided name, or None.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            target_name: Name of the process to find the PID for
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            Process id (pid) of a process with the provided name, or `None`.
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
@ -225,34 +241,48 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
                             ) -> List[ctypes_buffer_t]:
 | 
			
		||||
        """
 | 
			
		||||
        Take a pointer to a struct and read out the struct members:
 | 
			
		||||
        ```
 | 
			
		||||
            struct_defintion = [(0x0, ctypes.c_ulong()),
 | 
			
		||||
                                (0x20, ctypes.c_byte())]
 | 
			
		||||
            values = p.deref_struct_pointer(0x0feab4, struct_defintion)
 | 
			
		||||
        ```
 | 
			
		||||
        which is shorthand for
 | 
			
		||||
        ```
 | 
			
		||||
            struct_addr = p.read_memory(0x0feab4, ctypes.c_void_p())
 | 
			
		||||
            values = [p.read_memory(struct_addr + 0x0, ctypes.c_ulong()),
 | 
			
		||||
                      p.read_memory(struct_addr + 0x20, ctypes.c_byte())]
 | 
			
		||||
        ```
 | 
			
		||||
 | 
			
		||||
        :param base_address: Address at which the struct pointer is located.
 | 
			
		||||
        :param targets: List of (offset, read_buffer) pairs which will be read from the struct.
 | 
			
		||||
        :return: List of read values corresponding to the provided targets.
 | 
			
		||||
        Args:
 | 
			
		||||
            base_address: Address at which the struct pointer is located.
 | 
			
		||||
            targets: List of `(offset, read_buffer)` pairs which will be read from the struct.
 | 
			
		||||
 | 
			
		||||
        Return:
 | 
			
		||||
            List of read values corresponding to the provided targets.
 | 
			
		||||
        """
 | 
			
		||||
        base = self.read_memory(base_address, ctypes.c_void_p()).value
 | 
			
		||||
        values = [self.read_memory(base + offset, buffer) for offset, buffer in targets]
 | 
			
		||||
        return values
 | 
			
		||||
 | 
			
		||||
    def search_addresses(self, addresses: List[int], needle_buffer: ctypes_buffer_t, verbatim: bool=True) -> List[int]:
 | 
			
		||||
    def search_addresses(self,
 | 
			
		||||
                         addresses: List[int],
 | 
			
		||||
                         needle_buffer: ctypes_buffer_t,
 | 
			
		||||
                         verbatim: bool = True,
 | 
			
		||||
                         ) -> List[int]:
 | 
			
		||||
        """
 | 
			
		||||
        Search for the provided value at each of the provided addresses, and return the addresses
 | 
			
		||||
          where it is found.
 | 
			
		||||
 | 
			
		||||
        :param addresses: List of addresses which should be probed.
 | 
			
		||||
        :param needle_buffer: The value to search for. This should be a ctypes object of the same
 | 
			
		||||
                sorts as used by .read_memory(...), which will be compared to the contents of
 | 
			
		||||
        Args:
 | 
			
		||||
            addresses: List of addresses which should be probed.
 | 
			
		||||
            needle_buffer: The value to search for. This should be a `ctypes` object of the same
 | 
			
		||||
                sorts as used by `.read_memory(...)`, which will be compared to the contents of
 | 
			
		||||
                memory at each of the given addresses.
 | 
			
		||||
        :param verbatim: If True, perform bitwise comparison when searching for needle_buffer.
 | 
			
		||||
                If False, perform utils.ctypes_equal-based comparison. Default True.
 | 
			
		||||
        :return: List of addresses where the needle_buffer was found.
 | 
			
		||||
            verbatim: If `True`, perform bitwise comparison when searching for `needle_buffer`.
 | 
			
		||||
                If `False`, perform `utils.ctypes_equal`-based comparison. Default `True`.
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            List of addresses where the `needle_buffer` was found.
 | 
			
		||||
        """
 | 
			
		||||
        found = []
 | 
			
		||||
        read_buffer = copy.copy(needle_buffer)
 | 
			
		||||
@ -269,18 +299,25 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
                found.append(address)
 | 
			
		||||
        return found
 | 
			
		||||
 | 
			
		||||
    def search_all_memory(self, needle_buffer: ctypes_buffer_t, writeable_only: bool=True, verbatim: bool=True) -> List[int]:
 | 
			
		||||
    def search_all_memory(self,
 | 
			
		||||
                          needle_buffer: ctypes_buffer_t,
 | 
			
		||||
                          writeable_only: bool = True,
 | 
			
		||||
                          verbatim: bool = True,
 | 
			
		||||
                          ) -> List[int]:
 | 
			
		||||
        """
 | 
			
		||||
        Search the entire memory space accessible to the process for the provided value.
 | 
			
		||||
 | 
			
		||||
        :param needle_buffer: The value to search for. This should be a ctypes object of the same
 | 
			
		||||
                sorts as used by .read_memory(...), which will be compared to the contents of
 | 
			
		||||
        Args:
 | 
			
		||||
            needle_buffer: The value to search for. This should be a ctypes object of the same
 | 
			
		||||
                sorts as used by `.read_memory(...)`, which will be compared to the contents of
 | 
			
		||||
                memory at each accessible address.
 | 
			
		||||
        :param writeable_only: If True, only search regions where the process has write access.
 | 
			
		||||
                Default True.
 | 
			
		||||
        :param verbatim: If True, perform bitwise comparison when searching for needle_buffer.
 | 
			
		||||
                If False, perform utils.ctypes_equal-based comparison. Default True.
 | 
			
		||||
        :return: List of addresses where the needle_buffer was found.
 | 
			
		||||
            writeable_only: If `True`, only search regions where the process has write access.
 | 
			
		||||
                Default `True`.
 | 
			
		||||
            verbatim: If `True`, perform bitwise comparison when searching for `needle_buffer`.
 | 
			
		||||
                If `False`, perform `utils.ctypes_equal-based` comparison. Default `True`.
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            List of addresses where the `needle_buffer` was found.
 | 
			
		||||
        """
 | 
			
		||||
        found = []
 | 
			
		||||
        if verbatim:
 | 
			
		||||
@ -299,15 +336,20 @@ class Process(metaclass=ABCMeta):
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    @contextmanager
 | 
			
		||||
    def open_process(cls, process_id: int) -> 'Process':
 | 
			
		||||
    def open_process(cls, process_id: int) -> Generator['Process', None, None]:
 | 
			
		||||
        """
 | 
			
		||||
        Context manager which automatically closes the constructed Process:
 | 
			
		||||
        ```
 | 
			
		||||
            with Process.open_process(2394) as p:
 | 
			
		||||
                # use p here
 | 
			
		||||
                # no need to run p.close()
 | 
			
		||||
        ```
 | 
			
		||||
 | 
			
		||||
        :param process_id: Process id (pid), passed to the Process constructor.
 | 
			
		||||
        :return: Constructed Process object.
 | 
			
		||||
        Args:
 | 
			
		||||
            process_id: Process id (pid), passed to the Process constructor.
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            Constructed Process object.
 | 
			
		||||
        """
 | 
			
		||||
        process = cls(process_id)
 | 
			
		||||
        yield process
 | 
			
		||||
 | 
			
		||||
@ -2,7 +2,7 @@
 | 
			
		||||
Implementation of Process class for Linux
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
from typing import List, Tuple
 | 
			
		||||
from typing import List, Tuple, Optional
 | 
			
		||||
from os import strerror
 | 
			
		||||
import os
 | 
			
		||||
import os.path
 | 
			
		||||
@ -20,13 +20,13 @@ logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
ptrace_commands = {
 | 
			
		||||
        'PTRACE_GETREGS': 12,
 | 
			
		||||
        'PTRACE_SETREGS': 13,
 | 
			
		||||
        'PTRACE_ATTACH': 16,
 | 
			
		||||
        'PTRACE_DETACH': 17,
 | 
			
		||||
        'PTRACE_SYSCALL': 24,
 | 
			
		||||
        'PTRACE_SEIZE': 16902,
 | 
			
		||||
        }
 | 
			
		||||
    'PTRACE_GETREGS': 12,
 | 
			
		||||
    'PTRACE_SETREGS': 13,
 | 
			
		||||
    'PTRACE_ATTACH': 16,
 | 
			
		||||
    'PTRACE_DETACH': 17,
 | 
			
		||||
    'PTRACE_SYSCALL': 24,
 | 
			
		||||
    'PTRACE_SEIZE': 16902,
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# import ptrace() from libc
 | 
			
		||||
@ -91,7 +91,7 @@ class Process(AbstractProcess):
 | 
			
		||||
        return pids
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def get_pid_by_name(target_name: str) -> int or None:
 | 
			
		||||
    def get_pid_by_name(target_name: str) -> Optional[int]:
 | 
			
		||||
        for pid in Process.list_available_pids():
 | 
			
		||||
            try:
 | 
			
		||||
                logger.info('Checking name for pid {}'.format(pid))
 | 
			
		||||
 | 
			
		||||
@ -12,24 +12,29 @@ Utility functions and types:
 | 
			
		||||
    ctypes_equal(a, b)
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
from typing import List
 | 
			
		||||
from typing import List, Union
 | 
			
		||||
import ctypes
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
ctypes_buffer_t = ctypes._SimpleCData or ctypes.Array or ctypes.Structure or ctypes.Union
 | 
			
		||||
ctypes_buffer_t = Union[ctypes._SimpleCData, ctypes.Array, ctypes.Structure, ctypes.Union]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MemEditError(Exception):
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def search_buffer_verbatim(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t) -> List[int]:
 | 
			
		||||
def search_buffer_verbatim(needle_buffer: ctypes_buffer_t,
 | 
			
		||||
                           haystack_buffer: ctypes_buffer_t,
 | 
			
		||||
                           ) -> List[int]:
 | 
			
		||||
    """
 | 
			
		||||
    Search for a buffer inside another buffer, using a direct (bitwise) comparison
 | 
			
		||||
 | 
			
		||||
    :param needle_buffer: Buffer to search for.
 | 
			
		||||
    :param haystack_buffer: Buffer to search in.
 | 
			
		||||
    :return: List of offsets where the needle_buffer was found.
 | 
			
		||||
    Args:
 | 
			
		||||
        needle_buffer: Buffer to search for.
 | 
			
		||||
        haystack_buffer: Buffer to search in.
 | 
			
		||||
 | 
			
		||||
    Returns:
 | 
			
		||||
        List of offsets where the `needle_buffer` was found.
 | 
			
		||||
    """
 | 
			
		||||
    found = []
 | 
			
		||||
 | 
			
		||||
@ -45,14 +50,19 @@ def search_buffer_verbatim(needle_buffer: ctypes_buffer_t, haystack_buffer: ctyp
 | 
			
		||||
    return found
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def search_buffer(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer_t) -> List[int]:
 | 
			
		||||
def search_buffer(needle_buffer: ctypes_buffer_t,
 | 
			
		||||
                  haystack_buffer: ctypes_buffer_t,
 | 
			
		||||
                  ) -> List[int]:
 | 
			
		||||
    """
 | 
			
		||||
    Search for a buffer inside another buffer, using ctypes_equal for comparison.
 | 
			
		||||
    Much slower than search_buffer_verbatim.
 | 
			
		||||
    Search for a buffer inside another buffer, using `ctypes_equal` for comparison.
 | 
			
		||||
    Much slower than `search_buffer_verbatim`.
 | 
			
		||||
 | 
			
		||||
    :param needle_buffer: Buffer to search for.
 | 
			
		||||
    :param haystack_buffer: Buffer to search in.
 | 
			
		||||
    :return: List of offsets where the needle_buffer was found.
 | 
			
		||||
    Args:
 | 
			
		||||
        needle_buffer: Buffer to search for.
 | 
			
		||||
        haystack_buffer: Buffer to search in.
 | 
			
		||||
 | 
			
		||||
    Returns:
 | 
			
		||||
        List of offsets where the needle_buffer was found.
 | 
			
		||||
    """
 | 
			
		||||
    found = []
 | 
			
		||||
    read_type = type(needle_buffer)
 | 
			
		||||
@ -63,7 +73,9 @@ def search_buffer(needle_buffer: ctypes_buffer_t, haystack_buffer: ctypes_buffer
 | 
			
		||||
    return found
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def ctypes_equal(a: ctypes_buffer_t, b: ctypes_buffer_t) -> bool:
 | 
			
		||||
def ctypes_equal(a: ctypes_buffer_t,
 | 
			
		||||
                 b: ctypes_buffer_t,
 | 
			
		||||
                 ) -> bool:
 | 
			
		||||
    """
 | 
			
		||||
    Check if the values stored inside two ctypes buffers are equal.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
@ -2,7 +2,7 @@
 | 
			
		||||
Implementation of Process class for Windows
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
from typing import List, Tuple
 | 
			
		||||
from typing import List, Tuple, Optional
 | 
			
		||||
from math import floor
 | 
			
		||||
from os import strerror
 | 
			
		||||
import os.path
 | 
			
		||||
@ -20,77 +20,77 @@ logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
# Process handle privileges
 | 
			
		||||
privileges = {
 | 
			
		||||
        'PROCESS_QUERY_INFORMATION': 0x0400,
 | 
			
		||||
        'PROCESS_VM_OPERATION': 0x0008,
 | 
			
		||||
        'PROCESS_VM_READ': 0x0010,
 | 
			
		||||
        'PROCESS_VM_WRITE': 0x0020,
 | 
			
		||||
        }
 | 
			
		||||
    'PROCESS_QUERY_INFORMATION': 0x0400,
 | 
			
		||||
    'PROCESS_VM_OPERATION': 0x0008,
 | 
			
		||||
    'PROCESS_VM_READ': 0x0010,
 | 
			
		||||
    'PROCESS_VM_WRITE': 0x0020,
 | 
			
		||||
    }
 | 
			
		||||
privileges['PROCESS_RW'] = (
 | 
			
		||||
            privileges['PROCESS_QUERY_INFORMATION'] |
 | 
			
		||||
            privileges['PROCESS_VM_OPERATION'] |
 | 
			
		||||
            privileges['PROCESS_VM_READ'] |
 | 
			
		||||
            privileges['PROCESS_VM_WRITE']
 | 
			
		||||
            )
 | 
			
		||||
    privileges['PROCESS_QUERY_INFORMATION'] |
 | 
			
		||||
    privileges['PROCESS_VM_OPERATION'] |
 | 
			
		||||
    privileges['PROCESS_VM_READ'] |
 | 
			
		||||
    privileges['PROCESS_VM_WRITE']
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
# Memory region states
 | 
			
		||||
mem_states = {
 | 
			
		||||
        'MEM_COMMIT': 0x1000,
 | 
			
		||||
        'MEM_FREE': 0x10000,
 | 
			
		||||
        'MEM_RESERVE': 0x2000,
 | 
			
		||||
        }
 | 
			
		||||
    'MEM_COMMIT': 0x1000,
 | 
			
		||||
    'MEM_FREE': 0x10000,
 | 
			
		||||
    'MEM_RESERVE': 0x2000,
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
# Memory region permissions
 | 
			
		||||
page_protections = {
 | 
			
		||||
        'PAGE_EXECUTE': 0x10,
 | 
			
		||||
        'PAGE_EXECUTE_READ': 0x20,
 | 
			
		||||
        'PAGE_EXECUTE_READWRITE': 0x40,
 | 
			
		||||
        'PAGE_EXECUTE_WRITECOPY': 0x80,
 | 
			
		||||
        'PAGE_NOACCESS': 0x01,
 | 
			
		||||
        'PAGE_READWRITE': 0x04,
 | 
			
		||||
        'PAGE_WRITECOPY': 0x08,
 | 
			
		||||
        }
 | 
			
		||||
    'PAGE_EXECUTE': 0x10,
 | 
			
		||||
    'PAGE_EXECUTE_READ': 0x20,
 | 
			
		||||
    'PAGE_EXECUTE_READWRITE': 0x40,
 | 
			
		||||
    'PAGE_EXECUTE_WRITECOPY': 0x80,
 | 
			
		||||
    'PAGE_NOACCESS': 0x01,
 | 
			
		||||
    'PAGE_READWRITE': 0x04,
 | 
			
		||||
    'PAGE_WRITECOPY': 0x08,
 | 
			
		||||
    }
 | 
			
		||||
# Custom (combined) permissions
 | 
			
		||||
page_protections['PAGE_READABLE'] = (
 | 
			
		||||
        page_protections['PAGE_EXECUTE_READ'] |
 | 
			
		||||
        page_protections['PAGE_EXECUTE_READWRITE'] |
 | 
			
		||||
        page_protections['PAGE_READWRITE']
 | 
			
		||||
        )
 | 
			
		||||
    page_protections['PAGE_EXECUTE_READ'] |
 | 
			
		||||
    page_protections['PAGE_EXECUTE_READWRITE'] |
 | 
			
		||||
    page_protections['PAGE_READWRITE']
 | 
			
		||||
    )
 | 
			
		||||
page_protections['PAGE_READWRITEABLE'] = (
 | 
			
		||||
        page_protections['PAGE_EXECUTE_READWRITE'] |
 | 
			
		||||
        page_protections['PAGE_READWRITE']
 | 
			
		||||
        )
 | 
			
		||||
    page_protections['PAGE_EXECUTE_READWRITE'] |
 | 
			
		||||
    page_protections['PAGE_READWRITE']
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
# Memory types
 | 
			
		||||
mem_types = {
 | 
			
		||||
        'MEM_IMAGE': 0x1000000,
 | 
			
		||||
        'MEM_MAPPED': 0x40000,
 | 
			
		||||
        'MEM_PRIVATE': 0x20000,
 | 
			
		||||
        }
 | 
			
		||||
    'MEM_IMAGE': 0x1000000,
 | 
			
		||||
    'MEM_MAPPED': 0x40000,
 | 
			
		||||
    'MEM_PRIVATE': 0x20000,
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
# C struct for VirtualQueryEx
 | 
			
		||||
class MEMORY_BASIC_INFORMATION32(ctypes.Structure):
 | 
			
		||||
    _fields_ = [
 | 
			
		||||
            ('BaseAddress', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('AllocationBase', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('AllocationProtect', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('RegionSize', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('State', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('Protect', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('Type', ctypes.wintypes.DWORD),
 | 
			
		||||
            ]
 | 
			
		||||
        ('BaseAddress', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('AllocationBase', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('AllocationProtect', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('RegionSize', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('State', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('Protect', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('Type', ctypes.wintypes.DWORD),
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
class MEMORY_BASIC_INFORMATION64(ctypes.Structure):
 | 
			
		||||
    _fields_ = [
 | 
			
		||||
            ('BaseAddress', ctypes.c_ulonglong),
 | 
			
		||||
            ('AllocationBase', ctypes.c_ulonglong),
 | 
			
		||||
            ('AllocationProtect', ctypes.wintypes.DWORD),
 | 
			
		||||
			('__alignment1', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('RegionSize', ctypes.c_ulonglong),
 | 
			
		||||
            ('State', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('Protect', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('Type', ctypes.wintypes.DWORD),
 | 
			
		||||
			('__alignment2', ctypes.wintypes.DWORD),
 | 
			
		||||
            ]
 | 
			
		||||
        ('BaseAddress', ctypes.c_ulonglong),
 | 
			
		||||
        ('AllocationBase', ctypes.c_ulonglong),
 | 
			
		||||
        ('AllocationProtect', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('__alignment1', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('RegionSize', ctypes.c_ulonglong),
 | 
			
		||||
        ('State', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('Protect', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('Type', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('__alignment2', ctypes.wintypes.DWORD),
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
PTR_SIZE = ctypes.sizeof(ctypes.c_void_p)
 | 
			
		||||
if PTR_SIZE == 8:       # 64-bit python
 | 
			
		||||
@ -98,36 +98,39 @@ if PTR_SIZE == 8:       # 64-bit python
 | 
			
		||||
elif PTR_SIZE == 4:     # 32-bit python
 | 
			
		||||
    MEMORY_BASIC_INFORMATION = MEMORY_BASIC_INFORMATION32
 | 
			
		||||
 | 
			
		||||
ctypes.windll.kernel32.VirtualQueryEx.argtypes = [ctypes.wintypes.HANDLE,
 | 
			
		||||
												  ctypes.wintypes.LPCVOID,
 | 
			
		||||
												  ctypes.c_void_p,
 | 
			
		||||
												  ctypes.c_size_t]
 | 
			
		||||
ctypes.windll.kernel32.ReadProcessMemory.argtypes = [ctypes.wintypes.HANDLE,
 | 
			
		||||
                                                     ctypes.wintypes.LPCVOID,
 | 
			
		||||
                                                     ctypes.c_void_p,
 | 
			
		||||
                                                     ctypes.c_size_t,
 | 
			
		||||
                                                     ctypes.c_void_p]
 | 
			
		||||
ctypes.windll.kernel32.WriteProcessMemory.argtypes = [ctypes.wintypes.HANDLE,
 | 
			
		||||
                                                      ctypes.wintypes.LPCVOID,
 | 
			
		||||
                                                      ctypes.c_void_p,
 | 
			
		||||
                                                      ctypes.c_size_t,
 | 
			
		||||
                                                      ctypes.c_void_p]
 | 
			
		||||
ctypes.windll.kernel32.VirtualQueryEx.argtypes = [
 | 
			
		||||
    ctypes.wintypes.HANDLE,
 | 
			
		||||
    ctypes.wintypes.LPCVOID,
 | 
			
		||||
    ctypes.c_void_p,
 | 
			
		||||
    ctypes.c_size_t]
 | 
			
		||||
ctypes.windll.kernel32.ReadProcessMemory.argtypes = [
 | 
			
		||||
    ctypes.wintypes.HANDLE,
 | 
			
		||||
    ctypes.wintypes.LPCVOID,
 | 
			
		||||
    ctypes.c_void_p,
 | 
			
		||||
    ctypes.c_size_t,
 | 
			
		||||
    ctypes.c_void_p]
 | 
			
		||||
ctypes.windll.kernel32.WriteProcessMemory.argtypes = [
 | 
			
		||||
    ctypes.wintypes.HANDLE,
 | 
			
		||||
    ctypes.wintypes.LPCVOID,
 | 
			
		||||
    ctypes.c_void_p,
 | 
			
		||||
    ctypes.c_size_t,
 | 
			
		||||
    ctypes.c_void_p]
 | 
			
		||||
 | 
			
		||||
# C struct for GetSystemInfo
 | 
			
		||||
class SYSTEM_INFO(ctypes.Structure):
 | 
			
		||||
    _fields_ = [
 | 
			
		||||
            ('wProcessorArchitecture', ctypes.wintypes.WORD),
 | 
			
		||||
            ('wReserved', ctypes.wintypes.WORD),
 | 
			
		||||
            ('dwPageSize', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('lpMinimumApplicationAddress', ctypes.c_void_p),
 | 
			
		||||
            ('lpMaximumApplicationAddress', ctypes.c_void_p),
 | 
			
		||||
            ('dwActiveProcessorMask', ctypes.c_void_p),
 | 
			
		||||
            ('dwNumberOfProcessors', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('dwProcessorType', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('dwAllocationGranularity', ctypes.wintypes.DWORD),
 | 
			
		||||
            ('wProcessorLevel', ctypes.wintypes.WORD),
 | 
			
		||||
            ('wProcessorRevision', ctypes.wintypes.WORD),
 | 
			
		||||
            ]
 | 
			
		||||
        ('wProcessorArchitecture', ctypes.wintypes.WORD),
 | 
			
		||||
        ('wReserved', ctypes.wintypes.WORD),
 | 
			
		||||
        ('dwPageSize', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('lpMinimumApplicationAddress', ctypes.c_void_p),
 | 
			
		||||
        ('lpMaximumApplicationAddress', ctypes.c_void_p),
 | 
			
		||||
        ('dwActiveProcessorMask', ctypes.c_void_p),
 | 
			
		||||
        ('dwNumberOfProcessors', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('dwProcessorType', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('dwAllocationGranularity', ctypes.wintypes.DWORD),
 | 
			
		||||
        ('wProcessorLevel', ctypes.wintypes.WORD),
 | 
			
		||||
        ('wProcessorRevision', ctypes.wintypes.WORD),
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Process(AbstractProcess):
 | 
			
		||||
@ -152,24 +155,24 @@ class Process(AbstractProcess):
 | 
			
		||||
    def write_memory(self, base_address: int, write_buffer: ctypes_buffer_t):
 | 
			
		||||
        try:
 | 
			
		||||
            ctypes.windll.kernel32.WriteProcessMemory(
 | 
			
		||||
                    self.process_handle,
 | 
			
		||||
                    base_address,
 | 
			
		||||
                    ctypes.byref(write_buffer),
 | 
			
		||||
                    ctypes.sizeof(write_buffer),
 | 
			
		||||
                    None
 | 
			
		||||
                    )
 | 
			
		||||
                self.process_handle,
 | 
			
		||||
                base_address,
 | 
			
		||||
                ctypes.byref(write_buffer),
 | 
			
		||||
                ctypes.sizeof(write_buffer),
 | 
			
		||||
                None
 | 
			
		||||
                )
 | 
			
		||||
        except (BufferError, ValueError, TypeError):
 | 
			
		||||
            raise MemEditError('Error with handle {}:  {}'.format(self.process_handle, self._get_last_error()))
 | 
			
		||||
 | 
			
		||||
    def read_memory(self, base_address: int, read_buffer: ctypes_buffer_t) -> ctypes_buffer_t:
 | 
			
		||||
        try:
 | 
			
		||||
            ctypes.windll.kernel32.ReadProcessMemory(
 | 
			
		||||
                    self.process_handle,
 | 
			
		||||
                    base_address,
 | 
			
		||||
                    ctypes.byref(read_buffer),
 | 
			
		||||
                    ctypes.sizeof(read_buffer),
 | 
			
		||||
                    None
 | 
			
		||||
                    )
 | 
			
		||||
                self.process_handle,
 | 
			
		||||
                base_address,
 | 
			
		||||
                ctypes.byref(read_buffer),
 | 
			
		||||
                ctypes.sizeof(read_buffer),
 | 
			
		||||
                None
 | 
			
		||||
                )
 | 
			
		||||
        except (BufferError, ValueError, TypeError):
 | 
			
		||||
            raise MemEditError('Error with handle {}: {}'.format(self.process_handle, self._get_last_error()))
 | 
			
		||||
 | 
			
		||||
@ -184,10 +187,9 @@ class Process(AbstractProcess):
 | 
			
		||||
        max_path_len = 260
 | 
			
		||||
        name_buffer = (ctypes.c_char * max_path_len)()
 | 
			
		||||
        rval = ctypes.windll.psapi.GetProcessImageFileNameA(
 | 
			
		||||
                    self.process_handle,
 | 
			
		||||
                    name_buffer,
 | 
			
		||||
                    max_path_len
 | 
			
		||||
                    )
 | 
			
		||||
             self.process_handle,
 | 
			
		||||
             name_buffer,
 | 
			
		||||
             max_path_len)
 | 
			
		||||
 | 
			
		||||
        if rval > 0:
 | 
			
		||||
            return name_buffer.value.decode()
 | 
			
		||||
@ -199,7 +201,7 @@ class Process(AbstractProcess):
 | 
			
		||||
        # According to EnumProcesses docs, you can't find out how many processes there are before
 | 
			
		||||
        #  fetching the list. As a result, we grab 100 on the first try, and if we get a full list
 | 
			
		||||
        #  of 100, repeatedly double the number until we get fewer than we asked for.
 | 
			
		||||
        
 | 
			
		||||
 | 
			
		||||
        n = 100
 | 
			
		||||
        returned_size = ctypes.wintypes.DWORD()
 | 
			
		||||
        returned_size_ptr = ctypes.byref(returned_size)
 | 
			
		||||
@ -224,7 +226,7 @@ class Process(AbstractProcess):
 | 
			
		||||
        return pids[:num_returned]
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def get_pid_by_name(target_name: str) -> int or None:
 | 
			
		||||
    def get_pid_by_name(target_name: str) -> Optional[int]:
 | 
			
		||||
        for pid in Process.list_available_pids():
 | 
			
		||||
            try:
 | 
			
		||||
                logger.info('Checking name for pid {}'.format(pid))
 | 
			
		||||
@ -260,11 +262,10 @@ class Process(AbstractProcess):
 | 
			
		||||
            mbi_size = ctypes.sizeof(mbi)
 | 
			
		||||
 | 
			
		||||
            success = ctypes.windll.kernel32.VirtualQueryEx(
 | 
			
		||||
                    self.process_handle,
 | 
			
		||||
                    address,
 | 
			
		||||
                    mbi_ptr,
 | 
			
		||||
                    mbi_size,
 | 
			
		||||
                    )
 | 
			
		||||
                self.process_handle,
 | 
			
		||||
                address,
 | 
			
		||||
                mbi_ptr,
 | 
			
		||||
                mbi_size)
 | 
			
		||||
 | 
			
		||||
            if success != mbi_size:
 | 
			
		||||
                if success == 0:
 | 
			
		||||
@ -279,10 +280,11 @@ class Process(AbstractProcess):
 | 
			
		||||
        page_ptr = start
 | 
			
		||||
        while page_ptr < stop:
 | 
			
		||||
            page_info = get_mem_info(page_ptr)
 | 
			
		||||
            if page_info.Type == mem_types['MEM_PRIVATE'] and \
 | 
			
		||||
                    page_info.State == mem_states['MEM_COMMIT'] and \
 | 
			
		||||
                    page_info.Protect & page_protections['PAGE_READABLE'] != 0 and \
 | 
			
		||||
                    (page_info.Protect & page_protections['PAGE_READWRITEABLE'] != 0 or not writeable_only):
 | 
			
		||||
            if (page_info.Type == mem_types['MEM_PRIVATE']
 | 
			
		||||
                    and page_info.State == mem_states['MEM_COMMIT']
 | 
			
		||||
                    and page_info.Protect & page_protections['PAGE_READABLE'] != 0
 | 
			
		||||
                    and (page_info.Protect & page_protections['PAGE_READWRITEABLE'] != 0
 | 
			
		||||
                         or not writeable_only)):
 | 
			
		||||
                regions.append((page_ptr, page_ptr + page_info.RegionSize))
 | 
			
		||||
            page_ptr += page_info.RegionSize
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user