miasm2.jitter.jitcore_python module
import miasm2.jitter.jitcore as jitcore import miasm2.expression.expression as m2_expr import miasm2.jitter.csts as csts from miasm2.expression.simplifications import ExpressionSimplifier, expr_simp_explicit from miasm2.jitter.emulatedsymbexec import EmulatedSymbExec ################################################################################ # Python jitter Core # ################################################################################ class JitCore_Python(jitcore.JitCore): "JiT management, using Miasm2 Symbol Execution engine as backend" SymbExecClass = EmulatedSymbExec def __init__(self, ir_arch, bin_stream): super(JitCore_Python, self).__init__(ir_arch, bin_stream) self.ir_arch = ir_arch self.ircfg = self.ir_arch.new_ircfg() # CPU & VM (None for now) will be set later self.symbexec = self.SymbExecClass( None, None, self.ir_arch, {}, sb_expr_simp=expr_simp_explicit ) self.symbexec.enable_emulated_simplifications() def set_cpu_vm(self, cpu, vm): self.symbexec.cpu = cpu self.symbexec.vm = vm def load(self): "Preload symbols according to current architecture" self.symbexec.reset_regs() def jit_irblocks(self, loc_key, irblocks): """Create a python function corresponding to an irblocks' group. @loc_key: the loc_key of the irblocks @irblocks: a group of irblocks """ def myfunc(cpu): """Execute the function according to cpu and vmmngr states @cpu: JitCpu instance """ # Get virtual memory handler vmmngr = cpu.vmmngr # Keep current location in irblocks cur_loc_key = loc_key # Required to detect new instructions offsets_jitted = set() # Get exec engine exec_engine = self.symbexec expr_simp = exec_engine.expr_simp known_loc_keys = set(irb.loc_key for irb in irblocks) # For each irbloc inside irblocks while True: # Get the current bloc for irb in irblocks: if irb.loc_key == cur_loc_key: break else: raise RuntimeError("Irblocks must end with returning an " "ExprInt instance") # Refresh CPU values according to @cpu instance exec_engine.update_engine_from_cpu() # Execute current ir bloc for assignblk in irb: instr = assignblk.instr # For each new instruction (in assembly) if instr is not None and instr.offset not in offsets_jitted: # Test exceptions vmmngr.check_invalid_code_blocs() vmmngr.check_memory_breakpoint() if vmmngr.get_exception(): exec_engine.update_cpu_from_engine() return instr.offset offsets_jitted.add(instr.offset) # Log registers values if self.log_regs: exec_engine.update_cpu_from_engine() exec_engine.cpu.dump_gpregs() # Log instruction if self.log_mn: print "%08x %s" % (instr.offset, instr) # Check for exception if (vmmngr.get_exception() != 0 or cpu.get_exception() != 0): exec_engine.update_cpu_from_engine() return instr.offset # Eval current instruction (in IR) exec_engine.eval_updt_assignblk(assignblk) # Check for exceptions which do not update PC exec_engine.update_cpu_from_engine() if (vmmngr.get_exception() & csts.EXCEPT_DO_NOT_UPDATE_PC != 0 or cpu.get_exception() > csts.EXCEPT_NUM_UPDT_EIP): return instr.offset vmmngr.check_invalid_code_blocs() vmmngr.check_memory_breakpoint() # Get next bloc address ad = expr_simp(exec_engine.eval_expr(self.ir_arch.IRDst)) # Updates @cpu instance according to new CPU values exec_engine.update_cpu_from_engine() # Manage resulting address if isinstance(ad, m2_expr.ExprInt): return ad.arg.arg elif isinstance(ad, m2_expr.ExprLoc): cur_loc_key = ad.loc_key else: raise NotImplementedError("Type not handled: %s" % ad) # Associate myfunc with current loc_key offset = self.ir_arch.loc_db.get_location_offset(loc_key) assert offset is not None self.offset_to_jitted_func[offset] = myfunc def exec_wrapper(self, loc_key, cpu, _offset_to_jitted_func, _stop_offsets, _max_exec_per_call): """Call the function @loc_key with @cpu @loc_key: function's loc_key @cpu: JitCpu instance """ # Get Python function corresponding to @loc_key fc_ptr = self.offset_to_jitted_func[loc_key] # Execute the function return fc_ptr(cpu)
Classes
class JitCore_Python
JiT management, using Miasm2 Symbol Execution engine as backend
class JitCore_Python(jitcore.JitCore): "JiT management, using Miasm2 Symbol Execution engine as backend" SymbExecClass = EmulatedSymbExec def __init__(self, ir_arch, bin_stream): super(JitCore_Python, self).__init__(ir_arch, bin_stream) self.ir_arch = ir_arch self.ircfg = self.ir_arch.new_ircfg() # CPU & VM (None for now) will be set later self.symbexec = self.SymbExecClass( None, None, self.ir_arch, {}, sb_expr_simp=expr_simp_explicit ) self.symbexec.enable_emulated_simplifications() def set_cpu_vm(self, cpu, vm): self.symbexec.cpu = cpu self.symbexec.vm = vm def load(self): "Preload symbols according to current architecture" self.symbexec.reset_regs() def jit_irblocks(self, loc_key, irblocks): """Create a python function corresponding to an irblocks' group. @loc_key: the loc_key of the irblocks @irblocks: a group of irblocks """ def myfunc(cpu): """Execute the function according to cpu and vmmngr states @cpu: JitCpu instance """ # Get virtual memory handler vmmngr = cpu.vmmngr # Keep current location in irblocks cur_loc_key = loc_key # Required to detect new instructions offsets_jitted = set() # Get exec engine exec_engine = self.symbexec expr_simp = exec_engine.expr_simp known_loc_keys = set(irb.loc_key for irb in irblocks) # For each irbloc inside irblocks while True: # Get the current bloc for irb in irblocks: if irb.loc_key == cur_loc_key: break else: raise RuntimeError("Irblocks must end with returning an " "ExprInt instance") # Refresh CPU values according to @cpu instance exec_engine.update_engine_from_cpu() # Execute current ir bloc for assignblk in irb: instr = assignblk.instr # For each new instruction (in assembly) if instr is not None and instr.offset not in offsets_jitted: # Test exceptions vmmngr.check_invalid_code_blocs() vmmngr.check_memory_breakpoint() if vmmngr.get_exception(): exec_engine.update_cpu_from_engine() return instr.offset offsets_jitted.add(instr.offset) # Log registers values if self.log_regs: exec_engine.update_cpu_from_engine() exec_engine.cpu.dump_gpregs() # Log instruction if self.log_mn: print "%08x %s" % (instr.offset, instr) # Check for exception if (vmmngr.get_exception() != 0 or cpu.get_exception() != 0): exec_engine.update_cpu_from_engine() return instr.offset # Eval current instruction (in IR) exec_engine.eval_updt_assignblk(assignblk) # Check for exceptions which do not update PC exec_engine.update_cpu_from_engine() if (vmmngr.get_exception() & csts.EXCEPT_DO_NOT_UPDATE_PC != 0 or cpu.get_exception() > csts.EXCEPT_NUM_UPDT_EIP): return instr.offset vmmngr.check_invalid_code_blocs() vmmngr.check_memory_breakpoint() # Get next bloc address ad = expr_simp(exec_engine.eval_expr(self.ir_arch.IRDst)) # Updates @cpu instance according to new CPU values exec_engine.update_cpu_from_engine() # Manage resulting address if isinstance(ad, m2_expr.ExprInt): return ad.arg.arg elif isinstance(ad, m2_expr.ExprLoc): cur_loc_key = ad.loc_key else: raise NotImplementedError("Type not handled: %s" % ad) # Associate myfunc with current loc_key offset = self.ir_arch.loc_db.get_location_offset(loc_key) assert offset is not None self.offset_to_jitted_func[offset] = myfunc def exec_wrapper(self, loc_key, cpu, _offset_to_jitted_func, _stop_offsets, _max_exec_per_call): """Call the function @loc_key with @cpu @loc_key: function's loc_key @cpu: JitCpu instance """ # Get Python function corresponding to @loc_key fc_ptr = self.offset_to_jitted_func[loc_key] # Execute the function return fc_ptr(cpu)
Ancestors (in MRO)
- JitCore_Python
- miasm2.jitter.jitcore.JitCore
- __builtin__.object
Class variables
var FUNCNAME
var SymbExecClass
var jitted_block_delete_cb
var jitted_block_max_size
Instance variables
var disasm_cb
var ir_arch
var ircfg
var symbexec
Methods
def __init__(
self, ir_arch, bin_stream)
def __init__(self, ir_arch, bin_stream): super(JitCore_Python, self).__init__(ir_arch, bin_stream) self.ir_arch = ir_arch self.ircfg = self.ir_arch.new_ircfg() # CPU & VM (None for now) will be set later self.symbexec = self.SymbExecClass( None, None, self.ir_arch, {}, sb_expr_simp=expr_simp_explicit ) self.symbexec.enable_emulated_simplifications()
def add_block(
self, block)
Add a block to JiT and JiT it. @block: asm_bloc to add
def add_block(self, block): """Add a block to JiT and JiT it. @block: asm_bloc to add """ irblocks = self.ir_arch.add_asmblock_to_ircfg(block, self.ircfg, gen_pc_updt = True) block.blocks = irblocks self.jit_irblocks(block.loc_key, irblocks)
def add_block_to_mem_interval(
self, vm, block)
Update vm to include block addresses in its memory range
def add_block_to_mem_interval(self, vm, block): "Update vm to include block addresses in its memory range" self.blocks_mem_interval += interval([(block.ad_min, block.ad_max - 1)]) vm.reset_code_bloc_pool() for a, b in self.blocks_mem_interval: vm.add_code_bloc(a, b + 1)
def add_disassembly_splits(
self, *args)
The disassembly engine will stop on address in args if they are not at the block beginning
def add_disassembly_splits(self, *args): """The disassembly engine will stop on address in args if they are not at the block beginning""" self.split_dis.update(set(args))
def blocks_to_memrange(
self, blocks)
Return an interval instance standing for blocks addresses @blocks: list of AsmBlock instances
def blocks_to_memrange(self, blocks): """Return an interval instance standing for blocks addresses @blocks: list of AsmBlock instances """ mem_range = interval() for block in blocks: mem_range += interval([(block.ad_min, block.ad_max - 1)]) return mem_range
def clear_jitted_blocks(
self)
Reset all jitted blocks
def clear_jitted_blocks(self): "Reset all jitted blocks" self.offset_to_jitted_func.clear() self.loc_key_to_block.clear() self.blocks_mem_interval = interval()
def del_block_in_range(
self, ad1, ad2)
Find and remove jitted block in range [ad1, ad2]. Return the list of block removed. @ad1: First address @ad2: Last address
def del_block_in_range(self, ad1, ad2): """Find and remove jitted block in range [ad1, ad2]. Return the list of block removed. @ad1: First address @ad2: Last address """ # Find concerned blocks modified_blocks = set() for block in self.loc_key_to_block.values(): if not block.lines: continue if block.ad_max <= ad1 or block.ad_min >= ad2: # Block not modified pass else: # Modified blocks modified_blocks.add(block) # Generate interval to delete del_interval = self.blocks_to_memrange(modified_blocks) # Remove interval from monitored interval list self.blocks_mem_interval -= del_interval # Remove modified blocks for block in modified_blocks: try: for irblock in block.blocks: # Remove offset -> jitted block link offset = self.ir_arch.loc_db.get_location_offset(irblock.loc_key) if offset in self.offset_to_jitted_func: del(self.offset_to_jitted_func[offset]) except AttributeError: # The block has never been translated in IR offset = self.ir_arch.loc_db.get_location_offset(block.loc_key) if offset in self.offset_to_jitted_func: del(self.offset_to_jitted_func[offset]) # Remove label -> block link del(self.loc_key_to_block[block.loc_key]) return modified_blocks
def disasm_and_jit_block(
self, addr, vm)
Disassemble a new block and JiT it @addr: address of the block to disassemble (LocKey or int) @vm: VmMngr instance
def disasm_and_jit_block(self, addr, vm): """Disassemble a new block and JiT it @addr: address of the block to disassemble (LocKey or int) @vm: VmMngr instance """ # Get the block if isinstance(addr, LocKey): addr = self.ir_arch.loc_db.get_location_offset(addr) if addr is None: raise RuntimeError("Unknown offset for LocKey") # Prepare disassembler self.mdis.lines_wd = self.options["jit_maxline"] # Disassemble it cur_block = self.mdis.dis_block(addr) if isinstance(cur_block, AsmBlockBad): return cur_block # Logging if self.log_newbloc: print cur_block.to_string(self.mdis.loc_db) # Update label -> block self.loc_key_to_block[cur_block.loc_key] = cur_block # Store min/max block address needed in jit automod code self.set_block_min_max(cur_block) # JiT it self.add_block(cur_block) # Update jitcode mem range self.add_block_to_mem_interval(vm, cur_block) return cur_block
def exec_wrapper(
self, loc_key, cpu, _offset_to_jitted_func, _stop_offsets, _max_exec_per_call)
Call the function @loc_key with @cpu @loc_key: function's loc_key @cpu: JitCpu instance
def exec_wrapper(self, loc_key, cpu, _offset_to_jitted_func, _stop_offsets, _max_exec_per_call): """Call the function @loc_key with @cpu @loc_key: function's loc_key @cpu: JitCpu instance """ # Get Python function corresponding to @loc_key fc_ptr = self.offset_to_jitted_func[loc_key] # Execute the function return fc_ptr(cpu)
def hash_block(
self, block)
Build a hash of the block @block @block: asmblock
def hash_block(self, block): """ Build a hash of the block @block @block: asmblock """ block_raw = "".join(line.b for line in block.lines) offset = self.ir_arch.loc_db.get_location_offset(block.loc_key) block_hash = md5("%X_%s_%s_%s_%s" % (offset, self.arch_name, self.log_mn, self.log_regs, block_raw)).hexdigest() return block_hash
def jit_irblocks(
self, loc_key, irblocks)
Create a python function corresponding to an irblocks' group. @loc_key: the loc_key of the irblocks @irblocks: a group of irblocks
def jit_irblocks(self, loc_key, irblocks): """Create a python function corresponding to an irblocks' group. @loc_key: the loc_key of the irblocks @irblocks: a group of irblocks """ def myfunc(cpu): """Execute the function according to cpu and vmmngr states @cpu: JitCpu instance """ # Get virtual memory handler vmmngr = cpu.vmmngr # Keep current location in irblocks cur_loc_key = loc_key # Required to detect new instructions offsets_jitted = set() # Get exec engine exec_engine = self.symbexec expr_simp = exec_engine.expr_simp known_loc_keys = set(irb.loc_key for irb in irblocks) # For each irbloc inside irblocks while True: # Get the current bloc for irb in irblocks: if irb.loc_key == cur_loc_key: break else: raise RuntimeError("Irblocks must end with returning an " "ExprInt instance") # Refresh CPU values according to @cpu instance exec_engine.update_engine_from_cpu() # Execute current ir bloc for assignblk in irb: instr = assignblk.instr # For each new instruction (in assembly) if instr is not None and instr.offset not in offsets_jitted: # Test exceptions vmmngr.check_invalid_code_blocs() vmmngr.check_memory_breakpoint() if vmmngr.get_exception(): exec_engine.update_cpu_from_engine() return instr.offset offsets_jitted.add(instr.offset) # Log registers values if self.log_regs: exec_engine.update_cpu_from_engine() exec_engine.cpu.dump_gpregs() # Log instruction if self.log_mn: print "%08x %s" % (instr.offset, instr) # Check for exception if (vmmngr.get_exception() != 0 or cpu.get_exception() != 0): exec_engine.update_cpu_from_engine() return instr.offset # Eval current instruction (in IR) exec_engine.eval_updt_assignblk(assignblk) # Check for exceptions which do not update PC exec_engine.update_cpu_from_engine() if (vmmngr.get_exception() & csts.EXCEPT_DO_NOT_UPDATE_PC != 0 or cpu.get_exception() > csts.EXCEPT_NUM_UPDT_EIP): return instr.offset vmmngr.check_invalid_code_blocs() vmmngr.check_memory_breakpoint() # Get next bloc address ad = expr_simp(exec_engine.eval_expr(self.ir_arch.IRDst)) # Updates @cpu instance according to new CPU values exec_engine.update_cpu_from_engine() # Manage resulting address if isinstance(ad, m2_expr.ExprInt): return ad.arg.arg elif isinstance(ad, m2_expr.ExprLoc): cur_loc_key = ad.loc_key else: raise NotImplementedError("Type not handled: %s" % ad) # Associate myfunc with current loc_key offset = self.ir_arch.loc_db.get_location_offset(loc_key) assert offset is not None self.offset_to_jitted_func[offset] = myfunc
def load(
self)
Preload symbols according to current architecture
def load(self): "Preload symbols according to current architecture" self.symbexec.reset_regs()
def remove_disassembly_splits(
self, *args)
The disassembly engine will no longer stop on address in args
def remove_disassembly_splits(self, *args): """The disassembly engine will no longer stop on address in args""" self.split_dis.difference_update(set(args))
def run_at(
self, cpu, offset, stop_offsets)
Run from the starting address @offset. Execution will stop if: - max_exec_per_call option is reached - a new, yet unknown, block is reached after the execution of block at address @offset - an address in @stop_offsets is reached @cpu: JitCpu instance @offset: starting address (int) @stop_offsets: set of address on which the jitter must stop
def run_at(self, cpu, offset, stop_offsets): """Run from the starting address @offset. Execution will stop if: - max_exec_per_call option is reached - a new, yet unknown, block is reached after the execution of block at address @offset - an address in @stop_offsets is reached @cpu: JitCpu instance @offset: starting address (int) @stop_offsets: set of address on which the jitter must stop """ if offset is None: offset = getattr(cpu, self.ir_arch.pc.name) if offset not in self.offset_to_jitted_func: # Need to JiT the block cur_block = self.disasm_and_jit_block(offset, cpu.vmmngr) if isinstance(cur_block, AsmBlockBad): errno = cur_block.errno if errno == AsmBlockBad.ERROR_IO: cpu.vmmngr.set_exception(EXCEPT_ACCESS_VIOL) elif errno == AsmBlockBad.ERROR_CANNOT_DISASM: cpu.set_exception(EXCEPT_UNK_MNEMO) else: raise RuntimeError("Unhandled disasm result %r" % errno) return offset # Run the block and update cpu/vmmngr state return self.exec_wrapper(offset, cpu, self.offset_to_jitted_func.data, stop_offsets, self.options["max_exec_per_call"])
def set_block_min_max(
self, cur_block)
Update cur_block to set min/max address
def set_block_min_max(self, cur_block): "Update cur_block to set min/max address" if cur_block.lines: cur_block.ad_min = cur_block.lines[0].offset cur_block.ad_max = cur_block.lines[-1].offset + cur_block.lines[-1].l else: # 1 byte block for unknown mnemonic offset = ir_arch.loc_db.get_location_offset(cur_block.loc_key) cur_block.ad_min = offset cur_block.ad_max = offset+1
def set_cpu_vm(
self, cpu, vm)
def set_cpu_vm(self, cpu, vm): self.symbexec.cpu = cpu self.symbexec.vm = vm
def set_options(
self, **kwargs)
Set options relative to the backend
def set_options(self, **kwargs): "Set options relative to the backend" self.options.update(kwargs)
def updt_automod_code(
self, vm)
Remove jitted code updated by memory write @vm: VmMngr instance
def updt_automod_code(self, vm): """Remove jitted code updated by memory write @vm: VmMngr instance """ mem_range = [] for addr_start, addr_stop in vm.get_memory_write(): mem_range.append((addr_start, addr_stop)) self.updt_automod_code_range(vm, mem_range)
def updt_automod_code_range(
self, vm, mem_range)
Remove jitted code in range @mem_range @vm: VmMngr instance @mem_range: list of start/stop addresses
def updt_automod_code_range(self, vm, mem_range): """Remove jitted code in range @mem_range @vm: VmMngr instance @mem_range: list of start/stop addresses """ for addr_start, addr_stop in mem_range: self.del_block_in_range(addr_start, addr_stop) self.__updt_jitcode_mem_range(vm) vm.reset_memory_access()