code formatting and newly created enums to clean up code

This commit is contained in:
Aéna Aria 2026-04-02 14:02:07 +02:00
parent f46cc2438f
commit 6f4fccd350
5 changed files with 111 additions and 84 deletions

View file

@ -1,38 +1,43 @@
import utils
def parse_wave_nodes(cfg,wave: int) -> list:
return list(filter(lambda node: node["wave"] == wave,cfg["nodes"]))
def parse_procaddr_calls(cfg,wave:int):
def parse_wave_nodes(cfg, wave: int) -> list:
return list(filter(lambda node: node["wave"] == wave, cfg["nodes"]))
def parse_procaddr_calls(cfg, wave: int) -> list:
res = []
wave_nodes:list[dict] = parse_wave_nodes(cfg,wave)
wave_nodes: list[dict] = parse_wave_nodes(cfg, wave)
for node in wave_nodes:
if "syscalls" in node.keys():
for syscall in node["syscalls"]:
if syscall["name"] == "KERNEL32.DLL!GetProcAddress":
funcname = syscall["arguments"][-1].split("\"")[1]
funcname = syscall["arguments"][-1].split('"')[1]
func_addr = syscall["return"]
res.append({"name": funcname, "addr": func_addr})
return res
def parse_syscalls(cfg,wave: int) -> list[dict[str, str]]:
res: list[dict[str,str]] = []
wave_nodes:list[dict] = parse_wave_nodes(cfg,wave)
def parse_syscalls(cfg, wave: int) -> list[dict[str, str]]:
res: list[dict[str, str]] = []
wave_nodes: list[dict] = parse_wave_nodes(cfg, wave)
no_repeat = []
for node in wave_nodes:
if "syscalls" in node.keys():
for syscall in node["syscalls"]:
if node["last_instr"] in no_repeat:
continue
adress = node["last_instr"] # call is at the end of the basic block
adress = node["last_instr"] # call is at the end of the basic block
name = syscall["name"]
current_instruction = node["instructions"][-1]["mnemonic"]
no_repeat.append(adress)
res.append({"adress":adress,"name":name})
res.append({"adress": adress, "name": name})
return res
def parse_wave_entrypoint(cfg,wave: int) -> int:
return int(parse_wave_nodes(cfg,wave)[0]["start"],16)
def parse_bb_registers(cfg,wave:int,n_bb:int) -> dict[str,str]:
return parse_wave_nodes(cfg,wave)[n_bb]["registers"]
def parse_wave_entrypoint(cfg, wave: int) -> int:
return int(parse_wave_nodes(cfg, wave)[0]["start"], 16)
def parse_bb_registers(cfg, wave: int, n_bb: int) -> dict[str, str]:
return parse_wave_nodes(cfg, wave)[n_bb]["registers"]

28
iat.py
View file

@ -1,8 +1,10 @@
import argparse
import json
import lief
import patch
import cfg_parser
import patch
import reginit
import utils
@ -12,8 +14,6 @@ with open("lib/WindowsDllsExport/win10-19043-exports.json", "rb") as f:
api_info = json.load(f)
# Retrives all unique DLL names being imported
def get_used_dlls(calls: list[dict[str, str]]) -> set[str]:
res = set()
@ -58,8 +58,13 @@ def link_func_to_dll(func_list):
res.append(res_new)
return res
def main():
parser = argparse.ArgumentParser(prog="iat.py", description="Create a patched PE from a binary dump and a traceCFG file.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser = argparse.ArgumentParser(
prog="iat.py",
description="Create a patched PE from a binary dump and a traceCFG file.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# Input arguments
parser.add_argument("dump", type=str, help="The path to the wave dump file (usually ends with .dump)")
@ -68,7 +73,7 @@ def main():
# Additional arguments
parser.add_argument("-o", "--output", type=str, default="patched.exe", help="Specify an output filepath for the patched PE.")
parser.add_argument("-w", "--wave", type=int, help="Specify the wave number for the binary dump (if it can't be inferred from the filename)")
parser.add_argument("-v", '--verbose', action='store_true', help="Output additional debug info")
parser.add_argument("-v", "--verbose", action="store_true", help="Output additional debug info")
args = parser.parse_args()
utils.set_verbose(args.verbose)
@ -85,24 +90,24 @@ def main():
utils.print_debug(f"Opened file {args.trace} as the TraceCFG JSON")
# determine target wave
if args.wave == None and args.dump[-5:] == ".dump":
if args.wave is None and args.dump[-5:] == ".dump":
wave = int(args.dump[-9:-5])
else:
wave = args.wave
utils.print_debug(f"Determined wave to be {wave}")
calls = cfg_parser.parse_syscalls(cfg,wave)
wave_entry = cfg_parser.parse_wave_entrypoint(cfg,wave)
calls = cfg_parser.parse_syscalls(cfg, wave)
wave_entry = cfg_parser.parse_wave_entrypoint(cfg, wave)
# create new section
iatpatch_section = lief.PE.Section(".iatpatch")
iatpatch_content = []
# registers initiation
iatpatch_content += reginit.generate_reg_init_code(cfg,pe,wave,wave_entry)
iatpatch_content += reginit.generate_reg_init_code(cfg, pe, wave, wave_entry)
# write patch section code
iatpatch_section.content = iatpatch_content # pyright: ignore[reportAttributeAccessIssue]
iatpatch_section.content = iatpatch_content # pyright: ignore[reportAttributeAccessIssue]
# add new section to PE
pe.add_section(iatpatch_section)
@ -160,7 +165,7 @@ def main():
# patch additional non-call related info
for func in filter(lambda x: x["name"] == entry.name and x["dll"] == imp.name, func_dll_list):
patch.patch_addr_found_in_mem(pe, rva, func["addr"])
utils.print_debug(f"Done!\n")
utils.print_debug("Done!\n")
# write result
config = lief.PE.Builder.config_t()
@ -170,5 +175,6 @@ def main():
pe.write(output_path, config)
print(f"Wrote the patched executable as {output_path}")
if __name__ == "__main__":
main()

View file

@ -1,37 +1,34 @@
from utils import hex_address_to_memory_representation
import lief
import utils
from utils import Instructions, hex_address_to_memory_representation, is_32b, is_little_endian
def patch_direct_adress_call(pe: lief.PE.Binary, rva: int, instruction_offset: int):
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
new_value = hex_address_to_memory_representation(
hex(rva + pe.imagebase),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
is_32b(pe),
is_little_endian(pe),
)
pe.patch_address(instruction_offset, [0xFF, 0x15] + new_value, lief.Binary.VA_TYPES.RVA)
utils.print_debug(f" Patched a call at addr {hex(pe.imagebase+instruction_offset)}")
pe.patch_address(instruction_offset, Instructions.CALL_ADDR + new_value, lief.Binary.VA_TYPES.RVA)
utils.print_debug(f" Patched a call at addr {hex(pe.imagebase + instruction_offset)}")
def patch_direct_adress_jump(pe: lief.PE.Binary, rva: int, instruction_offset: int):
# We can manually patch the instruction here: FF 15 08 10 00 01 represents `call [0x01001080]`
new_value = hex_address_to_memory_representation(
hex(rva + pe.imagebase),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
pe.patch_address(instruction_offset, [0xFF, 0x25] + new_value, lief.Binary.VA_TYPES.RVA)
utils.print_debug(f" Patched a jump at addr {hex(pe.imagebase+instruction_offset)}")
new_value = hex_address_to_memory_representation(hex(rva + pe.imagebase), is_32b(pe), is_little_endian(pe))
pe.patch_address(instruction_offset, Instructions.JUMP_ADDR + new_value, lief.Binary.VA_TYPES.RVA)
utils.print_debug(f" Patched a jump at addr {hex(pe.imagebase + instruction_offset)}")
def patch_instr_to_new_IAT_entry(pe: lief.PE.Binary, call: dict[str, str], rva: int):
base = pe.imagebase
instruction_offset = int(call["adress"], 16) - base
memview = pe.get_content_from_virtual_address(instruction_offset, 2)
if [memview[0], memview[1]] == [0xFF, 0x15]:
if [memview[0], memview[1]] == Instructions.CALL_ADDR:
patch_direct_adress_call(pe, rva, instruction_offset)
elif [memview[0], memview[1]] == [0xFF, 0x25]:
elif [memview[0], memview[1]] == Instructions.JUMP_ADDR:
patch_direct_adress_jump(pe, rva, instruction_offset)
@ -39,16 +36,8 @@ def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str):
is_32 = pe.abstract.header.is_32
little_endian = pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE
# scan memory for reference to old addr
old_addr_mem_repr = hex_address_to_memory_representation(
old_addr,
is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
)
new_addr = hex_address_to_memory_representation(
hex(rva + pe.imagebase),
is_32,
little_endian,
)
old_addr_mem_repr = hex_address_to_memory_representation(old_addr, is_32b(pe), is_little_endian(pe))
new_addr = hex_address_to_memory_representation(hex(rva + pe.imagebase), is_32, little_endian)
found_ref_addr = []
found_xref_addr = []
for section in pe.sections:
@ -70,7 +59,7 @@ def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str):
for section in pe.sections:
for ref_addr in found_ref_addr:
for k in range(len(section.content)-len(ref_addr)):
for k in range(len(section.content) - len(ref_addr)):
foundxref = True
for L in range(len(ref_addr)):
if section.content[k + L] != ref_addr[L]:
@ -80,4 +69,4 @@ def patch_addr_found_in_mem(pe: lief.PE.Binary, rva: int, old_addr: str):
found_xref_addr.append(section.virtual_address + k)
for addr in found_xref_addr:
pe.patch_address(addr, new_addr, lief.Binary.VA_TYPES.RVA)
utils.print_debug(f" Patched an xref to old IAT at {hex(pe.imagebase+addr)}")
utils.print_debug(f" Patched an xref to old IAT at {hex(pe.imagebase + addr)}")

View file

@ -1,42 +1,43 @@
import lief
import cfg_parser
from utils import hex_address_to_memory_representation
from enum import IntEnum
def generate_reg_init_code(cfg, pe: lief.PE.Binary,wave:int, wave_entry: int) -> list[int]:
import lief
import cfg_parser
from utils import Instructions, hex_address_to_memory_representation, is_32b, is_little_endian
class Registers(IntEnum):
EAX = 0xC0
EBX = 0xC3
ECX = 0xC1
EDX = 0xC2
ESI = 0xC6
EDI = 0xC7
EBP = 0xC5
# ESP = 0xC4
def generate_reg_init_code(cfg, pe: lief.PE.Binary, wave: int, wave_entry: int) -> list[int]:
code = []
# initiate registry values
reg_to_inst_code = {
"EAX": 0xC0,
"EBX": 0xC3,
"ECX": 0xC1,
"EDX": 0xC2,
"ESI": 0xC6,
"EDI": 0xC7,
"EBP": 0xC5,
# "ESP": 0xC4,
}
reg_values = cfg_parser.parse_bb_registers(cfg, wave, 0)
for reg in reg_values:
if reg not in reg_to_inst_code:
if reg not in Registers.__members__:
continue
new_instruction = [
0xC7,
reg_to_inst_code[reg],
] + hex_address_to_memory_representation(
reg_values[reg].strip(),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
new_instruction = (
Instructions.MOV_REG
+ [Registers[reg]]
+ hex_address_to_memory_representation(
reg_values[reg].strip(),
is_32b(pe),
is_little_endian(pe),
)
)
for byte in new_instruction:
code.append(byte)
# add ret to actual OEP
code += [0x68] + hex_address_to_memory_representation(
hex(wave_entry),
pe.abstract.header.is_32,
pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE,
) # push addr
code += Instructions.PUSH + hex_address_to_memory_representation(hex(wave_entry), is_32b(pe), is_little_endian(pe)) # push addr
code += [0xC3] # ret
code += Instructions.RET
return code

View file

@ -1,3 +1,24 @@
from enum import Enum
import lief
class Instructions(list[int], Enum):
RET = [0xC3]
PUSH = [0x68]
MOV_REG = [0xC7]
CALL_ADDR = [0xFF, 0x15]
JUMP_ADDR = [0xFF, 0x25]
def is_32b(pe: lief.PE.Binary):
return pe.abstract.header.is_32
def is_little_endian(pe: lief.PE.Binary):
return pe.abstract.header.endianness == lief.Header.ENDIANNESS.LITTLE
def hex_address_to_memory_representation(hex_addr: str, is_32b: bool, is_little_endian: bool) -> list[int]:
adress_size = 4 if is_32b else 8
mem_value = [0x00] * adress_size
@ -9,10 +30,15 @@ def hex_address_to_memory_representation(hex_addr: str, is_32b: bool, is_little_
mem_value = mem_value[::-1] # reverse byte order for big endian
return mem_value
verbose = False
def print_debug(msg:str):
if(verbose): print(msg)
def set_verbose(value:bool):
verbose = False
def print_debug(msg: str):
if verbose:
print(msg)
def set_verbose(value: bool):
global verbose
verbose = value