lief-iat-reconstruction/iat.py

178 lines
5.7 KiB
Python

import json
import lief
lief.disable_leak_warning() # warnings to disable for the callback
dump_path = "rsc/wave-0001.dump"
# dump_path = "rsc/wave-0002.dump"
iat_json_path = "rsc/upx-hostname.exe.bin_iat_wave1.json"
# iat_json_path = "rsc/000155f2e0360f6ff6cd.exe_iat_wave2.json"
def hex_address_to_memory_representation(
hex_addr: str, is_32b: bool, is_little_endian: bool
) -> list[int]:
adress_size = 4 if is_32b else 8
mem_value = [0x00] * adress_size
hex_addr = hex_addr[::-1][:-2] # reversing order and stripping zero
for i in range(0, adress_size):
byte_str = hex_addr[i * 2 : (i + 1) * 2][::-1]
mem_value[i] += int(byte_str, 16)
if not is_little_endian:
mem_value = mem_value[::-1] # reverse byte order for big endian
return mem_value
# Retrives all unique DLL names being imported
def get_used_dlls(calls: list[dict[str, str]]) -> set[str]:
res = set()
for call in calls:
res.add(call["name"].split("!")[0])
return res
# Retrieves all unique function names used for a single DLL name
def get_used_functions_from_dll(dllname, calls):
res = set()
for [dll, func] in map(lambda x: x["name"].split("!"), calls):
if dll == dllname:
res.add(func)
return res
def patch_direct_adress_call(pe: lief.PE.Binary, rva: int, instruction_offset: int):
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
new_value = hex_address_to_memory_representation(
hex(rva + pe.imagebase),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
pe.patch_address(
instruction_offset, [0xFF, 0x15] + new_value, lief.Binary.VA_TYPES.RVA
)
def patch_direct_adress_jump(pe: lief.PE.Binary, rva: int, instruction_offset: int):
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
new_value = hex_address_to_memory_representation(
hex(rva + pe.imagebase),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
pe.patch_address(
instruction_offset, [0xFF, 0x25] + new_value, lief.Binary.VA_TYPES.RVA
)
def patch_instr_to_new_IAT_entry(pe: lief.PE.Binary, call: dict[str, str], rva: int):
base = pe.imagebase
instruction_offset = int(call["adress"], 16) - base
memview = pe.get_content_from_virtual_address(instruction_offset, 2)
if [memview[0], memview[1]] == [0xFF, 0x15]:
patch_direct_adress_call(pe, rva, instruction_offset)
elif [memview[0], memview[1]] == [0xFF, 0x25]:
patch_direct_adress_jump(pe, rva, instruction_offset)
def patch_calls_to_new_IAT(
pe: lief.PE.Binary, imp: lief.PE.Import, entry: lief.PE.ImportEntry, rva: int
):
# print(f"{imp.name}!{entry.name}: 0x{rva:010x}")
for call in filter(
lambda x: x["name"] == f"{imp.name.upper()}!{entry.name}", calls
):
patch_instr_to_new_IAT_entry(pe, call, rva)
# wave dump file to patch
with open(dump_path, "rb") as f:
pe = lief.parse(f)
assert isinstance(pe, lief.PE.Binary)
# JSON generated with the python reader files
with open(iat_json_path, "r") as iat_json_input:
iat_data = json.load(iat_json_input)
calls: list[dict[str, str]] = iat_data["calls"]
wave_entry = int(iat_data["entry"], 16)
# create new section
patch_section = lief.PE.Section(".iatpatch")
content = []
# initiate registry values
reg_to_inst_code = {
"EAX": 0xC0,
"EBX": 0xC3,
"ECX": 0xC1,
"EDX": 0xC2,
"ESI": 0xC6,
"EDI": 0xC7,
"EBP": 0xC5,
# "ESP": 0xC4,
}
for reg in iat_data["entry_reg_values"].keys():
if reg not in reg_to_inst_code:
continue
new_instruction = [
0xC7,
reg_to_inst_code[reg],
] + hex_address_to_memory_representation(
iat_data["entry_reg_values"][reg].strip(),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
for byte in new_instruction:
content.append(byte)
# add ret to actual OEP
content += [0x68] + hex_address_to_memory_representation(
hex(wave_entry),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
content += [0xC3]
patch_section.content = content
# add new section to PE
pe.add_section(patch_section)
# patch entrypoint
# entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
pe.optional_header.addressof_entrypoint = entrypoint_format
# remove all current imports
pe.remove_all_imports()
# recreate all DLL imports
for dll in get_used_dlls(calls):
imported_dll = pe.add_import(dll.lower())
# recreate all function calls related to that dll import
for func in get_used_functions_from_dll(dll, calls):
imported_dll.add_entry(func)
# At this point, the new IAT will only be constructed when the PE is written. We therefore need to make a callback function to patch calls afterwards.
# Define all sections as writeable, to help with some weird stuff we're seeing
for section in pe.sections:
section.characteristics = (
lief.PE.Section.CHARACTERISTICS.MEM_WRITE.value
+ lief.PE.Section.CHARACTERISTICS.MEM_READ.value
+ lief.PE.Section.CHARACTERISTICS.MEM_EXECUTE.value
+ lief.PE.Section.CHARACTERISTICS.CNT_INITIALIZED_DATA.value
)
# write result
config = lief.PE.Builder.config_t()
config.imports = True # allows the config of the writer to write a new IAT
config.resolved_iat_cbk = (
patch_calls_to_new_IAT # callback after the IAT has been written
)
pe.write("patched.exe", config)
print("Wrote the patched executable as patched.exe")