Top

miasm2.analysis.cst_propag module

import logging

from miasm2.ir.symbexec import SymbolicExecutionEngine
from miasm2.expression.expression import ExprMem
from miasm2.expression.expression_helper import possible_values
from miasm2.expression.simplifications import expr_simp
from miasm2.ir.ir import IRBlock, AssignBlock

LOG_CST_PROPAG = logging.getLogger("cst_propag")
CONSOLE_HANDLER = logging.StreamHandler()
CONSOLE_HANDLER.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s"))
LOG_CST_PROPAG.addHandler(CONSOLE_HANDLER)
LOG_CST_PROPAG.setLevel(logging.WARNING)


class SymbExecState(SymbolicExecutionEngine):
    """
    State manager for SymbolicExecution
    """
    def __init__(self, ir_arch, ircfg, state):
        super(SymbExecState, self).__init__(ir_arch, {})
        self.set_state(state)


def add_state(ircfg, todo, states, addr, state):
    """
    Add or merge the computed @state for the block at @addr. Update @todo
    @todo: modified block set
    @states: dictionnary linking a label to its entering state.
    @addr: address of the concidered block
    @state: computed state
    """
    addr = ircfg.get_loc_key(addr)
    todo.add(addr)
    if addr not in states:
        states[addr] = state
    else:
        states[addr] = states[addr].merge(state)


def is_expr_cst(ir_arch, expr):
    """Return true if @expr is only composed of ExprInt and init_regs
    @ir_arch: IR instance
    @expr: Expression to test"""

    elements = expr.get_r(mem_read=True)
    for element in elements:
        if element.is_mem():
            continue
        if element.is_id() and element in ir_arch.arch.regs.all_regs_ids_init:
            continue
        if element.is_int():
            continue
        return False
    else:
        # Expr is a constant
        return True


class SymbExecStateFix(SymbolicExecutionEngine):
    """
    Emul blocks and replace expressions with their corresponding constant if
    any.

    """
    # Function used to test if an Expression is considered as a constant
    is_expr_cst = lambda _, ir_arch, expr: is_expr_cst(ir_arch, expr)

    def __init__(self, ir_arch, ircfg, state, cst_propag_link):
        self.ircfg = ircfg
        super(SymbExecStateFix, self).__init__(ir_arch, {})
        self.set_state(state)
        self.cst_propag_link = cst_propag_link

    def propag_expr_cst(self, expr):
        """Propagate constant expressions in @expr
        @expr: Expression to update"""
        elements = expr.get_r(mem_read=True)
        to_propag = {}
        for element in elements:
            # Only ExprId can be safely propagated
            if not element.is_id():
                continue
            value = self.eval_expr(element)
            if self.is_expr_cst(self.ir_arch, value):
                to_propag[element] = value
        return expr_simp(expr.replace_expr(to_propag))

    def eval_updt_irblock(self, irb, step=False):
        """
        Symbolic execution of the @irb on the current state
        @irb: IRBlock instance
        @step: display intermediate steps
        """
        assignblks = []
        for index, assignblk in enumerate(irb):
            new_assignblk = {}
            links = {}
            for dst, src in assignblk.iteritems():
                src = self.propag_expr_cst(src)
                if dst.is_mem():
                    ptr = dst.arg
                    ptr = self.propag_expr_cst(ptr)
                    dst = ExprMem(ptr, dst.size)
                new_assignblk[dst] = src

            if assignblk.instr is not None:
                for arg in assignblk.instr.args:
                    new_arg = self.propag_expr_cst(arg)
                    links[new_arg] = arg
                self.cst_propag_link[(irb.loc_key, index)] = links

            self.eval_updt_assignblk(assignblk)
            assignblks.append(AssignBlock(new_assignblk, assignblk.instr))
        self.ircfg.blocks[irb.loc_key] = IRBlock(irb.loc_key, assignblks)


def compute_cst_propagation_states(ir_arch, ircfg, init_addr, init_infos):
    """
    Propagate "constant expressions" in a function.
    The attribute "constant expression" is true if the expression is based on
    constants or "init" regs values.

    @ir_arch: IntermediateRepresentation instance
    @init_addr: analysis start address
    @init_infos: dictionnary linking expressions to their values at @init_addr
    """

    done = set()
    state = SymbExecState.StateEngine(init_infos)
    lbl = ircfg.get_loc_key(init_addr)
    todo = set([lbl])
    states = {lbl: state}

    while todo:
        if not todo:
            break
        lbl = todo.pop()
        state = states[lbl]
        if (lbl, state) in done:
            continue
        done.add((lbl, state))
        if lbl not in ircfg.blocks:
            continue

        symbexec_engine = SymbExecState(ir_arch, ircfg, state)
        addr = symbexec_engine.run_block_at(ircfg, lbl)
        symbexec_engine.del_mem_above_stack(ir_arch.sp)

        for dst in possible_values(addr):
            value = dst.value
            if value.is_mem():
                LOG_CST_PROPAG.warning('Bad destination: %s', value)
                continue
            elif value.is_int():
                value = ircfg.get_loc_key(value)
            add_state(
                ircfg, todo, states, value,
                symbexec_engine.get_state()
            )

    return states


def propagate_cst_expr(ir_arch, ircfg, addr, init_infos):
    """
    Propagate "constant expressions" in a @ir_arch.
    The attribute "constant expression" is true if the expression is based on
    constants or "init" regs values.

    @ir_arch: IntermediateRepresentation instance
    @addr: analysis start address
    @init_infos: dictionnary linking expressions to their values at @init_addr

    Returns a mapping between replaced Expression and their new values.
    """
    states = compute_cst_propagation_states(ir_arch, ircfg, addr, init_infos)
    cst_propag_link = {}
    for lbl, state in states.iteritems():
        if lbl not in ircfg.blocks:
            continue
        symbexec = SymbExecStateFix(ir_arch, ircfg, state, cst_propag_link)
        symbexec.eval_updt_irblock(ircfg.blocks[lbl])
    return cst_propag_link

Module variables

var CONSOLE_HANDLER

var LOG_CST_PROPAG

Functions

def add_state(

ircfg, todo, states, addr, state)

Add or merge the computed @state for the block at @addr. Update @todo @todo: modified block set @states: dictionnary linking a label to its entering state. @addr: address of the concidered block @state: computed state

def add_state(ircfg, todo, states, addr, state):
    """
    Add or merge the computed @state for the block at @addr. Update @todo
    @todo: modified block set
    @states: dictionnary linking a label to its entering state.
    @addr: address of the concidered block
    @state: computed state
    """
    addr = ircfg.get_loc_key(addr)
    todo.add(addr)
    if addr not in states:
        states[addr] = state
    else:
        states[addr] = states[addr].merge(state)

def compute_cst_propagation_states(

ir_arch, ircfg, init_addr, init_infos)

Propagate "constant expressions" in a function. The attribute "constant expression" is true if the expression is based on constants or "init" regs values.

@ir_arch: IntermediateRepresentation instance @init_addr: analysis start address @init_infos: dictionnary linking expressions to their values at @init_addr

def compute_cst_propagation_states(ir_arch, ircfg, init_addr, init_infos):
    """
    Propagate "constant expressions" in a function.
    The attribute "constant expression" is true if the expression is based on
    constants or "init" regs values.

    @ir_arch: IntermediateRepresentation instance
    @init_addr: analysis start address
    @init_infos: dictionnary linking expressions to their values at @init_addr
    """

    done = set()
    state = SymbExecState.StateEngine(init_infos)
    lbl = ircfg.get_loc_key(init_addr)
    todo = set([lbl])
    states = {lbl: state}

    while todo:
        if not todo:
            break
        lbl = todo.pop()
        state = states[lbl]
        if (lbl, state) in done:
            continue
        done.add((lbl, state))
        if lbl not in ircfg.blocks:
            continue

        symbexec_engine = SymbExecState(ir_arch, ircfg, state)
        addr = symbexec_engine.run_block_at(ircfg, lbl)
        symbexec_engine.del_mem_above_stack(ir_arch.sp)

        for dst in possible_values(addr):
            value = dst.value
            if value.is_mem():
                LOG_CST_PROPAG.warning('Bad destination: %s', value)
                continue
            elif value.is_int():
                value = ircfg.get_loc_key(value)
            add_state(
                ircfg, todo, states, value,
                symbexec_engine.get_state()
            )

    return states

def is_expr_cst(

ir_arch, expr)

Return true if @expr is only composed of ExprInt and init_regs @ir_arch: IR instance @expr: Expression to test

def is_expr_cst(ir_arch, expr):
    """Return true if @expr is only composed of ExprInt and init_regs
    @ir_arch: IR instance
    @expr: Expression to test"""

    elements = expr.get_r(mem_read=True)
    for element in elements:
        if element.is_mem():
            continue
        if element.is_id() and element in ir_arch.arch.regs.all_regs_ids_init:
            continue
        if element.is_int():
            continue
        return False
    else:
        # Expr is a constant
        return True

def propagate_cst_expr(

ir_arch, ircfg, addr, init_infos)

Propagate "constant expressions" in a @ir_arch. The attribute "constant expression" is true if the expression is based on constants or "init" regs values.

@ir_arch: IntermediateRepresentation instance @addr: analysis start address @init_infos: dictionnary linking expressions to their values at @init_addr

Returns a mapping between replaced Expression and their new values.

def propagate_cst_expr(ir_arch, ircfg, addr, init_infos):
    """
    Propagate "constant expressions" in a @ir_arch.
    The attribute "constant expression" is true if the expression is based on
    constants or "init" regs values.

    @ir_arch: IntermediateRepresentation instance
    @addr: analysis start address
    @init_infos: dictionnary linking expressions to their values at @init_addr

    Returns a mapping between replaced Expression and their new values.
    """
    states = compute_cst_propagation_states(ir_arch, ircfg, addr, init_infos)
    cst_propag_link = {}
    for lbl, state in states.iteritems():
        if lbl not in ircfg.blocks:
            continue
        symbexec = SymbExecStateFix(ir_arch, ircfg, state, cst_propag_link)
        symbexec.eval_updt_irblock(ircfg.blocks[lbl])
    return cst_propag_link

Classes

class SymbExecState

State manager for SymbolicExecution

class SymbExecState(SymbolicExecutionEngine):
    """
    State manager for SymbolicExecution
    """
    def __init__(self, ir_arch, ircfg, state):
        super(SymbExecState, self).__init__(ir_arch, {})
        self.set_state(state)

Ancestors (in MRO)

  • SymbExecState
  • miasm2.ir.symbexec.SymbolicExecutionEngine
  • __builtin__.object

Class variables

var StateEngine

Instance variables

var state

Return the current state of the SymbolicEngine

Methods

def __init__(

self, ir_arch, ircfg, state)

def __init__(self, ir_arch, ircfg, state):
    super(SymbExecState, self).__init__(ir_arch, {})
    self.set_state(state)

def apply_change(

self, dst, src)

Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source

def apply_change(self, dst, src):
    """
    Apply @dst = @src on the current state WITHOUT evaluating both side
    @dst: Expr, destination
    @src: Expr, source
    """
    if dst.is_mem():
        self.mem_write(dst, src)
    else:
        self.symbols.write(dst, src)

def apply_expr(

self, expr)

Deprecated version of eval_updt_expr

def apply_expr(self, expr):
    """Deprecated version of eval_updt_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr')
    return self.eval_updt_expr(expr)

def apply_expr_on_state(

self, expr, cache)

Deprecated version of eval_expr

def apply_expr_on_state(self, expr, cache):
    """Deprecated version of eval_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state')
    if cache is None:
        cache = {}
    ret = self.eval_expr(expr, eval_cache=cache)
    return ret

def as_assignblock(

self)

Return the current state as an AssignBlock

def as_assignblock(self):
    """Return the current state as an AssignBlock"""
    warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock')
    out = []
    for dst, src in self.modified(ids=True, mems=True):
        out.append((dst, src))
    return AssignBlock(dict(out))

def del_mem_above_stack(

self, stack_ptr)

Remove all stored memory values with following properties: pointer based on initial stack value pointer below current stack pointer

def del_mem_above_stack(self, stack_ptr):
    """
    Remove all stored memory values with following properties:
    * pointer based on initial stack value
    * pointer below current stack pointer
    """
    stack_ptr = self.eval_expr(stack_ptr)
    base, stk_offset = get_expr_base_offset(stack_ptr)
    memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None)
    if memarray:
        to_del = set()
        for offset in memarray:
            if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0:
                to_del.add(offset)
        for offset in to_del:
            del memarray[offset]

def dump(

self, ids=True, mems=True)

Display modififed variables @ids: display modified ids @mems: display modified memory

def dump(self, ids=True, mems=True):
    """
    Display modififed variables
    @ids: display modified ids
    @mems: display modified memory
    """
    for variable, value in self.modified(None, ids, mems):
        print "%-18s" % variable, "=", "%s" % value

def dump_id(

self)

Deprecated version of dump(mems=False)

def dump_id(self):
    """Deprecated version of dump(mems=False)"""
    warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id')
    self.dump(mems=False)

def dump_mem(

self)

Deprecated version of dump(ids=False)

def dump_mem(self):
    """Deprecated version of dump(ids=False)"""
    warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem')
    self.dump(ids=False)

def emul_ir_bloc(

self, _, addr, step=False)

Deprecated version of run_block_at

def emul_ir_bloc(self, _, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc')
    return self.run_block_at(addr, step)

def emul_ir_block(

self, addr, step=False)

Deprecated version of run_block_at

def emul_ir_block(self, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block')
    return self.run_block_at(addr, step)

def emul_ir_blocks(

self, addr, lbl_stop=None, step=False)

Deprecated version of run_at

def emul_ir_blocks(self, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks')
    return self.run_at(addr, lbl_stop, step)

def emul_ir_blocs(

self, _, addr, lbl_stop=None, step=False)

Deprecated version of run_at

def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs')
    return self.run_at(addr, lbl_stop, step)

def emulbloc(

self, irb, step=False)

Deprecated version of eval_updt_irblock(self, irb, step=False)

def emulbloc(self, irb, step=False):
    """Deprecated version of eval_updt_irblock(self, irb, step=False)"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc')
    return self.eval_updt_irblock(irb, step)

def eval_assignblk(

self, assignblk)

Evaluate AssignBlock using the current state

Returns a dictionary containing modified keys associated to their values

@assignblk: AssignBlock instance

def eval_assignblk(self, assignblk):
    """
    Evaluate AssignBlock using the current state
    Returns a dictionary containing modified keys associated to their values
    @assignblk: AssignBlock instance
    """
    pool_out = {}
    eval_cache = {}
    for dst, src in assignblk.iteritems():
        src = self.eval_expr(src, eval_cache)
        if dst.is_mem():
            ptr = self.eval_expr(dst.arg, eval_cache)
            # Test if mem lookup is known
            tmp = ExprMem(ptr, dst.size)
            pool_out[tmp] = src
        elif dst.is_id():
            pool_out[dst] = src
        else:
            raise ValueError("Unknown destination type", str(dst))
    return pool_out

def eval_expr(

self, expr, eval_cache=None)

Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values

def eval_expr(self, expr, eval_cache=None):
    """
    Evaluate @expr
    @expr: Expresion instance to evaluate
    @cache: None or dictionary linking variables to their values
    """
    if eval_cache is None:
        eval_cache = {}
    ret = self.eval_expr_visitor(expr, cache=eval_cache)
    assert ret is not None
    return ret

def eval_expr_visitor(

self, expr, cache=None)

[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.

def eval_expr_visitor(self, expr, cache=None):
    """
    [DEV]: Override to change the behavior of an Expr evaluation.
    This function recursively applies 'eval_expr*' to @expr.
    This function uses @cache to speedup re-evaluation of expression.
    """
    if cache is None:
        cache = {}
    ret = cache.get(expr, None)
    if ret is not None:
        return ret
    new_expr = self.expr_simp(expr)
    ret = cache.get(new_expr, None)
    if ret is not None:
        return ret
    func = self.expr_to_visitor.get(new_expr.__class__, None)
    if func is None:
        raise TypeError("Unknown expr type")
    ret = func(new_expr, cache=cache)
    ret = self.expr_simp(ret)
    assert ret is not None
    cache[expr] = ret
    cache[new_expr] = ret
    return ret

def eval_exprcompose(

self, expr, **kwargs)

[DEV]: Evaluate an ExprCompose using the current state

def eval_exprcompose(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCompose using the current state"""
    args = []
    for arg in expr.args:
        args.append(self.eval_expr_visitor(arg, **kwargs))
    ret = ExprCompose(*args)
    return ret

def eval_exprcond(

self, expr, **kwargs)

[DEV]: Evaluate an ExprCond using the current state

def eval_exprcond(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCond using the current state"""
    cond = self.eval_expr_visitor(expr.cond, **kwargs)
    src1 = self.eval_expr_visitor(expr.src1, **kwargs)
    src2 = self.eval_expr_visitor(expr.src2, **kwargs)
    ret = ExprCond(cond, src1, src2)
    return ret

def eval_exprid(

self, expr, **kwargs)

[DEV]: Evaluate an ExprId using the current state

def eval_exprid(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprId using the current state"""
    ret = self.symbols.read(expr)
    return ret

def eval_exprint(

self, expr, **kwargs)

[DEV]: Evaluate an ExprInt using the current state

def eval_exprint(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprInt using the current state"""
    return expr

def eval_exprloc(

self, expr, **kwargs)

[DEV]: Evaluate an ExprLoc using the current state

def eval_exprloc(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprLoc using the current state"""
    offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key)
    if offset is not None:
        ret = ExprInt(offset, expr.size)
    else:
        ret = expr
    return ret

def eval_exprmem(

self, expr, **kwargs)

[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses

def eval_exprmem(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprMem using the current state
    This function first evaluate the memory pointer value.
    Override 'mem_read' to modify the effective memory accesses
    """
    ptr = self.eval_expr_visitor(expr.arg, **kwargs)
    mem = ExprMem(ptr, expr.size)
    ret = self.mem_read(mem)
    return ret

def eval_exprop(

self, expr, **kwargs)

[DEV]: Evaluate an ExprOp using the current state

def eval_exprop(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprOp using the current state"""
    args = []
    for oarg in expr.args:
        arg = self.eval_expr_visitor(oarg, **kwargs)
        args.append(arg)
    ret = ExprOp(expr.op, *args)
    return ret

def eval_exprslice(

self, expr, **kwargs)

[DEV]: Evaluate an ExprSlice using the current state

def eval_exprslice(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprSlice using the current state"""
    arg = self.eval_expr_visitor(expr.arg, **kwargs)
    ret = ExprSlice(arg, expr.start, expr.stop)
    return ret

def eval_ir(

self, assignblk)

Deprecated version of eval_updt_assignblk(self, assignblk)

def eval_ir(self, assignblk):
    """Deprecated version of eval_updt_assignblk(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir')
    return self.eval_updt_assignblk(assignblk)

def eval_ir_expr(

self, assignblk)

Deprecated version of eval_ir_expr(self, assignblk)

def eval_ir_expr(self, assignblk):
    """Deprecated version of eval_ir_expr(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr')
    return self.eval_assignblk(assignblk).iteritems()

def eval_updt_assignblk(

self, assignblk)

Apply an AssignBlock on the current state @assignblk: AssignBlock instance

def eval_updt_assignblk(self, assignblk):
    """
    Apply an AssignBlock on the current state
    @assignblk: AssignBlock instance
    """
    mem_dst = []
    dst_src = self.eval_assignblk(assignblk)
    for dst, src in dst_src.iteritems():
        self.apply_change(dst, src)
        if dst.is_mem():
            mem_dst.append(dst)
    return mem_dst

def eval_updt_expr(

self, expr)

Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value

def eval_updt_expr(self, expr):
    """
    Evaluate @expr and apply side effect if needed (ie. if expr is an
    assignment). Return the evaluated value
    """
    # Update value if needed
    if expr.is_aff():
        ret = self.eval_expr(expr.src)
        self.eval_updt_assignblk(AssignBlock([expr]))
    else:
        ret = self.eval_expr(expr)
    return ret

def eval_updt_irblock(

self, irb, step=False)

Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps

def eval_updt_irblock(self, irb, step=False):
    """
    Symbolic execution of the @irb on the current state
    @irb: irbloc instance
    @step: display intermediate steps
    """
    for assignblk in irb:
        if step:
            print 'Instr', assignblk.instr
            print 'Assignblk:'
            print assignblk
            print '_' * 80
        self.eval_updt_assignblk(assignblk)
        if step:
            self.dump(mems=False)
            self.dump(ids=False)
            print '_' * 80
    dst = self.eval_expr(self.ir_arch.IRDst)
    return dst

def get_state(

self)

Return the current state of the SymbolicEngine

def get_state(self):
    """Return the current state of the SymbolicEngine"""
    state = self.StateEngine(dict(self.symbols))
    return state

def mem_read(

self, expr)

[DEV]: Override to modify the effective memory reads

Read symbolic value at ExprMem @expr @expr: ExprMem

def mem_read(self, expr):
    """
    [DEV]: Override to modify the effective memory reads
    Read symbolic value at ExprMem @expr
    @expr: ExprMem
    """
    parts = self._resolve_mem_parts(expr)
    out = []
    for known, part in parts:
        if not known and part.is_mem() and self.func_read is not None:
            ret = self.func_read(part)
        else:
            ret = part
        out.append(ret)
    ret = self.expr_simp(ExprCompose(*out))
    assert ret.size == expr.size
    return ret

def mem_write(

self, dst, src)

[DEV]: Override to modify the effective memory writes

Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression

def mem_write(self, dst, src):
    """
    [DEV]: Override to modify the effective memory writes
    Write symbolic value @src at ExprMem @dst
    @dst: destination ExprMem
    @src: source Expression
    """
    if self.func_write is not None:
        self.func_write(self, dst, src)
    else:
        self.symbols.write(dst, src)

def modified(

self, init_state=None, ids=True, mems=True)

Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only

def modified(self, init_state=None, ids=True, mems=True):
    """
    Return the modified variables.
    @init_state: a base dictionary linking variables to their initial values
    to diff. Can be None.
    @ids: track ids only
    @mems: track mems only
    """
    if init_state is None:
        init_state = {}
    if ids:
        for variable, value in self.symbols.symbols_id.iteritems():
            if variable in init_state and init_state[variable] == value:
                continue
            yield variable, value
    if mems:
        for mem, value in self.symbols.memory():
            if mem in init_state and init_state[mem] == value:
                continue
            yield mem, value

def modified_mems(

self, init_state=None)

Deprecated version of modified(ids=False)

def modified_mems(self, init_state=None):
    """Deprecated version of modified(ids=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems')
    for mem in self.modified(init_state=init_state, ids=False):
        yield mem

def modified_regs(

self, init_state=None)

Deprecated version of modified(mems=False)

def modified_regs(self, init_state=None):
    """Deprecated version of modified(mems=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs')
    for reg in self.modified(init_state=init_state, mems=False):
        yield reg

def run_at(

self, ircfg, addr, lbl_stop=None, step=False)

Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps

def run_at(self, ircfg, addr, lbl_stop=None, step=False):
    """
    Symbolic execution starting at @addr
    @addr: address to execute (int or ExprInt or label)
    @lbl_stop: LocKey to stop execution on
    @step: display intermediate steps
    """
    while True:
        irblock = ircfg.get_block(addr)
        if irblock is None:
            break
        if irblock.loc_key == lbl_stop:
            break
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def run_block_at(

self, ircfg, addr, step=False)

Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps

def run_block_at(self, ircfg, addr, step=False):
    """
    Symbolic execution of the block at @addr
    @addr: address to execute (int or ExprInt or label)
    @step: display intermediate steps
    """
    irblock = ircfg.get_block(addr)
    if irblock is not None:
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def set_state(

self, state)

Restaure the @state of the engine @state: StateEngine instance

def set_state(self, state):
    """Restaure the @state of the engine
    @state: StateEngine instance
    """
    self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp)
    for dst, src in dict(state).iteritems():
        self.symbols[dst] = src

class SymbExecStateFix

Emul blocks and replace expressions with their corresponding constant if any.

class SymbExecStateFix(SymbolicExecutionEngine):
    """
    Emul blocks and replace expressions with their corresponding constant if
    any.

    """
    # Function used to test if an Expression is considered as a constant
    is_expr_cst = lambda _, ir_arch, expr: is_expr_cst(ir_arch, expr)

    def __init__(self, ir_arch, ircfg, state, cst_propag_link):
        self.ircfg = ircfg
        super(SymbExecStateFix, self).__init__(ir_arch, {})
        self.set_state(state)
        self.cst_propag_link = cst_propag_link

    def propag_expr_cst(self, expr):
        """Propagate constant expressions in @expr
        @expr: Expression to update"""
        elements = expr.get_r(mem_read=True)
        to_propag = {}
        for element in elements:
            # Only ExprId can be safely propagated
            if not element.is_id():
                continue
            value = self.eval_expr(element)
            if self.is_expr_cst(self.ir_arch, value):
                to_propag[element] = value
        return expr_simp(expr.replace_expr(to_propag))

    def eval_updt_irblock(self, irb, step=False):
        """
        Symbolic execution of the @irb on the current state
        @irb: IRBlock instance
        @step: display intermediate steps
        """
        assignblks = []
        for index, assignblk in enumerate(irb):
            new_assignblk = {}
            links = {}
            for dst, src in assignblk.iteritems():
                src = self.propag_expr_cst(src)
                if dst.is_mem():
                    ptr = dst.arg
                    ptr = self.propag_expr_cst(ptr)
                    dst = ExprMem(ptr, dst.size)
                new_assignblk[dst] = src

            if assignblk.instr is not None:
                for arg in assignblk.instr.args:
                    new_arg = self.propag_expr_cst(arg)
                    links[new_arg] = arg
                self.cst_propag_link[(irb.loc_key, index)] = links

            self.eval_updt_assignblk(assignblk)
            assignblks.append(AssignBlock(new_assignblk, assignblk.instr))
        self.ircfg.blocks[irb.loc_key] = IRBlock(irb.loc_key, assignblks)

Ancestors (in MRO)

  • SymbExecStateFix
  • miasm2.ir.symbexec.SymbolicExecutionEngine
  • __builtin__.object

Class variables

var StateEngine

Instance variables

var ircfg

var state

Return the current state of the SymbolicEngine

Methods

def __init__(

self, ir_arch, ircfg, state, cst_propag_link)

def __init__(self, ir_arch, ircfg, state, cst_propag_link):
    self.ircfg = ircfg
    super(SymbExecStateFix, self).__init__(ir_arch, {})
    self.set_state(state)
    self.cst_propag_link = cst_propag_link

def apply_change(

self, dst, src)

Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source

def apply_change(self, dst, src):
    """
    Apply @dst = @src on the current state WITHOUT evaluating both side
    @dst: Expr, destination
    @src: Expr, source
    """
    if dst.is_mem():
        self.mem_write(dst, src)
    else:
        self.symbols.write(dst, src)

def apply_expr(

self, expr)

Deprecated version of eval_updt_expr

def apply_expr(self, expr):
    """Deprecated version of eval_updt_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr')
    return self.eval_updt_expr(expr)

def apply_expr_on_state(

self, expr, cache)

Deprecated version of eval_expr

def apply_expr_on_state(self, expr, cache):
    """Deprecated version of eval_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state')
    if cache is None:
        cache = {}
    ret = self.eval_expr(expr, eval_cache=cache)
    return ret

def as_assignblock(

self)

Return the current state as an AssignBlock

def as_assignblock(self):
    """Return the current state as an AssignBlock"""
    warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock')
    out = []
    for dst, src in self.modified(ids=True, mems=True):
        out.append((dst, src))
    return AssignBlock(dict(out))

def del_mem_above_stack(

self, stack_ptr)

Remove all stored memory values with following properties: pointer based on initial stack value pointer below current stack pointer

def del_mem_above_stack(self, stack_ptr):
    """
    Remove all stored memory values with following properties:
    * pointer based on initial stack value
    * pointer below current stack pointer
    """
    stack_ptr = self.eval_expr(stack_ptr)
    base, stk_offset = get_expr_base_offset(stack_ptr)
    memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None)
    if memarray:
        to_del = set()
        for offset in memarray:
            if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0:
                to_del.add(offset)
        for offset in to_del:
            del memarray[offset]

def dump(

self, ids=True, mems=True)

Display modififed variables @ids: display modified ids @mems: display modified memory

def dump(self, ids=True, mems=True):
    """
    Display modififed variables
    @ids: display modified ids
    @mems: display modified memory
    """
    for variable, value in self.modified(None, ids, mems):
        print "%-18s" % variable, "=", "%s" % value

def dump_id(

self)

Deprecated version of dump(mems=False)

def dump_id(self):
    """Deprecated version of dump(mems=False)"""
    warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id')
    self.dump(mems=False)

def dump_mem(

self)

Deprecated version of dump(ids=False)

def dump_mem(self):
    """Deprecated version of dump(ids=False)"""
    warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem')
    self.dump(ids=False)

def emul_ir_bloc(

self, _, addr, step=False)

Deprecated version of run_block_at

def emul_ir_bloc(self, _, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc')
    return self.run_block_at(addr, step)

def emul_ir_block(

self, addr, step=False)

Deprecated version of run_block_at

def emul_ir_block(self, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block')
    return self.run_block_at(addr, step)

def emul_ir_blocks(

self, addr, lbl_stop=None, step=False)

Deprecated version of run_at

def emul_ir_blocks(self, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks')
    return self.run_at(addr, lbl_stop, step)

def emul_ir_blocs(

self, _, addr, lbl_stop=None, step=False)

Deprecated version of run_at

def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs')
    return self.run_at(addr, lbl_stop, step)

def emulbloc(

self, irb, step=False)

Deprecated version of eval_updt_irblock(self, irb, step=False)

def emulbloc(self, irb, step=False):
    """Deprecated version of eval_updt_irblock(self, irb, step=False)"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc')
    return self.eval_updt_irblock(irb, step)

def eval_assignblk(

self, assignblk)

Evaluate AssignBlock using the current state

Returns a dictionary containing modified keys associated to their values

@assignblk: AssignBlock instance

def eval_assignblk(self, assignblk):
    """
    Evaluate AssignBlock using the current state
    Returns a dictionary containing modified keys associated to their values
    @assignblk: AssignBlock instance
    """
    pool_out = {}
    eval_cache = {}
    for dst, src in assignblk.iteritems():
        src = self.eval_expr(src, eval_cache)
        if dst.is_mem():
            ptr = self.eval_expr(dst.arg, eval_cache)
            # Test if mem lookup is known
            tmp = ExprMem(ptr, dst.size)
            pool_out[tmp] = src
        elif dst.is_id():
            pool_out[dst] = src
        else:
            raise ValueError("Unknown destination type", str(dst))
    return pool_out

def eval_expr(

self, expr, eval_cache=None)

Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values

def eval_expr(self, expr, eval_cache=None):
    """
    Evaluate @expr
    @expr: Expresion instance to evaluate
    @cache: None or dictionary linking variables to their values
    """
    if eval_cache is None:
        eval_cache = {}
    ret = self.eval_expr_visitor(expr, cache=eval_cache)
    assert ret is not None
    return ret

def eval_expr_visitor(

self, expr, cache=None)

[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.

def eval_expr_visitor(self, expr, cache=None):
    """
    [DEV]: Override to change the behavior of an Expr evaluation.
    This function recursively applies 'eval_expr*' to @expr.
    This function uses @cache to speedup re-evaluation of expression.
    """
    if cache is None:
        cache = {}
    ret = cache.get(expr, None)
    if ret is not None:
        return ret
    new_expr = self.expr_simp(expr)
    ret = cache.get(new_expr, None)
    if ret is not None:
        return ret
    func = self.expr_to_visitor.get(new_expr.__class__, None)
    if func is None:
        raise TypeError("Unknown expr type")
    ret = func(new_expr, cache=cache)
    ret = self.expr_simp(ret)
    assert ret is not None
    cache[expr] = ret
    cache[new_expr] = ret
    return ret

def eval_exprcompose(

self, expr, **kwargs)

[DEV]: Evaluate an ExprCompose using the current state

def eval_exprcompose(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCompose using the current state"""
    args = []
    for arg in expr.args:
        args.append(self.eval_expr_visitor(arg, **kwargs))
    ret = ExprCompose(*args)
    return ret

def eval_exprcond(

self, expr, **kwargs)

[DEV]: Evaluate an ExprCond using the current state

def eval_exprcond(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCond using the current state"""
    cond = self.eval_expr_visitor(expr.cond, **kwargs)
    src1 = self.eval_expr_visitor(expr.src1, **kwargs)
    src2 = self.eval_expr_visitor(expr.src2, **kwargs)
    ret = ExprCond(cond, src1, src2)
    return ret

def eval_exprid(

self, expr, **kwargs)

[DEV]: Evaluate an ExprId using the current state

def eval_exprid(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprId using the current state"""
    ret = self.symbols.read(expr)
    return ret

def eval_exprint(

self, expr, **kwargs)

[DEV]: Evaluate an ExprInt using the current state

def eval_exprint(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprInt using the current state"""
    return expr

def eval_exprloc(

self, expr, **kwargs)

[DEV]: Evaluate an ExprLoc using the current state

def eval_exprloc(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprLoc using the current state"""
    offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key)
    if offset is not None:
        ret = ExprInt(offset, expr.size)
    else:
        ret = expr
    return ret

def eval_exprmem(

self, expr, **kwargs)

[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses

def eval_exprmem(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprMem using the current state
    This function first evaluate the memory pointer value.
    Override 'mem_read' to modify the effective memory accesses
    """
    ptr = self.eval_expr_visitor(expr.arg, **kwargs)
    mem = ExprMem(ptr, expr.size)
    ret = self.mem_read(mem)
    return ret

def eval_exprop(

self, expr, **kwargs)

[DEV]: Evaluate an ExprOp using the current state

def eval_exprop(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprOp using the current state"""
    args = []
    for oarg in expr.args:
        arg = self.eval_expr_visitor(oarg, **kwargs)
        args.append(arg)
    ret = ExprOp(expr.op, *args)
    return ret

def eval_exprslice(

self, expr, **kwargs)

[DEV]: Evaluate an ExprSlice using the current state

def eval_exprslice(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprSlice using the current state"""
    arg = self.eval_expr_visitor(expr.arg, **kwargs)
    ret = ExprSlice(arg, expr.start, expr.stop)
    return ret

def eval_ir(

self, assignblk)

Deprecated version of eval_updt_assignblk(self, assignblk)

def eval_ir(self, assignblk):
    """Deprecated version of eval_updt_assignblk(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir')
    return self.eval_updt_assignblk(assignblk)

def eval_ir_expr(

self, assignblk)

Deprecated version of eval_ir_expr(self, assignblk)

def eval_ir_expr(self, assignblk):
    """Deprecated version of eval_ir_expr(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr')
    return self.eval_assignblk(assignblk).iteritems()

def eval_updt_assignblk(

self, assignblk)

Apply an AssignBlock on the current state @assignblk: AssignBlock instance

def eval_updt_assignblk(self, assignblk):
    """
    Apply an AssignBlock on the current state
    @assignblk: AssignBlock instance
    """
    mem_dst = []
    dst_src = self.eval_assignblk(assignblk)
    for dst, src in dst_src.iteritems():
        self.apply_change(dst, src)
        if dst.is_mem():
            mem_dst.append(dst)
    return mem_dst

def eval_updt_expr(

self, expr)

Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value

def eval_updt_expr(self, expr):
    """
    Evaluate @expr and apply side effect if needed (ie. if expr is an
    assignment). Return the evaluated value
    """
    # Update value if needed
    if expr.is_aff():
        ret = self.eval_expr(expr.src)
        self.eval_updt_assignblk(AssignBlock([expr]))
    else:
        ret = self.eval_expr(expr)
    return ret

def eval_updt_irblock(

self, irb, step=False)

Symbolic execution of the @irb on the current state @irb: IRBlock instance @step: display intermediate steps

def eval_updt_irblock(self, irb, step=False):
    """
    Symbolic execution of the @irb on the current state
    @irb: IRBlock instance
    @step: display intermediate steps
    """
    assignblks = []
    for index, assignblk in enumerate(irb):
        new_assignblk = {}
        links = {}
        for dst, src in assignblk.iteritems():
            src = self.propag_expr_cst(src)
            if dst.is_mem():
                ptr = dst.arg
                ptr = self.propag_expr_cst(ptr)
                dst = ExprMem(ptr, dst.size)
            new_assignblk[dst] = src
        if assignblk.instr is not None:
            for arg in assignblk.instr.args:
                new_arg = self.propag_expr_cst(arg)
                links[new_arg] = arg
            self.cst_propag_link[(irb.loc_key, index)] = links
        self.eval_updt_assignblk(assignblk)
        assignblks.append(AssignBlock(new_assignblk, assignblk.instr))
    self.ircfg.blocks[irb.loc_key] = IRBlock(irb.loc_key, assignblks)

def get_state(

self)

Return the current state of the SymbolicEngine

def get_state(self):
    """Return the current state of the SymbolicEngine"""
    state = self.StateEngine(dict(self.symbols))
    return state

def is_expr_cst(

_, ir_arch, expr)

is_expr_cst = lambda _, ir_arch, expr: is_expr_cst(ir_arch, expr)

def mem_read(

self, expr)

[DEV]: Override to modify the effective memory reads

Read symbolic value at ExprMem @expr @expr: ExprMem

def mem_read(self, expr):
    """
    [DEV]: Override to modify the effective memory reads
    Read symbolic value at ExprMem @expr
    @expr: ExprMem
    """
    parts = self._resolve_mem_parts(expr)
    out = []
    for known, part in parts:
        if not known and part.is_mem() and self.func_read is not None:
            ret = self.func_read(part)
        else:
            ret = part
        out.append(ret)
    ret = self.expr_simp(ExprCompose(*out))
    assert ret.size == expr.size
    return ret

def mem_write(

self, dst, src)

[DEV]: Override to modify the effective memory writes

Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression

def mem_write(self, dst, src):
    """
    [DEV]: Override to modify the effective memory writes
    Write symbolic value @src at ExprMem @dst
    @dst: destination ExprMem
    @src: source Expression
    """
    if self.func_write is not None:
        self.func_write(self, dst, src)
    else:
        self.symbols.write(dst, src)

def modified(

self, init_state=None, ids=True, mems=True)

Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only

def modified(self, init_state=None, ids=True, mems=True):
    """
    Return the modified variables.
    @init_state: a base dictionary linking variables to their initial values
    to diff. Can be None.
    @ids: track ids only
    @mems: track mems only
    """
    if init_state is None:
        init_state = {}
    if ids:
        for variable, value in self.symbols.symbols_id.iteritems():
            if variable in init_state and init_state[variable] == value:
                continue
            yield variable, value
    if mems:
        for mem, value in self.symbols.memory():
            if mem in init_state and init_state[mem] == value:
                continue
            yield mem, value

def modified_mems(

self, init_state=None)

Deprecated version of modified(ids=False)

def modified_mems(self, init_state=None):
    """Deprecated version of modified(ids=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems')
    for mem in self.modified(init_state=init_state, ids=False):
        yield mem

def modified_regs(

self, init_state=None)

Deprecated version of modified(mems=False)

def modified_regs(self, init_state=None):
    """Deprecated version of modified(mems=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs')
    for reg in self.modified(init_state=init_state, mems=False):
        yield reg

def propag_expr_cst(

self, expr)

Propagate constant expressions in @expr @expr: Expression to update

def propag_expr_cst(self, expr):
    """Propagate constant expressions in @expr
    @expr: Expression to update"""
    elements = expr.get_r(mem_read=True)
    to_propag = {}
    for element in elements:
        # Only ExprId can be safely propagated
        if not element.is_id():
            continue
        value = self.eval_expr(element)
        if self.is_expr_cst(self.ir_arch, value):
            to_propag[element] = value
    return expr_simp(expr.replace_expr(to_propag))

def run_at(

self, ircfg, addr, lbl_stop=None, step=False)

Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps

def run_at(self, ircfg, addr, lbl_stop=None, step=False):
    """
    Symbolic execution starting at @addr
    @addr: address to execute (int or ExprInt or label)
    @lbl_stop: LocKey to stop execution on
    @step: display intermediate steps
    """
    while True:
        irblock = ircfg.get_block(addr)
        if irblock is None:
            break
        if irblock.loc_key == lbl_stop:
            break
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def run_block_at(

self, ircfg, addr, step=False)

Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps

def run_block_at(self, ircfg, addr, step=False):
    """
    Symbolic execution of the block at @addr
    @addr: address to execute (int or ExprInt or label)
    @step: display intermediate steps
    """
    irblock = ircfg.get_block(addr)
    if irblock is not None:
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def set_state(

self, state)

Restaure the @state of the engine @state: StateEngine instance

def set_state(self, state):
    """Restaure the @state of the engine
    @state: StateEngine instance
    """
    self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp)
    for dst, src in dict(state).iteritems():
        self.symbols[dst] = src