lief-iat-reconstruction/iat.py

186 lines
7 KiB
Python

import argparse
import json
import lief
import cfg_parser
import patch
import reginit
import utils
lief.disable_leak_warning() # warnings to disable for the callback
with open("lib/WindowsDllsExport/win10-19043-exports.json", "rb") as f:
api_info = json.load(f)
# Retrives all unique DLL names being imported
def get_used_dlls(calls: list[dict[str, str]]) -> set[str]:
res = set()
for call in calls:
res.add(call["name"].split("!")[0])
return res
# Retrieves all unique function names used for a single DLL name
def get_used_functions_from_dll(dllname, calls):
res = set()
for [dll, func] in map(lambda x: x["name"].split("!"), calls):
if dll == dllname:
res.add(func)
return res
def link_func_to_dll(func_list):
res = []
for func in func_list:
# first only including imported dlls
res_new = {}
for export in api_info:
if export["dllname"] in func and export["exportname"] == func["name"]:
res_new = {
"name": export["exportname"],
"dll": export["dllname"],
"addr": func["addr"],
}
break
if res_new == {}:
# try adding a new dll
for export in api_info:
if export["exportname"] == func["name"]:
res_new = {
"name": export["exportname"],
"dll": export["dllname"],
"addr": func["addr"],
}
break
if res_new != {}:
res.append(res_new)
return res
def main():
parser = argparse.ArgumentParser(
prog="iat.py",
description="Create a patched PE from a binary dump and a traceCFG file.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# Input arguments
parser.add_argument("dump", type=str, help="The path to the wave dump file (usually ends with .dump)")
parser.add_argument("trace", type=str, help="The path to the traceCFG file (.json)")
# Additional arguments
parser.add_argument("-v", "--verbose", action="store_true", help="Output additional debug info")
parser.add_argument("-o", "--output", type=str, default="patched.exe", help="Specify an output filepath for the patched PE.")
parser.add_argument("-w", "--wave", type=int, help="Specify the wave number for the binary dump (if it can't be inferred from the filename)")
parser.add_argument("--disable-reginit", action="store_true", help="Disable initialization of the registry before jumping to the wave start")
args = parser.parse_args()
utils.set_verbose(args.verbose)
# open wave dump file
with open(args.dump, "rb") as f:
pe = lief.parse(f)
assert isinstance(pe, lief.PE.Binary)
utils.print_debug(f"Opened file {args.dump} as the binary dump")
# open traceCFG json
with open(args.trace, "r") as f:
cfg = json.load(f)
utils.print_debug(f"Opened file {args.trace} as the TraceCFG JSON")
# determine target wave
if args.wave is None and args.dump[-5:] == ".dump":
wave = int(args.dump[-9:-5])
else:
wave = args.wave
utils.print_debug(f"Determined wave to be {wave}")
calls = cfg_parser.parse_syscalls(cfg, wave)
wave_entry = cfg_parser.parse_wave_entrypoint(cfg, wave)
# create new section
iatpatch_section = lief.PE.Section(".iatpatch")
iatpatch_content = []
# registers initialization
if not args.disable_reginit:
iatpatch_content += reginit.generate_reg_init_code(cfg, pe, wave, wave_entry)
# write patch section code
if iatpatch_content != []:
iatpatch_section.content = iatpatch_content # pyright: ignore[reportAttributeAccessIssue]
# add new section to PE
pe.add_section(iatpatch_section)
# patch entrypoint
if args.disable_reginit:
entrypoint_format = int(hex(cfg_parser.parse_wave_entrypoint(cfg, wave))[-4:], 16)
else:
entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
pe.optional_header.addressof_entrypoint = entrypoint_format
# remove all current imports
pe.remove_all_imports()
# recreate all DLL imports from calls detected
dll_calls_list = []
imported_dll_list = []
func_calls_list = []
for dll in get_used_dlls(calls):
dll_calls_list.append(dll.lower())
imported_dll = pe.add_import(dll.lower())
imported_dll_list.append(imported_dll)
# recreate all function calls related to that dll import
for func in get_used_functions_from_dll(dll, calls):
func_calls_list.append(func)
imported_dll.add_entry(func)
# get list of functions called with getprocaddr in previous wave
func_list = cfg_parser.parse_procaddr_calls(cfg, wave - 1)
func_dll_list = link_func_to_dll(func_list)
for func in func_dll_list:
if func["name"] in func_calls_list: # call already added
continue
if func["dll"] in dll_calls_list: # dll already added
imported_dll_list[dll_calls_list.index(func["dll"])].add_entry(func["name"])
else: # we need to import the new DLL
dll_calls_list.append(func["dll"])
imported_dll = pe.add_import(func["dll"])
imported_dll_list.append(imported_dll)
func_calls_list.append(func["name"])
imported_dll.add_entry(func["name"])
# Define all sections as writeable, to prevent permission issues.
# Ideally, we would like to have the actual permitions from Goatracer at some point in the future
for section in pe.sections:
section.characteristics = (
lief.PE.Section.CHARACTERISTICS.MEM_WRITE.value
+ lief.PE.Section.CHARACTERISTICS.MEM_READ.value
+ lief.PE.Section.CHARACTERISTICS.MEM_EXECUTE.value
+ lief.PE.Section.CHARACTERISTICS.CNT_INITIALIZED_DATA.value
)
# At this point, the new IAT will only be constructed when the PE is written. We therefore need to make a callback function to patch calls afterwards.
def patching_callback(pe: lief.PE.Binary, imp: lief.PE.Import, entry: lief.PE.ImportEntry, rva: int):
utils.print_debug(f"Now trying to patch {entry.name}!{imp.name}...")
for call in filter(lambda x: x["name"] == f"{imp.name.upper()}!{entry.name}", calls):
patch.patch_instr_to_new_IAT_entry(pe, call, rva)
# patch additional non-call related info
for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, func_dll_list):
patch.patch_addr_found_in_mem(pe, rva, func["addr"])
utils.print_debug("Done!\n")
# write result
config = lief.PE.Builder.config_t()
config.imports = True # allows the config of the writer to write a new IAT
config.resolved_iat_cbk = patching_callback # Define the callback
output_path = args.output
pe.write(output_path, config)
print(f"Wrote the patched executable as {output_path}")
if __name__ == "__main__":
main()