Restructuration of the code to use cfg files, arguments, and multiple
smaller modules
This commit is contained in:
parent
3e665cd11a
commit
9cc805fb9d
10 changed files with 3235 additions and 256 deletions
299
iat.py
299
iat.py
|
|
@ -1,30 +1,15 @@
|
|||
import argparse
|
||||
import json
|
||||
|
||||
import lief
|
||||
import patch
|
||||
import cfg_parser
|
||||
import reginit
|
||||
|
||||
lief.disable_leak_warning() # warnings to disable for the callback
|
||||
|
||||
with open("lib/WindowsDllsExport/win10-19043-exports.json", "rb") as f:
|
||||
api_info = json.load(f)
|
||||
|
||||
dump_path = "rsc/wave-0001.dump"
|
||||
# dump_path = "rsc/wave-0002.dump"
|
||||
iat_json_path = "rsc/upx-hostname.exe.bin_iat_wave1.json"
|
||||
# iat_json_path = "rsc/000155f2e0360f6ff6cd.exe_iat_wave2.json"
|
||||
|
||||
|
||||
def hex_address_to_memory_representation(hex_addr: str, is_32b: bool, is_little_endian: bool) -> list[int]:
|
||||
adress_size = 4 if is_32b else 8
|
||||
mem_value = [0x00] * adress_size
|
||||
hex_addr = hex_addr[::-1][:-2] # reversing order and stripping zero
|
||||
for i in range(0, adress_size):
|
||||
byte_str = hex_addr[i * 2 : (i + 1) * 2][::-1]
|
||||
mem_value[i] += int(byte_str, 16)
|
||||
if not is_little_endian:
|
||||
mem_value = mem_value[::-1] # reverse byte order for big endian
|
||||
return mem_value
|
||||
|
||||
|
||||
# Retrives all unique DLL names being imported
|
||||
def get_used_dlls(calls: list[dict[str, str]]) -> set[str]:
|
||||
res = set()
|
||||
|
|
@ -42,112 +27,27 @@ def get_used_functions_from_dll(dllname, calls):
|
|||
return res
|
||||
|
||||
|
||||
def patch_direct_adress_call(pe: lief.PE.Binary, rva: int, instruction_offset: int):
|
||||
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
|
||||
new_value = hex_address_to_memory_representation(
|
||||
hex(rva + pe.imagebase),
|
||||
pe.abstract.header.is_32,
|
||||
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
|
||||
)
|
||||
pe.patch_address(instruction_offset, [0xFF, 0x15] + new_value, lief.Binary.VA_TYPES.RVA)
|
||||
|
||||
|
||||
def patch_direct_adress_jump(pe: lief.PE.Binary, rva: int, instruction_offset: int):
|
||||
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
|
||||
new_value = hex_address_to_memory_representation(
|
||||
hex(rva + pe.imagebase),
|
||||
pe.abstract.header.is_32,
|
||||
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
|
||||
)
|
||||
pe.patch_address(instruction_offset, [0xFF, 0x25] + new_value, lief.Binary.VA_TYPES.RVA)
|
||||
|
||||
|
||||
def patch_instr_to_new_IAT_entry(pe: lief.PE.Binary, call: dict[str, str], rva: int):
|
||||
base = pe.imagebase
|
||||
instruction_offset = int(call["adress"], 16) - base
|
||||
memview = pe.get_content_from_virtual_address(instruction_offset, 2)
|
||||
if [memview[0], memview[1]] == [0xFF, 0x15]:
|
||||
patch_direct_adress_call(pe, rva, instruction_offset)
|
||||
elif [memview[0], memview[1]] == [0xFF, 0x25]:
|
||||
patch_direct_adress_jump(pe, rva, instruction_offset)
|
||||
|
||||
|
||||
def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str):
|
||||
is_32 = pe.abstract.header.is_32
|
||||
little_endian = pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE
|
||||
# scan memory for reference to old addr
|
||||
old_addr_mem_repr = hex_address_to_memory_representation(
|
||||
old_addr,
|
||||
is_32,
|
||||
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
|
||||
)
|
||||
new_addr = hex_address_to_memory_representation(
|
||||
hex(rva + pe.imagebase),
|
||||
is_32,
|
||||
little_endian,
|
||||
)
|
||||
adresses_to_patch = []
|
||||
for section in pe.sections:
|
||||
for i in range(len(section.content)):
|
||||
found = True
|
||||
for j in range(len(old_addr_mem_repr)):
|
||||
if i + j >= len(section.content) or section.content[i + j] != old_addr_mem_repr[j]:
|
||||
found = False
|
||||
break
|
||||
if found:
|
||||
old_addr_ref = hex_address_to_memory_representation(
|
||||
hex(
|
||||
section.virtual_address + i + pe.imagebase,
|
||||
),
|
||||
is_32,
|
||||
little_endian,
|
||||
)
|
||||
for section in pe.sections:
|
||||
for k in range(len(section.content)):
|
||||
foundxref = True
|
||||
for L in range(len(old_addr_ref)):
|
||||
if k + L < len(section.content) and section.content[k + L] != old_addr_ref[L]:
|
||||
foundxref = False
|
||||
break
|
||||
if foundxref:
|
||||
adresses_to_patch.append(section.virtual_address + k)
|
||||
for addr in adresses_to_patch:
|
||||
print(f"patched {hex(addr)}")
|
||||
pe.patch_address(addr, new_addr, lief.Binary.VA_TYPES.RVA)
|
||||
|
||||
|
||||
def patch_to_new_IAT(pe: lief.PE.Binary, imp: lief.PE.Import, entry: lief.PE.ImportEntry, rva: int):
|
||||
# print(f"{imp.name}!{entry.name}: 0x{rva:010x}")
|
||||
for call in filter(lambda x: x["name"] == f"{imp.name.upper()}!{entry.name}", calls):
|
||||
patch_instr_to_new_IAT_entry(pe, call, rva)
|
||||
# patch additional non-call related info
|
||||
print(entry.name)
|
||||
for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, procaddr_list):
|
||||
# print(func["name"])
|
||||
patch_addr_found_in_mem(pe, rva, func["addr"])
|
||||
|
||||
|
||||
def get_list_of_procaddr_functions(prevwave_info):
|
||||
def link_func_to_dll(func_list):
|
||||
res = []
|
||||
for call in prevwave_info:
|
||||
for func in func_list:
|
||||
# first only including imported dlls
|
||||
res_new = {}
|
||||
for export in api_info:
|
||||
if export["dllname"] in dll_calls_list and export["exportname"] == call["function"]:
|
||||
if export["dllname"] in func and export["exportname"] == func["name"]:
|
||||
res_new = {
|
||||
"name": export["exportname"],
|
||||
"dll": export["dllname"],
|
||||
"addr": call["func_addr"],
|
||||
"addr": func["addr"],
|
||||
}
|
||||
break
|
||||
if res_new == {}:
|
||||
# try adding a new dll
|
||||
for export in api_info:
|
||||
if export["exportname"] == call["function"]:
|
||||
if export["exportname"] == func["name"]:
|
||||
res_new = {
|
||||
"name": export["exportname"],
|
||||
"dll": export["dllname"],
|
||||
"addr": call["func_addr"],
|
||||
"addr": func["addr"],
|
||||
}
|
||||
break
|
||||
if res_new != {}:
|
||||
|
|
@ -155,111 +55,108 @@ def get_list_of_procaddr_functions(prevwave_info):
|
|||
return res
|
||||
|
||||
|
||||
# wave dump file to patch
|
||||
with open(dump_path, "rb") as f:
|
||||
pe = lief.parse(f)
|
||||
assert isinstance(pe, lief.PE.Binary)
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(prog="iat.py", description="Create a patched PE from a binary dump and a traceCFG file.")
|
||||
|
||||
# JSON generated with the python reader files
|
||||
with open(iat_json_path, "r") as iat_json_input:
|
||||
iat_data = json.load(iat_json_input)
|
||||
calls: list[dict[str, str]] = iat_data["calls"]
|
||||
wave_entry = int(iat_data["entry"], 16)
|
||||
# Input arguments
|
||||
parser.add_argument("dump", type=str, help="The path to the wave dump file")
|
||||
parser.add_argument("trace", type=str, help="The path to the traceCFG file")
|
||||
|
||||
# create new section
|
||||
patch_section = lief.PE.Section(".iatpatch")
|
||||
content = []
|
||||
# Additional arguments
|
||||
parser.add_argument("-o", "--output", type=str, help="Specify an output filepath for the patched PE.")
|
||||
parser.add_argument("-w", "--wave", type=int, help="Specify the wave number for the binary dump (if it can't be inferred from the filename)")
|
||||
|
||||
# initiate registry values
|
||||
reg_to_inst_code = {
|
||||
"EAX": 0xC0,
|
||||
"EBX": 0xC3,
|
||||
"ECX": 0xC1,
|
||||
"EDX": 0xC2,
|
||||
"ESI": 0xC6,
|
||||
"EDI": 0xC7,
|
||||
"EBP": 0xC5,
|
||||
# "ESP": 0xC4,
|
||||
}
|
||||
for reg in iat_data["entry_reg_values"].keys():
|
||||
if reg not in reg_to_inst_code:
|
||||
continue
|
||||
new_instruction = [
|
||||
0xC7,
|
||||
reg_to_inst_code[reg],
|
||||
] + hex_address_to_memory_representation(
|
||||
iat_data["entry_reg_values"][reg].strip(),
|
||||
pe.abstract.header.is_32,
|
||||
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
|
||||
)
|
||||
for byte in new_instruction:
|
||||
content.append(byte)
|
||||
args = parser.parse_args()
|
||||
|
||||
# open wave dump file
|
||||
with open(args.dump, "rb") as f:
|
||||
pe = lief.parse(f)
|
||||
assert isinstance(pe, lief.PE.Binary)
|
||||
|
||||
# add ret to actual OEP
|
||||
# open traceCFG json
|
||||
with open(args.trace, "r") as f:
|
||||
cfg = json.load(f)
|
||||
|
||||
content += [0x68] + hex_address_to_memory_representation(
|
||||
hex(wave_entry),
|
||||
pe.abstract.header.is_32,
|
||||
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
|
||||
)
|
||||
if args.wave == None and args.dump[-5:] == ".dump":
|
||||
wave = int(args.dump[-9:-5])
|
||||
else:
|
||||
wave = args.wave
|
||||
|
||||
content += [0xC3]
|
||||
calls = cfg_parser.parse_syscalls(cfg,wave)
|
||||
wave_entry = cfg_parser.parse_wave_entrypoint(cfg,wave)
|
||||
|
||||
patch_section.content = content
|
||||
# create new section
|
||||
iatpatch_section = lief.PE.Section(".iatpatch")
|
||||
iatpatch_content = []
|
||||
|
||||
# add new section to PE
|
||||
pe.add_section(patch_section)
|
||||
# registers initiation
|
||||
iatpatch_content += reginit.generate_reg_init_code(cfg,pe,wave,wave_entry)
|
||||
|
||||
# patch entrypoint
|
||||
# entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
|
||||
entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
|
||||
pe.optional_header.addressof_entrypoint = entrypoint_format
|
||||
# write patch section code
|
||||
iatpatch_section.content = iatpatch_content # pyright: ignore[reportAttributeAccessIssue]
|
||||
|
||||
# remove all current imports
|
||||
pe.remove_all_imports()
|
||||
# add new section to PE
|
||||
pe.add_section(iatpatch_section)
|
||||
|
||||
# recreate all DLL imports from calls detected
|
||||
dll_calls_list = []
|
||||
imported_dll_list = []
|
||||
func_calls_list = []
|
||||
for dll in get_used_dlls(calls):
|
||||
dll_calls_list.append(dll.lower())
|
||||
imported_dll = pe.add_import(dll.lower())
|
||||
imported_dll_list.append(imported_dll)
|
||||
# recreate all function calls related to that dll import
|
||||
for func in get_used_functions_from_dll(dll, calls):
|
||||
func_calls_list.append(func)
|
||||
imported_dll.add_entry(func)
|
||||
# patch entrypoint
|
||||
entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
|
||||
pe.optional_header.addressof_entrypoint = entrypoint_format
|
||||
|
||||
# get list of functions called with getprocaddr
|
||||
procaddr_list = get_list_of_procaddr_functions(iat_data["prevwave_getprocaddr"])
|
||||
for func in procaddr_list:
|
||||
if func["name"] in func_calls_list: # call already added
|
||||
continue
|
||||
if func["dll"] in dll_calls_list: # dll already added
|
||||
imported_dll_list[dll_calls_list.index(func["dll"])].add_entry(func["name"])
|
||||
else: # we need to import the new DLL
|
||||
dll_calls_list.append(func["dll"])
|
||||
imported_dll = pe.add_import(func["dll"])
|
||||
# remove all current imports
|
||||
pe.remove_all_imports()
|
||||
|
||||
# recreate all DLL imports from calls detected
|
||||
dll_calls_list = []
|
||||
imported_dll_list = []
|
||||
func_calls_list = []
|
||||
for dll in get_used_dlls(calls):
|
||||
dll_calls_list.append(dll.lower())
|
||||
imported_dll = pe.add_import(dll.lower())
|
||||
imported_dll_list.append(imported_dll)
|
||||
func_calls_list.append(func["name"])
|
||||
imported_dll.add_entry(func["name"])
|
||||
# recreate all function calls related to that dll import
|
||||
for func in get_used_functions_from_dll(dll, calls):
|
||||
func_calls_list.append(func)
|
||||
imported_dll.add_entry(func)
|
||||
|
||||
# At this point, the new IAT will only be constructed when the PE is written. We therefore need to make a callback function to patch calls afterwards.
|
||||
# get list of functions called with getprocaddr in previous wave
|
||||
func_list = cfg_parser.parse_procaddr_calls(cfg, wave - 1)
|
||||
func_dll_list = link_func_to_dll(func_list)
|
||||
for func in func_dll_list:
|
||||
if func["name"] in func_calls_list: # call already added
|
||||
continue
|
||||
if func["dll"] in dll_calls_list: # dll already added
|
||||
imported_dll_list[dll_calls_list.index(func["dll"])].add_entry(func["name"])
|
||||
else: # we need to import the new DLL
|
||||
dll_calls_list.append(func["dll"])
|
||||
imported_dll = pe.add_import(func["dll"])
|
||||
imported_dll_list.append(imported_dll)
|
||||
func_calls_list.append(func["name"])
|
||||
imported_dll.add_entry(func["name"])
|
||||
|
||||
# Define all sections as writeable, to help with some weird stuff we're seeing
|
||||
for section in pe.sections:
|
||||
section.characteristics = (
|
||||
lief.PE.Section.CHARACTERISTICS.MEM_WRITE.value
|
||||
+ lief.PE.Section.CHARACTERISTICS.MEM_READ.value
|
||||
+ lief.PE.Section.CHARACTERISTICS.MEM_EXECUTE.value
|
||||
+ lief.PE.Section.CHARACTERISTICS.CNT_INITIALIZED_DATA.value
|
||||
)
|
||||
# At this point, the new IAT will only be constructed when the PE is written. We therefore need to make a callback function to patch calls afterwards.
|
||||
|
||||
# write result
|
||||
config = lief.PE.Builder.config_t()
|
||||
config.imports = True # allows the config of the writer to write a new IAT
|
||||
config.resolved_iat_cbk = patch_to_new_IAT # callback after the IAT has been written
|
||||
pe.write("patched.exe", config)
|
||||
print("Wrote the patched executable as patched.exe")
|
||||
# Define all sections as writeable, to help with some weird stuff we're seeing
|
||||
for section in pe.sections:
|
||||
section.characteristics = (
|
||||
lief.PE.Section.CHARACTERISTICS.MEM_WRITE.value
|
||||
+ lief.PE.Section.CHARACTERISTICS.MEM_READ.value
|
||||
+ lief.PE.Section.CHARACTERISTICS.MEM_EXECUTE.value
|
||||
+ lief.PE.Section.CHARACTERISTICS.CNT_INITIALIZED_DATA.value
|
||||
)
|
||||
|
||||
# write result
|
||||
config = lief.PE.Builder.config_t()
|
||||
config.imports = True # allows the config of the writer to write a new IAT
|
||||
|
||||
def patching_callback(pe: lief.PE.Binary, imp: lief.PE.Import, entry: lief.PE.ImportEntry, rva: int):
|
||||
for call in filter(lambda x: x["name"] == f"{imp.name.upper()}!{entry.name}", calls):
|
||||
patch.patch_instr_to_new_IAT_entry(pe, call, rva)
|
||||
# patch additional non-call related info
|
||||
for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, func_dll_list):
|
||||
patch.patch_addr_found_in_mem(pe, rva, func["addr"])
|
||||
config.resolved_iat_cbk = patching_callback # callback after the IAT has been written
|
||||
pe.write("patched.exe" if args.output == None else args.output, config)
|
||||
print("Wrote the patched executable as patched.exe")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue