miasm2.analysis.cst_propag module
import logging from miasm2.ir.symbexec import SymbolicExecutionEngine from miasm2.expression.expression import ExprMem from miasm2.expression.expression_helper import possible_values from miasm2.expression.simplifications import expr_simp from miasm2.ir.ir import IRBlock, AssignBlock LOG_CST_PROPAG = logging.getLogger("cst_propag") CONSOLE_HANDLER = logging.StreamHandler() CONSOLE_HANDLER.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) LOG_CST_PROPAG.addHandler(CONSOLE_HANDLER) LOG_CST_PROPAG.setLevel(logging.WARNING) class SymbExecState(SymbolicExecutionEngine): """ State manager for SymbolicExecution """ def __init__(self, ir_arch, ircfg, state): super(SymbExecState, self).__init__(ir_arch, {}) self.set_state(state) def add_state(ircfg, todo, states, addr, state): """ Add or merge the computed @state for the block at @addr. Update @todo @todo: modified block set @states: dictionnary linking a label to its entering state. @addr: address of the concidered block @state: computed state """ addr = ircfg.get_loc_key(addr) todo.add(addr) if addr not in states: states[addr] = state else: states[addr] = states[addr].merge(state) def is_expr_cst(ir_arch, expr): """Return true if @expr is only composed of ExprInt and init_regs @ir_arch: IR instance @expr: Expression to test""" elements = expr.get_r(mem_read=True) for element in elements: if element.is_mem(): continue if element.is_id() and element in ir_arch.arch.regs.all_regs_ids_init: continue if element.is_int(): continue return False else: # Expr is a constant return True class SymbExecStateFix(SymbolicExecutionEngine): """ Emul blocks and replace expressions with their corresponding constant if any. """ # Function used to test if an Expression is considered as a constant is_expr_cst = lambda _, ir_arch, expr: is_expr_cst(ir_arch, expr) def __init__(self, ir_arch, ircfg, state, cst_propag_link): self.ircfg = ircfg super(SymbExecStateFix, self).__init__(ir_arch, {}) self.set_state(state) self.cst_propag_link = cst_propag_link def propag_expr_cst(self, expr): """Propagate constant expressions in @expr @expr: Expression to update""" elements = expr.get_r(mem_read=True) to_propag = {} for element in elements: # Only ExprId can be safely propagated if not element.is_id(): continue value = self.eval_expr(element) if self.is_expr_cst(self.ir_arch, value): to_propag[element] = value return expr_simp(expr.replace_expr(to_propag)) def eval_updt_irblock(self, irb, step=False): """ Symbolic execution of the @irb on the current state @irb: IRBlock instance @step: display intermediate steps """ assignblks = [] for index, assignblk in enumerate(irb): new_assignblk = {} links = {} for dst, src in assignblk.iteritems(): src = self.propag_expr_cst(src) if dst.is_mem(): ptr = dst.arg ptr = self.propag_expr_cst(ptr) dst = ExprMem(ptr, dst.size) new_assignblk[dst] = src if assignblk.instr is not None: for arg in assignblk.instr.args: new_arg = self.propag_expr_cst(arg) links[new_arg] = arg self.cst_propag_link[(irb.loc_key, index)] = links self.eval_updt_assignblk(assignblk) assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) self.ircfg.blocks[irb.loc_key] = IRBlock(irb.loc_key, assignblks) def compute_cst_propagation_states(ir_arch, ircfg, init_addr, init_infos): """ Propagate "constant expressions" in a function. The attribute "constant expression" is true if the expression is based on constants or "init" regs values. @ir_arch: IntermediateRepresentation instance @init_addr: analysis start address @init_infos: dictionnary linking expressions to their values at @init_addr """ done = set() state = SymbExecState.StateEngine(init_infos) lbl = ircfg.get_loc_key(init_addr) todo = set([lbl]) states = {lbl: state} while todo: if not todo: break lbl = todo.pop() state = states[lbl] if (lbl, state) in done: continue done.add((lbl, state)) if lbl not in ircfg.blocks: continue symbexec_engine = SymbExecState(ir_arch, ircfg, state) addr = symbexec_engine.run_block_at(ircfg, lbl) symbexec_engine.del_mem_above_stack(ir_arch.sp) for dst in possible_values(addr): value = dst.value if value.is_mem(): LOG_CST_PROPAG.warning('Bad destination: %s', value) continue elif value.is_int(): value = ircfg.get_loc_key(value) add_state( ircfg, todo, states, value, symbexec_engine.get_state() ) return states def propagate_cst_expr(ir_arch, ircfg, addr, init_infos): """ Propagate "constant expressions" in a @ir_arch. The attribute "constant expression" is true if the expression is based on constants or "init" regs values. @ir_arch: IntermediateRepresentation instance @addr: analysis start address @init_infos: dictionnary linking expressions to their values at @init_addr Returns a mapping between replaced Expression and their new values. """ states = compute_cst_propagation_states(ir_arch, ircfg, addr, init_infos) cst_propag_link = {} for lbl, state in states.iteritems(): if lbl not in ircfg.blocks: continue symbexec = SymbExecStateFix(ir_arch, ircfg, state, cst_propag_link) symbexec.eval_updt_irblock(ircfg.blocks[lbl]) return cst_propag_link
Module variables
var CONSOLE_HANDLER
var LOG_CST_PROPAG
Functions
def add_state(
ircfg, todo, states, addr, state)
Add or merge the computed @state for the block at @addr. Update @todo @todo: modified block set @states: dictionnary linking a label to its entering state. @addr: address of the concidered block @state: computed state
def add_state(ircfg, todo, states, addr, state): """ Add or merge the computed @state for the block at @addr. Update @todo @todo: modified block set @states: dictionnary linking a label to its entering state. @addr: address of the concidered block @state: computed state """ addr = ircfg.get_loc_key(addr) todo.add(addr) if addr not in states: states[addr] = state else: states[addr] = states[addr].merge(state)
def compute_cst_propagation_states(
ir_arch, ircfg, init_addr, init_infos)
Propagate "constant expressions" in a function. The attribute "constant expression" is true if the expression is based on constants or "init" regs values.
@ir_arch: IntermediateRepresentation instance @init_addr: analysis start address @init_infos: dictionnary linking expressions to their values at @init_addr
def compute_cst_propagation_states(ir_arch, ircfg, init_addr, init_infos): """ Propagate "constant expressions" in a function. The attribute "constant expression" is true if the expression is based on constants or "init" regs values. @ir_arch: IntermediateRepresentation instance @init_addr: analysis start address @init_infos: dictionnary linking expressions to their values at @init_addr """ done = set() state = SymbExecState.StateEngine(init_infos) lbl = ircfg.get_loc_key(init_addr) todo = set([lbl]) states = {lbl: state} while todo: if not todo: break lbl = todo.pop() state = states[lbl] if (lbl, state) in done: continue done.add((lbl, state)) if lbl not in ircfg.blocks: continue symbexec_engine = SymbExecState(ir_arch, ircfg, state) addr = symbexec_engine.run_block_at(ircfg, lbl) symbexec_engine.del_mem_above_stack(ir_arch.sp) for dst in possible_values(addr): value = dst.value if value.is_mem(): LOG_CST_PROPAG.warning('Bad destination: %s', value) continue elif value.is_int(): value = ircfg.get_loc_key(value) add_state( ircfg, todo, states, value, symbexec_engine.get_state() ) return states
def is_expr_cst(
ir_arch, expr)
Return true if @expr is only composed of ExprInt and init_regs @ir_arch: IR instance @expr: Expression to test
def is_expr_cst(ir_arch, expr): """Return true if @expr is only composed of ExprInt and init_regs @ir_arch: IR instance @expr: Expression to test""" elements = expr.get_r(mem_read=True) for element in elements: if element.is_mem(): continue if element.is_id() and element in ir_arch.arch.regs.all_regs_ids_init: continue if element.is_int(): continue return False else: # Expr is a constant return True
def propagate_cst_expr(
ir_arch, ircfg, addr, init_infos)
Propagate "constant expressions" in a @ir_arch. The attribute "constant expression" is true if the expression is based on constants or "init" regs values.
@ir_arch: IntermediateRepresentation instance @addr: analysis start address @init_infos: dictionnary linking expressions to their values at @init_addr
Returns a mapping between replaced Expression and their new values.
def propagate_cst_expr(ir_arch, ircfg, addr, init_infos): """ Propagate "constant expressions" in a @ir_arch. The attribute "constant expression" is true if the expression is based on constants or "init" regs values. @ir_arch: IntermediateRepresentation instance @addr: analysis start address @init_infos: dictionnary linking expressions to their values at @init_addr Returns a mapping between replaced Expression and their new values. """ states = compute_cst_propagation_states(ir_arch, ircfg, addr, init_infos) cst_propag_link = {} for lbl, state in states.iteritems(): if lbl not in ircfg.blocks: continue symbexec = SymbExecStateFix(ir_arch, ircfg, state, cst_propag_link) symbexec.eval_updt_irblock(ircfg.blocks[lbl]) return cst_propag_link
Classes
class SymbExecState
State manager for SymbolicExecution
class SymbExecState(SymbolicExecutionEngine): """ State manager for SymbolicExecution """ def __init__(self, ir_arch, ircfg, state): super(SymbExecState, self).__init__(ir_arch, {}) self.set_state(state)
Ancestors (in MRO)
- SymbExecState
- miasm2.ir.symbexec.SymbolicExecutionEngine
- __builtin__.object
Class variables
var StateEngine
Instance variables
var state
Return the current state of the SymbolicEngine
Methods
def __init__(
self, ir_arch, ircfg, state)
def __init__(self, ir_arch, ircfg, state): super(SymbExecState, self).__init__(ir_arch, {}) self.set_state(state)
def apply_change(
self, dst, src)
Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source
def apply_change(self, dst, src): """ Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source """ if dst.is_mem(): self.mem_write(dst, src) else: self.symbols.write(dst, src)
def apply_expr(
self, expr)
Deprecated version of eval_updt_expr
def apply_expr(self, expr): """Deprecated version of eval_updt_expr""" warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr') return self.eval_updt_expr(expr)
def apply_expr_on_state(
self, expr, cache)
Deprecated version of eval_expr
def apply_expr_on_state(self, expr, cache): """Deprecated version of eval_expr""" warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state') if cache is None: cache = {} ret = self.eval_expr(expr, eval_cache=cache) return ret
def as_assignblock(
self)
Return the current state as an AssignBlock
def as_assignblock(self): """Return the current state as an AssignBlock""" warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock') out = [] for dst, src in self.modified(ids=True, mems=True): out.append((dst, src)) return AssignBlock(dict(out))
def del_mem_above_stack(
self, stack_ptr)
Remove all stored memory values with following properties: pointer based on initial stack value pointer below current stack pointer
def del_mem_above_stack(self, stack_ptr): """ Remove all stored memory values with following properties: * pointer based on initial stack value * pointer below current stack pointer """ stack_ptr = self.eval_expr(stack_ptr) base, stk_offset = get_expr_base_offset(stack_ptr) memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) if memarray: to_del = set() for offset in memarray: if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: to_del.add(offset) for offset in to_del: del memarray[offset]
def dump(
self, ids=True, mems=True)
Display modififed variables @ids: display modified ids @mems: display modified memory
def dump(self, ids=True, mems=True): """ Display modififed variables @ids: display modified ids @mems: display modified memory """ for variable, value in self.modified(None, ids, mems): print "%-18s" % variable, "=", "%s" % value
def dump_id(
self)
Deprecated version of dump(mems=False)
def dump_id(self): """Deprecated version of dump(mems=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id') self.dump(mems=False)
def dump_mem(
self)
Deprecated version of dump(ids=False)
def dump_mem(self): """Deprecated version of dump(ids=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem') self.dump(ids=False)
def emul_ir_bloc(
self, _, addr, step=False)
Deprecated version of run_block_at
def emul_ir_bloc(self, _, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc') return self.run_block_at(addr, step)
def emul_ir_block(
self, addr, step=False)
Deprecated version of run_block_at
def emul_ir_block(self, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block') return self.run_block_at(addr, step)
def emul_ir_blocks(
self, addr, lbl_stop=None, step=False)
Deprecated version of run_at
def emul_ir_blocks(self, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks') return self.run_at(addr, lbl_stop, step)
def emul_ir_blocs(
self, _, addr, lbl_stop=None, step=False)
Deprecated version of run_at
def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs') return self.run_at(addr, lbl_stop, step)
def emulbloc(
self, irb, step=False)
Deprecated version of eval_updt_irblock(self, irb, step=False)
def emulbloc(self, irb, step=False): """Deprecated version of eval_updt_irblock(self, irb, step=False)""" warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc') return self.eval_updt_irblock(irb, step)
def eval_assignblk(
self, assignblk)
Evaluate AssignBlock using the current state
Returns a dictionary containing modified keys associated to their values
@assignblk: AssignBlock instance
def eval_assignblk(self, assignblk): """ Evaluate AssignBlock using the current state Returns a dictionary containing modified keys associated to their values @assignblk: AssignBlock instance """ pool_out = {} eval_cache = {} for dst, src in assignblk.iteritems(): src = self.eval_expr(src, eval_cache) if dst.is_mem(): ptr = self.eval_expr(dst.arg, eval_cache) # Test if mem lookup is known tmp = ExprMem(ptr, dst.size) pool_out[tmp] = src elif dst.is_id(): pool_out[dst] = src else: raise ValueError("Unknown destination type", str(dst)) return pool_out
def eval_expr(
self, expr, eval_cache=None)
Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values
def eval_expr(self, expr, eval_cache=None): """ Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values """ if eval_cache is None: eval_cache = {} ret = self.eval_expr_visitor(expr, cache=eval_cache) assert ret is not None return ret
def eval_expr_visitor(
self, expr, cache=None)
[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.
def eval_expr_visitor(self, expr, cache=None): """ [DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression. """ if cache is None: cache = {} ret = cache.get(expr, None) if ret is not None: return ret new_expr = self.expr_simp(expr) ret = cache.get(new_expr, None) if ret is not None: return ret func = self.expr_to_visitor.get(new_expr.__class__, None) if func is None: raise TypeError("Unknown expr type") ret = func(new_expr, cache=cache) ret = self.expr_simp(ret) assert ret is not None cache[expr] = ret cache[new_expr] = ret return ret
def eval_exprcompose(
self, expr, **kwargs)
[DEV]: Evaluate an ExprCompose using the current state
def eval_exprcompose(self, expr, **kwargs): """[DEV]: Evaluate an ExprCompose using the current state""" args = [] for arg in expr.args: args.append(self.eval_expr_visitor(arg, **kwargs)) ret = ExprCompose(*args) return ret
def eval_exprcond(
self, expr, **kwargs)
[DEV]: Evaluate an ExprCond using the current state
def eval_exprcond(self, expr, **kwargs): """[DEV]: Evaluate an ExprCond using the current state""" cond = self.eval_expr_visitor(expr.cond, **kwargs) src1 = self.eval_expr_visitor(expr.src1, **kwargs) src2 = self.eval_expr_visitor(expr.src2, **kwargs) ret = ExprCond(cond, src1, src2) return ret
def eval_exprid(
self, expr, **kwargs)
[DEV]: Evaluate an ExprId using the current state
def eval_exprid(self, expr, **kwargs): """[DEV]: Evaluate an ExprId using the current state""" ret = self.symbols.read(expr) return ret
def eval_exprint(
self, expr, **kwargs)
[DEV]: Evaluate an ExprInt using the current state
def eval_exprint(self, expr, **kwargs): """[DEV]: Evaluate an ExprInt using the current state""" return expr
def eval_exprloc(
self, expr, **kwargs)
[DEV]: Evaluate an ExprLoc using the current state
def eval_exprloc(self, expr, **kwargs): """[DEV]: Evaluate an ExprLoc using the current state""" offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key) if offset is not None: ret = ExprInt(offset, expr.size) else: ret = expr return ret
def eval_exprmem(
self, expr, **kwargs)
[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses
def eval_exprmem(self, expr, **kwargs): """[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses """ ptr = self.eval_expr_visitor(expr.arg, **kwargs) mem = ExprMem(ptr, expr.size) ret = self.mem_read(mem) return ret
def eval_exprop(
self, expr, **kwargs)
[DEV]: Evaluate an ExprOp using the current state
def eval_exprop(self, expr, **kwargs): """[DEV]: Evaluate an ExprOp using the current state""" args = [] for oarg in expr.args: arg = self.eval_expr_visitor(oarg, **kwargs) args.append(arg) ret = ExprOp(expr.op, *args) return ret
def eval_exprslice(
self, expr, **kwargs)
[DEV]: Evaluate an ExprSlice using the current state
def eval_exprslice(self, expr, **kwargs): """[DEV]: Evaluate an ExprSlice using the current state""" arg = self.eval_expr_visitor(expr.arg, **kwargs) ret = ExprSlice(arg, expr.start, expr.stop) return ret
def eval_ir(
self, assignblk)
Deprecated version of eval_updt_assignblk(self, assignblk)
def eval_ir(self, assignblk): """Deprecated version of eval_updt_assignblk(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir') return self.eval_updt_assignblk(assignblk)
def eval_ir_expr(
self, assignblk)
Deprecated version of eval_ir_expr(self, assignblk)
def eval_ir_expr(self, assignblk): """Deprecated version of eval_ir_expr(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr') return self.eval_assignblk(assignblk).iteritems()
def eval_updt_assignblk(
self, assignblk)
Apply an AssignBlock on the current state @assignblk: AssignBlock instance
def eval_updt_assignblk(self, assignblk): """ Apply an AssignBlock on the current state @assignblk: AssignBlock instance """ mem_dst = [] dst_src = self.eval_assignblk(assignblk) for dst, src in dst_src.iteritems(): self.apply_change(dst, src) if dst.is_mem(): mem_dst.append(dst) return mem_dst
def eval_updt_expr(
self, expr)
Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value
def eval_updt_expr(self, expr): """ Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value """ # Update value if needed if expr.is_aff(): ret = self.eval_expr(expr.src) self.eval_updt_assignblk(AssignBlock([expr])) else: ret = self.eval_expr(expr) return ret
def eval_updt_irblock(
self, irb, step=False)
Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps
def eval_updt_irblock(self, irb, step=False): """ Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps """ for assignblk in irb: if step: print 'Instr', assignblk.instr print 'Assignblk:' print assignblk print '_' * 80 self.eval_updt_assignblk(assignblk) if step: self.dump(mems=False) self.dump(ids=False) print '_' * 80 dst = self.eval_expr(self.ir_arch.IRDst) return dst
def get_state(
self)
Return the current state of the SymbolicEngine
def get_state(self): """Return the current state of the SymbolicEngine""" state = self.StateEngine(dict(self.symbols)) return state
def mem_read(
self, expr)
[DEV]: Override to modify the effective memory reads
Read symbolic value at ExprMem @expr @expr: ExprMem
def mem_read(self, expr): """ [DEV]: Override to modify the effective memory reads Read symbolic value at ExprMem @expr @expr: ExprMem """ parts = self._resolve_mem_parts(expr) out = [] for known, part in parts: if not known and part.is_mem() and self.func_read is not None: ret = self.func_read(part) else: ret = part out.append(ret) ret = self.expr_simp(ExprCompose(*out)) assert ret.size == expr.size return ret
def mem_write(
self, dst, src)
[DEV]: Override to modify the effective memory writes
Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression
def mem_write(self, dst, src): """ [DEV]: Override to modify the effective memory writes Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression """ if self.func_write is not None: self.func_write(self, dst, src) else: self.symbols.write(dst, src)
def modified(
self, init_state=None, ids=True, mems=True)
Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only
def modified(self, init_state=None, ids=True, mems=True): """ Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only """ if init_state is None: init_state = {} if ids: for variable, value in self.symbols.symbols_id.iteritems(): if variable in init_state and init_state[variable] == value: continue yield variable, value if mems: for mem, value in self.symbols.memory(): if mem in init_state and init_state[mem] == value: continue yield mem, value
def modified_mems(
self, init_state=None)
Deprecated version of modified(ids=False)
def modified_mems(self, init_state=None): """Deprecated version of modified(ids=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems') for mem in self.modified(init_state=init_state, ids=False): yield mem
def modified_regs(
self, init_state=None)
Deprecated version of modified(mems=False)
def modified_regs(self, init_state=None): """Deprecated version of modified(mems=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs') for reg in self.modified(init_state=init_state, mems=False): yield reg
def run_at(
self, ircfg, addr, lbl_stop=None, step=False)
Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps
def run_at(self, ircfg, addr, lbl_stop=None, step=False): """ Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps """ while True: irblock = ircfg.get_block(addr) if irblock is None: break if irblock.loc_key == lbl_stop: break addr = self.eval_updt_irblock(irblock, step=step) return addr
def run_block_at(
self, ircfg, addr, step=False)
Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps
def run_block_at(self, ircfg, addr, step=False): """ Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps """ irblock = ircfg.get_block(addr) if irblock is not None: addr = self.eval_updt_irblock(irblock, step=step) return addr
def set_state(
self, state)
Restaure the @state of the engine @state: StateEngine instance
def set_state(self, state): """Restaure the @state of the engine @state: StateEngine instance """ self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp) for dst, src in dict(state).iteritems(): self.symbols[dst] = src
class SymbExecStateFix
Emul blocks and replace expressions with their corresponding constant if any.
class SymbExecStateFix(SymbolicExecutionEngine): """ Emul blocks and replace expressions with their corresponding constant if any. """ # Function used to test if an Expression is considered as a constant is_expr_cst = lambda _, ir_arch, expr: is_expr_cst(ir_arch, expr) def __init__(self, ir_arch, ircfg, state, cst_propag_link): self.ircfg = ircfg super(SymbExecStateFix, self).__init__(ir_arch, {}) self.set_state(state) self.cst_propag_link = cst_propag_link def propag_expr_cst(self, expr): """Propagate constant expressions in @expr @expr: Expression to update""" elements = expr.get_r(mem_read=True) to_propag = {} for element in elements: # Only ExprId can be safely propagated if not element.is_id(): continue value = self.eval_expr(element) if self.is_expr_cst(self.ir_arch, value): to_propag[element] = value return expr_simp(expr.replace_expr(to_propag)) def eval_updt_irblock(self, irb, step=False): """ Symbolic execution of the @irb on the current state @irb: IRBlock instance @step: display intermediate steps """ assignblks = [] for index, assignblk in enumerate(irb): new_assignblk = {} links = {} for dst, src in assignblk.iteritems(): src = self.propag_expr_cst(src) if dst.is_mem(): ptr = dst.arg ptr = self.propag_expr_cst(ptr) dst = ExprMem(ptr, dst.size) new_assignblk[dst] = src if assignblk.instr is not None: for arg in assignblk.instr.args: new_arg = self.propag_expr_cst(arg) links[new_arg] = arg self.cst_propag_link[(irb.loc_key, index)] = links self.eval_updt_assignblk(assignblk) assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) self.ircfg.blocks[irb.loc_key] = IRBlock(irb.loc_key, assignblks)
Ancestors (in MRO)
- SymbExecStateFix
- miasm2.ir.symbexec.SymbolicExecutionEngine
- __builtin__.object
Class variables
var StateEngine
Instance variables
var cst_propag_link
var ircfg
var state
Return the current state of the SymbolicEngine
Methods
def __init__(
self, ir_arch, ircfg, state, cst_propag_link)
def __init__(self, ir_arch, ircfg, state, cst_propag_link): self.ircfg = ircfg super(SymbExecStateFix, self).__init__(ir_arch, {}) self.set_state(state) self.cst_propag_link = cst_propag_link
def apply_change(
self, dst, src)
Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source
def apply_change(self, dst, src): """ Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source """ if dst.is_mem(): self.mem_write(dst, src) else: self.symbols.write(dst, src)
def apply_expr(
self, expr)
Deprecated version of eval_updt_expr
def apply_expr(self, expr): """Deprecated version of eval_updt_expr""" warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr') return self.eval_updt_expr(expr)
def apply_expr_on_state(
self, expr, cache)
Deprecated version of eval_expr
def apply_expr_on_state(self, expr, cache): """Deprecated version of eval_expr""" warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state') if cache is None: cache = {} ret = self.eval_expr(expr, eval_cache=cache) return ret
def as_assignblock(
self)
Return the current state as an AssignBlock
def as_assignblock(self): """Return the current state as an AssignBlock""" warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock') out = [] for dst, src in self.modified(ids=True, mems=True): out.append((dst, src)) return AssignBlock(dict(out))
def del_mem_above_stack(
self, stack_ptr)
Remove all stored memory values with following properties: pointer based on initial stack value pointer below current stack pointer
def del_mem_above_stack(self, stack_ptr): """ Remove all stored memory values with following properties: * pointer based on initial stack value * pointer below current stack pointer """ stack_ptr = self.eval_expr(stack_ptr) base, stk_offset = get_expr_base_offset(stack_ptr) memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) if memarray: to_del = set() for offset in memarray: if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: to_del.add(offset) for offset in to_del: del memarray[offset]
def dump(
self, ids=True, mems=True)
Display modififed variables @ids: display modified ids @mems: display modified memory
def dump(self, ids=True, mems=True): """ Display modififed variables @ids: display modified ids @mems: display modified memory """ for variable, value in self.modified(None, ids, mems): print "%-18s" % variable, "=", "%s" % value
def dump_id(
self)
Deprecated version of dump(mems=False)
def dump_id(self): """Deprecated version of dump(mems=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id') self.dump(mems=False)
def dump_mem(
self)
Deprecated version of dump(ids=False)
def dump_mem(self): """Deprecated version of dump(ids=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem') self.dump(ids=False)
def emul_ir_bloc(
self, _, addr, step=False)
Deprecated version of run_block_at
def emul_ir_bloc(self, _, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc') return self.run_block_at(addr, step)
def emul_ir_block(
self, addr, step=False)
Deprecated version of run_block_at
def emul_ir_block(self, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block') return self.run_block_at(addr, step)
def emul_ir_blocks(
self, addr, lbl_stop=None, step=False)
Deprecated version of run_at
def emul_ir_blocks(self, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks') return self.run_at(addr, lbl_stop, step)
def emul_ir_blocs(
self, _, addr, lbl_stop=None, step=False)
Deprecated version of run_at
def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs') return self.run_at(addr, lbl_stop, step)
def emulbloc(
self, irb, step=False)
Deprecated version of eval_updt_irblock(self, irb, step=False)
def emulbloc(self, irb, step=False): """Deprecated version of eval_updt_irblock(self, irb, step=False)""" warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc') return self.eval_updt_irblock(irb, step)
def eval_assignblk(
self, assignblk)
Evaluate AssignBlock using the current state
Returns a dictionary containing modified keys associated to their values
@assignblk: AssignBlock instance
def eval_assignblk(self, assignblk): """ Evaluate AssignBlock using the current state Returns a dictionary containing modified keys associated to their values @assignblk: AssignBlock instance """ pool_out = {} eval_cache = {} for dst, src in assignblk.iteritems(): src = self.eval_expr(src, eval_cache) if dst.is_mem(): ptr = self.eval_expr(dst.arg, eval_cache) # Test if mem lookup is known tmp = ExprMem(ptr, dst.size) pool_out[tmp] = src elif dst.is_id(): pool_out[dst] = src else: raise ValueError("Unknown destination type", str(dst)) return pool_out
def eval_expr(
self, expr, eval_cache=None)
Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values
def eval_expr(self, expr, eval_cache=None): """ Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values """ if eval_cache is None: eval_cache = {} ret = self.eval_expr_visitor(expr, cache=eval_cache) assert ret is not None return ret
def eval_expr_visitor(
self, expr, cache=None)
[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.
def eval_expr_visitor(self, expr, cache=None): """ [DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression. """ if cache is None: cache = {} ret = cache.get(expr, None) if ret is not None: return ret new_expr = self.expr_simp(expr) ret = cache.get(new_expr, None) if ret is not None: return ret func = self.expr_to_visitor.get(new_expr.__class__, None) if func is None: raise TypeError("Unknown expr type") ret = func(new_expr, cache=cache) ret = self.expr_simp(ret) assert ret is not None cache[expr] = ret cache[new_expr] = ret return ret
def eval_exprcompose(
self, expr, **kwargs)
[DEV]: Evaluate an ExprCompose using the current state
def eval_exprcompose(self, expr, **kwargs): """[DEV]: Evaluate an ExprCompose using the current state""" args = [] for arg in expr.args: args.append(self.eval_expr_visitor(arg, **kwargs)) ret = ExprCompose(*args) return ret
def eval_exprcond(
self, expr, **kwargs)
[DEV]: Evaluate an ExprCond using the current state
def eval_exprcond(self, expr, **kwargs): """[DEV]: Evaluate an ExprCond using the current state""" cond = self.eval_expr_visitor(expr.cond, **kwargs) src1 = self.eval_expr_visitor(expr.src1, **kwargs) src2 = self.eval_expr_visitor(expr.src2, **kwargs) ret = ExprCond(cond, src1, src2) return ret
def eval_exprid(
self, expr, **kwargs)
[DEV]: Evaluate an ExprId using the current state
def eval_exprid(self, expr, **kwargs): """[DEV]: Evaluate an ExprId using the current state""" ret = self.symbols.read(expr) return ret
def eval_exprint(
self, expr, **kwargs)
[DEV]: Evaluate an ExprInt using the current state
def eval_exprint(self, expr, **kwargs): """[DEV]: Evaluate an ExprInt using the current state""" return expr
def eval_exprloc(
self, expr, **kwargs)
[DEV]: Evaluate an ExprLoc using the current state
def eval_exprloc(self, expr, **kwargs): """[DEV]: Evaluate an ExprLoc using the current state""" offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key) if offset is not None: ret = ExprInt(offset, expr.size) else: ret = expr return ret
def eval_exprmem(
self, expr, **kwargs)
[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses
def eval_exprmem(self, expr, **kwargs): """[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses """ ptr = self.eval_expr_visitor(expr.arg, **kwargs) mem = ExprMem(ptr, expr.size) ret = self.mem_read(mem) return ret
def eval_exprop(
self, expr, **kwargs)
[DEV]: Evaluate an ExprOp using the current state
def eval_exprop(self, expr, **kwargs): """[DEV]: Evaluate an ExprOp using the current state""" args = [] for oarg in expr.args: arg = self.eval_expr_visitor(oarg, **kwargs) args.append(arg) ret = ExprOp(expr.op, *args) return ret
def eval_exprslice(
self, expr, **kwargs)
[DEV]: Evaluate an ExprSlice using the current state
def eval_exprslice(self, expr, **kwargs): """[DEV]: Evaluate an ExprSlice using the current state""" arg = self.eval_expr_visitor(expr.arg, **kwargs) ret = ExprSlice(arg, expr.start, expr.stop) return ret
def eval_ir(
self, assignblk)
Deprecated version of eval_updt_assignblk(self, assignblk)
def eval_ir(self, assignblk): """Deprecated version of eval_updt_assignblk(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir') return self.eval_updt_assignblk(assignblk)
def eval_ir_expr(
self, assignblk)
Deprecated version of eval_ir_expr(self, assignblk)
def eval_ir_expr(self, assignblk): """Deprecated version of eval_ir_expr(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr') return self.eval_assignblk(assignblk).iteritems()
def eval_updt_assignblk(
self, assignblk)
Apply an AssignBlock on the current state @assignblk: AssignBlock instance
def eval_updt_assignblk(self, assignblk): """ Apply an AssignBlock on the current state @assignblk: AssignBlock instance """ mem_dst = [] dst_src = self.eval_assignblk(assignblk) for dst, src in dst_src.iteritems(): self.apply_change(dst, src) if dst.is_mem(): mem_dst.append(dst) return mem_dst
def eval_updt_expr(
self, expr)
Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value
def eval_updt_expr(self, expr): """ Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value """ # Update value if needed if expr.is_aff(): ret = self.eval_expr(expr.src) self.eval_updt_assignblk(AssignBlock([expr])) else: ret = self.eval_expr(expr) return ret
def eval_updt_irblock(
self, irb, step=False)
Symbolic execution of the @irb on the current state @irb: IRBlock instance @step: display intermediate steps
def eval_updt_irblock(self, irb, step=False): """ Symbolic execution of the @irb on the current state @irb: IRBlock instance @step: display intermediate steps """ assignblks = [] for index, assignblk in enumerate(irb): new_assignblk = {} links = {} for dst, src in assignblk.iteritems(): src = self.propag_expr_cst(src) if dst.is_mem(): ptr = dst.arg ptr = self.propag_expr_cst(ptr) dst = ExprMem(ptr, dst.size) new_assignblk[dst] = src if assignblk.instr is not None: for arg in assignblk.instr.args: new_arg = self.propag_expr_cst(arg) links[new_arg] = arg self.cst_propag_link[(irb.loc_key, index)] = links self.eval_updt_assignblk(assignblk) assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) self.ircfg.blocks[irb.loc_key] = IRBlock(irb.loc_key, assignblks)
def get_state(
self)
Return the current state of the SymbolicEngine
def get_state(self): """Return the current state of the SymbolicEngine""" state = self.StateEngine(dict(self.symbols)) return state
def is_expr_cst(
_, ir_arch, expr)
is_expr_cst = lambda _, ir_arch, expr: is_expr_cst(ir_arch, expr)
def mem_read(
self, expr)
[DEV]: Override to modify the effective memory reads
Read symbolic value at ExprMem @expr @expr: ExprMem
def mem_read(self, expr): """ [DEV]: Override to modify the effective memory reads Read symbolic value at ExprMem @expr @expr: ExprMem """ parts = self._resolve_mem_parts(expr) out = [] for known, part in parts: if not known and part.is_mem() and self.func_read is not None: ret = self.func_read(part) else: ret = part out.append(ret) ret = self.expr_simp(ExprCompose(*out)) assert ret.size == expr.size return ret
def mem_write(
self, dst, src)
[DEV]: Override to modify the effective memory writes
Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression
def mem_write(self, dst, src): """ [DEV]: Override to modify the effective memory writes Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression """ if self.func_write is not None: self.func_write(self, dst, src) else: self.symbols.write(dst, src)
def modified(
self, init_state=None, ids=True, mems=True)
Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only
def modified(self, init_state=None, ids=True, mems=True): """ Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only """ if init_state is None: init_state = {} if ids: for variable, value in self.symbols.symbols_id.iteritems(): if variable in init_state and init_state[variable] == value: continue yield variable, value if mems: for mem, value in self.symbols.memory(): if mem in init_state and init_state[mem] == value: continue yield mem, value
def modified_mems(
self, init_state=None)
Deprecated version of modified(ids=False)
def modified_mems(self, init_state=None): """Deprecated version of modified(ids=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems') for mem in self.modified(init_state=init_state, ids=False): yield mem
def modified_regs(
self, init_state=None)
Deprecated version of modified(mems=False)
def modified_regs(self, init_state=None): """Deprecated version of modified(mems=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs') for reg in self.modified(init_state=init_state, mems=False): yield reg
def propag_expr_cst(
self, expr)
Propagate constant expressions in @expr @expr: Expression to update
def propag_expr_cst(self, expr): """Propagate constant expressions in @expr @expr: Expression to update""" elements = expr.get_r(mem_read=True) to_propag = {} for element in elements: # Only ExprId can be safely propagated if not element.is_id(): continue value = self.eval_expr(element) if self.is_expr_cst(self.ir_arch, value): to_propag[element] = value return expr_simp(expr.replace_expr(to_propag))
def run_at(
self, ircfg, addr, lbl_stop=None, step=False)
Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps
def run_at(self, ircfg, addr, lbl_stop=None, step=False): """ Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps """ while True: irblock = ircfg.get_block(addr) if irblock is None: break if irblock.loc_key == lbl_stop: break addr = self.eval_updt_irblock(irblock, step=step) return addr
def run_block_at(
self, ircfg, addr, step=False)
Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps
def run_block_at(self, ircfg, addr, step=False): """ Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps """ irblock = ircfg.get_block(addr) if irblock is not None: addr = self.eval_updt_irblock(irblock, step=step) return addr
def set_state(
self, state)
Restaure the @state of the engine @state: StateEngine instance
def set_state(self, state): """Restaure the @state of the engine @state: StateEngine instance """ self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp) for dst, src in dict(state).iteritems(): self.symbols[dst] = src