diff --git a/.gitignore b/.gitignore index 74cecaf..5525039 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ __pycache__/ tests/docs/api tests/docs/build .tox/ + +*.pyi \ No newline at end of file diff --git a/dissect/executable/__init__.py b/dissect/executable/__init__.py index 43d2a88..564a688 100644 --- a/dissect/executable/__init__.py +++ b/dissect/executable/__init__.py @@ -1,5 +1,7 @@ from dissect.executable.elf import ELF +from dissect.executable.pe import PE __all__ = [ "ELF", + "PE", ] diff --git a/dissect/executable/elf/elf.py b/dissect/executable/elf/elf.py index f2b827f..8a8963c 100644 --- a/dissect/executable/elf/elf.py +++ b/dissect/executable/elf/elf.py @@ -287,7 +287,14 @@ def patch(self, new_data: bytes) -> None: class SegmentTable(Table[Segment]): - def __init__(self, fh: BinaryIO, offset: int, entries: int, size: int, c_elf: cstruct = c_elf_64): + def __init__( + self, + fh: BinaryIO, + offset: int, + entries: int, + size: int, + c_elf: cstruct = c_elf_64, + ): super().__init__(entries) self.fh = fh self.offset = offset diff --git a/dissect/executable/exception.py b/dissect/executable/exception.py index b0fb678..0043c51 100644 --- a/dissect/executable/exception.py +++ b/dissect/executable/exception.py @@ -4,3 +4,28 @@ class Error(Exception): class InvalidSignatureError(Error): """Exception that occurs if the magic in the header does not match.""" + + +class InvalidPE(Error): + """Exception that occurs if the PE signature does not match.""" + + +class InvalidVA(Error): + """Exception that occurs when a virtual address is not found within the PE sections.""" + + +class InvalidAddress(Error): + """Exception that occurs when a raw address is not found within the PE file when translating from a virtual + address.""" + + +class InvalidArchitecture(Error): + """Exception that occurs when an invalid value is encountered for the PE architecture types.""" + + +class BuildSectionException(Error): + """Exception that occurs when the section to be build contains an error.""" + + +class ResourceException(Error): + """Exception that occurs when an error is thrown parsing the resources.""" diff --git a/dissect/executable/pe/__init__.py b/dissect/executable/pe/__init__.py index e69de29..66eb098 100644 --- a/dissect/executable/pe/__init__.py +++ b/dissect/executable/pe/__init__.py @@ -0,0 +1,25 @@ +from dissect.executable.pe.builder import Builder +from dissect.executable.pe.patcher import Patcher +from dissect.executable.pe.pe import PE +from dissect.executable.pe.sections.exports import ExportFunction, ExportManager +from dissect.executable.pe.sections.imports import ( + ImportFunction, + ImportManager, + ImportModule, +) +from dissect.executable.pe.sections.resources import Resource, ResourceManager +from dissect.executable.pe.sections.sections import PESection + +__all__ = [ + "PE", + "Builder", + "ExportFunction", + "ExportManager", + "ImportFunction", + "ImportManager", + "ImportModule", + "PESection", + "Patcher", + "Resource", + "ResourceManager", +] diff --git a/dissect/executable/pe/builder.py b/dissect/executable/pe/builder.py new file mode 100644 index 0000000..d1015c0 --- /dev/null +++ b/dissect/executable/pe/builder.py @@ -0,0 +1,451 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from io import BytesIO + +from dissect.executable import utils +from dissect.executable.exception import BuildSectionException +from dissect.executable.pe.c_pe import c_pe +from dissect.executable.pe.pe import PE + +STUB = b"\x0e\x1f\xba\x0e\x00\xb4\t\xcd!\xb8\x01L\xcd!This program is made with dissect.pe <3 kusjesvanSRT <3.\x0d\x0d\x0a$\x00\x00" # noqa: E501 + + +class Builder: + """Base class for building the PE file with the user applied patches. + + Args: + pe: A `PE` object. + arch: The architecture to use for the new PE. + dll: Whether the new PE should be a DLL or not. + subsystem: The subsystem to use for the new PE default uses IMAGE_SUBSYSTEM_WINDOWS_GUI. + """ + + def __init__( + self, + arch: str = "x64", + dll: bool = False, + subsystem: int = 0x2, + ): + self.arch = ( + c_pe.MachineType.IMAGE_FILE_MACHINE_AMD64 if arch == "x64" else c_pe.MachineType.IMAGE_FILE_MACHINE_I386 + ) + self.dll = dll + self.subsystem = subsystem + + self.pe: PE = None + + def new(self) -> None: + """Build the PE file from scratch. + + This function will build a new PE that consists of a single dummy section. It will not contain any imports, + exports, code, etc. + """ + + new_pe = BytesIO() + + # Generate the MZ header + self.mz_header = self.gen_mz_header() + + image_characteristics = self.get_characteristics() + # Generate the file header + self.file_header = self.gen_file_header(machine=self.arch, characteristics=image_characteristics) + + # Generate the optional header + self.optional_header = self.gen_optional_header() + + # Add a dummy section header to the new PE, we need at least 1 section to parse the PE + dummy_data = b"<3kusjesvanSRT<3" + dummy_multiplier = 0x400 // len(dummy_data) + + section_header_offset = self.optional_header.SizeOfHeaders + pointer_to_raw_data = utils.align_int( + integer=section_header_offset + c_pe.IMAGE_SECTION_HEADER.size, + blocksize=self.file_alignment, + ) + dummy_section = self.section( + pointer_to_raw_data=pointer_to_raw_data, + virtual_address=self.optional_header.BaseOfCode, + virtual_size=dummy_multiplier, + raw_size=dummy_multiplier, + characteristics=c_pe.SectionFlags.IMAGE_SCN_CNT_CODE + | c_pe.SectionFlags.IMAGE_SCN_MEM_EXECUTE + | c_pe.SectionFlags.IMAGE_SCN_MEM_READ + | c_pe.SectionFlags.IMAGE_SCN_MEM_NOT_PAGED, + ) + # Update the number of sections in the file header + self.file_header.NumberOfSections += 1 + + # Write the headers into the new PE + new_pe.write(self.mz_header.dumps()) + new_pe.write(STUB) + new_pe.seek(self.mz_header.e_lfanew) + new_pe.write(b"PE\x00\x00") + new_pe.write(self.file_header.dumps()) + new_pe.write(self.optional_header.dumps()) + + # Write the dummy section header + new_pe.write(dummy_section.dumps()) + + # Write the data of the section + new_pe.seek(dummy_section.PointerToRawData) + new_pe.write(dummy_data * dummy_multiplier) + + self.pe = PE(pe_file=new_pe) + + # Fix our SizeOfImage field in the optional header + self.pe.optional_header.SizeOfImage = self.pe_size + + def gen_mz_header( + self, + e_magic: int = 0x5A4D, + e_cblp: int = 0, + e_cp: int = 1, + e_crlc: int = 0, + e_cparhdr: int = 4, + e_minalloc: int = 0, + e_maxalloc: int = 0, + e_ss: int = 0, + e_sp: int = 0, + e_csum: int = 0, + e_ip: int = 0, + e_cs: int = 0, + e_lfarlc: int = 64, + e_ovno: int = 0, + e_res: list[int] | None = None, + e_oemid: int = 0, + e_oeminfo: int = 0, + e_res2: list[int] | None = None, + e_lfanew: int = 0, + ) -> c_pe.IMAGE_DOS_HEADER: + """Generate the MZ header for the new PE file. + + Args: + e_magic: The magic number for the MZ header. + e_cblp: The number of bytes on the last page of the file. + e_cp: The number of pages in the file. + e_crlc: The number of relocations. + e_cparhdr: The number of paragraphs in the header. + e_minalloc: The minimum number of paragraphs in the program. + e_maxalloc: The maximum number of paragraphs in the program. + e_ss: The relative value of the stack segment. + e_sp: The initial value of the stack pointer. + e_csum: The checksum. + e_ip: The initial value of the instruction pointer. + e_cs: The relative value of the code segment. + e_lfarlc: The file address of the relocation table. + e_ovno: The overlay number. + e_res: The reserved words. + e_oemid: The OEM identifier. + e_oeminfo: The OEM information. + e_res2: The reserved words. + e_lfanew: The file address of the new exe header. + + Returns: + The MZ header as a `cstruct` object. + """ + + mz_header = c_pe.IMAGE_DOS_HEADER() + + mz_header.e_magic = e_magic + mz_header.e_cblp = e_cblp + mz_header.e_cp = e_cp + mz_header.e_crlc = e_crlc + mz_header.e_cparhdr = e_cparhdr + mz_header.e_minalloc = e_minalloc + mz_header.e_maxalloc = e_maxalloc + mz_header.e_ss = e_ss + mz_header.e_sp = e_sp + mz_header.e_csum = e_csum + mz_header.e_ip = e_ip + mz_header.e_cs = e_cs + mz_header.e_lfarlc = e_lfarlc + mz_header.e_ovno = e_ovno + mz_header.e_res = e_res or [0, 0, 0, 0] + mz_header.e_oemid = e_oemid + mz_header.e_oeminfo = e_oeminfo + mz_header.e_res2 = e_res2 or [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + + # Calculate the start of the NT headers by checking the location and size of the relocation table + # within the MZ header + start_of_nt_header = (mz_header.e_lfarlc + (mz_header.e_crlc * 4)) + len(STUB) + mz_header.e_lfanew = e_lfanew if e_lfanew else start_of_nt_header + # Align the e_lfanew value + mz_header.e_lfanew = mz_header.e_lfanew + (mz_header.e_lfanew % 2) + + return mz_header + + def get_characteristics(self) -> int: + """Function to retreive the characteristics that are set based on the kind of PE file that needs to be + generated. + + For now it will only contain the main characteristics of a PE file, like if it's an executable image and/or a + DLL. + + Returns: + The characteristics of the PE file. + """ + + output = c_pe.ImageCharacteristics.IMAGE_FILE_EXECUTABLE_IMAGE + if self.arch != c_pe.MachineType.IMAGE_FILE_MACHINE_AMD64: + output |= c_pe.ImageCharacteristics.IMAGE_FILE_32BIT_MACHINE + + if self.dll: + output |= c_pe.ImageCharacteristics.IMAGE_FILE_DLL + + return output + + def gen_file_header( + self, + time_date_stamp: int = 0, + pointer_to_symbol_table: int = 0, + number_of_symbols: int = 0, + size_of_optional_header: int = 0, + characteristics: int = 0, + machine: int = 0x8664, + number_of_sections: int = 0, + ) -> c_pe.IMAGE_FILE_HEADER: + """Generate the file header for the new PE file. + + Args: + time_date_stamp: The time and date the file was created. + pointer_to_symbol_table: The file pointer to the COFF symbol table. + number_of_symbols: The number of entries in the symbol table. + size_of_optional_header: The size of the optional header. + characteristics: The characteristics of the file. + machine: The machine type. + number_of_sections: The number of sections. + + Returns: + The file header as a `cstruct` object. + """ + + # Set the size of the optional header if not given + if not size_of_optional_header: + if machine == 0x8664: + size_of_optional_header = len(c_pe.IMAGE_OPTIONAL_HEADER64) + self.machine = 0x8664 + else: + size_of_optional_header = len(c_pe.IMAGE_OPTIONAL_HEADER) + self.machine = 0x14C + + # Set the timestamp to now if not given + if not time_date_stamp: + time_date_stamp = int(datetime.now(tz=timezone.utc).timestamp()) + + file_header = c_pe.IMAGE_FILE_HEADER() + file_header.Machine = c_pe.MachineType(machine) + file_header.NumberOfSections = number_of_sections + file_header.TimeDateStamp = time_date_stamp + file_header.PointerToSymbolTable = pointer_to_symbol_table + file_header.NumberOfSymbols = number_of_symbols + file_header.SizeOfOptionalHeader = size_of_optional_header + file_header.Characteristics = c_pe.ImageCharacteristics(characteristics) + + return file_header + + def gen_optional_header( + self, + magic: int = 0, + major_linker_version: int = 0xE, + minor_linker_version: int = 0, + size_of_code: int = 0, + size_of_initialized_data: int = 0, + size_of_uninitialized_data: int = 0, + address_of_entrypoint: int = 0, + base_of_code: int = 0x1000, + imagebase: int = 0x69000, + section_alignment: int = 0x1000, + file_alignment: int = 0x200, + major_os_version: int = 0x5, + minor_os_version: int = 0x2, + major_image_version: int = 0, + minor_image_version: int = 0, + major_subsystem_version: int = 0x5, + minor_subsystem_version: int = 0x2, + win32_version_value: int = 0, + size_of_image: int = 0, + size_of_headers: int = 0x400, + checksum: int = 0, + subsystem: int = 0x2, + dll_characteristics: int = 0, + size_of_stack_reserve: int = 0x1000, + size_of_stack_commit: int = 0x1000, + size_of_heap_reserve: int = 0x1000, + size_of_heap_commit: int = 0x1000, + loaderflags: int = 0, + number_of_rva_and_sizes: int = c_pe.IMAGE_NUMBEROF_DIRECTORY_ENTRIES, + datadirectory: list[c_pe.IMAGE_DATA_DIRECTORY] | None = None, + ) -> c_pe.IMAGE_OPTIONAL_HEADER | c_pe.IMAGE_OPTIONAL_HEADER64: + """Generate the optional header for the new PE file. + + Args: + magic: The magic number for the optional header, this indicates the architecture for the PE. + major_linker_version: The major version of the linker. + minor_linker_version: The minor version of the linker. + size_of_code: The size of the code section. + size_of_initialized_data: The size of the initialized data section. + size_of_uninitialized_data: The size of the uninitialized data section. + address_of_entrypoint: The address of the entry point. + base_of_code: The base of the code section. + imagebase: The base address of the image. + section_alignment: The alignment of sections in memory. + file_alignment: The alignment of sections in the file. + major_os_version: The major version of the operating system. + minor_os_version: The minor version of the operating system. + major_image_version: The major version of the image. + minor_image_version: The minor version of the image. + major_subsystem_version: The major version of the subsystem. + minor_subsystem_version: The minor version of the subsystem. + win32_version_value: The Win32 version value. + size_of_image: The size of the image. + size_of_headers: The size of the headers. + checksum: The checksum of the image. + subsystem: The subsystem of the image. + dll_characteristics: The DLL characteristics of the image. + size_of_stack_reserve: The size of the stack to reserve. + size_of_stack_commit: The size of the stack to commit. + size_of_heap_reserve: The size of the heap to reserve. + size_of_heap_commit: The size of the heap to commit. + loaderflags: The loader flags. + number_of_rva_and_sizes: The number of RVA and sizes. + datadirectory: The data directory entries, initialized as nullbyte directories. + + Returns: + The optional header as a `cstruct` object. + """ + + if self.machine == 0x8664: + optional_header = c_pe.IMAGE_OPTIONAL_HEADER64() + _magic = 0x20B + else: + optional_header = c_pe.IMAGE_OPTIONAL_HEADER() + _magic = 0x10B + + optional_header.Magic = magic or _magic + self.file_alignment = file_alignment + self.section_alignment = section_alignment + + # Calculate the SizeOfHeaders field, we add the length of a section header because we know there's going to be + # at least 1 section header + size_of_headers = utils.align_int( + integer=len(self.mz_header) + + len(STUB) + + len(b"PE\x00\x00") + + len(self.file_header) + + len(optional_header) + + len(c_pe.IMAGE_SECTION_HEADER), + blocksize=file_alignment, + ) + + optional_header.MajorLinkerVersion = major_linker_version + optional_header.MinorLinkerVersion = minor_linker_version + optional_header.SizeOfCode = size_of_code + optional_header.SizeOfInitializedData = size_of_initialized_data + optional_header.SizeOfUninitializedData = size_of_uninitialized_data + optional_header.AddressOfEntryPoint = address_of_entrypoint + optional_header.BaseOfCode = base_of_code + optional_header.ImageBase = imagebase + optional_header.SectionAlignment = section_alignment + optional_header.FileAlignment = file_alignment + optional_header.MajorOperatingSystemVersion = major_os_version + optional_header.MinorOperatingSystemVersion = minor_os_version + optional_header.MajorImageVersion = major_image_version + optional_header.MinorImageVersion = minor_image_version + optional_header.MajorSubsystemVersion = major_subsystem_version + optional_header.MinorSubsystemVersion = minor_subsystem_version + optional_header.Win32VersionValue = win32_version_value + optional_header.SizeOfImage = size_of_image + optional_header.SizeOfHeaders = size_of_headers + optional_header.CheckSum = checksum + optional_header.Subsystem = c_pe.WindowsSubsystem(subsystem) + optional_header.DllCharacteristics = c_pe.DLLCharacteristics(dll_characteristics) + optional_header.SizeOfStackReserve = size_of_stack_reserve + optional_header.SizeOfStackCommit = size_of_stack_commit + optional_header.SizeOfHeapReserve = size_of_heap_reserve + optional_header.SizeOfHeapCommit = size_of_heap_commit + optional_header.LoaderFlags = loaderflags + optional_header.NumberOfRvaAndSizes = number_of_rva_and_sizes + optional_header.DataDirectory = datadirectory or [ + c_pe.IMAGE_DATA_DIRECTORY(BytesIO(b"\x00" * len(c_pe.IMAGE_DATA_DIRECTORY))) + for _ in range(c_pe.IMAGE_NUMBEROF_DIRECTORY_ENTRIES) + ] + + return optional_header + + def section( + self, + pointer_to_raw_data: int, + name: str | bytes = b".dissect", + virtual_size: int = 0x1000, + virtual_address: int = 0x1000, + raw_size: int = 0x200, + pointer_to_relocations: int = 0, + pointer_to_linenumbers: int = 0, + number_of_relocations: int = 0, + number_of_linenumbers: int = 0, + characteristics: int = 0x68000020, + ) -> c_pe.IMAGE_SECTION_HEADER: + """Build a new section for the PE. + + The default characteristics of the new section will be: + - IMAGE_SCN_CNT_CODE + - IMAGE_SCN_MEM_EXECUTE + - IMAGE_SCN_MEM_READ + - IMAGE_SCN_MEM_NOT_PAGED + + Args: + pointer_to_raw_data: The file pointer to the raw data of the new section. + name: The new section name, default: .dissect + virtual_size: The virtual size of the new section data. + virtual_address: The virtual address where the new section is located. + raw_size: The size of the section data. + pointer_to_relocations: The file pointer to the relocation table. + pointer_to_linenumbers: The file pointer to the line number table. + number_of_relocations: The number of relocations. + number_of_linenumbers: The number of line numbers. + characteristics: The characteristics of the new section. + + Returns: + The new section header as a `cstruct` object. + """ + + if len(name) > 8: + raise BuildSectionException("section names can't be longer than 8 characters") + + if isinstance(name, str): + name = name.encode() + + section_header = c_pe.IMAGE_SECTION_HEADER() + + pointer_to_raw_data = utils.align_int(integer=pointer_to_raw_data, blocksize=self.file_alignment) + + section_header.Name = name + utils.pad(size=8 - len(name)) + section_header.VirtualSize = virtual_size + section_header.VirtualAddress = virtual_address + section_header.SizeOfRawData = raw_size + section_header.PointerToRawData = pointer_to_raw_data + section_header.PointerToRelocations = pointer_to_relocations + section_header.PointerToLinenumbers = pointer_to_linenumbers + section_header.NumberOfRelocations = number_of_relocations + section_header.NumberOfLinenumbers = number_of_linenumbers + section_header.Characteristics = c_pe.SectionFlags(characteristics) + + return section_header + + @property + def pe_size(self) -> int: + """Calculate the new PE size. + + We can calculate the new size of the PE by adding the virtual address and virtual size of the last section + together. + + Returns: + The size of the PE. + """ + + last_section = self.pe.sections.last_section(patch=True) + va = last_section.virtual_address + size = last_section.virtual_size + + return utils.align_int(integer=(va + size), blocksize=self.section_alignment) diff --git a/dissect/executable/pe/c_pe.py b/dissect/executable/pe/c_pe.py new file mode 100755 index 0000000..ff5e449 --- /dev/null +++ b/dissect/executable/pe/c_pe.py @@ -0,0 +1,462 @@ +from dissect.cstruct import cstruct + +c_pe_def = """ +#define IMAGE_NUMBEROF_DIRECTORY_ENTRIES 16 +#define IMAGE_SIZEOF_SHORT_NAME 8 + +#define IMAGE_DIRECTORY_ENTRY_EXPORT 0 // Export Directory +#define IMAGE_DIRECTORY_ENTRY_IMPORT 1 // Import Directory +#define IMAGE_DIRECTORY_ENTRY_RESOURCE 2 // Resource Directory +#define IMAGE_DIRECTORY_ENTRY_EXCEPTION 3 // Exception Directory +#define IMAGE_DIRECTORY_ENTRY_SECURITY 4 // Security Directory +#define IMAGE_DIRECTORY_ENTRY_BASERELOC 5 // Base Relocation Table +#define IMAGE_DIRECTORY_ENTRY_DEBUG 6 // Debug Directory +// IMAGE_DIRECTORY_ENTRY_COPYRIGHT 7 // (X86 usage) +#define IMAGE_DIRECTORY_ENTRY_ARCHITECTURE 7 // Architecture Specific Data +#define IMAGE_DIRECTORY_ENTRY_GLOBALPTR 8 // RVA of GP +#define IMAGE_DIRECTORY_ENTRY_TLS 9 // TLS Directory +#define IMAGE_DIRECTORY_ENTRY_LOAD_CONFIG 10 // Load Configuration Directory +#define IMAGE_DIRECTORY_ENTRY_BOUND_IMPORT 11 // Bound Import Directory in headers +#define IMAGE_DIRECTORY_ENTRY_IAT 12 // Import Address Table +#define IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT 13 // Delay Load Import Descriptors +#define IMAGE_DIRECTORY_ENTRY_COM_DESCRIPTOR 14 // COM Runtime descriptor + +// --- PE HEADERS --- + +typedef struct IMAGE_DOS_HEADER { + WORD e_magic; + WORD e_cblp; + WORD e_cp; + WORD e_crlc; + WORD e_cparhdr; + WORD e_minalloc; + WORD e_maxalloc; + WORD e_ss; + WORD e_sp; + WORD e_csum; + WORD e_ip; + WORD e_cs; + WORD e_lfarlc; + WORD e_ovno; + WORD e_res[4]; + WORD e_oemid; + WORD e_oeminfo; + WORD e_res2[10]; + LONG e_lfanew; +}; + +typedef enum MachineType : WORD { + IMAGE_FILE_MACHINE_UNKNOWN = 0x0, + IMAGE_FILE_MACHINE_AM33 = 0x1d3, + IMAGE_FILE_MACHINE_AMD64 = 0x8664, + IMAGE_FILE_MACHINE_ARM = 0x1c0, + IMAGE_FILE_MACHINE_ARM64 = 0xaa64, + IMAGE_FILE_MACHINE_ARMNT = 0x1c4, + IMAGE_FILE_MACHINE_EBC = 0xebc, + IMAGE_FILE_MACHINE_I386 = 0x14c, + IMAGE_FILE_MACHINE_IA64 = 0x200, + IMAGE_FILE_MACHINE_M32R = 0x9041, + IMAGE_FILE_MACHINE_MIPS16 = 0x266, + IMAGE_FILE_MACHINE_MIPSFPU = 0x366, + IMAGE_FILE_MACHINE_MIPSFPU16 = 0x466, + IMAGE_FILE_MACHINE_POWERPC = 0x1f0, + IMAGE_FILE_MACHINE_POWERPCFP = 0x1f1, + IMAGE_FILE_MACHINE_R4000 = 0x166, + IMAGE_FILE_MACHINE_RISCV32 = 0x5032, + IMAGE_FILE_MACHINE_RISCV64 = 0x5064, + IMAGE_FILE_MACHINE_RISCV128 = 0x5128, + IMAGE_FILE_MACHINE_SH3 = 0x1a2, + IMAGE_FILE_MACHINE_SH3DSP = 0x1a3, + IMAGE_FILE_MACHINE_SH4 = 0x1a6, + IMAGE_FILE_MACHINE_SH5 = 0x1a8, + IMAGE_FILE_MACHINE_THUMB = 0x1c2, + IMAGE_FILE_MACHINE_WCEMIPSV2 = 0x169, +}; + +flag ImageCharacteristics : WORD { + IMAGE_FILE_RELOCS_STRIPPED = 0x0001, + IMAGE_FILE_EXECUTABLE_IMAGE = 0x0002, + IMAGE_FILE_LINE_NUMS_STRIPPED = 0x0004, + IMAGE_FILE_LOCAL_SYMS_STRIPPED = 0x0008, + IMAGE_FILE_AGGRESSIVE_WS_TRIM = 0x0010, + IMAGE_FILE_LARGE_ADDRESS_AWARE = 0x0020, + Reserved = 0x0040, + IMAGE_FILE_BYTES_REVERSED_LO = 0x0080, + IMAGE_FILE_32BIT_MACHINE = 0x0100, + IMAGE_FILE_DEBUG_STRIPPED = 0x0200, + IMAGE_FILE_REMOVABLE_RUN_FROM_SWAP = 0x0400, + IMAGE_FILE_NET_RUN_FROM_SWAP = 0x0800, + IMAGE_FILE_SYSTEM = 0x1000, + IMAGE_FILE_DLL = 0x2000, + IMAGE_FILE_UP_SYSTEM_ONLY = 0x4000, + IMAGE_FILE_BYTES_REVERSED_HI = 0x8000, +}; + +typedef struct IMAGE_FILE_HEADER { + MachineType Machine; + WORD NumberOfSections; + DWORD TimeDateStamp; + DWORD PointerToSymbolTable; + DWORD NumberOfSymbols; + WORD SizeOfOptionalHeader; + ImageCharacteristics Characteristics; +}; + +typedef struct IMAGE_DATA_DIRECTORY { + ULONG VirtualAddress; + ULONG Size; +}; + +typedef enum WindowsSubsystem : WORD { + IMAGE_SUBSYSTEM_UNKNOWN = 0, + IMAGE_SUBSYSTEM_NATIVE = 1, + IMAGE_SUBSYSTEM_WINDOWS_GUI = 2, + IMAGE_SUBSYSTEM_WINDOWS_CUI = 3, + IMAGE_SUBSYSTEM_OS2_CUI = 5, + IMAGE_SUBSYSTEM_POSIX_CUI = 7, + IMAGE_SUBSYSTEM_NATIVE_WINDOWS = 8, + IMAGE_SUBSYSTEM_WINDOWS_CE_GUI = 9, + IMAGE_SUBSYSTEM_EFI_APPLICATION = 10, + IMAGE_SUBSYSTEM_EFI_BOOT_SERVICE_DRIVER = 11, + IMAGE_SUBSYSTEM_EFI_RUNTIME_DRIVER = 12, + IMAGE_SUBSYSTEM_EFI_ROM = 13, + IMAGE_SUBSYSTEM_XBOX = 14, + IMAGE_SUBSYSTEM_WINDOWS_BOOT_APPLICATION = 16, +}; + +typedef enum DLLCharacteristics : WORD { + IMAGE_DLLCHARACTERISTICS_HIGH_ENTROPY_VA = 0x0020, + IMAGE_DLLCHARACTERISTICS_DYNAMIC_BASE = 0x0040, + IMAGE_DLLCHARACTERISTICS_FORCE_INTEGRITY = 0x0080, + IMAGE_DLLCHARACTERISTICS_NX_COMPAT = 0x0100, + IMAGE_DLLCHARACTERISTICS_NO_ISOLATION = 0x0200, + IMAGE_DLLCHARACTERISTICS_NO_SEH = 0x0400, + IMAGE_DLLCHARACTERISTICS_NO_BIND = 0x0800, + IMAGE_DLLCHARACTERISTICS_APPCONTAINER = 0x1000, + IMAGE_DLLCHARACTERISTICS_WDM_DRIVER = 0x2000, + IMAGE_DLLCHARACTERISTICS_GUARD_CF = 0x4000, + IMAGE_DLLCHARACTERISTICS_TERMINAL_SERVER_AWARE = 0x8000, +}; + +typedef struct IMAGE_OPTIONAL_HEADER { + WORD Magic; + BYTE MajorLinkerVersion; + BYTE MinorLinkerVersion; + DWORD SizeOfCode; + DWORD SizeOfInitializedData; + DWORD SizeOfUninitializedData; + DWORD AddressOfEntryPoint; + DWORD BaseOfCode; + DWORD BaseOfData; + DWORD ImageBase; + DWORD SectionAlignment; + DWORD FileAlignment; + WORD MajorOperatingSystemVersion; + WORD MinorOperatingSystemVersion; + WORD MajorImageVersion; + WORD MinorImageVersion; + WORD MajorSubsystemVersion; + WORD MinorSubsystemVersion; + DWORD Win32VersionValue; + DWORD SizeOfImage; + DWORD SizeOfHeaders; + DWORD CheckSum; + WindowsSubsystem Subsystem; + DLLCharacteristics DllCharacteristics; + DWORD SizeOfStackReserve; + DWORD SizeOfStackCommit; + DWORD SizeOfHeapReserve; + DWORD SizeOfHeapCommit; + DWORD LoaderFlags; + DWORD NumberOfRvaAndSizes; + IMAGE_DATA_DIRECTORY DataDirectory[IMAGE_NUMBEROF_DIRECTORY_ENTRIES]; +}; + +typedef struct IMAGE_OPTIONAL_HEADER64 { + WORD Magic; + BYTE MajorLinkerVersion; + BYTE MinorLinkerVersion; + DWORD SizeOfCode; + DWORD SizeOfInitializedData; + DWORD SizeOfUninitializedData; + DWORD AddressOfEntryPoint; + DWORD BaseOfCode; + ULONGLONG ImageBase; + DWORD SectionAlignment; + DWORD FileAlignment; + WORD MajorOperatingSystemVersion; + WORD MinorOperatingSystemVersion; + WORD MajorImageVersion; + WORD MinorImageVersion; + WORD MajorSubsystemVersion; + WORD MinorSubsystemVersion; + DWORD Win32VersionValue; + DWORD SizeOfImage; + DWORD SizeOfHeaders; + DWORD CheckSum; + WORD Subsystem; + WORD DllCharacteristics; + ULONGLONG SizeOfStackReserve; + ULONGLONG SizeOfStackCommit; + ULONGLONG SizeOfHeapReserve; + ULONGLONG SizeOfHeapCommit; + DWORD LoaderFlags; + DWORD NumberOfRvaAndSizes; + IMAGE_DATA_DIRECTORY DataDirectory[IMAGE_NUMBEROF_DIRECTORY_ENTRIES]; +}; + +typedef struct IMAGE_NT_HEADERS { + DWORD Signature; + IMAGE_FILE_HEADER FileHeader; + IMAGE_OPTIONAL_HEADER OptionalHeader; +}; + +typedef struct IMAGE_NT_HEADERS64 { + DWORD Signature; + IMAGE_FILE_HEADER FileHeader; + IMAGE_OPTIONAL_HEADER64 OptionalHeader; +}; + +flag SectionFlags : DWORD { + IMAGE_SCN_TYPE_NO_PAD = 0x00000008, + IMAGE_SCN_CNT_CODE = 0x00000020, + IMAGE_SCN_CNT_INITIALIZED_DATA = 0x00000040, + IMAGE_SCN_CNT_UNINITIALIZED_DATA = 0x00000080, + IMAGE_SCN_LNK_OTHER = 0x00000100, + IMAGE_SCN_LNK_INFO = 0x00000200, + IMAGE_SCN_LNK_REMOVE = 0x00000800, + IMAGE_SCN_LNK_COMDAT = 0x00001000, + IMAGE_SCN_NO_DEFER_SPEC_EXC = 0x00004000, + IMAGE_SCN_GPREL = 0x00008000, + IMAGE_SCN_MEM_FARDATA = 0x00008000, + IMAGE_SCN_MEM_PURGEABLE = 0x00020000, + IMAGE_SCN_MEM_16BIT = 0x00020000, + IMAGE_SCN_MEM_LOCKED = 0x00040000, + IMAGE_SCN_MEM_PRELOAD = 0x00080000, + IMAGE_SCN_ALIGN_1BYTES = 0x00100000, + IMAGE_SCN_ALIGN_2BYTES = 0x00200000, + IMAGE_SCN_ALIGN_4BYTES = 0x00300000, + IMAGE_SCN_ALIGN_8BYTES = 0x00400000, + IMAGE_SCN_ALIGN_16BYTES = 0x00500000, + IMAGE_SCN_ALIGN_32BYTES = 0x00600000, + IMAGE_SCN_ALIGN_64BYTES = 0x00700000, + IMAGE_SCN_ALIGN_128BYTES = 0x00800000, + IMAGE_SCN_ALIGN_256BYTES = 0x00900000, + IMAGE_SCN_ALIGN_512BYTES = 0x00A00000, + IMAGE_SCN_ALIGN_1024BYTES = 0x00B00000, + IMAGE_SCN_ALIGN_2048BYTES = 0x00C00000, + IMAGE_SCN_ALIGN_4096BYTES = 0x00D00000, + IMAGE_SCN_ALIGN_8192BYTES = 0x00E00000, + IMAGE_SCN_LNK_NRELOC_OVFL = 0x01000000, + IMAGE_SCN_MEM_DISCARDABLE = 0x02000000, + IMAGE_SCN_MEM_NOT_CACHED = 0x04000000, + IMAGE_SCN_MEM_NOT_PAGED = 0x08000000, + IMAGE_SCN_MEM_SHARED = 0x10000000, + IMAGE_SCN_MEM_EXECUTE = 0x20000000, + IMAGE_SCN_MEM_READ = 0x40000000, + IMAGE_SCN_MEM_WRITE = 0x80000000 +}; + +typedef struct IMAGE_SECTION_HEADER { + char Name[IMAGE_SIZEOF_SHORT_NAME]; + ULONG VirtualSize; + ULONG VirtualAddress; + ULONG SizeOfRawData; + ULONG PointerToRawData; + ULONG PointerToRelocations; + ULONG PointerToLinenumbers; + USHORT NumberOfRelocations; + USHORT NumberOfLinenumbers; + SectionFlags Characteristics; +}; + +// --- END OF PE HEADERS + +// --- EXPORTS + +typedef struct IMAGE_EXPORT_DIRECTORY { + ULONG Characteristics; + ULONG TimeDateStamp; + USHORT MajorVersion; + USHORT MinorVersion; + ULONG Name; + ULONG Base; + ULONG NumberOfFunctions; + ULONG NumberOfNames; + ULONG AddressOfFunctions; // RVA from base of image + ULONG AddressOfNames; // RVA from base of image + ULONG AddressOfNameOrdinals; // RVA from base of image +}; + +// --- END OF EXPORTS + +// --- IMPORTS + +typedef struct IMAGE_IMPORT_DESCRIPTOR { + DWORD OriginalFirstThunk; + DWORD TimeDateStamp; + DWORD ForwarderChain; + DWORD Name; + DWORD FirstThunk; +}; + +typedef struct IMAGE_IMPORT_BY_NAME { + uint16 Hint; + // char Name; +}; + +typedef struct IMAGE_THUNK_DATA32 { + union { + DWORD ForwarderString; + DWORD Function; + DWORD Ordinal; + DWORD AddressOfData; + } u1; +}; + +typedef struct IMAGE_THUNK_DATA64 { + union { + ULONGLONG ForwarderString; + ULONGLONG Function; + ULONGLONG Ordinal; + ULONGLONG AddressOfData; + } u1; +} + +// --- END OF IMPORTS + +// --- RESOURCE DIRECTORY + +enum ResourceID : WORD { + Cursor = 0x1, + Bitmap = 0x2, + Icon = 0x3, + Menu = 0x4, + Dialog = 0x5, + String = 0x6, + FontDirectory = 0x7, + Font = 0x8, + Accelerator = 0x9, + RcData = 0xA, + MessageTable = 0xB, + Version = 0x10, + DlgInclude = 0x11, + PlugAndPlay = 0x13, + VXD = 0x14, + AnimatedCursor = 0x15, + AnimatedIcon = 0x16, + HTML = 0x17, + Manifest = 0x18, +}; + +struct IMAGE_RESOURCE_DIRECTORY_ENTRY { + union { + struct { + DWORD NameOffset:31; + DWORD NameIsString:1; + }; + DWORD Name; + WORD Id; + }; + union { + DWORD OffsetToData; + struct { + DWORD OffsetToDirectory:31; + DWORD DataIsDirectory:1; + }; + }; +} + +struct IMAGE_RESOURCE_DIRECTORY { + uint32 Characteristics; + uint32 TimeDateStamp; + ushort MajorVersion; + ushort MinorVersion; + ushort NumberOfNamedEntries; + ushort NumberOfIdEntries; +}; + +/* +struct IMAGE_RESOURCE_DIRECTORY_ENTRY { + uint32 Name; + uint32 OffsetToDirectory:31; + uint32 DataIsDirectory:1; +}; +*/ + +typedef struct IMAGE_RESOURCE_DATA_ENTRY { + uint32 OffsetToData; + uint32 Size; + uint32 CodePage; + uint32 Reserved; +}; + +// --- END OF RESOURCE DIRECTORY + +// --- DEBUG DIRECTORY + +typedef struct IMAGE_DEBUG_DIRECTORY { + DWORD Characteristics; + DWORD TimeDateStamp; + WORD MajorVersion; + WORD MinorVersion; + DWORD Type; + DWORD SizeOfData; + DWORD AddressOfRawData; + DWORD PointerToRawData; +}; + +// --- END OF DEBUG DIRECTORY + +// --- RELOCATION DIRECTORY + +typedef struct _IMAGE_BASE_RELOCATION { + DWORD VirtualAddress; + DWORD SizeOfBlock; +// WORD TypeOffset[1]; +} IMAGE_BASE_RELOCATION; + +// --- END OF RELOCATION DIRECTORY + +// --- TLS DIRECTORY + +typedef struct _IMAGE_TLS_DIRECTORY32 { + DWORD StartAddressOfRawData; + DWORD EndAddressOfRawData; + DWORD AddressOfIndex; // PDWORD + DWORD AddressOfCallBacks; // PIMAGE_TLS_CALLBACK * + DWORD SizeOfZeroFill; + DWORD Characteristics; +} IMAGE_TLS_DIRECTORY32; + +typedef struct _IMAGE_TLS_DIRECTORY64 { + ULONGLONG StartAddressOfRawData; + ULONGLONG EndAddressOfRawData; + ULONGLONG AddressOfIndex; + ULONGLONG AddressOfCallBacks; + DWORD SizeOfZeroFill; + DWORD Characteristics; +} IMAGE_TLS_DIRECTORY64; + +// --- END OF TLS DIRECTORY +""" + +c_pe = cstruct().load(c_pe_def) + +c_cv_info_def = """ +struct GUID { + DWORD Data1; + WORD Data2; + WORD Data3; + char Data4[8]; +}; + +struct CV_INFO_PDB70 { + DWORD CvSignature; + GUID Signature; // unique identifier + DWORD Age; // an always-incrementing value + char PdbFileName[]; // zero terminated string with the name of the PDB file +}; +""" + +c_cv_info = cstruct().load(c_cv_info_def) diff --git a/dissect/executable/pe/patcher.py b/dissect/executable/pe/patcher.py new file mode 100644 index 0000000..fa9599c --- /dev/null +++ b/dissect/executable/pe/patcher.py @@ -0,0 +1,336 @@ +from __future__ import annotations + +from io import BytesIO +from typing import TYPE_CHECKING + +from dissect.executable import utils +from dissect.executable.pe.c_pe import c_pe + +if TYPE_CHECKING: + from dissect.executable import PE + + +class Patcher: + """Class that is used to patch existing PE files with the changes made by the user. + + Args: + pe: A `PE` object that contains the original PE file. + """ + + def __init__(self, pe: PE): + self.pe = pe + self.patched_pe = BytesIO() + self._section_manager = pe.sections + self.functions = [] + + def build(self) -> BytesIO: + """Build the patched PE file. + + This function will return a new PE file as a `BytesIO` object that contains the new PE file. + + Returns: + The patched PE file as a `BytesIO` object. + """ + + # Update the SizeOfImage + self.pe.optional_header.SizeOfImage = self.pe_size + + self.patched_pe.seek(0) + + # Build the section table and add the sections + self._build_section_table() + + # Apply the patches + self._patch_rvas() + + # Add the MZ, File and NT headers + self.patched_pe.seek(0) + self._build_dos_header() + + # Reset the file pointer + self.patched_pe.seek(0) + return self.patched_pe + + @property + def pe_size(self) -> int: + """Calculate the new PE size. + + We can calculate the new size of the PE by looking at the ending of the last section. + + Returns: + The new size of the PE as an `int`. + """ + + last_section = self.pe.sections.last_section() + va = last_section.virtual_address + size = last_section.virtual_size + + return utils.align_int(integer=va + size, blocksize=self.pe.optional_header.SectionAlignment) + + def seek(self, address: int) -> None: + """Seek that is used to seek to a virtual address in the patched PE file. + + Args: + address: The virtual address to seek to. + """ + + raw_address = self.pe.virtual_address(physical_address=address) + self.patched_pe.seek(raw_address) + + def _build_section_table(self) -> None: + """Function to build the section table and add the sections with their data.""" + + if self.patched_pe.tell() < self.pe.section_header_offset: + # Pad the patched file with null bytes until we reach the section header offset + self.patched_pe.write(utils.pad(size=self.pe.section_header_offset - self.patched_pe.tell())) + + # Write the section headers + for section in self._section_manager.sections(patch=True).values(): + self.patched_pe.write(section.dump()) + + # Add the data for each section + for section in self._section_manager.sections(patch=True).values(): + self.patched_pe.seek(section.pointer_to_raw_data) + self.patched_pe.write(section.data) + + def _build_dos_header(self) -> None: + """Function to build the DOS header, NT headers and the DOS stub.""" + + # Add the MZ + self.patched_pe.write(self.pe.mz_header.dumps()) + + # Add the DOS stub + stub_size = self.pe.mz_header.e_lfanew - self.patched_pe.tell() + dos_stub = self.pe.raw_read(offset=self.patched_pe.tell(), size=stub_size) + self.patched_pe.write(dos_stub) + + # Add the NT headers + self.patched_pe.seek(self.pe.mz_header.e_lfanew) + self.patched_pe.write(b"PE\x00\x00") + self.patched_pe.write(self.pe.file_header.dumps()) + self.patched_pe.write(self.pe.optional_header.dumps()) + + def _patch_rvas(self) -> None: + """Function to call the different patch functions responsible for patching any kind of relative addressing.""" + + self._patch_import_rvas() + self._patch_export_rvas() + self._patch_rsrc_rvas() + self._patch_tls_rvas() + + def _patch_import_rvas(self) -> None: + """Function to patch the RVAs of the import directory and the thunkdata entries.""" + + patched_import_data = bytearray() + + # Get the directory entry virtual adddress, this is the updated address if it has been patched. + directory_va = self.pe.directory_entry_rva(c_pe.IMAGE_DIRECTORY_ENTRY_IMPORT) + if not directory_va: + return + + # Get the original VA of the section the import directory is residing in, this value is used to calculate the + # new RVA's + section = self._section_manager.get(va=directory_va, patch=True) + if section is None: + return + directory_offset = directory_va - section.virtual_address + original_directory_va = self._section_manager.get(name=section.name).virtual_address + directory_offset + + # Loop over the imports of the PE to patch the RVA's of the import descriptors and the associated thunkdata + # entries + for module in self.pe.imports.values(): + import_descriptor: c_pe.IMAGE_IMPORT_DESCRIPTOR = module.import_descriptor + patched_thunkdata = bytearray() + + if import_descriptor.Name in [0xFFFFF800, 0x0]: + continue + + old_first_thunk = import_descriptor.FirstThunk + + first_thunk_offset = old_first_thunk - original_directory_va + import_descriptor.FirstThunk = abs(directory_va + first_thunk_offset) + + import_descriptor.OriginalFirstThunk = import_descriptor.FirstThunk + + name_offset = import_descriptor.Name - original_directory_va + import_descriptor.Name = abs(directory_va + name_offset) + + for function in module.functions: + # Check if we're dealing with an ordinal entry, if it's an ordinal entry we don't need + # to patch since it's not an RVA + if function.ordinal: + patched_thunkdata += function.thunkdata.dumps() + continue + + # Check the original RVA associated with the AddressOfData field in the thunkdata, retrieve the + # original VA + # and use it to also select the patched virtual address of this section that the RVA is located in + if (section := self._section_manager.get(va=function.data_address)) is None: + continue + + virtual_address = section.virtual_address + new_virtual_address = self._section_manager.get(name=section.name, patch=True).virtual_address + + # Calculate the offset using the VA of the section and update the thunkdata + va_offset = function.data_address - virtual_address + new_address = new_virtual_address + va_offset + + # Avoid overwriting the original data + tmp_thunkdata = self.pe.imports.thunk_struct() + tmp_thunkdata.u1.AddressOfData = new_address + tmp_thunkdata.u1.ForwarderString = new_address + tmp_thunkdata.u1.Function = new_address + tmp_thunkdata.u1.Ordinal = new_address + + patched_thunkdata += tmp_thunkdata.dumps() + + # Write the thunk data into the patched PE + self.seek(import_descriptor.FirstThunk) + self.patched_pe.write(patched_thunkdata) + + patched_import_data += import_descriptor.dumps() + + self.seek(directory_va) + self.patched_pe.write(patched_import_data) + + def _patch_export_rvas(self) -> None: + """Function to patch the RVAs of the export directory and the associated function and name RVA's.""" + + directory_va = self.pe.directory_entry_rva(c_pe.IMAGE_DIRECTORY_ENTRY_EXPORT) + if not directory_va: + return + + self.seek(directory_va) + export_directory = c_pe.IMAGE_EXPORT_DIRECTORY(self.patched_pe) + + # Get the original VA of the section the import directory is residing in, this value is used to calculate the + # new RVA's + if not (section := self._section_manager.get(va=directory_va, patch=True)): + return + + directory_offset = directory_va - section.virtual_address + original_directory_va = self._section_manager.get(name=section.name).virtual_address + directory_offset + + name_offset = export_directory.Name - original_directory_va + address_of_functions_offset = export_directory.AddressOfFunctions - original_directory_va + address_of_names_offset = export_directory.AddressOfNames - original_directory_va + address_of_name_ordinals = export_directory.AddressOfNameOrdinals - original_directory_va + + export_directory.Name = directory_va + name_offset + export_directory.AddressOfFunctions = directory_va + address_of_functions_offset + export_directory.AddressOfNames = directory_va + address_of_names_offset + export_directory.AddressOfNameOrdinals = directory_va + address_of_name_ordinals + + # Write the new export directory + self.seek(directory_va) + self.patched_pe.write(export_directory.dumps()) + + # Patch the addresses of the functions + new_function_rvas = [] + function_rvas = bytearray() + self.seek(export_directory.AddressOfFunctions) + export_addresses = c_pe.uint32[export_directory.NumberOfFunctions].read(self.patched_pe) + for address in export_addresses: + if not (section := self._section_manager.get(va=address)): + continue + address_offset = address - section.virtual_address + new_address = self._section_manager.get(name=section.name, patch=True).virtual_address + address_offset + new_function_rvas.append(new_address) + + rva_struct = utils.create_struct(" None: + """Function to patch the RVAs of the resource directory and the associated resource data RVA's.""" + + directory_va = self.pe.directory_entry_rva(c_pe.IMAGE_DIRECTORY_ENTRY_RESOURCE) + if not directory_va: + return + + section_data = BytesIO() + self.seek(directory_va) + + for rsrc_entry in self.pe.resources.raw(lambda rsrc: rsrc.data_offset): + entry_offset = rsrc_entry.offset + entry = rsrc_entry.entry + + if isinstance(entry, c_pe.IMAGE_RESOURCE_DATA_ENTRY): + rsrc_obj = rsrc_entry.resource + data_offset = rsrc_entry.data_offset + + # Update the offset of the entry to match with the new directory VA + rsrc_obj.offset = directory_va + data_offset + + # Write the resource entry data into the section + section_data.seek(data_offset) + section_data.write(rsrc_obj.data) + + # Write the resource entry into the section + section_data.seek(entry_offset) + section_data.write(entry.dumps()) + + section_data.seek(0) + self.seek(directory_va) + self.patched_pe.write(section_data.read()) + + def _patch_tls_rvas(self) -> None: + """Function to patch the RVAs of the TLS directory and the associated TLS callbacks.""" + + directory_va = self.pe.directory_entry_rva(c_pe.IMAGE_DIRECTORY_ENTRY_TLS) + if not directory_va: + return + + self.seek(directory_va) + tls_directory = self.pe.tls._tls_directory(self.patched_pe) + + image_base = self.pe.optional_header.ImageBase + + # Patch the TLS StartAddressOfRawData and EndAddressOfRawData + if section := self._section_manager.get(va=tls_directory.StartAddressOfRawData - image_base): + start_address_offset = tls_directory.StartAddressOfRawData - section.virtual_address + tls_directory.StartAddressOfRawData = ( + self._section_manager.get(name=section.name, patch=True).virtual_address + start_address_offset + ) + end_address_offset = tls_directory.EndAddressOfRawData - tls_directory.StartAddressOfRawData + tls_directory.EndAddressOfRawData = tls_directory.StartAddressOfRawData + end_address_offset + + # Patch the TLS callbacks address + if section := self._section_manager.get(va=tls_directory.AddressOfCallBacks - image_base): + address_of_callbacks_offset = tls_directory.AddressOfCallBacks - section.virtual_address + tls_directory.AddressOfCallBacks = ( + self._section_manager.get(name=section.name, patch=True).virtual_address + address_of_callbacks_offset + ) + + # Patch the TLS AddressOfIndex + if section := self._section_manager.get(va=tls_directory.AddressOfIndex - image_base, patch=True): + address_of_index_offset = tls_directory.AddressOfIndex - section.virtual_address + tls_directory.AddressOfIndex = section.virtual_address + address_of_index_offset + + # Write the patched TLS directory to the new PE + self.seek(directory_va) + self.patched_pe.write(tls_directory.dumps()) diff --git a/dissect/executable/pe/pe.py b/dissect/executable/pe/pe.py new file mode 100644 index 0000000..be70c4d --- /dev/null +++ b/dissect/executable/pe/pe.py @@ -0,0 +1,485 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from io import BytesIO +from pathlib import Path +from typing import TYPE_CHECKING, BinaryIO + +from dissect.executable import utils +from dissect.executable.exception import ( + InvalidAddress, + InvalidArchitecture, + InvalidPE, + InvalidVA, +) +from dissect.executable.pe import patcher +from dissect.executable.pe.c_pe import c_cv_info, c_pe +from dissect.executable.pe.sections import ( + exports, + imports, + relocations, + resources, + sections, + tls, +) + +if TYPE_CHECKING: + from dissect.cstruct.cstruct import cstruct + + +class PE: + """Base class for parsing PE files. + + Args: + pe_file: A file-like object of an executable. + virtual: Indicate whether the PE file exists within a memory image. + parse: Indicate if the different sections should be parsed automatically. + """ + + def __init__(self, pe_file: BinaryIO, virtual: bool = False): + pe_file.seek(0) + self.pe_file = BytesIO(pe_file.read()) + self.virtual = virtual + + # Make sure we reset any kind of pointers within the PE file before continueing + self.pe_file.seek(0) + + self.mz_header: c_pe.IMAGE_DOS_HEADER = None + self.nt_headers: c_pe.IMAGE_NT_HEADERS | c_pe.IMAGE_NT_HEADERS64 = None + self.file_header: c_pe.IMAGE_FILE_HEADER = None + self.optional_header: c_pe.IMAGE_OPTIONAL_HEADER | c_pe.IMAGE_OPTIONAL_HEADER64 = None + + self.section_header_offset = 0 + + self.imports: imports.ImportManager = None + self.exports: exports.ExportManager = None + self.resources: resources.ResourceManager = None + self.relocations: relocations.RelocationManager = None + self.tls: tls.TLSManager = None + + # We always want to parse the DOS header and NT headers + self.parse_headers() + + # The offset of the section header is always at the end of the NT headers + self.section_header_offset = self.pe_file.tell() + + self.imagebase = self.nt_headers.OptionalHeader.ImageBase + self.file_alignment = self.nt_headers.OptionalHeader.FileAlignment + self.section_alignment = self.nt_headers.OptionalHeader.SectionAlignment + self.timestamp = datetime.fromtimestamp(self.file_header.TimeDateStamp, tz=timezone.utc) + + self.sections = sections.PESectionManager(self.file_alignment, self.section_alignment) + # Parse the section header + self.parse_section_header() + + # Parsing the directories present in the PE + self.parse_directories() + + def is64bit(self) -> bool: + return self.file_header.Machine == c_pe.MachineType.IMAGE_FILE_MACHINE_AMD64 + + def parse_headers(self) -> None: + """Function to parse the basic PE headers: + - DOS header + - File header (part of NT header) + - Optional header (part of NT header) + + Function also sets some architecture dependent variables. + + Raises: + InvalidPE if the PE file is not a valid PE file. + InvalidArchitecture if the architecture is not supported or unknown. + """ + + self.mz_header = c_pe.IMAGE_DOS_HEADER(self.pe_file) + if self.mz_header.e_magic != 0x5A4D: + raise InvalidPE("File is not a valid PE file, MZ header has wrong signature.") + + self.pe_file.seek(self.mz_header.e_lfanew) + nt_signature = c_pe.uint32(self.pe_file) + + if nt_signature != 0x4550: + raise InvalidPE("file is not a valid PE file") + + self.file_header = c_pe.IMAGE_FILE_HEADER(self.pe_file) + + image_nt_headers_offset = self.mz_header.e_lfanew + self.pe_file.seek(image_nt_headers_offset) + + # Set the architecture specific settings + self._check_architecture() + if self.is64bit(): + self.nt_headers = c_pe.IMAGE_NT_HEADERS64(self.pe_file) + else: + self.nt_headers = c_pe.IMAGE_NT_HEADERS(self.pe_file) + + self.optional_header = self.nt_headers.OptionalHeader + + def _check_architecture(self) -> None: + """Check whether the architecture belonging to the binary is one we support. + + Raises: + InvalidArchitecture if the architecture is not supported or unknown. + """ + if self.file_header.Machine not in [ + c_pe.MachineType.IMAGE_FILE_MACHINE_AMD64, + c_pe.MachineType.IMAGE_FILE_MACHINE_I386, + ]: + raise InvalidArchitecture(f"Invalid architecture found: {self.file_header.Machine:02x}") + + def parse_section_header(self) -> None: + """Parse the sections within the PE file.""" + + self.pe_file.seek(self.section_header_offset) + + for _ in range(self.file_header.NumberOfSections): + # Keep track of the last section offset + offset = self.pe_file.tell() + section_header = c_pe.IMAGE_SECTION_HEADER(self.pe_file) + section_name = section_header.Name.decode().strip("\x00") + # Take note of the sections, keep track of any patches seperately + self.sections.add(section_name, sections.PESection(pe=self, section=section_header, offset=offset)) + + self.last_section_offset = self.sections.last_section().offset + + def datadirectory_section(self, index: int) -> sections.PESection: + """Return the section that contains the given virtual address. + + Args: + index: The index of the data directory to find the associated section for. + + Returns: + The section that contains the given virtual address. + """ + + va = self.directory_entry_rva(index=index) + if section := self.sections.in_range(va, patch=True): + return section + + raise InvalidVA(f"VA not found in sections: {va:#04x}") + + def parse_directories(self) -> None: + """Parse the different data directories in the PE file and initialize their associated managers. + + For now the following data directories are implemented: + - Import Address Table (IAT) + - Export Directory + - Resources + - Base Relocations + - Thread Local Storage (TLS) Callbacks + """ + + for idx in range(c_pe.IMAGE_NUMBEROF_DIRECTORY_ENTRIES): + if not self.has_directory(index=idx): + continue + + # Take note of the current directory VA so we can dynamically update it when resizing sections + section = self.datadirectory_section(index=idx) + section_dir = self.optional_header.DataDirectory[idx] + + section.add_directory(idx, section_dir) + + # Parse the Import Address Table (IAT) + if idx == c_pe.IMAGE_DIRECTORY_ENTRY_IMPORT: + self.imports = imports.ImportManager(pe=self, section=section) + + if idx == c_pe.IMAGE_DIRECTORY_ENTRY_EXPORT: + self.exports = exports.ExportManager(pe=self, section=section) + + # Parse the resources directory entry of the PE file + if idx == c_pe.IMAGE_DIRECTORY_ENTRY_RESOURCE: + self.resources = resources.ResourceManager(pe=self, section=section) + + # Parse the relocation directory entry of the PE file + if idx == c_pe.IMAGE_DIRECTORY_ENTRY_BASERELOC: + self.relocations = relocations.RelocationManager(pe=self, section=section) + + # Parse the TLS directory entry of the PE file + if idx == c_pe.IMAGE_DIRECTORY_ENTRY_TLS: + self.tls = tls.TLSManager(pe=self, section=section) + + def virtual_address(self, physical_address: int) -> int: + """Return the virtual address given a (possible) physical address. + + Args: + address: The address to translate. + + Returns: + The virtual address as an` int` + """ + + if self.virtual: + return physical_address + + if section := self.sections.in_range(physical_address, patch=True): + return section.pointer_to_raw_data + (physical_address - section.virtual_address) + + raise InvalidVA(f"VA not found in sections: {physical_address:#04x}") + + def raw_address(self, virtual_address: int) -> int: + """Return the physical address given a virtual address. + + Args: + offset: The offset to translate into a virtual address. + + Returns: + The physical address as an `int`. + """ + if section := self.sections.in_raw_range(virtual_address, patch=True): + return section.virtual_address + (virtual_address - section.pointer_to_raw_data) + + raise InvalidAddress(f"Raw address not found in sections: {virtual_address:#04x}") + + def virtual_read(self, address: int, size: int) -> bytes: + """Wrapper for reading virtual address offsets within a PE file. + + Args: + address: The virtual address to read from. + size: The amount of bytes to read from the given virtual address. + + Returns: + The bytes that were read. + """ + + physical_address = self.virtual_address(physical_address=address) + if self.virtual: + return self.pe_file.readoffset(offset=physical_address, size=size) + + self.pe_file.seek(physical_address) + return self.pe_file.read(size) + + def raw_read(self, offset: int, size: int) -> bytes: + """Read the amount of bytes denoted by the size argument within the PE file at the given offset. + + Args: + offset: The offset within the file to start reading. + size: The amount of bytes to read within the PE file. + + Returns: + The bytes that were read from the given offset. + """ + + old_offset = self.pe_file.tell() + self.pe_file.seek(offset) + + data = self.pe_file.read(size) + self.pe_file.seek(old_offset) + return data + + def seek(self, address: int) -> None: + """Seek to the given virtual address within a PE file. + + Args: + address: The virtual address to seek to. + """ + + raw_address = self.virtual_address(physical_address=address) + self.pe_file.seek(raw_address) + + def tell(self) -> int: + """Returns the current offset within the PE file. + + Returns: + The current offset within the PE file. + """ + + offset = self.pe_file.tell() + return self.raw_address(virtual_address=offset) + + def read(self, size: int) -> bytes: + """Read x amount of bytes of the PE file. + + Args: + size: The amount of bytes to read. + + Returns: + The bytes that were read. + """ + + return self.pe_file.read(size) + + def write(self, data: bytes) -> None: + """Write the data to the PE file. + + This write function will also make sure to update the section data. + + Args: + data: The data to write to the PE file. + """ + + offset = self.tell() + + # Write the data to the PE file so we can do a raw_read on this data in the section + self.pe_file.write(data) + + # Update the section data + if section := self.sections.in_range(offset, patch=True): + self.seek(address=section.virtual_address) + section.data = self.read(size=section.virtual_size) + + def read_image_directory(self, index: int) -> bytes: + """Read the PE file image directory entry of a given index. + + Args: + index: The index of the data directory to read. + + Returns: + The bytes of the directory that was read. + """ + + directory_entry = self.optional_header.DataDirectory[index] + return self.virtual_read(address=directory_entry.VirtualAddress, size=directory_entry.Size) + + def directory_entry_rva(self, index: int) -> int: + """Returns the virtual address of a directory given its index. + + Args: + index: The index of the data directory to read. + + Returns: + The virtual address of the data directory at the given index. + """ + + return self.optional_header.DataDirectory[index].VirtualAddress + + def has_directory(self, index: int) -> bool: + """Check if a certain data directory exists within the PE file given its index. + + Args: + index: The index of the data directory to check. + + Returns: + `True` if the data directory has a size associated with it, indicating it exists, `False` otherwise. + """ + + return self.optional_header.DataDirectory[index].Size != 0 + + def debug(self) -> c_cv_info.CV_INFO_PDB70 | None: + """Return the debug directory of the given PE file. + + Returns: + A `cstruct` object of the debug entry within the PE file. + """ + + debug_directory_entry = self.read_image_directory(index=c_pe.IMAGE_DIRECTORY_ENTRY_DEBUG) + image_directory_size = c_pe.IMAGE_DEBUG_DIRECTORY.size + + for _ in range(len(debug_directory_entry) // image_directory_size): + entry = c_pe.IMAGE_DEBUG_DIRECTORY(debug_directory_entry) + dbg_entry = self.virtual_read(address=entry.AddressOfRawData, size=entry.SizeOfData) + + if entry.Type == 0x2: + return c_cv_info.CV_INFO_PDB70(dbg_entry) + return None + + def symbol_data(self, symbol: cstruct, size: int) -> bytes: + """Retrieve data from the PE using a PDB symbol. + + Args: + symbol: A `cstruct` object of a symbol. + size: The size to read from the symbol offset. + + Returns: + The bytes that were read from the offset within the PE. + """ + + _section = self.sections.from_index(segment_index=symbol.seg) + address = self.imagebase + _section.virtual_address + symbol.off + + self.pe_file.seek(address) + return self.pe_file.read(size) + + def add_section( + self, + name: str, + data: bytes, + va: int | None = None, + datadirectory: int | None = None, + datadirectory_rva: int | None = None, + datadirectory_size: int | None = None, + size: int | None = None, + ) -> None: + """Add a new section to the PE file. + + Args: + name: The name of the new section. + data: The data to add to the new section. + datadirectory: Whether this section should be a specific data directory entry. + rva: The RVA of the directory entry if this is different than the virtual address of the section. + size: The size of the entry. + """ + + # Take note of the last section + last_section = self.sections.last_section(patch=True) + + # Calculate the new section size + raw_size = utils.align_int(integer=len(data), blocksize=self.file_alignment) + virtual_size = size or len(data) + + # Use the provided RVA or calculate the new section virtual address + + virtual_address = va or utils.align_int( + integer=last_section.virtual_address + last_section.virtual_size, + blocksize=self.section_alignment, + ) + + # Calculate the new section raw address + pointer_to_raw_data = last_section.pointer_to_raw_data + last_section.size_of_raw_data + + # Build the new section + new_section = sections.build_section( + virtual_size=virtual_size, + virtual_address=virtual_address, + raw_size=raw_size, + pointer_to_raw_data=pointer_to_raw_data, + name=name.encode(), + ) + + # Update the last section offset + offset = last_section.offset + c_pe.IMAGE_SECTION_HEADER.size + + # Increment the NumberOfSections field + self.file_header.NumberOfSections += 1 + + # Set the VA and size of the datadirectory entry if this was marked as being such + if datadirectory is not None: + self.optional_header.DataDirectory[datadirectory].VirtualAddress = datadirectory_rva or virtual_address + self.optional_header.DataDirectory[datadirectory].Size = datadirectory_size or len(data) + + # Add the new section to the PE + self.sections.add(name, sections.PESection(pe=self, section=new_section, offset=offset, data=data)) + + # Update the SizeOfImage field + last_section = self.sections.last_section(patch=True) + last_va = last_section.virtual_address + last_size = last_section.virtual_size + + pe_size = utils.align_int(integer=(last_va + last_size), blocksize=self.section_alignment) + self.optional_header.SizeOfImage = pe_size + + # Write the data to the PE file + self.pe_file.seek(pointer_to_raw_data) + if virtual_size > raw_size: + data += utils.pad(virtual_size - raw_size) + + # Pad the data to align the section + padsize = utils.align_int(integer=len(data), blocksize=self.section_alignment) + data += utils.pad(size=padsize) + self.pe_file.write(data) + + # Reparse the directories + self.parse_directories() + + def write_pe(self, filename: str = "out.exe") -> None: + """Write the contents of the PE to a new file. + + This will use the patcher that is part of the project to make sure any kind of relative addressing is also + corrected for the supported data directories. + + Args: + filename: The filename to write the PE to, default out.exe. + """ + + pepatcher = patcher.Patcher(pe=self) + new_pe = pepatcher.build() + Path(filename).write_bytes(new_pe.read()) diff --git a/dissect/executable/pe/sections/__init__.py b/dissect/executable/pe/sections/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dissect/executable/pe/sections/exports.py b/dissect/executable/pe/sections/exports.py new file mode 100644 index 0000000..68e436e --- /dev/null +++ b/dissect/executable/pe/sections/exports.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +from dataclasses import dataclass +from io import BytesIO +from typing import TYPE_CHECKING + +from dissect.executable.pe.c_pe import c_pe +from dissect.executable.utils import DictManager + +if TYPE_CHECKING: + from dissect.executable.pe.pe import PE + from dissect.executable.pe.sections.sections import PESection + + +@dataclass +class ExportFunction: + """Object to store the information belonging to export functions. + + Args: + ordinal: The ordinal of the export function. + address: The export function address. + name: The name of the function, if available. + """ + + ordinal: int + address: int + name: bytes | None = b"" + + def __str__(self) -> str: + return self.name.decode() if self.name else f"#{self.ordinal}" + + def __repr__(self) -> str: + return f"" + + +class ExportManager(DictManager[ExportFunction]): + def __init__(self, pe: PE, section: PESection): + super().__init__(pe, section) + + self.parse() + + def parse(self) -> None: + """Parse the export directory of the PE file. + + This function will store every export function within the PE file as an `ExportFunction` object containing the + name (if available), the call ordinal, and the function address. + """ + + export_entry_va = self.pe.directory_entry_rva(c_pe.IMAGE_DIRECTORY_ENTRY_EXPORT) + export_entry = BytesIO(self.section.directory_data(index=c_pe.IMAGE_DIRECTORY_ENTRY_EXPORT)) + export_directory = c_pe.IMAGE_EXPORT_DIRECTORY(export_entry) + + # Seek to the offset of the export name + export_entry.seek(export_directory.Name - export_entry_va) + self.export_name = c_pe.char[None](export_entry) + + # Create a list of adresses for the exported functions + export_entry.seek(export_directory.AddressOfFunctions - export_entry_va) + export_addresses: list[int] = c_pe.uint32[export_directory.NumberOfFunctions].read(export_entry) + # Create a list of addresses for the exported functions that have associated names + export_entry.seek(export_directory.AddressOfNames - export_entry_va) + export_names: list[int] = c_pe.uint32[export_directory.NumberOfNames].read(export_entry) + # Create a list of addresses for the ordinals associated with the functions + export_entry.seek(export_directory.AddressOfNameOrdinals - export_entry_va) + export_ordinals: list[int] = c_pe.uint16[export_directory.NumberOfNames].read(export_entry) + + # Iterate over the export functions and store the information + export_entry.seek(export_directory.AddressOfFunctions - export_entry_va) + for idx, address in enumerate(export_addresses): + _idx = idx + 1 + key = str(_idx) + export_name: bytes | None = None + + if idx in export_ordinals: + entry_offset = export_names[export_ordinals.index(idx)] - export_entry_va + export_entry.seek(entry_offset) + export_name = c_pe.char[None](export_entry) + key = export_name.decode() + + self.elements[key] = ExportFunction(ordinal=export_directory.Base + _idx, address=address, name=export_name) diff --git a/dissect/executable/pe/sections/imports.py b/dissect/executable/pe/sections/imports.py new file mode 100644 index 0000000..ba95440 --- /dev/null +++ b/dissect/executable/pe/sections/imports.py @@ -0,0 +1,349 @@ +from __future__ import annotations + +from io import BytesIO +from typing import TYPE_CHECKING, BinaryIO + +from dissect.executable import utils +from dissect.executable.pe.c_pe import c_pe +from dissect.executable.utils import DictManager, create_struct + +if TYPE_CHECKING: + from collections.abc import Iterator + from struct import Struct + + from dissect.executable.pe.pe import PE + from dissect.executable.pe.sections.sections import PESection + + +class ImportModule: + """Base class for the import modules, these hold their respective functions. + + Args: + name: The name of the module. + import_descriptor: The import descriptor of the module as a cstruct object. + module_va: The virtual address of the module. + name_va: The virtual address of the name of the module. + first_thunk: The virtual address of the first thunk. + """ + + def __init__( + self, + name: bytes, + import_descriptor: c_pe.IMAGE_IMPORT_DESCRIPTOR, + module_va: int, + name_va: int, + first_thunk: int, + ): + self.name = name + self.import_descriptor = import_descriptor + self.module_va = module_va + self.name_va = name_va + self.first_thunk = first_thunk + self.functions: list[ImportFunction] = [] + + def __str__(self) -> str: + return self.name.decode() + + def __repr__(self) -> str: + return f"" # noqa: E501 + + +class ImportFunction: + """Base class for the import functions. + + Args: + pe: A `PE` object. + thunkdata: The thunkdata of the import function as a cstruct object. + """ + + def __init__( + self, + pe: PE, + thunkdata: c_pe.IMAGE_THUNK_DATA32 | c_pe.IMAGE_THUNK_DATA64, + high_bit: int, + name: str = "", + ): + self.pe = pe + self.thunkdata = thunkdata + self.high_bit = high_bit + self._name = name + + @property + def data_address(self) -> int: + """Shows the AddressOfData of the thunk data.""" + return self.thunkdata.u1.AddressOfData + + @property + def ordinal(self) -> int: + return self.data_address & self.high_bit + + @property + def name(self) -> str: + """Return the name of the import function if available, otherwise return the ordinal of the function. + + Returns: + The name or ordinal of the import function. + """ + + if self._name: + return self._name + + if self.thunkdata is None: + # For the case thunkdata is not defined, such as during the `add` + return "" + + if not (entry := self.ordinal): + self.pe.seek(self.data_address + 2) + entry = c_pe.char[None](self.pe).decode() + + if isinstance(entry, int): + return str(entry) + + return entry + + def __str__(self) -> str: + return self.name + + def __repr__(self) -> str: + return f"" + + +class ImportManager(DictManager[ImportModule]): + """The base class for dealing with the imports that are present within the PE file. + + Args: + pe: A `PE` object. + section: The associated `PESection` object. + """ + + def __init__(self, pe: PE, section: PESection): + super().__init__(pe, section) + self._section_manager = pe.sections + self.image_size: int = pe.optional_header.SizeOfImage + self.import_directory_rva = 0 + self.import_data = bytearray() + self.new_size_of_image = 0 + self.section_data = bytearray() + + self.thunk_struct: type[c_pe.IMAGE_THUNK_DATA32 | c_pe.IMAGE_THUNK_DATA64] = None + self.thunk_pack_struct: Struct = None + self._high_bit: int = 0 + + self.set_architecture(pe) + self.parse() + + def set_architecture(self, pe: PE) -> None: + if pe.is64bit(): + self.thunk_struct = c_pe.IMAGE_THUNK_DATA64 + self._high_bit = 1 << 63 + self.thunk_pack_struct = create_struct(" None: + """Parse the imports of the PE file. + + The imports are in turn added to the `imports` attribute so they can be accessed by the user. + """ + import_data = BytesIO(self.section.directory_data(index=c_pe.IMAGE_DIRECTORY_ENTRY_IMPORT)) + + # Loop over the entries + for descriptor_va, import_descriptor in self.import_descriptors(import_data=import_data): + if import_descriptor.Name in [0xFFFFF800, 0x0]: + continue + + self.pe.seek(import_descriptor.Name) + modulename: bytes = c_pe.char[None](self.pe) + + # Use the OriginalFirstThunk if available, FirstThunk otherwise + first_thunk = import_descriptor.OriginalFirstThunk or import_descriptor.FirstThunk + + module = ImportModule( + name=modulename, + import_descriptor=import_descriptor, + module_va=descriptor_va, + name_va=import_descriptor.Name, + first_thunk=first_thunk, + ) + + module.functions.extend( + ImportFunction(pe=self.pe, thunkdata=thunkdata, high_bit=self._high_bit) + for thunkdata in self.parse_thunks(offset=first_thunk) + ) + self.elements[modulename.decode()] = module + + def import_descriptors(self, import_data: BinaryIO) -> Iterator[tuple[int, c_pe.IMAGE_IMPORT_DESCRIPTOR]]: + """Parse the import descriptors of the PE file. + + Args: + import_data: The data within the import directory. + + Yields: + The import descriptor as a `cstruct` object. + """ + + while True: + try: + import_descriptor = c_pe.IMAGE_IMPORT_DESCRIPTOR(import_data) + except EOFError: + break + + yield import_data.tell(), import_descriptor + + def parse_thunks(self, offset: int) -> Iterator[c_pe.IMAGE_THUNK_DATA32 | c_pe.IMAGE_THUNK_DATA64]: + """Parse the import thunks for every module. + + Args: + offset: The offset to the first thunk + + Yields: + The function name or ordinal + """ + + self.pe.seek(offset) + + while True: + thunkdata = self.thunk_struct(self.pe) + if not thunkdata.u1.Function: + break + + yield thunkdata + + def add(self, dllname: str, functions: list[str]) -> None: + """Add the given module and its functions to the PE. + + Args: + dllname: The name of the module to add. + functions: A `list` of function names belonging to the module. + """ + + self.last_section = self._section_manager.last_section(patch=True) + + # Build a dummy import module + _module = ImportModule( + name=dllname.encode(), + import_descriptor=None, + module_va=0, + name_va=0, + first_thunk=0, + ) + # Build the dummy module functions + _module.functions.extend( + ImportFunction(pe=self.pe, thunkdata=None, high_bit=self._high_bit, name=function) for function in functions + ) + + self.elements[dllname] = _module + + # Rebuild the import table with the new import module and functions + imports, import_rva, directory_size = self.build_import_table() + + # Create a new section + section_data = utils.align_data(data=imports, blocksize=self.pe.file_alignment) + size = len(imports) + c_pe.IMAGE_SECTION_HEADER.size + self.pe.add_section( + name=".idata", + data=section_data, + datadirectory=c_pe.IMAGE_DIRECTORY_ENTRY_IMPORT, + datadirectory_rva=import_rva, + datadirectory_size=directory_size, + size=size, + ) + + def build_import_table(self) -> tuple[bytearray, int, int]: + """Function to rebuild the import table after a change has been made to the PE imports. + + Currently we're using the .idata section to store the imports, there might be a better way to do this but for + now this will do. + """ + + import_descriptors: list[c_pe.IMAGE_IMPORT_DESCRIPTOR] = [] + import_data = bytearray() + + for name, module in self.elements.items(): + # Take note of the current offset to store the modulename + name_offset = len(import_data) + import_data += name.encode() + b"\x00" + + # Build the module imports and get the RVA of the first thunk to generate an import descriptor + module_offset = len(import_data) + module_bytes, offsets = self._build_module_imports(module_offset, module.functions) + thunkdata = self._build_thunkdata(offsets) + import_data += module_bytes + thunkdata + + import_descriptor = self._build_import_descriptor( + first_thunk_rva=self.image_size + module_offset + len(module_bytes), + name_rva=self.image_size + name_offset, + ) + import_descriptors.append(import_descriptor) + + # Take note of the RVA of the first import descriptor + import_rva = self.image_size + len(import_data) + descriptor_data = b"".join(descriptor.dumps() for descriptor in import_descriptors) + directory_size = len(descriptor_data) + import_data += descriptor_data + + return import_data, import_rva, directory_size + + def _build_module_imports(self, function_offset: int, functions: list[ImportFunction]) -> tuple[bytes, list]: + """Function to build the imports for a module. + + This function is responsible for building the functions by name, as well as the associated thunkdata that is + used to parse the imports at a later stage. + + Args: + function_offset: The start offset of the functions for this module + functions: A `list` of `ImportFunction` objects. + + Returns: + The relative virtual address of the first thunk. + """ + + function_offsets = [] + hint = create_struct(" bytes: + """Function to build the thunkdata for the new import table. + + Args: + import_rvas: A `list` of relative virtual addresses. + + Returns: + The thunkdata as a `bytes` object. + """ + + thunkdata: list[bytes] = [] + thunkdata.extend(self.thunk_pack_struct.pack(rva + self.image_size) for rva in import_rvas) + thunkdata.append(self.thunk_pack_struct.pack(0)) + + return b"".join(thunkdata) + + def _build_import_descriptor(self, first_thunk_rva: int, name_rva: int) -> c_pe.IMAGE_IMPORT_DESCRIPTOR: + """Function to build the import descriptor for the new import table. + + Args: + first_thunk_rva: The relative address of the first piece of thunkdata. + + Returns: + The image import descriptor as a `cstruct` object. + """ + + new_import_descriptor = c_pe.IMAGE_IMPORT_DESCRIPTOR() + + new_import_descriptor.OriginalFirstThunk = first_thunk_rva + new_import_descriptor.TimeDateStamp = 0 + new_import_descriptor.ForwarderChain = 0 + new_import_descriptor.Name = name_rva + new_import_descriptor.FirstThunk = first_thunk_rva + + return new_import_descriptor diff --git a/dissect/executable/pe/sections/relocations.py b/dissect/executable/pe/sections/relocations.py new file mode 100644 index 0000000..03489d6 --- /dev/null +++ b/dissect/executable/pe/sections/relocations.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from dataclasses import dataclass +from io import BytesIO +from typing import TYPE_CHECKING + +from dissect.executable.pe.c_pe import c_pe +from dissect.executable.utils import ListManager + +if TYPE_CHECKING: + from dissect.executable.pe.pe import PE + from dissect.executable.pe.sections.sections import PESection + + +@dataclass +class Relocation: + rva: int + number_of_entries: int + entries: list[int] + + +class RelocationManager(ListManager[Relocation]): + """Base class for dealing with the relocations within the PE file. + + Args: + pe: The PE file object. + section: The section object that contains the relocation table. + """ + + def __init__(self, pe: PE, section: PESection): + super().__init__(pe, section) + + self.parse() + + def parse(self) -> None: + """Parse the relocation table of the PE file.""" + + reloc_data = BytesIO(self.section.directory_data(c_pe.IMAGE_DIRECTORY_ENTRY_BASERELOC)) + reloc_data_size = reloc_data.getbuffer().nbytes + while reloc_data.tell() < reloc_data_size: + reloc_directory = c_pe._IMAGE_BASE_RELOCATION(reloc_data) + if not reloc_directory.VirtualAddress: + # End of relocation entries + break + + # Each entry consists of 2 bytes + number_of_entries = (reloc_directory.SizeOfBlock - len(reloc_directory.dumps())) // 2 + entries = [entry for _ in range(number_of_entries) if (entry := c_pe.uint16(reloc_data))] + + self.elements.append( + Relocation( + rva=reloc_directory.VirtualAddress, + number_of_entries=number_of_entries, + entries=entries, + ) + ) diff --git a/dissect/executable/pe/sections/resources.py b/dissect/executable/pe/sections/resources.py new file mode 100644 index 0000000..fa0e1dc --- /dev/null +++ b/dissect/executable/pe/sections/resources.py @@ -0,0 +1,437 @@ +from __future__ import annotations + +from collections import OrderedDict +from dataclasses import dataclass +from functools import partial +from io import BytesIO +from itertools import chain +from textwrap import indent +from typing import TYPE_CHECKING + +from dissect.executable.exception import ResourceException +from dissect.executable.pe.c_pe import c_pe +from dissect.executable.utils import DictManager + +if TYPE_CHECKING: + from collections.abc import Iterator + from typing import BinaryIO, Callable + + from dissect.executable.pe.pe import PE + from dissect.executable.pe.sections.sections import PESection + + +@dataclass +class RawResource: + offset: int + entry: c_pe.IMAGE_RESOURCE_DIRECTORY_ENTRY | c_pe.IMAGE_RESOURCE_DIRECTORY | c_pe.IMAGE_RESOURCE_DATA_ENTRY + data_offset: int + data: bytes | None = None + resource: Resource | None = None + + +def rc_type_name(entry: c_pe.IMAGE_RESOURCE_DIRECTORY_ENTRY, data: BinaryIO, depth: int = 1) -> str: + """Returns the name of the rc type depending on the data and the depth level of the resource""" + if depth == 1: + return c_pe.ResourceID(entry.Id).name + + if entry.NameIsString: + data.seek(entry.NameOffset) + name_len = c_pe.uint16(data) + return c_pe.wchar[name_len](data) + + return str(entry.Id) + + +class ResourceManager(DictManager["Resource"]): + """Base class to perform actions regarding the resources within the PE file. + + Args: + pe: A `PE` object. + section: The section object that contains the resource table. + """ + + def __init__(self, pe: PE, section: PESection): + super().__init__(pe, section) + self.elements: OrderedDict[str, Resource] = OrderedDict() + self.raw_resources: list[RawResource] = [] + self.values = partial(self._resources, self.elements) + self.parse() + + def parse(self) -> None: + """Parse the resource directory entry of the PE file.""" + + rsrc_data = BytesIO(self.section.directory_data(c_pe.IMAGE_DIRECTORY_ENTRY_RESOURCE)) + self.elements = self._read_resource(data=rsrc_data, offset=0) + + def patch(self, name: str, data: bytes) -> None: + """Sets the new data of the resource and updates the offsets with the resources within the same directory. + + Resource looks like this: + + | Resource headers (1*...) | + | ------------------------ | + | Resource data (1*...) | + + So it is not important in what order the metadata of the entry gets written. + """ + try: + resource = next(self.by_type(name)) + except StopIteration: + raise ValueError(f"Could not find a resource by type for {name}") + + # TODO: Still rewrites the data to the original instance. Maybe we should change that. + resource._data = data + resource.size = len(data) + + output = BytesIO() + prev_offset = prev_size = 0 + + for rsrc_entry in self.raw(lambda rsrc: rsrc.data_offset): + entry_offset = rsrc_entry.offset + entry = rsrc_entry.entry + + # Write the resource entry into the section + output.seek(entry_offset) + output.write(entry.dumps()) + + if not isinstance(entry, c_pe.IMAGE_RESOURCE_DATA_ENTRY): + continue + + rsrc_obj = rsrc_entry.resource + data_offset = rsrc_entry.data_offset + + # Normally the data is separated by a null byte, increment the new offset by 1 + new_data_offset = prev_offset + prev_size + # if new_data_offset and (new_data_offset > data_offset or new_data_offset < data_offset): + if new_data_offset and new_data_offset != data_offset: + data_offset = new_data_offset + rsrc_entry.data_offset = data_offset + rsrc_obj.offset = self.section.virtual_address + data_offset + + # Write the resource entry data into the section + output.seek(data_offset) + output.write(rsrc_obj.data) + + # Take note of the offset and size so we can update any of these values after changing the data within + # the resource + prev_offset = data_offset + prev_size = rsrc_obj.size + + output.seek(0) + _data = output.read() + + self.pe.sections.patch(self.section.name, _data) + self.pe.optional_header.DataDirectory[c_pe.IMAGE_DIRECTORY_ENTRY_RESOURCE].size = len(_data) + + def _read_entries( + self, data: BinaryIO, directory: c_pe.IMAGE_RESOURCE_DIRECTORY + ) -> list[c_pe.IMAGE_RESOURCE_DIRECTORY_ENTRY]: + """Read the entries within the resource directory. + + Args: + data: The data of the resource directory. + directory: The resource directory entry. + + Returns: + A list containing the entries of the resource directory. + """ + + entries = [] + for _ in range(directory.NumberOfNamedEntries + directory.NumberOfIdEntries): + entry_offset = data.tell() + entry = c_pe.IMAGE_RESOURCE_DIRECTORY_ENTRY(data) + self.raw_resources.append( + RawResource( + offset=entry_offset, + entry=entry, + data_offset=entry_offset, + ) + ) + entries.append(entry) + return entries + + def _handle_data_entry(self, data: BinaryIO, entry: c_pe.IMAGE_RESOURCE_DIRECTORY_ENTRY, rc_type: str) -> Resource: + """Handle the data entry of a resource. This is the actual data associated with the directory entry. + + Args: + data: The data of the resource. + entry: The resource directory entry. + + Returns: + The resource that was given by name as a `Resource` object. + """ + + data.seek(entry.OffsetToDirectory) + data_entry = c_pe.IMAGE_RESOURCE_DATA_ENTRY(data) + self.pe.seek(data_entry.OffsetToData) + _data = self.pe.read(data_entry.Size) + raw_offset = data_entry.OffsetToData - self.section.virtual_address + rsrc = Resource( + pe=self.pe, + section=self.section, + name=entry.Name, + entry_offset=entry.OffsetToData, + data_entry=data_entry, + rc_type=rc_type, + ) + self.raw_resources.append( + RawResource( + offset=entry.OffsetToDirectory, + entry=data_entry, + data=_data, + data_offset=raw_offset, + resource=rsrc, + ) + ) + return rsrc + + def _read_resource(self, data: BinaryIO, offset: int, depth: int = 1) -> OrderedDict[str, Resource]: + """Recursively read the resources within the PE file. + + Each resource is added to the dictionary that is available to the user, as well as a list of + raw resources that are used to update the section data and size when a resource has been modified. + + Args: + data: The data of the resource. + offset: The offset of the resource. + depth: The depth level of the resource, this dictates the resource type. + + Returns: + A dictionary containing the resources that were found. + """ + + resource = OrderedDict() + + data.seek(offset) + directory = c_pe.IMAGE_RESOURCE_DIRECTORY(data) + self.raw_resources.append( + RawResource( + offset=offset, + entry=directory, + data_offset=offset, + ) + ) + + for entry in self._read_entries(data, directory): + rc_name = rc_type_name(entry, data, depth) + + if entry.DataIsDirectory: + resource[rc_name] = self._read_resource( + data=data, + offset=entry.OffsetToDirectory, + depth=depth + 1, + ) + else: + resource[rc_name] = self._handle_data_entry(data=data, entry=entry, rc_type=rc_name) + + return resource + + def by_name(self, name: str) -> Resource | OrderedDict: + """Retrieve the resource by name. + + Args: + name: The name of the resource to retrieve. + + Returns: + The resource that was given by name as a `Resource` object. + """ + + try: + return self.elements[name] + except KeyError: + raise ResourceException(f"Resource {name} not found!") + + def by_type(self, rsrc_id: str | c_pe.ResourceID) -> Iterator[Resource]: + """Yields a generator containing all of the nodes within the resources that contain the requested ID. + + The ID can be either given by name or its value. + + Args: + rsrc_id: The resource ID to find, this can be a cstruct `EnumInstance` or `str`. + + Yields: + All of the nodes that contain the requested type. + """ + + if rsrc_id not in self.elements: + raise ResourceException(f"Resource with ID {rsrc_id} not found in PE!") + + yield from self._resources(resources=self.elements[rsrc_id]) + + def _resources(self, resources: OrderedDict[str, Resource]) -> Iterator[Resource]: + """Iterates throught the resources inside the PE file. + + Args: + resources: A `dict` containing the different resources that were found. + + Yields: + All of the resources within the PE file. + """ + + for resource in resources.values(): + if isinstance(resource, OrderedDict): + yield from self._resources(resources=resource) + else: + yield resource + + def show_resource_tree(self, resources: OrderedDict[str, OrderedDict | Resource], indentation: int = 0) -> None: + """Print the resources within the PE as a tree. + + Args: + resources: A `dict` containing the different resources that were found. + indent: The amount of indentation for each child resource. + """ + + for name, resource in resources.items(): + prefix = " " * indentation + + if isinstance(resource, OrderedDict): + print(indent(f"+ name: {name}", prefix=prefix)) + self.show_resource_tree(resources=resource, indentation=indentation + 1) + else: + print(indent(f"- name: {name} ID: {resource.rsrc_id}", prefix=prefix)) + + def show_resource_info(self, resources: dict) -> None: + """Print basic information about the resource as well as the header. + + Args: + resources: A `dict` containing the different resources that were found. + """ + + for name, resource in resources.items(): + if isinstance(resource, OrderedDict): + self.show_resource_info(resources=resource) + else: + print( + f"* resource: {name} offset=0x{resource.offset:02x} size=0x{resource.size:02x} header: {resource.data[:64]}" # noqa: E501 + ) + + def raw(self, sort_key: Callable | None = None) -> Iterator[RawResource]: + if sort_key: + yield from sorted(self.raw_resources, key=sort_key) + else: + yield from self.raw_resources + + def update_section(self, update_offset: int) -> None: + """Function to dynamically update the section data and size when a resource has been modified. + + Args: + update_offset: The offset of the resource that was modified. + """ + + new_size = 0 + + resource_iter = iter(self._resources(resources=self.elements)) + first_resource = next(resource_iter) + + header_size = first_resource.offset - self.section.virtual_address + section_data = self.section.data[:header_size] + + for resource in chain([first_resource], resource_iter): + # Update the resource data + section_data += resource.data + + new_size += resource.size + 1 # Account for the id field + + # Skip the resources that are below our own offset + if update_offset >= resource.offset: + continue + + resource.offset = resource.offset + resource.size + 2 + + # Add the header to the total size so we can check if we need to update the section size + new_size += header_size + + self.pe.sections.patch(self.section.name, section_data) + + +class Resource: + """Base class representing a resource entry in the PE file. + + Args: + pe: A `PE` object. + section: The section object that contains the resource table. + name: The name of the resource. + entry_offset: The offset of the resource entry. + data_entry: The data entry of the resource. + rc_type: The type of the resource. + data: The data of the resource if there was data provided by the user. + """ + + def __init__( + self, + pe: PE, + section: PESection, + name: str | int, + entry_offset: int, + data_entry: c_pe.IMAGE_RESOURCE_DATA_ENTRY, + rc_type: str, + data: bytes = b"", + ): + self.pe = pe + self.section = section + self.name = name + self.entry_offset = entry_offset + self.entry = data_entry + self.rc_type = rc_type + self.offset = data_entry.OffsetToData + self._size = data_entry.Size + self.codepage = data_entry.CodePage + self._data = data or self.read_data() + + def read_data(self) -> bytes: + """Read the data within the resource. + + Returns: + The resource data. + """ + + return self.pe.virtual_read(address=self.offset, size=self._size) + + @property + def size(self) -> int: + """Function to return the size of the resource. + This needs to be done dynamically in the case that the data is patched by the user. + + Returns: + The size of the data within the resource. + """ + + return len(self.data) + + @size.setter + def size(self, value: int) -> None: + """Setter to set the size of the resource to the specified value. + + Args: + value: The size of the resource. + """ + + self._size = value + self.entry.Size = value + + @property + def offset(self) -> int: + """Return the offset of the resource.""" + return self.entry.OffsetToData + + @offset.setter + def offset(self, value: int) -> None: + """Setter to set the offset of the resource to the specified value. + + Args: + value: The offset of the resource. + """ + + self.entry.OffsetToData = value + + @property + def data(self) -> bytes: + """Return the data within the resource.""" + return self._data + + def __str__(self) -> str: + return str(self.name) + + def __repr__(self) -> str: + return f"" # noqa: E501 diff --git a/dissect/executable/pe/sections/sections.py b/dissect/executable/pe/sections/sections.py new file mode 100644 index 0000000..d587f12 --- /dev/null +++ b/dissect/executable/pe/sections/sections.py @@ -0,0 +1,335 @@ +from __future__ import annotations + +from collections import OrderedDict +from copy import copy +from itertools import chain +from typing import TYPE_CHECKING + +from dissect.executable import utils +from dissect.executable.exception import BuildSectionException +from dissect.executable.pe.c_pe import c_pe + +if TYPE_CHECKING: + from collections.abc import Iterable + + from dissect.executable.pe.pe import PE + + +class PESectionManager: + def __init__(self, file_alignment: int, section_alignment: int) -> None: + self._sections: OrderedDict[str, PESection] = OrderedDict() + self._patched_sections: OrderedDict[str, PESection] = OrderedDict() + self._file_alignment = file_alignment + self._section_alignment = section_alignment + + def add(self, name: str, section: PESection) -> None: + self._sections[name] = section + self._patched_sections[name] = PESection(section.pe, section.section, section.offset, copy(section.data)) + + def last_section(self, *, patch: bool = False) -> PESection: + sections = self.sections(patch=patch) + return sections[next(reversed(sections))] + + def get(self, va: int = 0, name: str = "", *, patch: bool = False) -> PESection | None: + sections = self.sections(patch=patch) + + if name: + return sections.get(name) + + return self._in_virtual_range(va, sections.values()) + + def sections(self, *, patch: bool = False) -> OrderedDict[str, PESection]: + return self._patched_sections if patch else self._sections + + def in_range(self, va: int, *, patch: bool = False) -> PESection | None: + """Retrieve a section pof the PE file by virtual address. + + Args: + va: The virtual address to look for + patch: Whether it should look through the patched sections. + + Returns: + a `.PESection` corresponding to the virtual address + + """ + return self.get(va=va, patch=patch) + + def in_raw_range(self, offset: int, *, patch: bool = False) -> PESection | None: + sections = self.sections(patch=patch) + + for section in sections.values(): + if section.pointer_to_raw_data <= offset < section.pointer_to_raw_data + section.size_of_raw_data: + return section + + return None + + def from_index(self, segment_index: int, *, patch: bool = False) -> PESection: + """Retrieve the section of the PE by index. + + Args: + segment_index: The segment to retrieve based on the order within the PE. + + TODO: Need to check whether this works for pdb stuff + + Returns: + A `PESection` corresponding to the segment_index. + """ + sections = self.sections(patch=patch) + + sections_items = list(sections.items()) + + idx = 0 if segment_index - 1 == -1 else segment_index + section_name = sections_items[idx - 1][0] + + return sections[section_name] + + def _in_virtual_range(self, va: int, sections: Iterable[PESection]) -> PESection | None: + for section in sections: + if section.virtual_address <= va < section.virtual_address + section.virtual_size: + return section + + return None + + def patch(self, name: str, data: bytes) -> None: + """Sets the new data of the resource and dynamically updates the other patched sections. + + Args: + name: The section to patch + data: The data to patch it with + """ + patched_section: PESection = self._patched_sections[name] + + # Update the patched section data and size + patched_section._data = data + patched_section.size = len(data) + + if patched_section.size_of_raw_data < patched_section.virtual_size: + patched_section._data += utils.pad(size=patched_section.virtual_size - patched_section.size_of_raw_data) + + iterator = iter(self.sections(patch=True).values()) + first_section = next(iterator) + + prev_ptr = first_section.pointer_to_raw_data + prev_size = first_section.size_of_raw_data + prev_va = first_section.virtual_address + prev_vsize = first_section.virtual_size + + for section in chain([first_section], iterator): + if section.virtual_address == prev_va: + continue + + pointer_to_raw_data = utils.align_int(integer=prev_ptr + prev_size, blocksize=self._file_alignment) + virtual_address = utils.align_int(integer=prev_va + prev_vsize, blocksize=self._section_alignment) + + if section.virtual_address < virtual_address: + """Set the virtual address and raw pointer of the section to the new values, but only do so if the + section virtual address is lower than the previous section. We want to prevent messing up RVA's as + much as possible, this could lead to binaries that are a bit larger than they need to be but that + doesn't really matter.""" + section.virtual_address = virtual_address + section.pointer_to_raw_data = pointer_to_raw_data + + prev_ptr = pointer_to_raw_data + prev_size = section.size_of_raw_data + prev_va = virtual_address + prev_vsize = section.virtual_size + + +class PESection: + """Base class for the PE sections that are present. + + Args: + pe: A `PE` object. + section: A `cstruct` definition holding the information about the section. + offset: The offset of the section within the PE file. + data: The data that should be part of the section, this can be used to add new sections. + """ + + def __init__(self, pe: PE, section: c_pe.IMAGE_SECTION_HEADER, offset: int, data: bytes = b""): + self.pe = pe + self.section = section + self.offset = offset + self.name = section.Name.decode().rstrip("\x00") + self._virtual_address = section.VirtualAddress + self._virtual_size = section.VirtualSize + self._pointer_to_raw_data = section.PointerToRawData + self._size_of_raw_data = section.SizeOfRawData + + # Keep track of the directories that are within this section + self.directories: OrderedDict[int, tuple[int, int]] = OrderedDict() + + self._data = data or self.read_data() + + def directory_data(self, index: int) -> bytes: + if (dir_information := self.directories.get(index)) is None: + raise ValueError("Directory not found in PE Section") + + offset, size = dir_information + return self.data[offset : offset + size] + + def add_directory(self, index: int, section_dir: c_pe.IMAGE_DATA_DIRECTORY) -> None: + self.directories[index] = (section_dir.VirtualAddress - self.virtual_address, section_dir.Size) + + def read_data(self) -> bytes: + """Return the data within the section. + + Returns: + The `bytes` contained within the section. + """ + + if self.pe.virtual: + return self.pe.virtual_read(self.virtual_address, self.virtual_size) + + return self.pe.raw_read(self.pointer_to_raw_data, self.size_of_raw_data) + + @property + def size(self) -> int: + """Return the size of the data within the section.""" + return self.virtual_size + + @size.setter + def size(self, value: int) -> None: + """Setter to set the size of the data to the specified value. + + This function can be used to update the size of the data, but also dynamically update the offset of the data + within the same directory. + + Args: + value: The size of the data. + """ + + self.virtual_size = value + self.size_of_raw_data = utils.align_int(integer=value, blocksize=self.pe.file_alignment) + + @property + def virtual_address(self) -> int: + """Return the virtual address of the section.""" + return self._virtual_address + + @virtual_address.setter + def virtual_address(self, value: int) -> None: + """Setter to set the virtual address of the section to the specified value. + + This function also updates any of the virtual addresses of the directories that are residing within the section + itself. + + Args: + value: The virtual address of the section. + """ + + self._virtual_address = value + self.section.VirtualAddress = value + + # Update the VA of the directory residing within this section + for idx, (offset, _) in self.directories.items(): + self.pe.optional_header.DataDirectory[idx].VirtualAddress = value + offset + + @property + def virtual_size(self) -> int: + """Return the virtual size of the section.""" + return self._virtual_size + + @virtual_size.setter + def virtual_size(self, value: int) -> None: + """Setter to set the virtual size of the section to the specified value. + + Args: + value: The virtual size of the section. + """ + + self._virtual_size = value + self.section.VirtualSize = value + + @property + def pointer_to_raw_data(self) -> int: + """Return the pointer to the raw data within the section.""" + return self._pointer_to_raw_data + + @pointer_to_raw_data.setter + def pointer_to_raw_data(self, value: int) -> None: + """Setter to set the pointer to the raw data of the section to the specified value. + + Args: + value: The pointer to the raw data of the section. + """ + + self._pointer_to_raw_data = value + self.section.PointerToRawData = value + + @property + def size_of_raw_data(self) -> int: + """Return the size of the raw data within the section. This acounts for section alignment within the PE.""" + return self._size_of_raw_data + + @size_of_raw_data.setter + def size_of_raw_data(self, value: int) -> None: + """Setter to set the size of the raw data to the specified value. + + The SizeOfRawData field uses the section alignment to make sure the data within this section is aligned to the + section alignment. + + Args: + value: The size of the data. + """ + + self._size_of_raw_data = utils.align_int(integer=value, blocksize=self.pe.file_alignment) + self.section.SizeOfRawData = utils.align_int(integer=value, blocksize=self.pe.file_alignment) + + @property + def data(self) -> bytes: + """Return the data within the section.""" + return self._data[: self.virtual_size] + + def dump(self) -> bytes: + """Return the section header as a `bytes` object.""" + return self.section.dumps() + + def __str__(self) -> str: + return self.name + + def __repr__(self) -> str: + return f"" # noqa: E501 + + +def build_section( + virtual_size: int, + virtual_address: int, + raw_size: int, + pointer_to_raw_data: int, + name: str | bytes = b".dissect", + characteristics: int = 0xC0000040, +) -> c_pe.IMAGE_SECTION_HEADER: + """Build a new section for the PE. + + Args: + virtual_size: The virtual size of the new section data. + virtual_address: The virtual address where the new section is located. + raw_size: The size of the section data. + pointer_to_raw_data: The pointer to the raw data of the new section. + characteristics: The characteristics of the new section, default: 0xC0000040 + name: The new section name, default: .dissect + + Returns: + The new section header as a `cstruct` object. + """ + + if len(name) > 8: + raise BuildSectionException("section names can't be longer than 8 characters") + + if isinstance(name, str): + name = name.encode() + + section_header = c_pe.IMAGE_SECTION_HEADER() + + section_header.Name = name + utils.pad(size=8 - len(name)) + section_header.VirtualSize = virtual_size + section_header.VirtualAddress = virtual_address + section_header.SizeOfRawData = raw_size + section_header.PointerToRawData = pointer_to_raw_data + section_header.PointerToRelocations = 0 + section_header.PointerToLinenumbers = 0 + section_header.NumberOfRelocations = 0 + section_header.NumberOfLinenumbers = 0 + section_header.Characteristics = c_pe.SectionFlags(characteristics) + + return section_header diff --git a/dissect/executable/pe/sections/tls.py b/dissect/executable/pe/sections/tls.py new file mode 100644 index 0000000..84b37ad --- /dev/null +++ b/dissect/executable/pe/sections/tls.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from io import BytesIO +from typing import TYPE_CHECKING + +from dissect.executable.pe.c_pe import c_pe +from dissect.executable.utils import ListManager + +if TYPE_CHECKING: + from dissect.executable.pe.pe import PE + from dissect.executable.pe.sections.sections import PESection + + +class TLSManager(ListManager[int]): + """Base class to manage the TLS entries of a PE file. + + Args: + pe: The PE object to manage the TLS entries for. + """ + + def __init__(self, pe: PE, section: PESection): + super().__init__(pe, section) + + self.tls: c_pe._IMAGE_TLS_DIRECTORY32 | c_pe._IMAGE_TLS_DIRECTORY64 = None + + self._read_address: type[c_pe.uint64 | c_pe.uint32] = None + self._tls_directory: type[c_pe._IMAGE_TLS_DIRECTORY32 | c_pe._IMAGE_TLS_DIRECTORY64] = None + self._data: bytes = b"" + self._base_address = pe.optional_header.ImageBase + + self.set_architecture(pe) + self.parse() + + def set_architecture(self, pe: PE) -> None: + if pe.is64bit(): + self._read_address = c_pe.uint64 + self._tls_directory = c_pe._IMAGE_TLS_DIRECTORY64 + else: + self._read_address = c_pe.uint32 + self._tls_directory = c_pe._IMAGE_TLS_DIRECTORY32 + + def parse(self) -> None: + """Parse the TLS directory entry of the PE file when present.""" + + tls_data = BytesIO(self.section.directory_data(c_pe.IMAGE_DIRECTORY_ENTRY_TLS)) + self.tls = self._tls_directory(tls_data) + + self.pe.seek(self.tls.AddressOfCallBacks - self._base_address) + + # Parse the TLS callback addresses if present + while True: + callback_address = self._read_address(self.pe) + if not callback_address: + break + self.elements.append(callback_address) + + # Read the TLS data + self._data = self.read_data() + + @property + def size(self) -> int: + """Return the size of the TLS data. + + Returns: + The size of the TLS data in bytes. + """ + + return self.tls.EndAddressOfRawData - self.tls.StartAddressOfRawData + + @size.setter + def size(self, value: int) -> None: + """Setter to set the size of the TLS data to the specified value. + + Args: + value: The new size of the TLS data in bytes. + """ + + self.tls.EndAddressOfRawData = self.tls.StartAddressOfRawData + value + + def read_data(self) -> bytes: + """Read the TLS data from the PE file. + + Returns: + The TLS data in bytes. + """ + + return self.pe.virtual_read( + address=self.tls.StartAddressOfRawData - self._base_address, + size=self.size, + ) + + @property + def data(self) -> bytes: + """Return the TLS data. + + Returns: + The TLS data in bytes. + """ + + return self._data + + @data.setter + def data(self, value: bytes) -> None: + """Dynamically update the TLS directory data if the user changes the data. + + Args: + value: The new TLS data to write to the PE file. + """ + + self._data = value + section_data = BytesIO(self.section.data) + + if len(self._data) != self.size: + # Update the size of the TLS data + self.size = len(self._data) + + # Write the new TLS values to the section + section_data.write(self.tls.dumps()) + + # Write the new TLS data to the section + start_address_rva = self.tls.StartAddressOfRawData - self._base_address + start_address_section_offset = start_address_rva - self.section.virtual_address + section_data.seek(start_address_section_offset) + section_data.write(self._data) + + # Update the section itself + section_data.seek(0) + self.pe.sections.patch(self.section.name, section_data.read()) diff --git a/dissect/executable/utils.py b/dissect/executable/utils.py new file mode 100644 index 0000000..4ea8c39 --- /dev/null +++ b/dissect/executable/utils.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import struct +from collections import OrderedDict +from functools import lru_cache +from typing import TYPE_CHECKING, Generic, TypeVar + +if TYPE_CHECKING: + from collections.abc import Iterable + + from dissect.executable.pe import PE, PESection + + +@lru_cache +def create_struct(packing: str) -> struct.Struct: + return struct.Struct(packing) + + +def align_data(data: bytes, blocksize: int) -> bytes: + """Align the new data according to the file alignment as specified in the PE header. + + Args: + data: The raw data that needs to be aligned. + blocksize: The alignment to adhere to. + + Returns: + Padded data if the data was not aligned to the blocksize. + """ + + needs_alignment = len(data) % blocksize + return data if not needs_alignment else data + ((blocksize - needs_alignment) * b"\x00") + + +def align_int(integer: int, blocksize: int) -> int: + """Align integer values to the specified section alignment described in the PE header. + + Args: + integer: The address or value that needs to have an aligned value. + blocksize: The alignment to adhere to. + + Returns: + An aligned integer if the integer itself was not aligned yet. + """ + + needs_alignment = integer % blocksize + return integer if not needs_alignment else integer + (blocksize - needs_alignment) + + +def pad(size: int) -> bytes: + """Pad the data with null bytes. + + Args: + size: The amount of null bytes to return. + + Returns: + The null bytes as `bytes`. + """ + return size * b"\x00" + + +T = TypeVar("T") + + +class Manager(Generic[T]): + elements: list[T] | OrderedDict[str, T] + + def __init__(self, pe: PE, section: PESection) -> None: + self.pe = pe + self.section = section + + def __contains__(self, item: str | T) -> bool: + return item in self.elements + + def __getitem__(self, item: str | int) -> T: + return self.elements[item] + + def __iter__(self): + yield from self.elements + + def __len__(self) -> int: + return len(self.elements) + + def __repr__(self) -> str: + return f"<{self.__class__.__name__} {self.elements}>" + + def values(self) -> Iterable[T]: + raise NotImplementedError + + def parse(self) -> None: + raise NotImplementedError + + def add(self, *args, **kwargs) -> None: + raise NotImplementedError + + def delete(self, elem: int | str) -> None: + raise NotImplementedError + + def patch(self, elem: int | str, data: bytes) -> None: + raise NotImplementedError + + +class DictManager(Manager, Generic[T]): + def __init__(self, pe: PE, section: PESection) -> None: + super().__init__(pe, section) + self.elements: OrderedDict[str, T] = OrderedDict() + self.values = self.elements.values + + +class ListManager(Manager, Generic[T]): + def __init__(self, pe: PE, section: PESection) -> None: + super().__init__(pe, section) + self.elements: list[T] = [] + self.values = lambda: self.elements diff --git a/tests/data/testexe.exe b/tests/data/testexe.exe new file mode 100644 index 0000000..04a7d72 Binary files /dev/null and b/tests/data/testexe.exe differ diff --git a/tests/test_pe.py b/tests/test_pe.py new file mode 100644 index 0000000..7a7110b --- /dev/null +++ b/tests/test_pe.py @@ -0,0 +1,98 @@ +from io import BytesIO + +import pytest + +from dissect.executable.exception import InvalidPE +from dissect.executable.pe.pe import PE + +from .util import data_file + + +def test_pe_invalid_signature() -> None: + with pytest.raises(InvalidPE): + PE(BytesIO(b"MZ" + b"\x00" * 400)) + + +def test_pe_sections() -> None: + known_sections = [ + ".dissect", + ".text", + ".rdata", + ".idata", + ".rsrc", + ".reloc", + ".tls", + ] + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + assert known_sections == list(pe.sections.sections()) + + +def test_pe_imports() -> None: + known_imports = [ + "SHELL32.dll", + "ole32.dll", + "OLEAUT32.dll", + "ADVAPI32.dll", + "WTSAPI32.dll", + "SHLWAPI.dll", + "VERSION.dll", + "KERNEL32.dll", + "USER32.dll", + ] + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + assert known_imports == list(pe.imports.elements) + + +def test_pe_exports() -> None: + # Too much export functions to put in a list + known_exports = [ + "1", + "2", + "CreateOverlayApiInterface", + "CreateShadowPlayApiInterface", + "ShadowPlayOnSystemStart", + ] + + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + assert known_exports == list(pe.exports.elements) + + +def test_pe_resources() -> None: + known_resource_types = ["RcData", "Manifest"] + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + assert known_resource_types == list(pe.resources) + + +def test_pe_relocations() -> None: + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + assert len(pe.relocations) == 9 + + +def test_pe_tls_callbacks() -> None: + known_callbacks = [ + 430080, + 434176, + 438272, + 442368, + 446464, + 450560, + 454656, + 458752, + 462848, + 466944, + ] + + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + assert pe.tls.elements == known_callbacks diff --git a/tests/test_pe_builder.py b/tests/test_pe_builder.py new file mode 100644 index 0000000..6370024 --- /dev/null +++ b/tests/test_pe_builder.py @@ -0,0 +1,72 @@ +from dissect.executable import PE +from dissect.executable.pe import Builder, Patcher +from dissect.executable.pe.c_pe import c_pe + + +def test_build_new_pe_lfanew() -> None: + builder = Builder() + builder.new() + pe = builder.pe + + assert pe.mz_header.e_lfanew == 0x8C + + +def test_build_new_x86_pe_exe() -> None: + builder = Builder(arch="x86") + builder.new() + pe = builder.pe + + pe.pe_file.seek(len(pe.mz_header)) + stub = pe.pe_file.read(pe.mz_header.e_lfanew - len(pe.mz_header)) + assert stub[14 : 73 - 4] == b"This program is made with dissect.pe <3 kusjesvanSRT <3" + + assert pe.file_header.Characteristics & c_pe.ImageCharacteristics.IMAGE_FILE_32BIT_MACHINE + + +def test_build_new_x64_pe_exe() -> None: + builder = Builder(arch="x64") + builder.new() + pe = builder.pe + + pe.pe_file.seek(len(pe.mz_header)) + stub = pe.pe_file.read(pe.mz_header.e_lfanew - len(pe.mz_header)) + assert stub[14 : 73 - 4] == b"This program is made with dissect.pe <3 kusjesvanSRT <3" + + assert not (pe.file_header.Characteristics & c_pe.ImageCharacteristics.IMAGE_FILE_32BIT_MACHINE) + + +def test_build_new_x86_pe_dll() -> None: + builder = Builder(arch="x86", dll=True) + builder.new() + pe = builder.pe + + assert pe.file_header.Characteristics & c_pe.ImageCharacteristics.IMAGE_FILE_32BIT_MACHINE + assert pe.file_header.Characteristics & c_pe.ImageCharacteristics.IMAGE_FILE_DLL + + +def test_build_new_x64_pe_dll() -> None: + builder = Builder(arch="x64", dll=True) + builder.new() + pe = builder.pe + + assert not (pe.file_header.Characteristics & c_pe.ImageCharacteristics.IMAGE_FILE_32BIT_MACHINE) + assert pe.file_header.Characteristics & c_pe.ImageCharacteristics.IMAGE_FILE_DLL + + +def test_build_new_pe_with_custom_section() -> None: + builder = Builder() + builder.new() + pe = builder.pe + + pe.add_section(name=".SRT", data=b"kusjesvanSRT") + + patcher = Patcher(pe=pe) + + new_pe = PE(pe_file=patcher.build()) + + section_manager = new_pe.sections + + section = section_manager.get(name=".SRT") + assert section.name == ".SRT" + assert section.size == 12 + assert section.data == b"kusjesvanSRT" diff --git a/tests/test_pe_modifications.py b/tests/test_pe_modifications.py new file mode 100644 index 0000000..9e2dbdb --- /dev/null +++ b/tests/test_pe_modifications.py @@ -0,0 +1,100 @@ +# Local imports +from dissect.executable import PE +from dissect.executable.pe import Patcher + +from .util import data_file + + +def test_add_imports() -> None: + dllname = "kusjesvanSRT.dll" + functions = ["PressButtons", "LooseLips"] + + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + pe.imports.add(dllname=dllname, functions=functions) + + patcher = Patcher(pe=pe) + new_pe = PE(pe_file=patcher.build()) + + assert "kusjesvanSRT.dll" in new_pe.imports + + custom_dll_imports = [i.name for i in new_pe.imports["kusjesvanSRT.dll"].functions] + assert "PressButtons" in custom_dll_imports + assert "LooseLips" in custom_dll_imports + + +def test_resize_section_smaller() -> None: + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + pe.sections.patch(name=".text", data=b"kusjesvanSRT, patched with dissect") + + patcher = Patcher(pe=pe) + new_pe = PE(pe_file=patcher.build()) + + assert new_pe.sections.get(name=".text").size == len(b"kusjesvanSRT, patched with dissect") + assert ( + new_pe.sections.get(name=".text").data[: len(b"kusjesvanSRT, patched with dissect")] + == b"kusjesvanSRT, patched with dissect" + ) + + +def test_resize_section_bigger() -> None: + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + section = pe.sections.get(name=".rdata") + + original_size = section.size + + patch_data = section.data + b"kusjesvanSRT, patched with dissect" * 100 + pe.sections.patch(name=".rdata", data=patch_data) + + patcher = Patcher(pe=pe) + new_pe = PE(pe_file=patcher.build()) + + assert new_pe.sections.get(name=".rdata").size == original_size + len( + b"kusjesvanSRT, patched with dissect" * 100 + ) + + +def test_resize_resource_smaller() -> None: + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + pe.resources.patch("Manifest", b"kusjesvanSRT, patched with dissect") + + patcher = Patcher(pe=pe) + new_pe = PE(pe_file=patcher.build()) + + assert [patched.data for patched in new_pe.resources.by_type(rsrc_id="Manifest")] == [ + b"kusjesvanSRT, patched with dissect" + ] + + +def test_resize_resource_bigger() -> None: + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + + resource = next(pe.resources.by_type(rsrc_id="Manifest")) + pe.resources.patch("Manifest", b"kusjesvanSRT, patched with dissect" + resource.data) + + patcher = Patcher(pe=pe) + new_pe = PE(pe_file=patcher.build()) + + assert [ + patched.data[: len(b"kusjesvanSRT, patched with dissect")] + for patched in new_pe.resources.by_type(rsrc_id="Manifest") + ] == [b"kusjesvanSRT, patched with dissect"] + + +def test_add_section() -> None: + with data_file("testexe.exe").open("rb") as pe_fh: + pe = PE(pe_file=pe_fh) + pe.add_section(name=".SRT", data=b"kusjesvanSRT") + + patcher = Patcher(pe=pe) + new_pe = PE(pe_file=patcher.build()) + + assert ".SRT" in new_pe.sections.sections() + assert new_pe.sections.get(name=".SRT").data == b"kusjesvanSRT" diff --git a/tests/test_section.py b/tests/test_section.py index cd0becf..1282267 100644 --- a/tests/test_section.py +++ b/tests/test_section.py @@ -25,7 +25,11 @@ def section_table(entries: int) -> SectionTable: def mock_section_table(section_data: bytes) -> Mock: - shdr = c_elf_64.Shdr(sh_offset=len(c_elf_64.Shdr), sh_size=len(section_data), sh_entsize=len(section_data)) + shdr = c_elf_64.Shdr( + sh_offset=len(c_elf_64.Shdr), + sh_size=len(section_data), + sh_entsize=len(section_data), + ) mocked_table = Mock() mocked_table.fh = BytesIO(shdr.dumps() + section_data) mocked_table.offset = 0 diff --git a/tests/test_segment.py b/tests/test_segment.py index 1d2d945..3dda28a 100644 --- a/tests/test_segment.py +++ b/tests/test_segment.py @@ -6,7 +6,9 @@ def create_segment(segment_data: bytes) -> Segment: - c_segment = c_elf_64.Phdr(p_offset=len(c_elf_64.Phdr), p_filesz=len(segment_data)).dumps() + c_segment = c_elf_64.Phdr( + p_offset=len(c_elf_64.Phdr), p_filesz=len(segment_data) + ).dumps() fh = BytesIO(c_segment + segment_data) return Segment(fh, 0) diff --git a/tests/test_segment_table.py b/tests/test_segment_table.py index e666c80..8e8ac0b 100644 --- a/tests/test_segment_table.py +++ b/tests/test_segment_table.py @@ -34,7 +34,9 @@ def create_segment_table(amount: int, random_data: bytes) -> SegmentTable: data_size = len(random_data) segments_data = [] for idx in range(amount): - data = c_elf_64.Phdr(p_offset=len(c_elf_64.Phdr) * amount + idx * data_size, p_filesz=data_size).dumps() + data = c_elf_64.Phdr( + p_offset=len(c_elf_64.Phdr) * amount + idx * data_size, p_filesz=data_size + ).dumps() segments_data.append(data) segments_data.append(random_data * amount)