186 lines
7 KiB
Python
186 lines
7 KiB
Python
import argparse
|
|
import json
|
|
|
|
import lief
|
|
|
|
import cfg_parser
|
|
import patch
|
|
import reginit
|
|
import utils
|
|
|
|
lief.disable_leak_warning() # warnings to disable for the callback
|
|
|
|
with open("lib/WindowsDllsExport/win10-19043-exports.json", "rb") as f:
|
|
api_info = json.load(f)
|
|
|
|
|
|
# Retrives all unique DLL names being imported
|
|
def get_used_dlls(calls: list[dict[str, str]]) -> set[str]:
|
|
res = set()
|
|
for call in calls:
|
|
res.add(call["name"].split("!")[0])
|
|
return res
|
|
|
|
|
|
# Retrieves all unique function names used for a single DLL name
|
|
def get_used_functions_from_dll(dllname, calls):
|
|
res = set()
|
|
for [dll, func] in map(lambda x: x["name"].split("!"), calls):
|
|
if dll == dllname:
|
|
res.add(func)
|
|
return res
|
|
|
|
|
|
def link_func_to_dll(func_list):
|
|
res = []
|
|
for func in func_list:
|
|
# first only including imported dlls
|
|
res_new = {}
|
|
for export in api_info:
|
|
if export["dllname"] in func and export["exportname"] == func["name"]:
|
|
res_new = {
|
|
"name": export["exportname"],
|
|
"dll": export["dllname"],
|
|
"addr": func["addr"],
|
|
}
|
|
break
|
|
if res_new == {}:
|
|
# try adding a new dll
|
|
for export in api_info:
|
|
if export["exportname"] == func["name"]:
|
|
res_new = {
|
|
"name": export["exportname"],
|
|
"dll": export["dllname"],
|
|
"addr": func["addr"],
|
|
}
|
|
break
|
|
if res_new != {}:
|
|
res.append(res_new)
|
|
return res
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog="iat.py",
|
|
description="Create a patched PE from a binary dump and a traceCFG file.",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
)
|
|
|
|
# Input arguments
|
|
parser.add_argument("dump", type=str, help="The path to the wave dump file (usually ends with .dump)")
|
|
parser.add_argument("trace", type=str, help="The path to the traceCFG file (.json)")
|
|
|
|
# Additional arguments
|
|
parser.add_argument("-v", "--verbose", action="store_true", help="Output additional debug info")
|
|
parser.add_argument("-o", "--output", type=str, default="patched.exe", help="Specify an output filepath for the patched PE.")
|
|
parser.add_argument("-w", "--wave", type=int, help="Specify the wave number for the binary dump (if it can't be inferred from the filename)")
|
|
parser.add_argument("--disable-reginit", action="store_true", help="Disable initialization of the registry before jumping to the wave start")
|
|
|
|
args = parser.parse_args()
|
|
utils.set_verbose(args.verbose)
|
|
|
|
# open wave dump file
|
|
with open(args.dump, "rb") as f:
|
|
pe = lief.parse(f)
|
|
assert isinstance(pe, lief.PE.Binary)
|
|
utils.print_debug(f"Opened file {args.dump} as the binary dump")
|
|
|
|
# open traceCFG json
|
|
with open(args.trace, "r") as f:
|
|
cfg = json.load(f)
|
|
utils.print_debug(f"Opened file {args.trace} as the TraceCFG JSON")
|
|
|
|
# determine target wave
|
|
if args.wave is None and args.dump[-5:] == ".dump":
|
|
wave = int(args.dump[-9:-5])
|
|
else:
|
|
wave = args.wave
|
|
utils.print_debug(f"Determined wave to be {wave}")
|
|
|
|
calls = cfg_parser.parse_syscalls(cfg, wave)
|
|
wave_entry = cfg_parser.parse_wave_entrypoint(cfg, wave)
|
|
|
|
# create new section
|
|
iatpatch_section = lief.PE.Section(".iatpatch")
|
|
iatpatch_content = []
|
|
|
|
# registers initialization
|
|
if not args.disable_reginit:
|
|
iatpatch_content += reginit.generate_reg_init_code(cfg, pe, wave, wave_entry)
|
|
|
|
# write patch section code
|
|
if iatpatch_content != []:
|
|
iatpatch_section.content = iatpatch_content # pyright: ignore[reportAttributeAccessIssue]
|
|
|
|
# add new section to PE
|
|
pe.add_section(iatpatch_section)
|
|
|
|
# patch entrypoint
|
|
if args.disable_reginit:
|
|
entrypoint_format = int(hex(cfg_parser.parse_wave_entrypoint(cfg, wave))[-4:], 16)
|
|
else:
|
|
entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
|
|
pe.optional_header.addressof_entrypoint = entrypoint_format
|
|
|
|
# remove all current imports
|
|
pe.remove_all_imports()
|
|
|
|
# recreate all DLL imports from calls detected
|
|
dll_calls_list = []
|
|
imported_dll_list = []
|
|
func_calls_list = []
|
|
for dll in get_used_dlls(calls):
|
|
dll_calls_list.append(dll.lower())
|
|
imported_dll = pe.add_import(dll.lower())
|
|
imported_dll_list.append(imported_dll)
|
|
# recreate all function calls related to that dll import
|
|
for func in get_used_functions_from_dll(dll, calls):
|
|
func_calls_list.append(func)
|
|
imported_dll.add_entry(func)
|
|
|
|
# get list of functions called with getprocaddr in previous wave
|
|
func_list = cfg_parser.parse_procaddr_calls(cfg, wave - 1)
|
|
func_dll_list = link_func_to_dll(func_list)
|
|
for func in func_dll_list:
|
|
if func["name"] in func_calls_list: # call already added
|
|
continue
|
|
if func["dll"] in dll_calls_list: # dll already added
|
|
imported_dll_list[dll_calls_list.index(func["dll"])].add_entry(func["name"])
|
|
else: # we need to import the new DLL
|
|
dll_calls_list.append(func["dll"])
|
|
imported_dll = pe.add_import(func["dll"])
|
|
imported_dll_list.append(imported_dll)
|
|
func_calls_list.append(func["name"])
|
|
imported_dll.add_entry(func["name"])
|
|
|
|
# Define all sections as writeable, to prevent permission issues.
|
|
# Ideally, we would like to have the actual permitions from Goatracer at some point in the future
|
|
for section in pe.sections:
|
|
section.characteristics = (
|
|
lief.PE.Section.CHARACTERISTICS.MEM_WRITE.value
|
|
+ lief.PE.Section.CHARACTERISTICS.MEM_READ.value
|
|
+ lief.PE.Section.CHARACTERISTICS.MEM_EXECUTE.value
|
|
+ lief.PE.Section.CHARACTERISTICS.CNT_INITIALIZED_DATA.value
|
|
)
|
|
|
|
# At this point, the new IAT will only be constructed when the PE is written. We therefore need to make a callback function to patch calls afterwards.
|
|
def patching_callback(pe: lief.PE.Binary, imp: lief.PE.Import, entry: lief.PE.ImportEntry, rva: int):
|
|
utils.print_debug(f"Now trying to patch {entry.name}!{imp.name}...")
|
|
for call in filter(lambda x: x["name"] == f"{imp.name.upper()}!{entry.name}", calls):
|
|
patch.patch_instr_to_new_IAT_entry(pe, call, rva)
|
|
# patch additional non-call related info
|
|
for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, func_dll_list):
|
|
patch.patch_addr_found_in_mem(pe, rva, func["addr"])
|
|
utils.print_debug("Done!\n")
|
|
|
|
# write result
|
|
config = lief.PE.Builder.config_t()
|
|
config.imports = True # allows the config of the writer to write a new IAT
|
|
config.resolved_iat_cbk = patching_callback # Define the callback
|
|
output_path = args.output
|
|
pe.write(output_path, config)
|
|
print(f"Wrote the patched executable as {output_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|