Compare commits

...

8 commits

10 changed files with 3290 additions and 256 deletions

40
cfg_parser.py Normal file
View file

@ -0,0 +1,40 @@
def parse_wave_nodes(cfg, wave: int) -> list:
return list(filter(lambda node: node["wave"] == wave, cfg["nodes"]))
def parse_procaddr_calls(cfg, wave: int) -> list:
res = []
wave_nodes: list[dict] = parse_wave_nodes(cfg, wave)
for node in wave_nodes:
if "syscalls" in node.keys():
for syscall in node["syscalls"]:
if syscall["name"] == "KERNEL32.DLL!GetProcAddress":
funcname = syscall["arguments"][-1].split('"')[1]
func_addr = syscall["return"]
res.append({"name": funcname, "addr": func_addr})
return res
def parse_syscalls(cfg, wave: int) -> list[dict[str, str]]:
res: list[dict[str, str]] = []
wave_nodes: list[dict] = parse_wave_nodes(cfg, wave)
no_repeat = []
for node in wave_nodes:
if "syscalls" in node.keys():
for syscall in node["syscalls"]:
if node["last_instr"] in no_repeat:
continue
adress = node["last_instr"] # call is at the end of the basic block
name = syscall["name"]
# current_instruction = node["instructions"][-1]["mnemonic"]
no_repeat.append(adress)
res.append({"adress": adress, "name": name})
return res
def parse_wave_entrypoint(cfg, wave: int) -> int:
return int(parse_wave_nodes(cfg, wave)[0]["start"], 16)
def parse_bb_registers(cfg, wave: int, n_bb: int) -> dict[str, str]:
return parse_wave_nodes(cfg, wave)[n_bb]["registers"]

317
iat.py
View file

@ -1,29 +1,18 @@
import argparse
import json
import lief
import cfg_parser
import patch
import reginit
import utils
lief.disable_leak_warning() # warnings to disable for the callback
with open("lib/WindowsDllsExport/win10-19043-exports.json", "rb") as f:
api_info = json.load(f)
dump_path = "rsc/wave-0001.dump"
# dump_path = "rsc/wave-0002.dump"
iat_json_path = "rsc/upx-hostname.exe.bin_iat_wave1.json"
# iat_json_path = "rsc/000155f2e0360f6ff6cd.exe_iat_wave2.json"
def hex_address_to_memory_representation(hex_addr: str, is_32b: bool, is_little_endian: bool) -> list[int]:
adress_size = 4 if is_32b else 8
mem_value = [0x00] * adress_size
hex_addr = hex_addr[::-1][:-2] # reversing order and stripping zero
for i in range(0, adress_size):
byte_str = hex_addr[i * 2 : (i + 1) * 2][::-1]
mem_value[i] += int(byte_str, 16)
if not is_little_endian:
mem_value = mem_value[::-1] # reverse byte order for big endian
return mem_value
# Retrives all unique DLL names being imported
def get_used_dlls(calls: list[dict[str, str]]) -> set[str]:
@ -42,112 +31,27 @@ def get_used_functions_from_dll(dllname, calls):
return res
def patch_direct_adress_call(pe: lief.PE.Binary, rva: int, instruction_offset: int):
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
new_value = hex_address_to_memory_representation(
hex(rva + pe.imagebase),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
pe.patch_address(instruction_offset, [0xFF, 0x15] + new_value, lief.Binary.VA_TYPES.RVA)
def patch_direct_adress_jump(pe: lief.PE.Binary, rva: int, instruction_offset: int):
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
new_value = hex_address_to_memory_representation(
hex(rva + pe.imagebase),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
pe.patch_address(instruction_offset, [0xFF, 0x25] + new_value, lief.Binary.VA_TYPES.RVA)
def patch_instr_to_new_IAT_entry(pe: lief.PE.Binary, call: dict[str, str], rva: int):
base = pe.imagebase
instruction_offset = int(call["adress"], 16) - base
memview = pe.get_content_from_virtual_address(instruction_offset, 2)
if [memview[0], memview[1]] == [0xFF, 0x15]:
patch_direct_adress_call(pe, rva, instruction_offset)
elif [memview[0], memview[1]] == [0xFF, 0x25]:
patch_direct_adress_jump(pe, rva, instruction_offset)
def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str):
is_32 = pe.abstract.header.is_32
little_endian = pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE
# scan memory for reference to old addr
old_addr_mem_repr = hex_address_to_memory_representation(
old_addr,
is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
new_addr = hex_address_to_memory_representation(
hex(rva + pe.imagebase),
is_32,
little_endian,
)
adresses_to_patch = []
for section in pe.sections:
for i in range(len(section.content)):
found = True
for j in range(len(old_addr_mem_repr)):
if i + j >= len(section.content) or section.content[i + j] != old_addr_mem_repr[j]:
found = False
break
if found:
old_addr_ref = hex_address_to_memory_representation(
hex(
section.virtual_address + i + pe.imagebase,
),
is_32,
little_endian,
)
for section in pe.sections:
for k in range(len(section.content)):
foundxref = True
for L in range(len(old_addr_ref)):
if k + L < len(section.content) and section.content[k + L] != old_addr_ref[L]:
foundxref = False
break
if foundxref:
adresses_to_patch.append(section.virtual_address + k)
for addr in adresses_to_patch:
print(f"patched {hex(addr)}")
pe.patch_address(addr, new_addr, lief.Binary.VA_TYPES.RVA)
def patch_to_new_IAT(pe: lief.PE.Binary, imp: lief.PE.Import, entry: lief.PE.ImportEntry, rva: int):
# print(f"{imp.name}!{entry.name}: 0x{rva:010x}")
for call in filter(lambda x: x["name"] == f"{imp.name.upper()}!{entry.name}", calls):
patch_instr_to_new_IAT_entry(pe, call, rva)
# patch additional non-call related info
print(entry.name)
for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, procaddr_list):
# print(func["name"])
patch_addr_found_in_mem(pe, rva, func["addr"])
def get_list_of_procaddr_functions(prevwave_info):
def link_func_to_dll(func_list):
res = []
for call in prevwave_info:
for func in func_list:
# first only including imported dlls
res_new = {}
for export in api_info:
if export["dllname"] in dll_calls_list and export["exportname"] == call["function"]:
if export["dllname"] in func and export["exportname"] == func["name"]:
res_new = {
"name": export["exportname"],
"dll": export["dllname"],
"addr": call["func_addr"],
"addr": func["addr"],
}
break
if res_new == {}:
# try adding a new dll
for export in api_info:
if export["exportname"] == call["function"]:
if export["exportname"] == func["name"]:
res_new = {
"name": export["exportname"],
"dll": export["dllname"],
"addr": call["func_addr"],
"addr": func["addr"],
}
break
if res_new != {}:
@ -155,111 +59,122 @@ def get_list_of_procaddr_functions(prevwave_info):
return res
# wave dump file to patch
with open(dump_path, "rb") as f:
pe = lief.parse(f)
assert isinstance(pe, lief.PE.Binary)
# JSON generated with the python reader files
with open(iat_json_path, "r") as iat_json_input:
iat_data = json.load(iat_json_input)
calls: list[dict[str, str]] = iat_data["calls"]
wave_entry = int(iat_data["entry"], 16)
# create new section
patch_section = lief.PE.Section(".iatpatch")
content = []
# initiate registry values
reg_to_inst_code = {
"EAX": 0xC0,
"EBX": 0xC3,
"ECX": 0xC1,
"EDX": 0xC2,
"ESI": 0xC6,
"EDI": 0xC7,
"EBP": 0xC5,
# "ESP": 0xC4,
}
for reg in iat_data["entry_reg_values"].keys():
if reg not in reg_to_inst_code:
continue
new_instruction = [
0xC7,
reg_to_inst_code[reg],
] + hex_address_to_memory_representation(
iat_data["entry_reg_values"][reg].strip(),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
def main():
parser = argparse.ArgumentParser(
prog="iat.py",
description="Create a patched PE from a binary dump and a traceCFG file.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
for byte in new_instruction:
content.append(byte)
# Input arguments
parser.add_argument("dump", type=str, help="The path to the wave dump file (usually ends with .dump)")
parser.add_argument("trace", type=str, help="The path to the traceCFG file (.json)")
# add ret to actual OEP
# Additional arguments
parser.add_argument("-o", "--output", type=str, default="patched.exe", help="Specify an output filepath for the patched PE.")
parser.add_argument("-w", "--wave", type=int, help="Specify the wave number for the binary dump (if it can't be inferred from the filename)")
parser.add_argument("-v", "--verbose", action="store_true", help="Output additional debug info")
content += [0x68] + hex_address_to_memory_representation(
hex(wave_entry),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
args = parser.parse_args()
utils.set_verbose(args.verbose)
content += [0xC3]
# open wave dump file
with open(args.dump, "rb") as f:
pe = lief.parse(f)
assert isinstance(pe, lief.PE.Binary)
utils.print_debug(f"Opened file {args.dump} as the binary dump")
patch_section.content = content
# open traceCFG json
with open(args.trace, "r") as f:
cfg = json.load(f)
utils.print_debug(f"Opened file {args.trace} as the TraceCFG JSON")
# add new section to PE
pe.add_section(patch_section)
# determine target wave
if args.wave is None and args.dump[-5:] == ".dump":
wave = int(args.dump[-9:-5])
else:
wave = args.wave
utils.print_debug(f"Determined wave to be {wave}")
# patch entrypoint
# entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
pe.optional_header.addressof_entrypoint = entrypoint_format
calls = cfg_parser.parse_syscalls(cfg, wave)
wave_entry = cfg_parser.parse_wave_entrypoint(cfg, wave)
# remove all current imports
pe.remove_all_imports()
# create new section
iatpatch_section = lief.PE.Section(".iatpatch")
iatpatch_content = []
# recreate all DLL imports from calls detected
dll_calls_list = []
imported_dll_list = []
func_calls_list = []
for dll in get_used_dlls(calls):
dll_calls_list.append(dll.lower())
imported_dll = pe.add_import(dll.lower())
imported_dll_list.append(imported_dll)
# recreate all function calls related to that dll import
for func in get_used_functions_from_dll(dll, calls):
func_calls_list.append(func)
imported_dll.add_entry(func)
# registers initiation
iatpatch_content += reginit.generate_reg_init_code(cfg, pe, wave, wave_entry)
# get list of functions called with getprocaddr
procaddr_list = get_list_of_procaddr_functions(iat_data["prevwave_getprocaddr"])
for func in procaddr_list:
if func["name"] in func_calls_list: # call already added
continue
if func["dll"] in dll_calls_list: # dll already added
imported_dll_list[dll_calls_list.index(func["dll"])].add_entry(func["name"])
else: # we need to import the new DLL
dll_calls_list.append(func["dll"])
imported_dll = pe.add_import(func["dll"])
# write patch section code
iatpatch_section.content = iatpatch_content # pyright: ignore[reportAttributeAccessIssue]
# add new section to PE
pe.add_section(iatpatch_section)
# patch entrypoint
entrypoint_format = int(hex(pe.get_section(".iatpatch").virtual_address)[-4:], 16)
pe.optional_header.addressof_entrypoint = entrypoint_format
# remove all current imports
pe.remove_all_imports()
# recreate all DLL imports from calls detected
dll_calls_list = []
imported_dll_list = []
func_calls_list = []
for dll in get_used_dlls(calls):
dll_calls_list.append(dll.lower())
imported_dll = pe.add_import(dll.lower())
imported_dll_list.append(imported_dll)
func_calls_list.append(func["name"])
imported_dll.add_entry(func["name"])
# recreate all function calls related to that dll import
for func in get_used_functions_from_dll(dll, calls):
func_calls_list.append(func)
imported_dll.add_entry(func)
# At this point, the new IAT will only be constructed when the PE is written. We therefore need to make a callback function to patch calls afterwards.
# get list of functions called with getprocaddr in previous wave
func_list = cfg_parser.parse_procaddr_calls(cfg, wave - 1)
func_dll_list = link_func_to_dll(func_list)
for func in func_dll_list:
if func["name"] in func_calls_list: # call already added
continue
if func["dll"] in dll_calls_list: # dll already added
imported_dll_list[dll_calls_list.index(func["dll"])].add_entry(func["name"])
else: # we need to import the new DLL
dll_calls_list.append(func["dll"])
imported_dll = pe.add_import(func["dll"])
imported_dll_list.append(imported_dll)
func_calls_list.append(func["name"])
imported_dll.add_entry(func["name"])
# Define all sections as writeable, to help with some weird stuff we're seeing
for section in pe.sections:
section.characteristics = (
lief.PE.Section.CHARACTERISTICS.MEM_WRITE.value
+ lief.PE.Section.CHARACTERISTICS.MEM_READ.value
+ lief.PE.Section.CHARACTERISTICS.MEM_EXECUTE.value
+ lief.PE.Section.CHARACTERISTICS.CNT_INITIALIZED_DATA.value
)
# Define all sections as writeable, to prevent permission issues.
# Ideally, we would like to have the actual permitions from Goatracer at some point in the future
for section in pe.sections:
section.characteristics = (
lief.PE.Section.CHARACTERISTICS.MEM_WRITE.value
+ lief.PE.Section.CHARACTERISTICS.MEM_READ.value
+ lief.PE.Section.CHARACTERISTICS.MEM_EXECUTE.value
+ lief.PE.Section.CHARACTERISTICS.CNT_INITIALIZED_DATA.value
)
# write result
config = lief.PE.Builder.config_t()
config.imports = True # allows the config of the writer to write a new IAT
config.resolved_iat_cbk = patch_to_new_IAT # callback after the IAT has been written
pe.write("patched.exe", config)
print("Wrote the patched executable as patched.exe")
# At this point, the new IAT will only be constructed when the PE is written. We therefore need to make a callback function to patch calls afterwards.
def patching_callback(pe: lief.PE.Binary, imp: lief.PE.Import, entry: lief.PE.ImportEntry, rva: int):
utils.print_debug(f"Now trying to patch {entry.name}!{imp.name}...")
for call in filter(lambda x: x["name"] == f"{imp.name.upper()}!{entry.name}", calls):
patch.patch_instr_to_new_IAT_entry(pe, call, rva)
# patch additional non-call related info
for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, func_dll_list):
patch.patch_addr_found_in_mem(pe, rva, func["addr"])
utils.print_debug("Done!\n")
# write result
config = lief.PE.Builder.config_t()
config.imports = True # allows the config of the writer to write a new IAT
config.resolved_iat_cbk = patching_callback # Define the callback
output_path = args.output
pe.write(output_path, config)
print(f"Wrote the patched executable as {output_path}")
if __name__ == "__main__":
main()

72
patch.py Normal file
View file

@ -0,0 +1,72 @@
import lief
import utils
from utils import Instructions, hex_address_to_memory_representation, is_32b, is_little_endian
def patch_direct_adress_call(pe: lief.PE.Binary, rva: int, instruction_offset: int):
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
new_value = hex_address_to_memory_representation(
hex(rva + pe.imagebase),
is_32b(pe),
is_little_endian(pe),
)
pe.patch_address(instruction_offset, Instructions.CALL_ADDR + new_value, lief.Binary.VA_TYPES.RVA)
utils.print_debug(f" Patched a call at addr {hex(pe.imagebase + instruction_offset)}")
def patch_direct_adress_jump(pe: lief.PE.Binary, rva: int, instruction_offset: int):
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
new_value = hex_address_to_memory_representation(hex(rva + pe.imagebase), is_32b(pe), is_little_endian(pe))
pe.patch_address(instruction_offset, Instructions.JUMP_ADDR + new_value, lief.Binary.VA_TYPES.RVA)
utils.print_debug(f" Patched a jump at addr {hex(pe.imagebase + instruction_offset)}")
def patch_instr_to_new_IAT_entry(pe: lief.PE.Binary, call: dict[str, str], rva: int):
base = pe.imagebase
instruction_offset = int(call["adress"], 16) - base
memview = pe.get_content_from_virtual_address(instruction_offset, 2)
if [memview[0], memview[1]] == Instructions.CALL_ADDR:
patch_direct_adress_call(pe, rva, instruction_offset)
elif [memview[0], memview[1]] == Instructions.JUMP_ADDR:
patch_direct_adress_jump(pe, rva, instruction_offset)
def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str):
is_32 = pe.abstract.header.is_32
little_endian = pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE
# scan memory for reference to old addr
old_addr_mem_repr = hex_address_to_memory_representation(old_addr, is_32b(pe), is_little_endian(pe))
new_addr = hex_address_to_memory_representation(hex(rva + pe.imagebase), is_32, little_endian)
found_ref_addr = []
found_xref_addr = []
for section in pe.sections:
for i in range(len(section.content)):
found = True
for j in range(len(old_addr_mem_repr)):
if i + j >= len(section.content) or section.content[i + j] != old_addr_mem_repr[j]:
found = False
break
if found:
ref_addr = hex_address_to_memory_representation(
hex(
section.virtual_address + i + pe.imagebase,
),
is_32,
little_endian,
)
found_ref_addr.append(ref_addr)
for section in pe.sections:
for ref_addr in found_ref_addr:
for k in range(len(section.content) - len(ref_addr)):
foundxref = True
for L in range(len(ref_addr)):
if section.content[k + L] != ref_addr[L]:
foundxref = False
break
if foundxref:
found_xref_addr.append(section.virtual_address + k)
for addr in found_xref_addr:
pe.patch_address(addr, new_addr, lief.Binary.VA_TYPES.RVA)
utils.print_debug(f" Patched an xref to old IAT at {hex(pe.imagebase + addr)}")

43
reginit.py Normal file
View file

@ -0,0 +1,43 @@
from enum import IntEnum
import lief
import cfg_parser
from utils import Instructions, hex_address_to_memory_representation, is_32b, is_little_endian
class Registers(IntEnum):
EAX = 0xC0
EBX = 0xC3
ECX = 0xC1
EDX = 0xC2
ESI = 0xC6
EDI = 0xC7
EBP = 0xC5
# ESP = 0xC4
def generate_reg_init_code(cfg, pe: lief.PE.Binary, wave: int, wave_entry: int) -> list[int]:
code = []
reg_values = cfg_parser.parse_bb_registers(cfg, wave, 0)
for reg in reg_values:
if reg not in Registers.__members__:
continue
new_instruction = (
Instructions.MOV_REG
+ [Registers[reg]]
+ hex_address_to_memory_representation(
reg_values[reg].strip(),
is_32b(pe),
is_little_endian(pe),
)
)
for byte in new_instruction:
code.append(byte)
# add ret to actual OEP
code += Instructions.PUSH + hex_address_to_memory_representation(hex(wave_entry), is_32b(pe), is_little_endian(pe)) # push addr
code += Instructions.RET
return code

File diff suppressed because one or more lines are too long

View file

@ -1,54 +0,0 @@
{
"entry": "0x10011d7",
"calls": [
{ "adress": "0x10011e6", "name": "KERNEL32.DLL!GetModuleHandleA" },
{ "adress": "0x1001243", "name": "MSVCRT.DLL!__set_app_type" },
{ "adress": "0x1001258", "name": "MSVCRT.DLL!__p__fmode" },
{ "adress": "0x1001266", "name": "MSVCRT.DLL!__p__commode" },
{ "adress": "0x10013be", "name": "MSVCRT.DLL!_controlfp" },
{ "adress": "0x1001358", "name": "MSVCRT.DLL!_initterm" },
{ "adress": "0x10012cb", "name": "MSVCRT.DLL!__getmainargs" },
{ "adress": "0x10010f2", "name": "WS2_32.DLL!WSAStartup" },
{ "adress": "0x1001160", "name": "WS2_32.DLL!gethostname" },
{ "adress": "0x10011ba", "name": "USER32.DLL!CharToOemBuffA" },
{ "adress": "0x10011c7", "name": "MSVCRT.DLL!puts" },
{ "adress": "0x10011d0", "name": "MSVCRT.DLL!exit" }
],
"entry_reg_values": {
"EAX": "0x000cff0c ",
"EBX": "0x7efde000 ",
"ECX": "0x00000000 ",
"EDX": "0x010058c0",
"ESI": "0x00000000 ",
"EDI": "0x00000000 ",
"EBP": "0x000cff94 ",
"ESP": "0x000cff8c",
"eflags": "0x00000203"
},
"prevwave_getprocaddr": [
{ "function": "FormatMessageA", "func_addr": "0x75985fbd" },
{ "function": "LocalFree", "func_addr": "0x75962d3c" },
{ "function": "GetModuleHandleA", "func_addr": "0x75961245" },
{ "function": "GetLastError", "func_addr": "0x759611c0" },
{ "function": "__p__commode", "func_addr": "0x752c27c3" },
{ "function": "__p__fmode", "func_addr": "0x752c27ce" },
{ "function": "__set_app_type", "func_addr": "0x752c2804" },
{ "function": "_controlfp", "func_addr": "0x752be1e1" },
{ "function": "_cexit", "func_addr": "0x752c37d4" },
{ "function": "_adjust_fdiv", "func_addr": "0x753532ec" },
{ "function": "_except_handler3", "func_addr": "0x752dd770" },
{ "function": "_XcptFilter", "func_addr": "0x752ddc75" },
{ "function": "_exit", "func_addr": "0x7531b2c0" },
{ "function": "_c_exit", "func_addr": "0x7531b2db" },
{ "function": "__setusermatherr", "func_addr": "0x753477ad" },
{ "function": "_initterm", "func_addr": "0x752bc151" },
{ "function": "__getmainargs", "func_addr": "0x752c2bc0" },
{ "function": "__initenv", "func_addr": "0x753504e8" },
{ "function": "_write", "func_addr": "0x752c4078" },
{ "function": "strchr", "func_addr": "0x752bdbeb" },
{ "function": "puts", "func_addr": "0x75328d04" },
{ "function": "exit", "func_addr": "0x752c36aa" },
{ "function": "s_perror", "func_addr": "0x6c8a1be4" },
{ "function": "CharToOemBuffA", "func_addr": "0x76aeb1b0" }
]
}

Binary file not shown.

File diff suppressed because it is too large Load diff

44
utils.py Normal file
View file

@ -0,0 +1,44 @@
from enum import Enum
import lief
class Instructions(list[int], Enum):
RET = [0xC3]
PUSH = [0x68]
MOV_REG = [0xC7]
CALL_ADDR = [0xFF, 0x15]
JUMP_ADDR = [0xFF, 0x25]
def is_32b(pe: lief.PE.Binary):
return pe.abstract.header.is_32
def is_little_endian(pe: lief.PE.Binary):
return pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE
def hex_address_to_memory_representation(hex_addr: str, is_32b: bool, is_little_endian: bool) -> list[int]:
adress_size = 4 if is_32b else 8
mem_value = [0x00] * adress_size
hex_addr = hex_addr[::-1][:-2] # reversing order and stripping zero
for i in range(0, adress_size):
byte_str = hex_addr[i * 2 : (i + 1) * 2][::-1]
mem_value[i] += int(byte_str, 16)
if not is_little_endian:
mem_value = mem_value[::-1] # reverse byte order for big endian
return mem_value
verbose = False
def print_debug(msg: str):
if verbose:
print(msg)
def set_verbose(value: bool):
global verbose
verbose = value