import argparse import json import lief import cfg_parser import patch import reginit import utils lief.disable_leak_warning() # warnings to disable for the callback with open("lib/WindowsDllsExport/win10-19043-exports.json", "rb") as f: api_info = json.load(f) # Retrives all unique DLL names being imported def get_used_dlls(calls: list[dict[str, str]]) -> set[str]: res = set() for call in calls: res.add(call["name"].split("!")[0]) return res # Retrieves all unique function names used for a single DLL name def get_used_functions_from_dll(dllname, calls): res = set() for [dll, func] in map(lambda x: x["name"].split("!"), calls): if dll == dllname: res.add(func) return res def link_func_to_dll(func_list): res = [] for func in func_list: # first only including imported dlls res_new = {} for export in api_info: if export["dllname"] in func and export["exportname"] == func["name"]: res_new = { "name": export["exportname"], "dll": export["dllname"], "addr": func["addr"], } break if res_new == {}: # try adding a new dll for export in api_info: if export["exportname"] == func["name"]: res_new = { "name": export["exportname"], "dll": export["dllname"], "addr": func["addr"], } break if res_new != {}: res.append(res_new) return res def main(): parser = argparse.ArgumentParser( prog="iat.py", description="Create a patched PE from a binary dump and a traceCFG file.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) # Input arguments parser.add_argument("dump", type=str, help="The path to the wave dump file (usually ends with .dump)") parser.add_argument("trace", type=str, help="The path to the traceCFG file (.json)") # Additional arguments parser.add_argument("-v", "--verbose", action="store_true", help="Output additional debug info") parser.add_argument("-o", "--output", type=str, default="patched.exe", help="Specify an output filepath for the patched PE.") parser.add_argument("-w", "--wave", type=int, help="Specify the wave number for the binary dump (if it can't be inferred from the filename)") parser.add_argument("--disable-reginit", action="store_true", help="Disable initialization of the registry before jumping to the wave start") args = parser.parse_args() utils.set_verbose(args.verbose) # open wave dump file with open(args.dump, "rb") as f: pe = lief.parse(f) assert isinstance(pe, lief.PE.Binary) utils.print_debug(f"Opened file {args.dump} as the binary dump") # open traceCFG json with open(args.trace, "r") as f: cfg = json.load(f) utils.print_debug(f"Opened file {args.trace} as the TraceCFG JSON") # determine target wave if args.wave is None and args.dump[-5:] == ".dump": wave = int(args.dump[-9:-5]) else: wave = args.wave utils.print_debug(f"Determined wave to be {wave}") calls = cfg_parser.parse_syscalls(cfg, wave) wave_entry = cfg_parser.parse_wave_entrypoint(cfg, wave) # create new section iatpatch_section = lief.PE.Section(".iatpatch") iatpatch_content = [] # registers initialization if not args.disable_reginit: iatpatch_content += reginit.generate_reg_init_code(cfg, pe, wave, wave_entry) # write patch section code if iatpatch_content != []: iatpatch_section.content = iatpatch_content # pyright: ignore[reportAttributeAccessIssue] # add new section to PE pe.add_section(iatpatch_section) # patch entrypoint if args.disable_reginit: entrypoint_format = int(hex(cfg_parser.parse_wave_entrypoint(cfg, wave))[-4:], 16) else: entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16) pe.optional_header.addressof_entrypoint = entrypoint_format # remove all current imports pe.remove_all_imports() # recreate all DLL imports from calls detected dll_calls_list = [] imported_dll_list = [] func_calls_list = [] for dll in get_used_dlls(calls): dll_calls_list.append(dll.lower()) imported_dll = pe.add_import(dll.lower()) imported_dll_list.append(imported_dll) # recreate all function calls related to that dll import for func in get_used_functions_from_dll(dll, calls): func_calls_list.append(func) imported_dll.add_entry(func) # get list of functions called with getprocaddr in previous wave func_list = cfg_parser.parse_procaddr_calls(cfg, wave - 1) func_dll_list = link_func_to_dll(func_list) for func in func_dll_list: if func["name"] in func_calls_list: # call already added continue if func["dll"] in dll_calls_list: # dll already added imported_dll_list[dll_calls_list.index(func["dll"])].add_entry(func["name"]) else: # we need to import the new DLL dll_calls_list.append(func["dll"]) imported_dll = pe.add_import(func["dll"]) imported_dll_list.append(imported_dll) func_calls_list.append(func["name"]) imported_dll.add_entry(func["name"]) # Define all sections as writeable, to prevent permission issues. # Ideally, we would like to have the actual permitions from Goatracer at some point in the future for section in pe.sections: section.characteristics = ( lief.PE.Section.CHARACTERISTICS.MEM_WRITE.value + lief.PE.Section.CHARACTERISTICS.MEM_READ.value + lief.PE.Section.CHARACTERISTICS.MEM_EXECUTE.value + lief.PE.Section.CHARACTERISTICS.CNT_INITIALIZED_DATA.value ) # At this point, the new IAT will only be constructed when the PE is written. We therefore need to make a callback function to patch calls afterwards. def patching_callback(pe: lief.PE.Binary, imp: lief.PE.Import, entry: lief.PE.ImportEntry, rva: int): utils.print_debug(f"Now trying to patch {entry.name}!{imp.name}...") for call in filter(lambda x: x["name"] == f"{imp.name.upper()}!{entry.name}", calls): patch.patch_instr_to_new_IAT_entry(pe, call, rva) # patch additional non-call related info for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, func_dll_list): patch.patch_addr_found_in_mem(pe, rva, func["addr"]) utils.print_debug("Done!\n") # write result config = lief.PE.Builder.config_t() config.imports = True # allows the config of the writer to write a new IAT config.resolved_iat_cbk = patching_callback # Define the callback output_path = args.output pe.write(output_path, config) print(f"Wrote the patched executable as {output_path}") if __name__ == "__main__": main()