Top

miasm2.ir.symbexec_types module

from miasm2.ir.symbexec import SymbolicExecutionEngine, StateEngine
from miasm2.expression.simplifications import expr_simp
from miasm2.expression.expression import ExprId, ExprMem


class SymbolicStateCTypes(StateEngine):
    """Store C types of symbols"""

    def __init__(self, symbols):
        tmp = {}
        for expr, types in symbols.iteritems():
            tmp[expr] = frozenset(types)
        self._symbols = frozenset(tmp.iteritems())

    def __hash__(self):
        return hash((self.__class__, self._symbols))

    def __str__(self):
        out = []
        for dst, src in sorted(self._symbols):
            out.append("%s = %s" % (dst, src))
        return "\n".join(out)

    def __eq__(self, other):
        if self is other:
            return True
        if self.__class__ != other.__class__:
            return False
        return self.symbols == other.symbols

    def __iter__(self):
        for dst, src in self._symbols:
            yield dst, src

    def merge(self, other):
        """Merge two symbolic states
        The resulting types are the union of types of both states.
        @other: second symbolic state
        """
        symb_a = self.symbols
        symb_b = other.symbols
        symbols = {}
        for expr in set(symb_a).union(set(symb_b)):
            ctypes = symb_a.get(expr, set()).union(symb_b.get(expr, set()))
            if ctypes:
                symbols[expr] = ctypes
        return self.__class__(symbols)

    @property
    def symbols(self):
        """Return the dictionnary of known symbols'types"""
        return dict(self._symbols)


class SymbExecCType(SymbolicExecutionEngine):
    """Engine of C types propagation
    WARNING: avoid memory aliases here!
    """

    StateEngine = SymbolicStateCTypes
    OBJC_INTERNAL = "___OBJC___"

    def __init__(self, ir_arch,
                 symbols,
                 chandler,
                 func_read=None,
                 func_write=None,
                 sb_expr_simp=expr_simp):
        self.chandler = chandler

        super(SymbExecCType, self).__init__(ir_arch,
                                            {},
                                            func_read,
                                            func_write,
                                            sb_expr_simp)
        self.symbols = dict(symbols)

    def get_state(self):
        """Return the current state of the SymbolicEngine"""
        return self.StateEngine(self.symbols)

    def eval_assignblk(self, assignblk):
        """
        Evaluate AssignBlock on the current state
        @assignblk: AssignBlock instance
        """
        pool_out = {}
        for dst, src in assignblk.iteritems():
            objcs = self.chandler.expr_to_types(src, self.symbols)
            if isinstance(dst, ExprMem):
                continue
            elif isinstance(dst, ExprId):
                pool_out[dst] = frozenset(objcs)
            else:
                raise ValueError("Unsupported affectation", str(dst))
        return pool_out

    def eval_expr(self, expr, eval_cache=None):
        return frozenset(self.chandler.expr_to_types(expr, self.symbols))

    def apply_change(self, dst, src):
        if src is None:
            if dst in self.symbols:
                del self.symbols[dst]
        else:
            self.symbols[dst] = src

    def del_mem_above_stack(self, stack_ptr):
        """No stack deletion"""
        return

    def dump_id(self):
        """
        Dump modififed registers symbols only
        """
        for expr, expr_types in sorted(self.symbols.iteritems()):
            if not expr.is_mem():
                print expr
                for expr_type in expr_types:
                    print '\t', expr_type

    def dump_mem(self):
        """
        Dump modififed memory symbols
        """
        for expr, value in sorted(self.symbols.iteritems()):
            if expr.is_mem():
                print expr, value

Classes

class SymbExecCType

Engine of C types propagation WARNING: avoid memory aliases here!

class SymbExecCType(SymbolicExecutionEngine):
    """Engine of C types propagation
    WARNING: avoid memory aliases here!
    """

    StateEngine = SymbolicStateCTypes
    OBJC_INTERNAL = "___OBJC___"

    def __init__(self, ir_arch,
                 symbols,
                 chandler,
                 func_read=None,
                 func_write=None,
                 sb_expr_simp=expr_simp):
        self.chandler = chandler

        super(SymbExecCType, self).__init__(ir_arch,
                                            {},
                                            func_read,
                                            func_write,
                                            sb_expr_simp)
        self.symbols = dict(symbols)

    def get_state(self):
        """Return the current state of the SymbolicEngine"""
        return self.StateEngine(self.symbols)

    def eval_assignblk(self, assignblk):
        """
        Evaluate AssignBlock on the current state
        @assignblk: AssignBlock instance
        """
        pool_out = {}
        for dst, src in assignblk.iteritems():
            objcs = self.chandler.expr_to_types(src, self.symbols)
            if isinstance(dst, ExprMem):
                continue
            elif isinstance(dst, ExprId):
                pool_out[dst] = frozenset(objcs)
            else:
                raise ValueError("Unsupported affectation", str(dst))
        return pool_out

    def eval_expr(self, expr, eval_cache=None):
        return frozenset(self.chandler.expr_to_types(expr, self.symbols))

    def apply_change(self, dst, src):
        if src is None:
            if dst in self.symbols:
                del self.symbols[dst]
        else:
            self.symbols[dst] = src

    def del_mem_above_stack(self, stack_ptr):
        """No stack deletion"""
        return

    def dump_id(self):
        """
        Dump modififed registers symbols only
        """
        for expr, expr_types in sorted(self.symbols.iteritems()):
            if not expr.is_mem():
                print expr
                for expr_type in expr_types:
                    print '\t', expr_type

    def dump_mem(self):
        """
        Dump modififed memory symbols
        """
        for expr, value in sorted(self.symbols.iteritems()):
            if expr.is_mem():
                print expr, value

Ancestors (in MRO)

  • SymbExecCType
  • miasm2.ir.symbexec.SymbolicExecutionEngine
  • __builtin__.object

Class variables

var OBJC_INTERNAL

var StateEngine

Instance variables

var chandler

var state

Return the current state of the SymbolicEngine

var symbols

Methods

def __init__(

self, ir_arch, symbols, chandler, func_read=None, func_write=None, sb_expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b1d0>)

def __init__(self, ir_arch,
             symbols,
             chandler,
             func_read=None,
             func_write=None,
             sb_expr_simp=expr_simp):
    self.chandler = chandler
    super(SymbExecCType, self).__init__(ir_arch,
                                        {},
                                        func_read,
                                        func_write,
                                        sb_expr_simp)
    self.symbols = dict(symbols)

def apply_change(

self, dst, src)

def apply_change(self, dst, src):
    if src is None:
        if dst in self.symbols:
            del self.symbols[dst]
    else:
        self.symbols[dst] = src

def apply_expr(

self, expr)

Deprecated version of eval_updt_expr

def apply_expr(self, expr):
    """Deprecated version of eval_updt_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr')
    return self.eval_updt_expr(expr)

def apply_expr_on_state(

self, expr, cache)

Deprecated version of eval_expr

def apply_expr_on_state(self, expr, cache):
    """Deprecated version of eval_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state')
    if cache is None:
        cache = {}
    ret = self.eval_expr(expr, eval_cache=cache)
    return ret

def as_assignblock(

self)

Return the current state as an AssignBlock

def as_assignblock(self):
    """Return the current state as an AssignBlock"""
    warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock')
    out = []
    for dst, src in self.modified(ids=True, mems=True):
        out.append((dst, src))
    return AssignBlock(dict(out))

def del_mem_above_stack(

self, stack_ptr)

No stack deletion

def del_mem_above_stack(self, stack_ptr):
    """No stack deletion"""
    return

def dump(

self, ids=True, mems=True)

Display modififed variables @ids: display modified ids @mems: display modified memory

def dump(self, ids=True, mems=True):
    """
    Display modififed variables
    @ids: display modified ids
    @mems: display modified memory
    """
    for variable, value in self.modified(None, ids, mems):
        print "%-18s" % variable, "=", "%s" % value

def dump_id(

self)

Dump modififed registers symbols only

def dump_id(self):
    """
    Dump modififed registers symbols only
    """
    for expr, expr_types in sorted(self.symbols.iteritems()):
        if not expr.is_mem():
            print expr
            for expr_type in expr_types:
                print '\t', expr_type

def dump_mem(

self)

Dump modififed memory symbols

def dump_mem(self):
    """
    Dump modififed memory symbols
    """
    for expr, value in sorted(self.symbols.iteritems()):
        if expr.is_mem():
            print expr, value

def emul_ir_bloc(

self, _, addr, step=False)

Deprecated version of run_block_at

def emul_ir_bloc(self, _, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc')
    return self.run_block_at(addr, step)

def emul_ir_block(

self, addr, step=False)

Deprecated version of run_block_at

def emul_ir_block(self, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block')
    return self.run_block_at(addr, step)

def emul_ir_blocks(

self, addr, lbl_stop=None, step=False)

Deprecated version of run_at

def emul_ir_blocks(self, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks')
    return self.run_at(addr, lbl_stop, step)

def emul_ir_blocs(

self, _, addr, lbl_stop=None, step=False)

Deprecated version of run_at

def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs')
    return self.run_at(addr, lbl_stop, step)

def emulbloc(

self, irb, step=False)

Deprecated version of eval_updt_irblock(self, irb, step=False)

def emulbloc(self, irb, step=False):
    """Deprecated version of eval_updt_irblock(self, irb, step=False)"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc')
    return self.eval_updt_irblock(irb, step)

def eval_assignblk(

self, assignblk)

Evaluate AssignBlock on the current state @assignblk: AssignBlock instance

def eval_assignblk(self, assignblk):
    """
    Evaluate AssignBlock on the current state
    @assignblk: AssignBlock instance
    """
    pool_out = {}
    for dst, src in assignblk.iteritems():
        objcs = self.chandler.expr_to_types(src, self.symbols)
        if isinstance(dst, ExprMem):
            continue
        elif isinstance(dst, ExprId):
            pool_out[dst] = frozenset(objcs)
        else:
            raise ValueError("Unsupported affectation", str(dst))
    return pool_out

def eval_expr(

self, expr, eval_cache=None)

def eval_expr(self, expr, eval_cache=None):
    return frozenset(self.chandler.expr_to_types(expr, self.symbols))

def eval_expr_visitor(

self, expr, cache=None)

[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.

def eval_expr_visitor(self, expr, cache=None):
    """
    [DEV]: Override to change the behavior of an Expr evaluation.
    This function recursively applies 'eval_expr*' to @expr.
    This function uses @cache to speedup re-evaluation of expression.
    """
    if cache is None:
        cache = {}
    ret = cache.get(expr, None)
    if ret is not None:
        return ret
    new_expr = self.expr_simp(expr)
    ret = cache.get(new_expr, None)
    if ret is not None:
        return ret
    func = self.expr_to_visitor.get(new_expr.__class__, None)
    if func is None:
        raise TypeError("Unknown expr type")
    ret = func(new_expr, cache=cache)
    ret = self.expr_simp(ret)
    assert ret is not None
    cache[expr] = ret
    cache[new_expr] = ret
    return ret

def eval_exprcompose(

self, expr, **kwargs)

[DEV]: Evaluate an ExprCompose using the current state

def eval_exprcompose(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCompose using the current state"""
    args = []
    for arg in expr.args:
        args.append(self.eval_expr_visitor(arg, **kwargs))
    ret = ExprCompose(*args)
    return ret

def eval_exprcond(

self, expr, **kwargs)

[DEV]: Evaluate an ExprCond using the current state

def eval_exprcond(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCond using the current state"""
    cond = self.eval_expr_visitor(expr.cond, **kwargs)
    src1 = self.eval_expr_visitor(expr.src1, **kwargs)
    src2 = self.eval_expr_visitor(expr.src2, **kwargs)
    ret = ExprCond(cond, src1, src2)
    return ret

def eval_exprid(

self, expr, **kwargs)

[DEV]: Evaluate an ExprId using the current state

def eval_exprid(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprId using the current state"""
    ret = self.symbols.read(expr)
    return ret

def eval_exprint(

self, expr, **kwargs)

[DEV]: Evaluate an ExprInt using the current state

def eval_exprint(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprInt using the current state"""
    return expr

def eval_exprloc(

self, expr, **kwargs)

[DEV]: Evaluate an ExprLoc using the current state

def eval_exprloc(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprLoc using the current state"""
    offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key)
    if offset is not None:
        ret = ExprInt(offset, expr.size)
    else:
        ret = expr
    return ret

def eval_exprmem(

self, expr, **kwargs)

[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses

def eval_exprmem(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprMem using the current state
    This function first evaluate the memory pointer value.
    Override 'mem_read' to modify the effective memory accesses
    """
    ptr = self.eval_expr_visitor(expr.arg, **kwargs)
    mem = ExprMem(ptr, expr.size)
    ret = self.mem_read(mem)
    return ret

def eval_exprop(

self, expr, **kwargs)

[DEV]: Evaluate an ExprOp using the current state

def eval_exprop(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprOp using the current state"""
    args = []
    for oarg in expr.args:
        arg = self.eval_expr_visitor(oarg, **kwargs)
        args.append(arg)
    ret = ExprOp(expr.op, *args)
    return ret

def eval_exprslice(

self, expr, **kwargs)

[DEV]: Evaluate an ExprSlice using the current state

def eval_exprslice(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprSlice using the current state"""
    arg = self.eval_expr_visitor(expr.arg, **kwargs)
    ret = ExprSlice(arg, expr.start, expr.stop)
    return ret

def eval_ir(

self, assignblk)

Deprecated version of eval_updt_assignblk(self, assignblk)

def eval_ir(self, assignblk):
    """Deprecated version of eval_updt_assignblk(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir')
    return self.eval_updt_assignblk(assignblk)

def eval_ir_expr(

self, assignblk)

Deprecated version of eval_ir_expr(self, assignblk)

def eval_ir_expr(self, assignblk):
    """Deprecated version of eval_ir_expr(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr')
    return self.eval_assignblk(assignblk).iteritems()

def eval_updt_assignblk(

self, assignblk)

Apply an AssignBlock on the current state @assignblk: AssignBlock instance

def eval_updt_assignblk(self, assignblk):
    """
    Apply an AssignBlock on the current state
    @assignblk: AssignBlock instance
    """
    mem_dst = []
    dst_src = self.eval_assignblk(assignblk)
    for dst, src in dst_src.iteritems():
        self.apply_change(dst, src)
        if dst.is_mem():
            mem_dst.append(dst)
    return mem_dst

def eval_updt_expr(

self, expr)

Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value

def eval_updt_expr(self, expr):
    """
    Evaluate @expr and apply side effect if needed (ie. if expr is an
    assignment). Return the evaluated value
    """
    # Update value if needed
    if expr.is_aff():
        ret = self.eval_expr(expr.src)
        self.eval_updt_assignblk(AssignBlock([expr]))
    else:
        ret = self.eval_expr(expr)
    return ret

def eval_updt_irblock(

self, irb, step=False)

Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps

def eval_updt_irblock(self, irb, step=False):
    """
    Symbolic execution of the @irb on the current state
    @irb: irbloc instance
    @step: display intermediate steps
    """
    for assignblk in irb:
        if step:
            print 'Instr', assignblk.instr
            print 'Assignblk:'
            print assignblk
            print '_' * 80
        self.eval_updt_assignblk(assignblk)
        if step:
            self.dump(mems=False)
            self.dump(ids=False)
            print '_' * 80
    dst = self.eval_expr(self.ir_arch.IRDst)
    return dst

def get_state(

self)

Return the current state of the SymbolicEngine

def get_state(self):
    """Return the current state of the SymbolicEngine"""
    return self.StateEngine(self.symbols)

def mem_read(

self, expr)

[DEV]: Override to modify the effective memory reads

Read symbolic value at ExprMem @expr @expr: ExprMem

def mem_read(self, expr):
    """
    [DEV]: Override to modify the effective memory reads
    Read symbolic value at ExprMem @expr
    @expr: ExprMem
    """
    parts = self._resolve_mem_parts(expr)
    out = []
    for known, part in parts:
        if not known and part.is_mem() and self.func_read is not None:
            ret = self.func_read(part)
        else:
            ret = part
        out.append(ret)
    ret = self.expr_simp(ExprCompose(*out))
    assert ret.size == expr.size
    return ret

def mem_write(

self, dst, src)

[DEV]: Override to modify the effective memory writes

Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression

def mem_write(self, dst, src):
    """
    [DEV]: Override to modify the effective memory writes
    Write symbolic value @src at ExprMem @dst
    @dst: destination ExprMem
    @src: source Expression
    """
    if self.func_write is not None:
        self.func_write(self, dst, src)
    else:
        self.symbols.write(dst, src)

def modified(

self, init_state=None, ids=True, mems=True)

Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only

def modified(self, init_state=None, ids=True, mems=True):
    """
    Return the modified variables.
    @init_state: a base dictionary linking variables to their initial values
    to diff. Can be None.
    @ids: track ids only
    @mems: track mems only
    """
    if init_state is None:
        init_state = {}
    if ids:
        for variable, value in self.symbols.symbols_id.iteritems():
            if variable in init_state and init_state[variable] == value:
                continue
            yield variable, value
    if mems:
        for mem, value in self.symbols.memory():
            if mem in init_state and init_state[mem] == value:
                continue
            yield mem, value

def modified_mems(

self, init_state=None)

Deprecated version of modified(ids=False)

def modified_mems(self, init_state=None):
    """Deprecated version of modified(ids=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems')
    for mem in self.modified(init_state=init_state, ids=False):
        yield mem

def modified_regs(

self, init_state=None)

Deprecated version of modified(mems=False)

def modified_regs(self, init_state=None):
    """Deprecated version of modified(mems=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs')
    for reg in self.modified(init_state=init_state, mems=False):
        yield reg

def run_at(

self, ircfg, addr, lbl_stop=None, step=False)

Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps

def run_at(self, ircfg, addr, lbl_stop=None, step=False):
    """
    Symbolic execution starting at @addr
    @addr: address to execute (int or ExprInt or label)
    @lbl_stop: LocKey to stop execution on
    @step: display intermediate steps
    """
    while True:
        irblock = ircfg.get_block(addr)
        if irblock is None:
            break
        if irblock.loc_key == lbl_stop:
            break
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def run_block_at(

self, ircfg, addr, step=False)

Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps

def run_block_at(self, ircfg, addr, step=False):
    """
    Symbolic execution of the block at @addr
    @addr: address to execute (int or ExprInt or label)
    @step: display intermediate steps
    """
    irblock = ircfg.get_block(addr)
    if irblock is not None:
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def set_state(

self, state)

Restaure the @state of the engine @state: StateEngine instance

def set_state(self, state):
    """Restaure the @state of the engine
    @state: StateEngine instance
    """
    self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp)
    for dst, src in dict(state).iteritems():
        self.symbols[dst] = src

class SymbolicStateCTypes

Store C types of symbols

class SymbolicStateCTypes(StateEngine):
    """Store C types of symbols"""

    def __init__(self, symbols):
        tmp = {}
        for expr, types in symbols.iteritems():
            tmp[expr] = frozenset(types)
        self._symbols = frozenset(tmp.iteritems())

    def __hash__(self):
        return hash((self.__class__, self._symbols))

    def __str__(self):
        out = []
        for dst, src in sorted(self._symbols):
            out.append("%s = %s" % (dst, src))
        return "\n".join(out)

    def __eq__(self, other):
        if self is other:
            return True
        if self.__class__ != other.__class__:
            return False
        return self.symbols == other.symbols

    def __iter__(self):
        for dst, src in self._symbols:
            yield dst, src

    def merge(self, other):
        """Merge two symbolic states
        The resulting types are the union of types of both states.
        @other: second symbolic state
        """
        symb_a = self.symbols
        symb_b = other.symbols
        symbols = {}
        for expr in set(symb_a).union(set(symb_b)):
            ctypes = symb_a.get(expr, set()).union(symb_b.get(expr, set()))
            if ctypes:
                symbols[expr] = ctypes
        return self.__class__(symbols)

    @property
    def symbols(self):
        """Return the dictionnary of known symbols'types"""
        return dict(self._symbols)

Ancestors (in MRO)

Instance variables

var symbols

Return the dictionnary of known symbols'types

Methods

def __init__(

self, symbols)

def __init__(self, symbols):
    tmp = {}
    for expr, types in symbols.iteritems():
        tmp[expr] = frozenset(types)
    self._symbols = frozenset(tmp.iteritems())

def merge(

self, other)

Merge two symbolic states The resulting types are the union of types of both states. @other: second symbolic state

def merge(self, other):
    """Merge two symbolic states
    The resulting types are the union of types of both states.
    @other: second symbolic state
    """
    symb_a = self.symbols
    symb_b = other.symbols
    symbols = {}
    for expr in set(symb_a).union(set(symb_b)):
        ctypes = symb_a.get(expr, set()).union(symb_b.get(expr, set()))
        if ctypes:
            symbols[expr] = ctypes
    return self.__class__(symbols)