import json import lief lief.disable_leak_warning() # warnings to disable for the callback with open("lib/WindowsDllsExport/win10-19043-exports.json", "rb") as f: api_info = json.load(f) dump_path = "rsc/wave-0001.dump" # dump_path = "rsc/wave-0002.dump" iat_json_path = "rsc/upx-hostname.exe.bin_iat_wave1.json" # iat_json_path = "rsc/000155f2e0360f6ff6cd.exe_iat_wave2.json" def hex_address_to_memory_representation(hex_addr: str, is_32b: bool, is_little_endian: bool) -> list[int]: adress_size = 4 if is_32b else 8 mem_value = [0x00] * adress_size hex_addr = hex_addr[::-1][:-2] # reversing order and stripping zero for i in range(0, adress_size): byte_str = hex_addr[i * 2 : (i + 1) * 2][::-1] mem_value[i] += int(byte_str, 16) if not is_little_endian: mem_value = mem_value[::-1] # reverse byte order for big endian return mem_value # Retrives all unique DLL names being imported def get_used_dlls(calls: list[dict[str, str]]) -> set[str]: res = set() for call in calls: res.add(call["name"].split("!")[0]) return res # Retrieves all unique function names used for a single DLL name def get_used_functions_from_dll(dllname, calls): res = set() for [dll, func] in map(lambda x: x["name"].split("!"), calls): if dll == dllname: res.add(func) return res def patch_direct_adress_call(pe: lief.PE.Binary, rva: int, instruction_offset: int): # We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]` new_value = hex_address_to_memory_representation( hex(rva + pe.imagebase), pe.abstract.header.is_32, pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, ) pe.patch_address(instruction_offset, [0xFF, 0x15] + new_value, lief.Binary.VA_TYPES.RVA) def patch_direct_adress_jump(pe: lief.PE.Binary, rva: int, instruction_offset: int): # We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]` new_value = hex_address_to_memory_representation( hex(rva + pe.imagebase), pe.abstract.header.is_32, pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, ) pe.patch_address(instruction_offset, [0xFF, 0x25] + new_value, lief.Binary.VA_TYPES.RVA) def patch_instr_to_new_IAT_entry(pe: lief.PE.Binary, call: dict[str, str], rva: int): base = pe.imagebase instruction_offset = int(call["adress"], 16) - base memview = pe.get_content_from_virtual_address(instruction_offset, 2) if [memview[0], memview[1]] == [0xFF, 0x15]: patch_direct_adress_call(pe, rva, instruction_offset) elif [memview[0], memview[1]] == [0xFF, 0x25]: patch_direct_adress_jump(pe, rva, instruction_offset) def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str): is_32 = pe.abstract.header.is_32 little_endian = pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE # scan memory for reference to old addr old_addr_mem_repr = hex_address_to_memory_representation( old_addr, is_32, pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, ) new_addr = hex_address_to_memory_representation( hex(rva + pe.imagebase), is_32, little_endian, ) adresses_to_patch = [] for section in pe.sections: for i in range(len(section.content)): found = True for j in range(len(old_addr_mem_repr)): if i + j >= len(section.content) or section.content[i + j] != old_addr_mem_repr[j]: found = False break if found: old_addr_ref = hex_address_to_memory_representation( hex( section.virtual_address + i + pe.imagebase, ), is_32, little_endian, ) for section in pe.sections: for k in range(len(section.content)): foundxref = True for L in range(len(old_addr_ref)): if k + L < len(section.content) and section.content[k + L] != old_addr_ref[L]: foundxref = False break if foundxref: adresses_to_patch.append(section.virtual_address + k) for addr in adresses_to_patch: print(f"patched {hex(addr)}") pe.patch_address(addr, new_addr, lief.Binary.VA_TYPES.RVA) def patch_to_new_IAT(pe: lief.PE.Binary, imp: lief.PE.Import, entry: lief.PE.ImportEntry, rva: int): # print(f"{imp.name}!{entry.name}: 0x{rva:010x}") for call in filter(lambda x: x["name"] == f"{imp.name.upper()}!{entry.name}", calls): patch_instr_to_new_IAT_entry(pe, call, rva) # patch additional non-call related info print(entry.name) for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, procaddr_list): # print(func["name"]) patch_addr_found_in_mem(pe, rva, func["addr"]) def get_list_of_procaddr_functions(prevwave_info): res = [] for call in prevwave_info: # first only including imported dlls res_new = {} for export in api_info: if export["dllname"] in dll_calls_list and export["exportname"] == call["function"]: res_new = { "name": export["exportname"], "dll": export["dllname"], "addr": call["func_addr"], } break if res_new == {}: # try adding a new dll for export in api_info: if export["exportname"] == call["function"]: res_new = { "name": export["exportname"], "dll": export["dllname"], "addr": call["func_addr"], } break if res_new != {}: res.append(res_new) return res # wave dump file to patch with open(dump_path, "rb") as f: pe = lief.parse(f) assert isinstance(pe, lief.PE.Binary) # JSON generated with the python reader files with open(iat_json_path, "r") as iat_json_input: iat_data = json.load(iat_json_input) calls: list[dict[str, str]] = iat_data["calls"] wave_entry = int(iat_data["entry"], 16) # create new section patch_section = lief.PE.Section(".iatpatch") content = [] # initiate registry values reg_to_inst_code = { "EAX": 0xC0, "EBX": 0xC3, "ECX": 0xC1, "EDX": 0xC2, "ESI": 0xC6, "EDI": 0xC7, "EBP": 0xC5, # "ESP": 0xC4, } for reg in iat_data["entry_reg_values"].keys(): if reg not in reg_to_inst_code: continue new_instruction = [ 0xC7, reg_to_inst_code[reg], ] + hex_address_to_memory_representation( iat_data["entry_reg_values"][reg].strip(), pe.abstract.header.is_32, pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, ) for byte in new_instruction: content.append(byte) # add ret to actual OEP content += [0x68] + hex_address_to_memory_representation( hex(wave_entry), pe.abstract.header.is_32, pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE, ) content += [0xC3] patch_section.content = content # add new section to PE pe.add_section(patch_section) # patch entrypoint # entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16) entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16) pe.optional_header.addressof_entrypoint = entrypoint_format # remove all current imports pe.remove_all_imports() # recreate all DLL imports from calls detected dll_calls_list = [] imported_dll_list = [] func_calls_list = [] for dll in get_used_dlls(calls): dll_calls_list.append(dll.lower()) imported_dll = pe.add_import(dll.lower()) imported_dll_list.append(imported_dll) # recreate all function calls related to that dll import for func in get_used_functions_from_dll(dll, calls): func_calls_list.append(func) imported_dll.add_entry(func) # get list of functions called with getprocaddr procaddr_list = get_list_of_procaddr_functions(iat_data["prevwave_getprocaddr"]) for func in procaddr_list: if func["name"] in func_calls_list: # call already added continue if func["dll"] in dll_calls_list: # dll already added imported_dll_list[dll_calls_list.index(func["dll"])].add_entry(func["name"]) else: # we need to import the new DLL dll_calls_list.append(func["dll"]) imported_dll = pe.add_import(func["dll"]) imported_dll_list.append(imported_dll) func_calls_list.append(func["name"]) imported_dll.add_entry(func["name"]) # At this point, the new IAT will only be constructed when the PE is written. We therefore need to make a callback function to patch calls afterwards. # Define all sections as writeable, to help with some weird stuff we're seeing for section in pe.sections: section.characteristics = ( lief.PE.Section.CHARACTERISTICS.MEM_WRITE.value + lief.PE.Section.CHARACTERISTICS.MEM_READ.value + lief.PE.Section.CHARACTERISTICS.MEM_EXECUTE.value + lief.PE.Section.CHARACTERISTICS.CNT_INITIALIZED_DATA.value ) # write result config = lief.PE.Builder.config_t() config.imports = True # allows the config of the writer to write a new IAT config.resolved_iat_cbk = patch_to_new_IAT # callback after the IAT has been written pe.write("patched.exe", config) print("Wrote the patched executable as patched.exe")