From 6f4fccd3506dcb0f489cd394343168d7a5ae5ed7 Mon Sep 17 00:00:00 2001 From: Seliaste Date: Thu, 2 Apr 2026 14:02:07 +0200 Subject: [PATCH] code formatting and newly created enums to clean up code --- cfg_parser.py | 33 ++++++++++++++++------------ iat.py | 28 ++++++++++++++---------- patch.py | 41 +++++++++++++---------------------- reginit.py | 59 ++++++++++++++++++++++++++------------------------- utils.py | 34 +++++++++++++++++++++++++---- 5 files changed, 111 insertions(+), 84 deletions(-) diff --git a/cfg_parser.py b/cfg_parser.py index 2ea23a9..c55dce2 100644 --- a/cfg_parser.py +++ b/cfg_parser.py @@ -1,38 +1,43 @@ import utils -def parse_wave_nodes(cfg,wave: int) -> list: - return list(filter(lambda node: node["wave"] == wave,cfg["nodes"])) -def parse_procaddr_calls(cfg,wave:int): +def parse_wave_nodes(cfg, wave: int) -> list: + return list(filter(lambda node: node["wave"] == wave, cfg["nodes"])) + + +def parse_procaddr_calls(cfg, wave: int) -> list: res = [] - wave_nodes:list[dict] = parse_wave_nodes(cfg,wave) + wave_nodes: list[dict] = parse_wave_nodes(cfg, wave) for node in wave_nodes: if "syscalls" in node.keys(): for syscall in node["syscalls"]: if syscall["name"] == "KERNEL32.DLL!GetProcAddress": - funcname = syscall["arguments"][-1].split("\"")[1] + funcname = syscall["arguments"][-1].split('"')[1] func_addr = syscall["return"] res.append({"name": funcname, "addr": func_addr}) return res -def parse_syscalls(cfg,wave: int) -> list[dict[str, str]]: - res: list[dict[str,str]] = [] - wave_nodes:list[dict] = parse_wave_nodes(cfg,wave) + +def parse_syscalls(cfg, wave: int) -> list[dict[str, str]]: + res: list[dict[str, str]] = [] + wave_nodes: list[dict] = parse_wave_nodes(cfg, wave) no_repeat = [] for node in wave_nodes: if "syscalls" in node.keys(): for syscall in node["syscalls"]: if node["last_instr"] in no_repeat: continue - adress = node["last_instr"] # call is at the end of the basic block + adress = node["last_instr"] # call is at the end of the basic block name = syscall["name"] current_instruction = node["instructions"][-1]["mnemonic"] no_repeat.append(adress) - res.append({"adress":adress,"name":name}) + res.append({"adress": adress, "name": name}) return res -def parse_wave_entrypoint(cfg,wave: int) -> int: - return int(parse_wave_nodes(cfg,wave)[0]["start"],16) -def parse_bb_registers(cfg,wave:int,n_bb:int) -> dict[str,str]: - return parse_wave_nodes(cfg,wave)[n_bb]["registers"] +def parse_wave_entrypoint(cfg, wave: int) -> int: + return int(parse_wave_nodes(cfg, wave)[0]["start"], 16) + + +def parse_bb_registers(cfg, wave: int, n_bb: int) -> dict[str, str]: + return parse_wave_nodes(cfg, wave)[n_bb]["registers"] diff --git a/iat.py b/iat.py index d5fa9cb..94da5a8 100644 --- a/iat.py +++ b/iat.py @@ -1,8 +1,10 @@ import argparse import json + import lief -import patch + import cfg_parser +import patch import reginit import utils @@ -12,8 +14,6 @@ with open("lib/WindowsDllsExport/win10-19043-exports.json", "rb") as f: api_info = json.load(f) - - # Retrives all unique DLL names being imported def get_used_dlls(calls: list[dict[str, str]]) -> set[str]: res = set() @@ -58,8 +58,13 @@ def link_func_to_dll(func_list): res.append(res_new) return res + def main(): - parser = argparse.ArgumentParser(prog="iat.py", description="Create a patched PE from a binary dump and a traceCFG file.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser( + prog="iat.py", + description="Create a patched PE from a binary dump and a traceCFG file.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) # Input arguments parser.add_argument("dump", type=str, help="The path to the wave dump file (usually ends with .dump)") @@ -68,7 +73,7 @@ def main(): # Additional arguments parser.add_argument("-o", "--output", type=str, default="patched.exe", help="Specify an output filepath for the patched PE.") parser.add_argument("-w", "--wave", type=int, help="Specify the wave number for the binary dump (if it can't be inferred from the filename)") - parser.add_argument("-v", '--verbose', action='store_true', help="Output additional debug info") + parser.add_argument("-v", "--verbose", action="store_true", help="Output additional debug info") args = parser.parse_args() utils.set_verbose(args.verbose) @@ -85,24 +90,24 @@ def main(): utils.print_debug(f"Opened file {args.trace} as the TraceCFG JSON") # determine target wave - if args.wave == None and args.dump[-5:] == ".dump": + if args.wave is None and args.dump[-5:] == ".dump": wave = int(args.dump[-9:-5]) else: wave = args.wave utils.print_debug(f"Determined wave to be {wave}") - calls = cfg_parser.parse_syscalls(cfg,wave) - wave_entry = cfg_parser.parse_wave_entrypoint(cfg,wave) + calls = cfg_parser.parse_syscalls(cfg, wave) + wave_entry = cfg_parser.parse_wave_entrypoint(cfg, wave) # create new section iatpatch_section = lief.PE.Section(".iatpatch") iatpatch_content = [] # registers initiation - iatpatch_content += reginit.generate_reg_init_code(cfg,pe,wave,wave_entry) + iatpatch_content += reginit.generate_reg_init_code(cfg, pe, wave, wave_entry) # write patch section code - iatpatch_section.content = iatpatch_content # pyright: ignore[reportAttributeAccessIssue] + iatpatch_section.content = iatpatch_content # pyright: ignore[reportAttributeAccessIssue] # add new section to PE pe.add_section(iatpatch_section) @@ -160,7 +165,7 @@ def main(): # patch additional non-call related info for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, func_dll_list): patch.patch_addr_found_in_mem(pe, rva, func["addr"]) - utils.print_debug(f"Done!\n") + utils.print_debug("Done!\n") # write result config = lief.PE.Builder.config_t() @@ -170,5 +175,6 @@ def main(): pe.write(output_path, config) print(f"Wrote the patched executable as {output_path}") + if __name__ == "__main__": main() diff --git a/patch.py b/patch.py index 97d4c6c..be4f17a 100644 --- a/patch.py +++ b/patch.py @@ -1,37 +1,34 @@ -from utils import hex_address_to_memory_representation import lief import utils +from utils import Instructions, hex_address_to_memory_representation, is_32b, is_little_endian + def patch_direct_adress_call(pe: lief.PE.Binary, rva: int, instruction_offset: int): # We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]` new_value = hex_address_to_memory_representation( hex(rva + pe.imagebase), - pe.abstract.header.is_32, - pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, + is_32b(pe), + is_little_endian(pe), ) - pe.patch_address(instruction_offset, [0xFF, 0x15] + new_value, lief.Binary.VA_TYPES.RVA) - utils.print_debug(f" Patched a call at addr {hex(pe.imagebase+instruction_offset)}") + pe.patch_address(instruction_offset, Instructions.CALL_ADDR + new_value, lief.Binary.VA_TYPES.RVA) + utils.print_debug(f" Patched a call at addr {hex(pe.imagebase + instruction_offset)}") def patch_direct_adress_jump(pe: lief.PE.Binary, rva: int, instruction_offset: int): # We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]` - new_value = hex_address_to_memory_representation( - hex(rva + pe.imagebase), - pe.abstract.header.is_32, - pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, - ) - pe.patch_address(instruction_offset, [0xFF, 0x25] + new_value, lief.Binary.VA_TYPES.RVA) - utils.print_debug(f" Patched a jump at addr {hex(pe.imagebase+instruction_offset)}") + new_value = hex_address_to_memory_representation(hex(rva + pe.imagebase), is_32b(pe), is_little_endian(pe)) + pe.patch_address(instruction_offset, Instructions.JUMP_ADDR + new_value, lief.Binary.VA_TYPES.RVA) + utils.print_debug(f" Patched a jump at addr {hex(pe.imagebase + instruction_offset)}") def patch_instr_to_new_IAT_entry(pe: lief.PE.Binary, call: dict[str, str], rva: int): base = pe.imagebase instruction_offset = int(call["adress"], 16) - base memview = pe.get_content_from_virtual_address(instruction_offset, 2) - if [memview[0], memview[1]] == [0xFF, 0x15]: + if [memview[0], memview[1]] == Instructions.CALL_ADDR: patch_direct_adress_call(pe, rva, instruction_offset) - elif [memview[0], memview[1]] == [0xFF, 0x25]: + elif [memview[0], memview[1]] == Instructions.JUMP_ADDR: patch_direct_adress_jump(pe, rva, instruction_offset) @@ -39,16 +36,8 @@ def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str): is_32 = pe.abstract.header.is_32 little_endian = pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE # scan memory for reference to old addr - old_addr_mem_repr = hex_address_to_memory_representation( - old_addr, - is_32, - pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, - ) - new_addr = hex_address_to_memory_representation( - hex(rva + pe.imagebase), - is_32, - little_endian, - ) + old_addr_mem_repr = hex_address_to_memory_representation(old_addr, is_32b(pe), is_little_endian(pe)) + new_addr = hex_address_to_memory_representation(hex(rva + pe.imagebase), is_32, little_endian) found_ref_addr = [] found_xref_addr = [] for section in pe.sections: @@ -70,7 +59,7 @@ def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str): for section in pe.sections: for ref_addr in found_ref_addr: - for k in range(len(section.content)-len(ref_addr)): + for k in range(len(section.content) - len(ref_addr)): foundxref = True for L in range(len(ref_addr)): if section.content[k + L] != ref_addr[L]: @@ -80,4 +69,4 @@ def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str): found_xref_addr.append(section.virtual_address + k) for addr in found_xref_addr: pe.patch_address(addr, new_addr, lief.Binary.VA_TYPES.RVA) - utils.print_debug(f" Patched an xref to old IAT at {hex(pe.imagebase+addr)}") + utils.print_debug(f" Patched an xref to old IAT at {hex(pe.imagebase + addr)}") diff --git a/reginit.py b/reginit.py index c846d6c..0d253ec 100644 --- a/reginit.py +++ b/reginit.py @@ -1,42 +1,43 @@ -import lief -import cfg_parser -from utils import hex_address_to_memory_representation +from enum import IntEnum -def generate_reg_init_code(cfg, pe: lief.PE.Binary,wave:int, wave_entry: int) -> list[int]: +import lief + +import cfg_parser +from utils import Instructions, hex_address_to_memory_representation, is_32b, is_little_endian + + +class Registers(IntEnum): + EAX = 0xC0 + EBX = 0xC3 + ECX = 0xC1 + EDX = 0xC2 + ESI = 0xC6 + EDI = 0xC7 + EBP = 0xC5 + # ESP = 0xC4 + + +def generate_reg_init_code(cfg, pe: lief.PE.Binary, wave: int, wave_entry: int) -> list[int]: code = [] - # initiate registry values - reg_to_inst_code = { - "EAX": 0xC0, - "EBX": 0xC3, - "ECX": 0xC1, - "EDX": 0xC2, - "ESI": 0xC6, - "EDI": 0xC7, - "EBP": 0xC5, - # "ESP": 0xC4, - } reg_values = cfg_parser.parse_bb_registers(cfg, wave, 0) for reg in reg_values: - if reg not in reg_to_inst_code: + if reg not in Registers.__members__: continue - new_instruction = [ - 0xC7, - reg_to_inst_code[reg], - ] + hex_address_to_memory_representation( - reg_values[reg].strip(), - pe.abstract.header.is_32, - pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, + new_instruction = ( + Instructions.MOV_REG + + [Registers[reg]] + + hex_address_to_memory_representation( + reg_values[reg].strip(), + is_32b(pe), + is_little_endian(pe), + ) ) for byte in new_instruction: code.append(byte) # add ret to actual OEP - code += [0x68] + hex_address_to_memory_representation( - hex(wave_entry), - pe.abstract.header.is_32, - pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, - ) # push addr + code += Instructions.PUSH + hex_address_to_memory_representation(hex(wave_entry), is_32b(pe), is_little_endian(pe)) # push addr - code += [0xC3] # ret + code += Instructions.RET return code diff --git a/utils.py b/utils.py index 3c0e069..4f45633 100644 --- a/utils.py +++ b/utils.py @@ -1,3 +1,24 @@ +from enum import Enum + +import lief + + +class Instructions(list[int], Enum): + RET = [0xC3] + PUSH = [0x68] + MOV_REG = [0xC7] + CALL_ADDR = [0xFF, 0x15] + JUMP_ADDR = [0xFF, 0x25] + + +def is_32b(pe: lief.PE.Binary): + return pe.abstract.header.is_32 + + +def is_little_endian(pe: lief.PE.Binary): + return pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE + + def hex_address_to_memory_representation(hex_addr: str, is_32b: bool, is_little_endian: bool) -> list[int]: adress_size = 4 if is_32b else 8 mem_value = [0x00] * adress_size @@ -9,10 +30,15 @@ def hex_address_to_memory_representation(hex_addr: str, is_32b: bool, is_little_ mem_value = mem_value[::-1] # reverse byte order for big endian return mem_value -verbose = False -def print_debug(msg:str): - if(verbose): print(msg) -def set_verbose(value:bool): +verbose = False + + +def print_debug(msg: str): + if verbose: + print(msg) + + +def set_verbose(value: bool): global verbose verbose = value