Top

miasm2.ir.symbexec module

import warnings
import logging
from collections import MutableMapping

from miasm2.expression.expression import ExprOp, ExprId, ExprLoc, ExprInt, \
    ExprMem, ExprCompose, ExprSlice, ExprCond
from miasm2.expression.simplifications import expr_simp_explicit
from miasm2.ir.ir import AssignBlock

log = logging.getLogger("symbexec")
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s"))
log.addHandler(console_handler)
log.setLevel(logging.INFO)


def get_block(ir_arch, ircfg, mdis, addr):
    """Get IRBlock at address @addr"""
    loc_key = ircfg.get_or_create_loc_key(addr)
    if not loc_key in ircfg.blocks:
        offset = mdis.loc_db.get_location_offset(loc_key)
        block = mdis.dis_block(offset)
        ir_arch.add_asmblock_to_ircfg(block, ircfg)
    irblock = ircfg.get_block(loc_key)
    if irblock is None:
        raise LookupError('No block found at that address: %s' % ir_arch.loc_db.pretty_str(loc_key))
    return irblock


class StateEngine(object):
    """Stores an Engine state"""

    def merge(self, other):
        """Generate a new state, representing the merge of self and @other
        @other: a StateEngine instance"""

        raise NotImplementedError("Abstract method")


class SymbolicState(StateEngine):
    """Stores a SymbolicExecutionEngine state"""

    def __init__(self, dct):
        self._symbols = frozenset(dct.items())

    def __hash__(self):
        return hash((self.__class__, self._symbols))

    def __eq__(self, other):
        if self is other:
            return True
        if self.__class__ != other.__class__:
            return False
        return self.symbols == other.symbols

    def __ne__(self, other):
        return not self.__eq__(other)

    def __iter__(self):
        for dst, src in self._symbols:
            yield dst, src

    def iteritems(self):
        """Iterate on stored memory/values"""
        return self.__iter__()

    def merge(self, other):
        """Merge two symbolic states
        Only equal expressions are kept in both states
        @other: second symbolic state
        """

        symb_a = self.symbols
        symb_b = other.symbols
        intersection = set(symb_a.keys()).intersection(symb_b.keys())
        out = {}
        for dst in intersection:
            if symb_a[dst] == symb_b[dst]:
                out[dst] = symb_a[dst]
        return self.__class__(out)

    @property
    def symbols(self):
        """Return the dictionnary of known symbols"""
        return dict(self._symbols)


INTERNAL_INTBASE_NAME = "__INTERNAL_INTBASE__"


def get_expr_base_offset(expr):
    """Return a couple representing the symbolic/concrete part of an addition
    expression.

    If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used
    If there is not concrete part, 0 is used
    @expr: Expression instance

    """
    if expr.is_int():
        internal_intbase = ExprId(INTERNAL_INTBASE_NAME, expr.size)
        return internal_intbase, int(expr)

    if not expr.is_op('+'):
        return expr, 0
    if expr.args[-1].is_int():
        args, offset = expr.args[:-1], int(expr.args[-1])
        if len(args) == 1:
            return args[0], offset
        return ExprOp('+', *args), offset
    return expr, 0


class MemArray(MutableMapping):
    """Link between base and its (offset, Expr)

    Given an expression (say *base*), this structure will store every memory
    content relatively to an integer offset from *base*.

    The value associated to a given offset is a description of the slice of a
    stored expression. The slice size depends on the configutation of the
    MemArray. For example, for a slice size of 8 bits, the affectation:
    - @32[EAX+0x10] = EBX

    will store for the base EAX:
    - 0x10: (EBX, 0)
    - 0x11: (EBX, 1)
    - 0x12: (EBX, 2)
    - 0x13: (EBX, 3)

    If the *base* is EAX+EBX, this structure can store the following contents:
    - @32[EAX+EBX]
    - @8[EAX+EBX+0x100]
    But not:
    - @32[EAX+0x10] (which is stored in another MemArray based on EAX)
    - @32[EAX+EBX+ECX] (which is stored in another MemArray based on
      EAX+EBX+ECX)

    """

    def __init__(self, base, expr_simp=expr_simp_explicit):
        self._base = base
        self.expr_simp = expr_simp
        self._mask = int(base.mask)
        self._offset_to_expr = {}

    @property
    def base(self):
        """Expression representing the symbolic base address"""
        return self._base

    @property
    def mask(self):
        """Mask offset"""
        return self._mask

    def __getitem__(self, offset):
        assert 0 <= offset <= self._mask
        return self._offset_to_expr.__getitem__(offset)

    def __setitem__(self, offset, value):
        raise RuntimeError("Use write api to update keys")

    def __delitem__(self, offset):
        assert 0 <= offset <= self._mask
        return self._offset_to_expr.__delitem__(offset)

    def __iter__(self):
        for offset, _ in self._offset_to_expr.iteritems():
            yield offset

    def __len__(self):
        return len(self._offset_to_expr)

    def __repr__(self):
        out = []
        out.append("Base: %s" % self.base)
        for offset, (index, value) in sorted(self._offset_to_expr.iteritems()):
            out.append("%16X %d %s" % (offset, index, value))
        return '\n'.join(out)

    def copy(self):
        """Copy object instance"""
        obj = MemArray(self.base, self.expr_simp)
        obj._offset_to_expr = self._offset_to_expr.copy()
        return obj

    @staticmethod
    def offset_to_ptr(base, offset):
        """
        Return an expression representing the @base + @offset
        @base: symbolic base address
        @offset: relative offset integer to the @base address
        """
        if base.is_id(INTERNAL_INTBASE_NAME):
            ptr = ExprInt(offset, base.size)
        elif offset == 0:
            ptr = base
        else:
            ptr = base + ExprInt(offset, base.size)
        return ptr.canonize()

    def read(self, offset, size):
        """
        Return memory at @offset with @size as an Expr list
        @offset: integer (in bytes)
        @size: integer (in bits), byte aligned

        Consider the following state:
        - 0x10: (EBX, 0)
        - 0x11: (EBX, 1)
        - 0x12: (EBX, 2)
        - 0x13: (EBX, 3)

        A read at 0x10 of 32 bits should return: EBX
        """

        assert size % 8 == 0
        # Parts is (Expr's offset, size, Expr)
        parts = []
        for index in xrange(size / 8):
            # Wrap read:
            # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2
            request_offset = (offset + index) & self._mask
            if request_offset in self._offset_to_expr:
                # Known memory portion
                off, data = self._offset_to_expr[request_offset]
                parts.append((off, 1, data))
                continue

            # Unknown memory portion
            ptr = self.offset_to_ptr(self.base, request_offset)
            data = ExprMem(ptr, 8)
            parts.append((0, 1, data))

        # Group similar datas
        # XXX TODO: only little endian here
        index = 0
        while index + 1 < len(parts):
            off_a, size_a, data_a = parts[index]
            off_b, size_b, data_b = parts[index+1]
            if data_a == data_b and off_a + size_a == off_b:
                # Read consecutive bytes of a variable
                # [(0, 8, x), (1, 8, x)] => (0, 16, x)
                parts[index:index+2] = [(off_a, size_a + size_b, data_a)]
                continue
            if data_a.is_int() and data_b.is_int():
                # Read integer parts
                # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744)
                int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8])
                int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8])
                assert int1.is_int() and int2.is_int()
                int1, int2 = int(int1), int(int2)
                result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8)
                parts[index:index+2] = [(0, size_a + size_b, result)]
                continue
            if data_a.is_mem() and data_b.is_mem():
                # Read consecutive bytes of a memory variable
                ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.arg)
                ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.arg)
                if ptr_base_a != ptr_base_b:
                    index += 1
                    continue
                if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask:
                    assert size_a <= data_a.size / 8 - off_a
                    assert size_b <= data_b.size / 8 - off_b
                    # Successive comparable symbolic pointers
                    # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr])
                    ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask)

                    data = ExprMem(ptr, (size_a + size_b) * 8)
                    parts[index:index+2] = [(0, size_a + size_b, data)]

                    continue

            index += 1

        # Slice datas
        read_mem = []
        for off, bytesize, data in parts:
            if data.size / 8 != bytesize:
                data = data[off * 8: (off + bytesize) * 8]
            read_mem.append(data)

        return read_mem

    def write(self, offset, expr):
        """
        Write @expr at @offset
        @offset: integer (in bytes)
        @expr: Expr instance value
        """
        assert expr.size % 8 == 0
        assert offset <= self._mask
        for index in xrange(expr.size / 8):
            # Wrap write:
            # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2
            request_offset = (offset + index) & self._mask
            # XXX TODO: only little endian here
            self._offset_to_expr[request_offset] = (index, expr)

            tmp = self.expr_simp(expr[index * 8: (index + 1) * 8])
            # Special case: Simplify slice of pointer (simplification is ok
            # here, as we won't store the simplified expression)
            if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0:
                new_ptr = self.expr_simp(tmp.arg.arg + ExprInt(tmp.start / 8, tmp.arg.arg.size))
                tmp = ExprMem(new_ptr, tmp.stop - tmp.start)
            # Test if write to original value
            if tmp.is_mem():
                src_ptr, src_off = get_expr_base_offset(tmp.arg)
                if src_ptr == self.base and src_off == request_offset:
                    del self._offset_to_expr[request_offset]


    def _get_variable_parts(self, index, known_offsets, forward=True):
        """
        Find consecutive memory parts representing the same variable. The part
        starts at offset known_offsets[@index] and search is in offset direction
        determined by @forward
        Return the number of consecutive parts of the same variable.

        @index: index of the memory offset in known_offsets
        @known_offsets: sorted offsets
        @forward: Search in offset growing direction if True, else in reverse
        order
        """

        offset = known_offsets[index]
        value_byte_index, value = self._offset_to_expr[offset]
        assert value.size % 8 == 0

        if forward:
            start, end, step = value_byte_index + 1, value.size / 8, 1
        else:
            start, end, step = value_byte_index - 1, -1, -1

        partnum = 1
        for value_offset in xrange(start, end, step):
            offset += step
            # Check if next part is in known_offsets
            next_index = index + step * partnum
            if not 0 <= next_index < len(known_offsets):
                break

            offset_next = known_offsets[next_index]
            if offset_next != offset:
                break

            # Check if next part is a part of the searched value
            byte_index, value_next = self._offset_to_expr[offset_next]
            if byte_index != value_offset:
                break
            if value != value_next:
                break
            partnum += 1

        return partnum


    def _build_value_at_offset(self, value, offset, start, length):
        """
        Return a couple. The first element is the memory Expression representing
        the value at @offset, the second is its value.  The value is truncated
        at byte @start with @length

        @value: Expression to truncate
        @offset: offset in bytes of the variable (integer)
        @start: value's byte offset (integer)
        @length: length in bytes (integer)
        """

        ptr = self.offset_to_ptr(self.base, offset)
        size = length * 8
        if start == 0 and size == value.size:
            result = value
        else:
            result = self.expr_simp(value[start * 8: start * 8 + size])

        return ExprMem(ptr, size), result


    def memory(self):
        """
        Iterate on stored memory/values

        The goal here is to group entities.

        Consider the following state:
        EAX + 0x10 = (0, EDX)
        EAX + 0x11 = (1, EDX)
        EAX + 0x12 = (2, EDX)
        EAX + 0x13 = (3, EDX)

        The function should return:
        @32[EAX + 0x10] = EDX
        """

        if not self._offset_to_expr:
            raise StopIteration
        known_offsets = self._offset_to_expr.keys()
        known_offsets.sort()
        index = 0
        # Test if the first element is the continuation of the last byte. If
        # yes, merge and output it first.
        min_int = 0
        max_int = (1 << self.base.size) - 1
        limit_index = len(known_offsets)

        first_element = None
        # Special case where a variable spreads on max_int/min_int
        if known_offsets[0] == min_int and known_offsets[-1] == max_int:
            min_offset, max_offset = known_offsets[0], known_offsets[-1]
            min_byte_index, min_value = self._offset_to_expr[min_offset]
            max_byte_index, max_value = self._offset_to_expr[max_offset]
            if min_value == max_value and max_byte_index + 1 == min_byte_index:
                # Look for current variable start
                partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False)
                # Look for current variable end
                partnum_after = self._get_variable_parts(0, known_offsets)

                partnum = partnum_before + partnum_after
                offset = known_offsets[-partnum_before]
                index_value, value = self._offset_to_expr[offset]

                mem, result = self._build_value_at_offset(value, offset, index_value, partnum)
                first_element = mem, result
                index = partnum_after
                limit_index = len(known_offsets) - partnum_before

        # Special cases are done, walk and merge variables
        while index < limit_index:
            offset = known_offsets[index]
            index_value, value = self._offset_to_expr[offset]
            partnum = self._get_variable_parts(index, known_offsets)
            mem, result = self._build_value_at_offset(value, offset, index_value, partnum)
            yield mem, result
            index += partnum

        if first_element is not None:
            yield first_element

    def dump(self):
        """Display MemArray content"""
        for mem, value in self.memory():
            print "%s = %s" % (mem, value)


class MemSparse(object):
    """Link a symbolic memory pointer to its MemArray.

    For each symbolic memory object, this object will extract the memory pointer
    *ptr*. It then splits *ptr* into a symbolic and an integer part. For
    example, the memory @[ESP+4] will give ESP+4 for *ptr*. *ptr* is then split
    into its base ESP and its offset 4. Each symbolic base address uses a
    different MemArray.

    Example:
    - @32[EAX+EBX]
    - @8[EAX+EBX+0x100]
    Will be stored in the same MemArray with a EAX+EBX base

    """

    def __init__(self, addrsize, expr_simp=expr_simp_explicit):
        """
        @addrsize: size (in bits) of the addresses manipulated by the MemSparse
        @expr_simp: an ExpressionSimplifier instance
        """
        self.addrsize = addrsize
        self.expr_simp = expr_simp
        self.base_to_memarray = {}

    def __contains__(self, expr):
        """
        Return True if the whole @expr is present
        For partial check, use 'contains_partial'
        """
        if not expr.is_mem():
            return False
        ptr = expr.arg
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            return False
        for i in xrange(expr.size / 8):
            if offset + i not in memarray:
                return False
        return True

    def contains_partial(self, expr):
        """
        Return True if a part of @expr is present in memory
        """
        if not expr.is_mem():
            return False
        ptr = expr.arg
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            return False
        for i in xrange(expr.size / 8):
            if offset + i in memarray:
                return True
        return False

    def clear(self):
        """Reset the current object content"""
        self.base_to_memarray.clear()

    def copy(self):
        """Copy the current object instance"""
        base_to_memarray = {}
        for base, memarray in self.base_to_memarray.iteritems():
            base_to_memarray[base] = memarray.copy()
        obj = MemSparse(self.addrsize, self.expr_simp)
        obj.base_to_memarray = base_to_memarray
        return obj

    def __delitem__(self, expr):
        """
        Delete a value @expr *fully* present in memory
        For partial delete, use delete_partial
        """
        ptr = expr.arg
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            raise KeyError
        # Check if whole entity is in the MemArray before deleting it
        for i in xrange(expr.size / 8):
            if (offset + i) & memarray.mask not in memarray:
                raise KeyError
        for i in xrange(expr.size / 8):
            del memarray[(offset + i) & memarray.mask]

    def delete_partial(self, expr):
        """
        Delete @expr from memory. Skip parts of @expr which are not present in
        memory.
        """
        ptr = expr.arg
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            raise KeyError
        # Check if whole entity is in the MemArray before deleting it
        for i in xrange(expr.size / 8):
            real_offset = (offset + i) & memarray.mask
            if real_offset in memarray:
                del memarray[real_offset]

    def read(self, ptr, size):
        """
        Return the value associated with the Expr at address @ptr
        @ptr: Expr representing the memory address
        @size: memory size (in bits), byte aligned
        """
        assert size % 8 == 0
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is not None:
            mems = memarray.read(offset, size)
            ret = ExprCompose(*mems)
        else:
            ret = ExprMem(ptr, size)
        return ret

    def write(self, ptr, expr):
        """
        Update the corresponding Expr @expr at address @ptr
        @ptr: Expr representing the memory address
        @expr: Expr instance
        """
        assert ptr.size == self.addrsize
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            memarray = MemArray(base, self.expr_simp)
            self.base_to_memarray[base] = memarray
        memarray.write(offset, expr)

    def iteritems(self):
        """Iterate on stored memory variables and their values."""
        for _, memarray in sorted(self.base_to_memarray.iteritems()):
            for mem, value in memarray.memory():
                yield mem, value

    def items(self):
        """Return stored memory variables and their values."""
        return list(self.iteritems())

    def dump(self):
        """Display MemSparse content"""
        for mem, value in self.iteritems():
            print "%s = %s" % (mem, value)

    def __repr__(self):
        out = []
        for _, memarray in sorted(self.base_to_memarray.iteritems()):
            out.append(repr(memarray))
        return '\n'.join(out)


class SymbolMngr(object):
    """Symbolic store manager (IDs and MEMs)"""

    def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit):
        assert addrsize is not None
        if init is None:
            init = {}
        self.addrsize = addrsize
        self.expr_simp = expr_simp
        self.symbols_id = {}
        self.symbols_mem = MemSparse(addrsize, expr_simp)
        self.mask = (1 << addrsize) - 1
        for expr, value in init.iteritems():
            self.write(expr, value)

    def __contains__(self, expr):
        if expr.is_id():
            return self.symbols_id.__contains__(expr)
        if expr.is_mem():
            return self.symbols_mem.__contains__(expr)
        return False

    def __getitem__(self, expr):
        return self.read(expr)

    def __setitem__(self, expr, value):
        self.write(expr, value)

    def __delitem__(self, expr):
        if expr.is_id():
            del self.symbols_id[expr]
        elif expr.is_mem():
            del self.symbols_mem[expr]
        else:
            raise TypeError("Bad source expr")

    def copy(self):
        """Copy object instance"""
        obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp)
        return obj

    def clear(self):
        """Forget every variables values"""
        self.symbols_id.clear()
        self.symbols_mem.clear()

    def read(self, src):
        """
        Return the value corresponding to Expr @src
        @src: ExprId or ExprMem instance
        """
        if src.is_id():
            return self.symbols_id.get(src, src)
        elif src.is_mem():
            # Only byte aligned accesses are supported for now
            assert src.size % 8 == 0
            return self.symbols_mem.read(src.arg, src.size)
        else:
            raise TypeError("Bad source expr")

    def write(self, dst, src):
        """
        Update @dst with @src expression
        @dst: ExprId or ExprMem instance
        @src: Expression instance
        """
        assert dst.size == src.size
        if dst.is_id():
            if dst == src:
                if dst in self.symbols_id:
                    del self.symbols_id[dst]
            else:
                self.symbols_id[dst] = src
        elif dst.is_mem():
            # Only byte aligned accesses are supported for now
            assert dst.size % 8 == 0
            self.symbols_mem.write(dst.arg, src)
        else:
            raise TypeError("Bad destination expr")

    def dump(self, ids=True, mems=True):
        """Display memory content"""
        if ids:
            for variable, value in self.ids():
                print '%s = %s' % (variable, value)
        if mems:
            for mem, value in self.memory():
                print '%s = %s' % (mem, value)

    def __repr__(self):
        out = []
        for variable, value in self.iteritems():
            out.append('%s = %s' % (variable, value))
        return "\n".join(out)

    def iteritems(self):
        """ExprId/ExprMem iteritems of the current state"""
        for variable, value in self.ids():
            yield variable, value
        for variable, value in self.memory():
            yield variable, value

    def items(self):
        """Return variables/values of the current state"""
        return list(self.iteritems())

    def __iter__(self):
        for expr, _ in self.iteritems():
            yield expr

    def ids(self):
        """Iterate on variables and their values."""
        for expr, value in self.symbols_id.iteritems():
            yield expr, value

    def memory(self):
        """Iterate on memory variables and their values."""
        for mem, value in self.symbols_mem.iteritems():
            yield mem, value

    def keys(self):
        """Variables of the current state"""
        return list(self)

    def get(self, expr, default=None):
        """Deprecated version of read"""
        warnings.warn('DEPRECATION WARNING: use "read(self, expr)" instead of get')
        ret = self.read(expr)
        if default is not None and ret == expr:
            return default
        return ret


def merge_ptr_read(known, ptrs):
    """
    Merge common memory parts in a multiple byte memory.
    @ptrs: memory bytes list
    @known: ptrs' associated boolean for present/unpresent memory part in the
    store
    """
    assert known
    out = []
    known.append(None)
    ptrs.append(None)
    last, value, size = known[0], ptrs[0], 8
    for index, part in enumerate(known[1:], 1):
        if part == last:
            size += 8
        else:
            out.append((last, value, size))
            last, value, size = part, ptrs[index], 8
    return out


class SymbolicExecutionEngine(object):
    """
    Symbolic execution engine
    Allow IR code emulation in symbolic domain


    Examples:
        from miasm2.ir.symbexec import SymbolicExecutionEngine
        from miasm2.ir.ir import AssignBlock

        ir_arch = ir_x86_32()

        init_state = {
            ir_arch.arch.regs.EAX: ir_arch.arch.regs.EBX,
            ExprMem(id_x+ExprInt(0x10, 32), 32): id_a,
        }

        sb_exec = SymbolicExecutionEngine(ir_arch, init_state)

        >>> sb_exec.dump()
        EAX                = a
        @32[x + 0x10]      = a
        >>> sb_exec.dump(mems=False)
        EAX                = a

        >>> print sb_exec.eval_expr(ir_arch.arch.regs.EAX + ir_arch.arch.regs.ECX)
        EBX + ECX

    Inspecting state:
        - dump
        - modified
    State manipulation:
        - '.state' (rw)

    Evaluation (read only):
        - eval_expr
        - eval_assignblk
    Evaluation with state update:
        - eval_updt_expr
        - eval_updt_assignblk
        - eval_updt_irblock

    Start a symbolic execution based on provisioned '.ir_arch' blocks:
        - run_block_at
        - run_at
    """

    StateEngine = SymbolicState

    def __init__(self, ir_arch, state=None,
                 func_read=None,
                 func_write=None,
                 sb_expr_simp=expr_simp_explicit):

        self.expr_to_visitor = {
            ExprInt: self.eval_exprint,
            ExprId: self.eval_exprid,
            ExprLoc: self.eval_exprloc,
            ExprMem: self.eval_exprmem,
            ExprSlice: self.eval_exprslice,
            ExprCond: self.eval_exprcond,
            ExprOp: self.eval_exprop,
            ExprCompose: self.eval_exprcompose,
        }

        if state is None:
            state = {}

        self.symbols = SymbolMngr(addrsize=ir_arch.addrsize, expr_simp=sb_expr_simp)

        for dst, src in state.iteritems():
            self.symbols.write(dst, src)

        if func_read:
            warnings.warn('DEPRECATION WARNING: override function "mem_read(self, expr)" instead of func_read')
        if func_write:
            warnings.warn('DEPRECATION WARNING: override function "mem_write(self, dsr, src)" instead of func_write')

        self.func_read = func_read
        self.func_write = func_write
        self.ir_arch = ir_arch
        self.expr_simp = sb_expr_simp

    def get_state(self):
        """Return the current state of the SymbolicEngine"""
        state = self.StateEngine(dict(self.symbols))
        return state

    def set_state(self, state):
        """Restaure the @state of the engine
        @state: StateEngine instance
        """
        self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp)
        for dst, src in dict(state).iteritems():
            self.symbols[dst] = src

    state = property(get_state, set_state)

    def eval_expr_visitor(self, expr, cache=None):
        """
        [DEV]: Override to change the behavior of an Expr evaluation.
        This function recursively applies 'eval_expr*' to @expr.
        This function uses @cache to speedup re-evaluation of expression.
        """
        if cache is None:
            cache = {}

        ret = cache.get(expr, None)
        if ret is not None:
            return ret

        new_expr = self.expr_simp(expr)
        ret = cache.get(new_expr, None)
        if ret is not None:
            return ret

        func = self.expr_to_visitor.get(new_expr.__class__, None)
        if func is None:
            raise TypeError("Unknown expr type")

        ret = func(new_expr, cache=cache)
        ret = self.expr_simp(ret)
        assert ret is not None

        cache[expr] = ret
        cache[new_expr] = ret
        return ret

    def eval_exprint(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprInt using the current state"""
        return expr

    def eval_exprid(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprId using the current state"""
        ret = self.symbols.read(expr)
        return ret

    def eval_exprloc(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprLoc using the current state"""
        offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key)
        if offset is not None:
            ret = ExprInt(offset, expr.size)
        else:
            ret = expr
        return ret

    def eval_exprmem(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprMem using the current state
        This function first evaluate the memory pointer value.
        Override 'mem_read' to modify the effective memory accesses
        """
        ptr = self.eval_expr_visitor(expr.arg, **kwargs)
        mem = ExprMem(ptr, expr.size)
        ret = self.mem_read(mem)
        return ret

    def eval_exprcond(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprCond using the current state"""
        cond = self.eval_expr_visitor(expr.cond, **kwargs)
        src1 = self.eval_expr_visitor(expr.src1, **kwargs)
        src2 = self.eval_expr_visitor(expr.src2, **kwargs)
        ret = ExprCond(cond, src1, src2)
        return ret

    def eval_exprslice(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprSlice using the current state"""
        arg = self.eval_expr_visitor(expr.arg, **kwargs)
        ret = ExprSlice(arg, expr.start, expr.stop)
        return ret

    def eval_exprop(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprOp using the current state"""
        args = []
        for oarg in expr.args:
            arg = self.eval_expr_visitor(oarg, **kwargs)
            args.append(arg)
        ret = ExprOp(expr.op, *args)
        return ret

    def eval_exprcompose(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprCompose using the current state"""
        args = []
        for arg in expr.args:
            args.append(self.eval_expr_visitor(arg, **kwargs))
        ret = ExprCompose(*args)
        return ret

    def eval_expr(self, expr, eval_cache=None):
        """
        Evaluate @expr
        @expr: Expresion instance to evaluate
        @cache: None or dictionary linking variables to their values
        """
        if eval_cache is None:
            eval_cache = {}
        ret = self.eval_expr_visitor(expr, cache=eval_cache)
        assert ret is not None
        return ret

    def modified(self, init_state=None, ids=True, mems=True):
        """
        Return the modified variables.
        @init_state: a base dictionary linking variables to their initial values
        to diff. Can be None.
        @ids: track ids only
        @mems: track mems only
        """
        if init_state is None:
            init_state = {}
        if ids:
            for variable, value in self.symbols.symbols_id.iteritems():
                if variable in init_state and init_state[variable] == value:
                    continue
                yield variable, value
        if mems:
            for mem, value in self.symbols.memory():
                if mem in init_state and init_state[mem] == value:
                    continue
                yield mem, value

    def dump(self, ids=True, mems=True):
        """
        Display modififed variables
        @ids: display modified ids
        @mems: display modified memory
        """

        for variable, value in self.modified(None, ids, mems):
            print "%-18s" % variable, "=", "%s" % value

    def eval_assignblk(self, assignblk):
        """
        Evaluate AssignBlock using the current state

        Returns a dictionary containing modified keys associated to their values

        @assignblk: AssignBlock instance
        """
        pool_out = {}
        eval_cache = {}
        for dst, src in assignblk.iteritems():
            src = self.eval_expr(src, eval_cache)
            if dst.is_mem():
                ptr = self.eval_expr(dst.arg, eval_cache)
                # Test if mem lookup is known
                tmp = ExprMem(ptr, dst.size)
                pool_out[tmp] = src
            elif dst.is_id():
                pool_out[dst] = src
            else:
                raise ValueError("Unknown destination type", str(dst))

        return pool_out

    def apply_change(self, dst, src):
        """
        Apply @dst = @src on the current state WITHOUT evaluating both side
        @dst: Expr, destination
        @src: Expr, source
        """
        if dst.is_mem():
            self.mem_write(dst, src)
        else:
            self.symbols.write(dst, src)

    def eval_updt_assignblk(self, assignblk):
        """
        Apply an AssignBlock on the current state
        @assignblk: AssignBlock instance
        """
        mem_dst = []
        dst_src = self.eval_assignblk(assignblk)
        for dst, src in dst_src.iteritems():
            self.apply_change(dst, src)
            if dst.is_mem():
                mem_dst.append(dst)
        return mem_dst

    def eval_updt_irblock(self, irb, step=False):
        """
        Symbolic execution of the @irb on the current state
        @irb: irbloc instance
        @step: display intermediate steps
        """
        for assignblk in irb:
            if step:
                print 'Instr', assignblk.instr
                print 'Assignblk:'
                print assignblk
                print '_' * 80
            self.eval_updt_assignblk(assignblk)
            if step:
                self.dump(mems=False)
                self.dump(ids=False)
                print '_' * 80
        dst = self.eval_expr(self.ir_arch.IRDst)

        return dst

    def run_block_at(self, ircfg, addr, step=False):
        """
        Symbolic execution of the block at @addr
        @addr: address to execute (int or ExprInt or label)
        @step: display intermediate steps
        """
        irblock = ircfg.get_block(addr)
        if irblock is not None:
            addr = self.eval_updt_irblock(irblock, step=step)
        return addr

    def run_at(self, ircfg, addr, lbl_stop=None, step=False):
        """
        Symbolic execution starting at @addr
        @addr: address to execute (int or ExprInt or label)
        @lbl_stop: LocKey to stop execution on
        @step: display intermediate steps
        """
        while True:
            irblock = ircfg.get_block(addr)
            if irblock is None:
                break
            if irblock.loc_key == lbl_stop:
                break
            addr = self.eval_updt_irblock(irblock, step=step)
        return addr

    def del_mem_above_stack(self, stack_ptr):
        """
        Remove all stored memory values with following properties:
        * pointer based on initial stack value
        * pointer below current stack pointer
        """
        stack_ptr = self.eval_expr(stack_ptr)
        base, stk_offset = get_expr_base_offset(stack_ptr)
        memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None)
        if memarray:
            to_del = set()
            for offset in memarray:
                if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0:
                    to_del.add(offset)

            for offset in to_del:
                del memarray[offset]

    def eval_updt_expr(self, expr):
        """
        Evaluate @expr and apply side effect if needed (ie. if expr is an
        assignment). Return the evaluated value
        """

        # Update value if needed
        if expr.is_aff():
            ret = self.eval_expr(expr.src)
            self.eval_updt_assignblk(AssignBlock([expr]))
        else:
            ret = self.eval_expr(expr)

        return ret

    def _resolve_mem_parts(self, expr):
        """For a given ExprMem @expr, get known/unknown parts from the store.
        @expr: ExprMem instance

        Return a list of (known, value) where known is a bool representing if
        the value has been resolved from the store or not.
        """

        # Extract known parts in symbols
        assert expr.size % 8 == 0
        ptr = expr.arg
        known = []
        ptrs = []
        for index in xrange(expr.size / 8):
            offset = self.expr_simp(ptr + ExprInt(index, ptr.size))
            ptrs.append(offset)
            mem = ExprMem(offset, 8)
            known.append(mem in self.symbols)

        reads = merge_ptr_read(known, ptrs)
        out = []
        for is_known, ptr_value, size in reads:
            mem = ExprMem(ptr_value, size)
            if is_known:
                mem = self.symbols.read(mem)
            out.append((is_known, mem))
        return out

    def mem_read(self, expr):
        """
        [DEV]: Override to modify the effective memory reads

        Read symbolic value at ExprMem @expr
        @expr: ExprMem
        """

        parts = self._resolve_mem_parts(expr)

        out = []
        for known, part in parts:
            if not known and part.is_mem() and self.func_read is not None:
                ret = self.func_read(part)
            else:
                ret = part

            out.append(ret)
        ret = self.expr_simp(ExprCompose(*out))

        assert ret.size == expr.size
        return ret

    def mem_write(self, dst, src):
        """
        [DEV]: Override to modify the effective memory writes

        Write symbolic value @src at ExprMem @dst
        @dst: destination ExprMem
        @src: source Expression
        """
        if self.func_write is not None:
            self.func_write(self, dst, src)
        else:
            self.symbols.write(dst, src)


    # Deprecated methods

    def apply_expr_on_state(self, expr, cache):
        """Deprecated version of eval_expr"""
        warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state')

        if cache is None:
            cache = {}
        ret = self.eval_expr(expr, eval_cache=cache)
        return ret

    def modified_mems(self, init_state=None):
        """Deprecated version of modified(ids=False)"""
        warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems')
        for mem in self.modified(init_state=init_state, ids=False):
            yield mem

    def modified_regs(self, init_state=None):
        """Deprecated version of modified(mems=False)"""
        warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs')
        for reg in self.modified(init_state=init_state, mems=False):
            yield reg

    def dump_id(self):
        """Deprecated version of dump(mems=False)"""
        warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id')
        self.dump(mems=False)

    def dump_mem(self):
        """Deprecated version of dump(ids=False)"""
        warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem')
        self.dump(ids=False)

    def eval_ir_expr(self, assignblk):
        """Deprecated version of eval_ir_expr(self, assignblk)"""
        warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr')
        return self.eval_assignblk(assignblk).iteritems()

    def eval_ir(self, assignblk):
        """Deprecated version of eval_updt_assignblk(self, assignblk)"""
        warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir')
        return self.eval_updt_assignblk(assignblk)

    def emulbloc(self, irb, step=False):
        """Deprecated version of eval_updt_irblock(self, irb, step=False)"""
        warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc')
        return self.eval_updt_irblock(irb, step)

    def emul_ir_bloc(self, _, addr, step=False):
        """Deprecated version of run_block_at"""
        warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc')
        return self.run_block_at(addr, step)

    def emul_ir_block(self, addr, step=False):
        """Deprecated version of run_block_at"""
        warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block')
        return self.run_block_at(addr, step)

    def emul_ir_blocks(self, addr, lbl_stop=None, step=False):
        """Deprecated version of run_at"""
        warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks')
        return self.run_at(addr, lbl_stop, step)

    def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False):
        """Deprecated version of run_at"""
        warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs')
        return self.run_at(addr, lbl_stop, step)

    def apply_expr(self, expr):
        """Deprecated version of eval_updt_expr"""
        warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr')
        return self.eval_updt_expr(expr)

    def as_assignblock(self):
        """Return the current state as an AssignBlock"""
        warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock')
        out = []
        for dst, src in self.modified(ids=True, mems=True):
            out.append((dst, src))
        return AssignBlock(dict(out))


class symbexec(SymbolicExecutionEngine):
    """
    DEPRECATED object
    Use SymbolicExecutionEngine instead of symbexec
    """

    def __init__(self, ir_arch, known_symbols,
                 func_read=None,
                 func_write=None,
                 sb_expr_simp=expr_simp_explicit):
        warnings.warn("Deprecated API: use SymbolicExecutionEngine")
        super(symbexec, self).__init__(ir_arch, known_symbols,
                                       func_read,
                                       func_write,
                                       sb_expr_simp=sb_expr_simp)

Module variables

var INTERNAL_INTBASE_NAME

var console_handler

var log

Functions

def get_block(

ir_arch, ircfg, mdis, addr)

Get IRBlock at address @addr

def get_block(ir_arch, ircfg, mdis, addr):
    """Get IRBlock at address @addr"""
    loc_key = ircfg.get_or_create_loc_key(addr)
    if not loc_key in ircfg.blocks:
        offset = mdis.loc_db.get_location_offset(loc_key)
        block = mdis.dis_block(offset)
        ir_arch.add_asmblock_to_ircfg(block, ircfg)
    irblock = ircfg.get_block(loc_key)
    if irblock is None:
        raise LookupError('No block found at that address: %s' % ir_arch.loc_db.pretty_str(loc_key))
    return irblock

def get_expr_base_offset(

expr)

Return a couple representing the symbolic/concrete part of an addition expression.

If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used If there is not concrete part, 0 is used @expr: Expression instance

def get_expr_base_offset(expr):
    """Return a couple representing the symbolic/concrete part of an addition
    expression.

    If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used
    If there is not concrete part, 0 is used
    @expr: Expression instance

    """
    if expr.is_int():
        internal_intbase = ExprId(INTERNAL_INTBASE_NAME, expr.size)
        return internal_intbase, int(expr)

    if not expr.is_op('+'):
        return expr, 0
    if expr.args[-1].is_int():
        args, offset = expr.args[:-1], int(expr.args[-1])
        if len(args) == 1:
            return args[0], offset
        return ExprOp('+', *args), offset
    return expr, 0

def merge_ptr_read(

known, ptrs)

Merge common memory parts in a multiple byte memory. @ptrs: memory bytes list @known: ptrs' associated boolean for present/unpresent memory part in the store

def merge_ptr_read(known, ptrs):
    """
    Merge common memory parts in a multiple byte memory.
    @ptrs: memory bytes list
    @known: ptrs' associated boolean for present/unpresent memory part in the
    store
    """
    assert known
    out = []
    known.append(None)
    ptrs.append(None)
    last, value, size = known[0], ptrs[0], 8
    for index, part in enumerate(known[1:], 1):
        if part == last:
            size += 8
        else:
            out.append((last, value, size))
            last, value, size = part, ptrs[index], 8
    return out

Classes

class MemArray

Link between base and its (offset, Expr)

Given an expression (say base), this structure will store every memory content relatively to an integer offset from base.

The value associated to a given offset is a description of the slice of a stored expression. The slice size depends on the configutation of the MemArray. For example, for a slice size of 8 bits, the affectation: - @32[EAX+0x10] = EBX

will store for the base EAX: - 0x10: (EBX, 0) - 0x11: (EBX, 1) - 0x12: (EBX, 2) - 0x13: (EBX, 3)

If the base is EAX+EBX, this structure can store the following contents: - @32[EAX+EBX] - @8[EAX+EBX+0x100] But not: - @32[EAX+0x10] (which is stored in another MemArray based on EAX) - @32[EAX+EBX+ECX] (which is stored in another MemArray based on EAX+EBX+ECX)

class MemArray(MutableMapping):
    """Link between base and its (offset, Expr)

    Given an expression (say *base*), this structure will store every memory
    content relatively to an integer offset from *base*.

    The value associated to a given offset is a description of the slice of a
    stored expression. The slice size depends on the configutation of the
    MemArray. For example, for a slice size of 8 bits, the affectation:
    - @32[EAX+0x10] = EBX

    will store for the base EAX:
    - 0x10: (EBX, 0)
    - 0x11: (EBX, 1)
    - 0x12: (EBX, 2)
    - 0x13: (EBX, 3)

    If the *base* is EAX+EBX, this structure can store the following contents:
    - @32[EAX+EBX]
    - @8[EAX+EBX+0x100]
    But not:
    - @32[EAX+0x10] (which is stored in another MemArray based on EAX)
    - @32[EAX+EBX+ECX] (which is stored in another MemArray based on
      EAX+EBX+ECX)

    """

    def __init__(self, base, expr_simp=expr_simp_explicit):
        self._base = base
        self.expr_simp = expr_simp
        self._mask = int(base.mask)
        self._offset_to_expr = {}

    @property
    def base(self):
        """Expression representing the symbolic base address"""
        return self._base

    @property
    def mask(self):
        """Mask offset"""
        return self._mask

    def __getitem__(self, offset):
        assert 0 <= offset <= self._mask
        return self._offset_to_expr.__getitem__(offset)

    def __setitem__(self, offset, value):
        raise RuntimeError("Use write api to update keys")

    def __delitem__(self, offset):
        assert 0 <= offset <= self._mask
        return self._offset_to_expr.__delitem__(offset)

    def __iter__(self):
        for offset, _ in self._offset_to_expr.iteritems():
            yield offset

    def __len__(self):
        return len(self._offset_to_expr)

    def __repr__(self):
        out = []
        out.append("Base: %s" % self.base)
        for offset, (index, value) in sorted(self._offset_to_expr.iteritems()):
            out.append("%16X %d %s" % (offset, index, value))
        return '\n'.join(out)

    def copy(self):
        """Copy object instance"""
        obj = MemArray(self.base, self.expr_simp)
        obj._offset_to_expr = self._offset_to_expr.copy()
        return obj

    @staticmethod
    def offset_to_ptr(base, offset):
        """
        Return an expression representing the @base + @offset
        @base: symbolic base address
        @offset: relative offset integer to the @base address
        """
        if base.is_id(INTERNAL_INTBASE_NAME):
            ptr = ExprInt(offset, base.size)
        elif offset == 0:
            ptr = base
        else:
            ptr = base + ExprInt(offset, base.size)
        return ptr.canonize()

    def read(self, offset, size):
        """
        Return memory at @offset with @size as an Expr list
        @offset: integer (in bytes)
        @size: integer (in bits), byte aligned

        Consider the following state:
        - 0x10: (EBX, 0)
        - 0x11: (EBX, 1)
        - 0x12: (EBX, 2)
        - 0x13: (EBX, 3)

        A read at 0x10 of 32 bits should return: EBX
        """

        assert size % 8 == 0
        # Parts is (Expr's offset, size, Expr)
        parts = []
        for index in xrange(size / 8):
            # Wrap read:
            # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2
            request_offset = (offset + index) & self._mask
            if request_offset in self._offset_to_expr:
                # Known memory portion
                off, data = self._offset_to_expr[request_offset]
                parts.append((off, 1, data))
                continue

            # Unknown memory portion
            ptr = self.offset_to_ptr(self.base, request_offset)
            data = ExprMem(ptr, 8)
            parts.append((0, 1, data))

        # Group similar datas
        # XXX TODO: only little endian here
        index = 0
        while index + 1 < len(parts):
            off_a, size_a, data_a = parts[index]
            off_b, size_b, data_b = parts[index+1]
            if data_a == data_b and off_a + size_a == off_b:
                # Read consecutive bytes of a variable
                # [(0, 8, x), (1, 8, x)] => (0, 16, x)
                parts[index:index+2] = [(off_a, size_a + size_b, data_a)]
                continue
            if data_a.is_int() and data_b.is_int():
                # Read integer parts
                # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744)
                int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8])
                int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8])
                assert int1.is_int() and int2.is_int()
                int1, int2 = int(int1), int(int2)
                result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8)
                parts[index:index+2] = [(0, size_a + size_b, result)]
                continue
            if data_a.is_mem() and data_b.is_mem():
                # Read consecutive bytes of a memory variable
                ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.arg)
                ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.arg)
                if ptr_base_a != ptr_base_b:
                    index += 1
                    continue
                if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask:
                    assert size_a <= data_a.size / 8 - off_a
                    assert size_b <= data_b.size / 8 - off_b
                    # Successive comparable symbolic pointers
                    # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr])
                    ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask)

                    data = ExprMem(ptr, (size_a + size_b) * 8)
                    parts[index:index+2] = [(0, size_a + size_b, data)]

                    continue

            index += 1

        # Slice datas
        read_mem = []
        for off, bytesize, data in parts:
            if data.size / 8 != bytesize:
                data = data[off * 8: (off + bytesize) * 8]
            read_mem.append(data)

        return read_mem

    def write(self, offset, expr):
        """
        Write @expr at @offset
        @offset: integer (in bytes)
        @expr: Expr instance value
        """
        assert expr.size % 8 == 0
        assert offset <= self._mask
        for index in xrange(expr.size / 8):
            # Wrap write:
            # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2
            request_offset = (offset + index) & self._mask
            # XXX TODO: only little endian here
            self._offset_to_expr[request_offset] = (index, expr)

            tmp = self.expr_simp(expr[index * 8: (index + 1) * 8])
            # Special case: Simplify slice of pointer (simplification is ok
            # here, as we won't store the simplified expression)
            if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0:
                new_ptr = self.expr_simp(tmp.arg.arg + ExprInt(tmp.start / 8, tmp.arg.arg.size))
                tmp = ExprMem(new_ptr, tmp.stop - tmp.start)
            # Test if write to original value
            if tmp.is_mem():
                src_ptr, src_off = get_expr_base_offset(tmp.arg)
                if src_ptr == self.base and src_off == request_offset:
                    del self._offset_to_expr[request_offset]


    def _get_variable_parts(self, index, known_offsets, forward=True):
        """
        Find consecutive memory parts representing the same variable. The part
        starts at offset known_offsets[@index] and search is in offset direction
        determined by @forward
        Return the number of consecutive parts of the same variable.

        @index: index of the memory offset in known_offsets
        @known_offsets: sorted offsets
        @forward: Search in offset growing direction if True, else in reverse
        order
        """

        offset = known_offsets[index]
        value_byte_index, value = self._offset_to_expr[offset]
        assert value.size % 8 == 0

        if forward:
            start, end, step = value_byte_index + 1, value.size / 8, 1
        else:
            start, end, step = value_byte_index - 1, -1, -1

        partnum = 1
        for value_offset in xrange(start, end, step):
            offset += step
            # Check if next part is in known_offsets
            next_index = index + step * partnum
            if not 0 <= next_index < len(known_offsets):
                break

            offset_next = known_offsets[next_index]
            if offset_next != offset:
                break

            # Check if next part is a part of the searched value
            byte_index, value_next = self._offset_to_expr[offset_next]
            if byte_index != value_offset:
                break
            if value != value_next:
                break
            partnum += 1

        return partnum


    def _build_value_at_offset(self, value, offset, start, length):
        """
        Return a couple. The first element is the memory Expression representing
        the value at @offset, the second is its value.  The value is truncated
        at byte @start with @length

        @value: Expression to truncate
        @offset: offset in bytes of the variable (integer)
        @start: value's byte offset (integer)
        @length: length in bytes (integer)
        """

        ptr = self.offset_to_ptr(self.base, offset)
        size = length * 8
        if start == 0 and size == value.size:
            result = value
        else:
            result = self.expr_simp(value[start * 8: start * 8 + size])

        return ExprMem(ptr, size), result


    def memory(self):
        """
        Iterate on stored memory/values

        The goal here is to group entities.

        Consider the following state:
        EAX + 0x10 = (0, EDX)
        EAX + 0x11 = (1, EDX)
        EAX + 0x12 = (2, EDX)
        EAX + 0x13 = (3, EDX)

        The function should return:
        @32[EAX + 0x10] = EDX
        """

        if not self._offset_to_expr:
            raise StopIteration
        known_offsets = self._offset_to_expr.keys()
        known_offsets.sort()
        index = 0
        # Test if the first element is the continuation of the last byte. If
        # yes, merge and output it first.
        min_int = 0
        max_int = (1 << self.base.size) - 1
        limit_index = len(known_offsets)

        first_element = None
        # Special case where a variable spreads on max_int/min_int
        if known_offsets[0] == min_int and known_offsets[-1] == max_int:
            min_offset, max_offset = known_offsets[0], known_offsets[-1]
            min_byte_index, min_value = self._offset_to_expr[min_offset]
            max_byte_index, max_value = self._offset_to_expr[max_offset]
            if min_value == max_value and max_byte_index + 1 == min_byte_index:
                # Look for current variable start
                partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False)
                # Look for current variable end
                partnum_after = self._get_variable_parts(0, known_offsets)

                partnum = partnum_before + partnum_after
                offset = known_offsets[-partnum_before]
                index_value, value = self._offset_to_expr[offset]

                mem, result = self._build_value_at_offset(value, offset, index_value, partnum)
                first_element = mem, result
                index = partnum_after
                limit_index = len(known_offsets) - partnum_before

        # Special cases are done, walk and merge variables
        while index < limit_index:
            offset = known_offsets[index]
            index_value, value = self._offset_to_expr[offset]
            partnum = self._get_variable_parts(index, known_offsets)
            mem, result = self._build_value_at_offset(value, offset, index_value, partnum)
            yield mem, result
            index += partnum

        if first_element is not None:
            yield first_element

    def dump(self):
        """Display MemArray content"""
        for mem, value in self.memory():
            print "%s = %s" % (mem, value)

Ancestors (in MRO)

  • MemArray
  • _abcoll.MutableMapping
  • _abcoll.Mapping
  • _abcoll.Sized
  • _abcoll.Iterable
  • _abcoll.Container
  • __builtin__.object

Static methods

def offset_to_ptr(

base, offset)

Return an expression representing the @base + @offset @base: symbolic base address @offset: relative offset integer to the @base address

@staticmethod
def offset_to_ptr(base, offset):
    """
    Return an expression representing the @base + @offset
    @base: symbolic base address
    @offset: relative offset integer to the @base address
    """
    if base.is_id(INTERNAL_INTBASE_NAME):
        ptr = ExprInt(offset, base.size)
    elif offset == 0:
        ptr = base
    else:
        ptr = base + ExprInt(offset, base.size)
    return ptr.canonize()

Instance variables

var base

Expression representing the symbolic base address

var expr_simp

var mask

Mask offset

Methods

def __init__(

self, base, expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)

def __init__(self, base, expr_simp=expr_simp_explicit):
    self._base = base
    self.expr_simp = expr_simp
    self._mask = int(base.mask)
    self._offset_to_expr = {}

def clear(

self)

D.clear() -> None. Remove all items from D.

def clear(self):
    'D.clear() -> None.  Remove all items from D.'
    try:
        while True:
            self.popitem()
    except KeyError:
        pass

def copy(

self)

Copy object instance

def copy(self):
    """Copy object instance"""
    obj = MemArray(self.base, self.expr_simp)
    obj._offset_to_expr = self._offset_to_expr.copy()
    return obj

def dump(

self)

Display MemArray content

def dump(self):
    """Display MemArray content"""
    for mem, value in self.memory():
        print "%s = %s" % (mem, value)

def get(

self, key, default=None)

D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.

def get(self, key, default=None):
    'D.get(k[,d]) -> D[k] if k in D, else d.  d defaults to None.'
    try:
        return self[key]
    except KeyError:
        return default

def items(

self)

D.items() -> list of D's (key, value) pairs, as 2-tuples

def items(self):
    "D.items() -> list of D's (key, value) pairs, as 2-tuples"
    return [(key, self[key]) for key in self]

def iteritems(

self)

D.iteritems() -> an iterator over the (key, value) items of D

def iteritems(self):
    'D.iteritems() -> an iterator over the (key, value) items of D'
    for key in self:
        yield (key, self[key])

def iterkeys(

self)

D.iterkeys() -> an iterator over the keys of D

def iterkeys(self):
    'D.iterkeys() -> an iterator over the keys of D'
    return iter(self)

def itervalues(

self)

D.itervalues() -> an iterator over the values of D

def itervalues(self):
    'D.itervalues() -> an iterator over the values of D'
    for key in self:
        yield self[key]

def keys(

self)

D.keys() -> list of D's keys

def keys(self):
    "D.keys() -> list of D's keys"
    return list(self)

def memory(

self)

Iterate on stored memory/values

The goal here is to group entities.

Consider the following state: EAX + 0x10 = (0, EDX) EAX + 0x11 = (1, EDX) EAX + 0x12 = (2, EDX) EAX + 0x13 = (3, EDX)

The function should return: @32[EAX + 0x10] = EDX

def memory(self):
    """
    Iterate on stored memory/values
    The goal here is to group entities.
    Consider the following state:
    EAX + 0x10 = (0, EDX)
    EAX + 0x11 = (1, EDX)
    EAX + 0x12 = (2, EDX)
    EAX + 0x13 = (3, EDX)
    The function should return:
    @32[EAX + 0x10] = EDX
    """
    if not self._offset_to_expr:
        raise StopIteration
    known_offsets = self._offset_to_expr.keys()
    known_offsets.sort()
    index = 0
    # Test if the first element is the continuation of the last byte. If
    # yes, merge and output it first.
    min_int = 0
    max_int = (1 << self.base.size) - 1
    limit_index = len(known_offsets)
    first_element = None
    # Special case where a variable spreads on max_int/min_int
    if known_offsets[0] == min_int and known_offsets[-1] == max_int:
        min_offset, max_offset = known_offsets[0], known_offsets[-1]
        min_byte_index, min_value = self._offset_to_expr[min_offset]
        max_byte_index, max_value = self._offset_to_expr[max_offset]
        if min_value == max_value and max_byte_index + 1 == min_byte_index:
            # Look for current variable start
            partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False)
            # Look for current variable end
            partnum_after = self._get_variable_parts(0, known_offsets)
            partnum = partnum_before + partnum_after
            offset = known_offsets[-partnum_before]
            index_value, value = self._offset_to_expr[offset]
            mem, result = self._build_value_at_offset(value, offset, index_value, partnum)
            first_element = mem, result
            index = partnum_after
            limit_index = len(known_offsets) - partnum_before
    # Special cases are done, walk and merge variables
    while index < limit_index:
        offset = known_offsets[index]
        index_value, value = self._offset_to_expr[offset]
        partnum = self._get_variable_parts(index, known_offsets)
        mem, result = self._build_value_at_offset(value, offset, index_value, partnum)
        yield mem, result
        index += partnum
    if first_element is not None:
        yield first_element

def pop(

self, key, default=<object object at 0x7f19b494d030>)

D.pop(k[,d]) -> v, remove specified key and return the corresponding value. If key is not found, d is returned if given, otherwise KeyError is raised.

def pop(self, key, default=__marker):
    '''D.pop(k[,d]) -> v, remove specified key and return the corresponding value.
      If key is not found, d is returned if given, otherwise KeyError is raised.
    '''
    try:
        value = self[key]
    except KeyError:
        if default is self.__marker:
            raise
        return default
    else:
        del self[key]
        return value

def popitem(

self)

D.popitem() -> (k, v), remove and return some (key, value) pair as a 2-tuple; but raise KeyError if D is empty.

def popitem(self):
    '''D.popitem() -> (k, v), remove and return some (key, value) pair
       as a 2-tuple; but raise KeyError if D is empty.
    '''
    try:
        key = next(iter(self))
    except StopIteration:
        raise KeyError
    value = self[key]
    del self[key]
    return key, value

def read(

self, offset, size)

Return memory at @offset with @size as an Expr list @offset: integer (in bytes) @size: integer (in bits), byte aligned

Consider the following state: - 0x10: (EBX, 0) - 0x11: (EBX, 1) - 0x12: (EBX, 2) - 0x13: (EBX, 3)

A read at 0x10 of 32 bits should return: EBX

def read(self, offset, size):
    """
    Return memory at @offset with @size as an Expr list
    @offset: integer (in bytes)
    @size: integer (in bits), byte aligned
    Consider the following state:
    - 0x10: (EBX, 0)
    - 0x11: (EBX, 1)
    - 0x12: (EBX, 2)
    - 0x13: (EBX, 3)
    A read at 0x10 of 32 bits should return: EBX
    """
    assert size % 8 == 0
    # Parts is (Expr's offset, size, Expr)
    parts = []
    for index in xrange(size / 8):
        # Wrap read:
        # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2
        request_offset = (offset + index) & self._mask
        if request_offset in self._offset_to_expr:
            # Known memory portion
            off, data = self._offset_to_expr[request_offset]
            parts.append((off, 1, data))
            continue
        # Unknown memory portion
        ptr = self.offset_to_ptr(self.base, request_offset)
        data = ExprMem(ptr, 8)
        parts.append((0, 1, data))
    # Group similar datas
    # XXX TODO: only little endian here
    index = 0
    while index + 1 < len(parts):
        off_a, size_a, data_a = parts[index]
        off_b, size_b, data_b = parts[index+1]
        if data_a == data_b and off_a + size_a == off_b:
            # Read consecutive bytes of a variable
            # [(0, 8, x), (1, 8, x)] => (0, 16, x)
            parts[index:index+2] = [(off_a, size_a + size_b, data_a)]
            continue
        if data_a.is_int() and data_b.is_int():
            # Read integer parts
            # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744)
            int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8])
            int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8])
            assert int1.is_int() and int2.is_int()
            int1, int2 = int(int1), int(int2)
            result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8)
            parts[index:index+2] = [(0, size_a + size_b, result)]
            continue
        if data_a.is_mem() and data_b.is_mem():
            # Read consecutive bytes of a memory variable
            ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.arg)
            ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.arg)
            if ptr_base_a != ptr_base_b:
                index += 1
                continue
            if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask:
                assert size_a <= data_a.size / 8 - off_a
                assert size_b <= data_b.size / 8 - off_b
                # Successive comparable symbolic pointers
                # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr])
                ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask)
                data = ExprMem(ptr, (size_a + size_b) * 8)
                parts[index:index+2] = [(0, size_a + size_b, data)]
                continue
        index += 1
    # Slice datas
    read_mem = []
    for off, bytesize, data in parts:
        if data.size / 8 != bytesize:
            data = data[off * 8: (off + bytesize) * 8]
        read_mem.append(data)
    return read_mem

def setdefault(

self, key, default=None)

D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D

def setdefault(self, key, default=None):
    'D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D'
    try:
        return self[key]
    except KeyError:
        self[key] = default
    return default

def update(

*args, **kwds)

D.update([E, ]**F) -> None. Update D from mapping/iterable E and F. If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

def update(*args, **kwds):
    ''' D.update([E, ]**F) -> None.  Update D from mapping/iterable E and F.
        If E present and has a .keys() method, does:     for k in E: D[k] = E[k]
        If E present and lacks .keys() method, does:     for (k, v) in E: D[k] = v
        In either case, this is followed by: for k, v in F.items(): D[k] = v
    '''
    if len(args) > 2:
        raise TypeError("update() takes at most 2 positional "
                        "arguments ({} given)".format(len(args)))
    elif not args:
        raise TypeError("update() takes at least 1 argument (0 given)")
    self = args[0]
    other = args[1] if len(args) >= 2 else ()
    if isinstance(other, Mapping):
        for key in other:
            self[key] = other[key]
    elif hasattr(other, "keys"):
        for key in other.keys():
            self[key] = other[key]
    else:
        for key, value in other:
            self[key] = value
    for key, value in kwds.items():
        self[key] = value

def values(

self)

D.values() -> list of D's values

def values(self):
    "D.values() -> list of D's values"
    return [self[key] for key in self]

def write(

self, offset, expr)

Write @expr at @offset @offset: integer (in bytes) @expr: Expr instance value

def write(self, offset, expr):
    """
    Write @expr at @offset
    @offset: integer (in bytes)
    @expr: Expr instance value
    """
    assert expr.size % 8 == 0
    assert offset <= self._mask
    for index in xrange(expr.size / 8):
        # Wrap write:
        # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2
        request_offset = (offset + index) & self._mask
        # XXX TODO: only little endian here
        self._offset_to_expr[request_offset] = (index, expr)
        tmp = self.expr_simp(expr[index * 8: (index + 1) * 8])
        # Special case: Simplify slice of pointer (simplification is ok
        # here, as we won't store the simplified expression)
        if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0:
            new_ptr = self.expr_simp(tmp.arg.arg + ExprInt(tmp.start / 8, tmp.arg.arg.size))
            tmp = ExprMem(new_ptr, tmp.stop - tmp.start)
        # Test if write to original value
        if tmp.is_mem():
            src_ptr, src_off = get_expr_base_offset(tmp.arg)
            if src_ptr == self.base and src_off == request_offset:
                del self._offset_to_expr[request_offset]

class MemSparse

Link a symbolic memory pointer to its MemArray.

For each symbolic memory object, this object will extract the memory pointer ptr. It then splits ptr into a symbolic and an integer part. For example, the memory @[ESP+4] will give ESP+4 for ptr. ptr is then split into its base ESP and its offset 4. Each symbolic base address uses a different MemArray.

Example: - @32[EAX+EBX] - @8[EAX+EBX+0x100] Will be stored in the same MemArray with a EAX+EBX base

class MemSparse(object):
    """Link a symbolic memory pointer to its MemArray.

    For each symbolic memory object, this object will extract the memory pointer
    *ptr*. It then splits *ptr* into a symbolic and an integer part. For
    example, the memory @[ESP+4] will give ESP+4 for *ptr*. *ptr* is then split
    into its base ESP and its offset 4. Each symbolic base address uses a
    different MemArray.

    Example:
    - @32[EAX+EBX]
    - @8[EAX+EBX+0x100]
    Will be stored in the same MemArray with a EAX+EBX base

    """

    def __init__(self, addrsize, expr_simp=expr_simp_explicit):
        """
        @addrsize: size (in bits) of the addresses manipulated by the MemSparse
        @expr_simp: an ExpressionSimplifier instance
        """
        self.addrsize = addrsize
        self.expr_simp = expr_simp
        self.base_to_memarray = {}

    def __contains__(self, expr):
        """
        Return True if the whole @expr is present
        For partial check, use 'contains_partial'
        """
        if not expr.is_mem():
            return False
        ptr = expr.arg
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            return False
        for i in xrange(expr.size / 8):
            if offset + i not in memarray:
                return False
        return True

    def contains_partial(self, expr):
        """
        Return True if a part of @expr is present in memory
        """
        if not expr.is_mem():
            return False
        ptr = expr.arg
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            return False
        for i in xrange(expr.size / 8):
            if offset + i in memarray:
                return True
        return False

    def clear(self):
        """Reset the current object content"""
        self.base_to_memarray.clear()

    def copy(self):
        """Copy the current object instance"""
        base_to_memarray = {}
        for base, memarray in self.base_to_memarray.iteritems():
            base_to_memarray[base] = memarray.copy()
        obj = MemSparse(self.addrsize, self.expr_simp)
        obj.base_to_memarray = base_to_memarray
        return obj

    def __delitem__(self, expr):
        """
        Delete a value @expr *fully* present in memory
        For partial delete, use delete_partial
        """
        ptr = expr.arg
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            raise KeyError
        # Check if whole entity is in the MemArray before deleting it
        for i in xrange(expr.size / 8):
            if (offset + i) & memarray.mask not in memarray:
                raise KeyError
        for i in xrange(expr.size / 8):
            del memarray[(offset + i) & memarray.mask]

    def delete_partial(self, expr):
        """
        Delete @expr from memory. Skip parts of @expr which are not present in
        memory.
        """
        ptr = expr.arg
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            raise KeyError
        # Check if whole entity is in the MemArray before deleting it
        for i in xrange(expr.size / 8):
            real_offset = (offset + i) & memarray.mask
            if real_offset in memarray:
                del memarray[real_offset]

    def read(self, ptr, size):
        """
        Return the value associated with the Expr at address @ptr
        @ptr: Expr representing the memory address
        @size: memory size (in bits), byte aligned
        """
        assert size % 8 == 0
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is not None:
            mems = memarray.read(offset, size)
            ret = ExprCompose(*mems)
        else:
            ret = ExprMem(ptr, size)
        return ret

    def write(self, ptr, expr):
        """
        Update the corresponding Expr @expr at address @ptr
        @ptr: Expr representing the memory address
        @expr: Expr instance
        """
        assert ptr.size == self.addrsize
        base, offset = get_expr_base_offset(ptr)
        memarray = self.base_to_memarray.get(base, None)
        if memarray is None:
            memarray = MemArray(base, self.expr_simp)
            self.base_to_memarray[base] = memarray
        memarray.write(offset, expr)

    def iteritems(self):
        """Iterate on stored memory variables and their values."""
        for _, memarray in sorted(self.base_to_memarray.iteritems()):
            for mem, value in memarray.memory():
                yield mem, value

    def items(self):
        """Return stored memory variables and their values."""
        return list(self.iteritems())

    def dump(self):
        """Display MemSparse content"""
        for mem, value in self.iteritems():
            print "%s = %s" % (mem, value)

    def __repr__(self):
        out = []
        for _, memarray in sorted(self.base_to_memarray.iteritems()):
            out.append(repr(memarray))
        return '\n'.join(out)

Ancestors (in MRO)

Instance variables

var addrsize

var base_to_memarray

var expr_simp

Methods

def __init__(

self, addrsize, expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)

@addrsize: size (in bits) of the addresses manipulated by the MemSparse @expr_simp: an ExpressionSimplifier instance

def __init__(self, addrsize, expr_simp=expr_simp_explicit):
    """
    @addrsize: size (in bits) of the addresses manipulated by the MemSparse
    @expr_simp: an ExpressionSimplifier instance
    """
    self.addrsize = addrsize
    self.expr_simp = expr_simp
    self.base_to_memarray = {}

def clear(

self)

Reset the current object content

def clear(self):
    """Reset the current object content"""
    self.base_to_memarray.clear()

def contains_partial(

self, expr)

Return True if a part of @expr is present in memory

def contains_partial(self, expr):
    """
    Return True if a part of @expr is present in memory
    """
    if not expr.is_mem():
        return False
    ptr = expr.arg
    base, offset = get_expr_base_offset(ptr)
    memarray = self.base_to_memarray.get(base, None)
    if memarray is None:
        return False
    for i in xrange(expr.size / 8):
        if offset + i in memarray:
            return True
    return False

def copy(

self)

Copy the current object instance

def copy(self):
    """Copy the current object instance"""
    base_to_memarray = {}
    for base, memarray in self.base_to_memarray.iteritems():
        base_to_memarray[base] = memarray.copy()
    obj = MemSparse(self.addrsize, self.expr_simp)
    obj.base_to_memarray = base_to_memarray
    return obj

def delete_partial(

self, expr)

Delete @expr from memory. Skip parts of @expr which are not present in memory.

def delete_partial(self, expr):
    """
    Delete @expr from memory. Skip parts of @expr which are not present in
    memory.
    """
    ptr = expr.arg
    base, offset = get_expr_base_offset(ptr)
    memarray = self.base_to_memarray.get(base, None)
    if memarray is None:
        raise KeyError
    # Check if whole entity is in the MemArray before deleting it
    for i in xrange(expr.size / 8):
        real_offset = (offset + i) & memarray.mask
        if real_offset in memarray:
            del memarray[real_offset]

def dump(

self)

Display MemSparse content

def dump(self):
    """Display MemSparse content"""
    for mem, value in self.iteritems():
        print "%s = %s" % (mem, value)

def items(

self)

Return stored memory variables and their values.

def items(self):
    """Return stored memory variables and their values."""
    return list(self.iteritems())

def iteritems(

self)

Iterate on stored memory variables and their values.

def iteritems(self):
    """Iterate on stored memory variables and their values."""
    for _, memarray in sorted(self.base_to_memarray.iteritems()):
        for mem, value in memarray.memory():
            yield mem, value

def read(

self, ptr, size)

Return the value associated with the Expr at address @ptr @ptr: Expr representing the memory address @size: memory size (in bits), byte aligned

def read(self, ptr, size):
    """
    Return the value associated with the Expr at address @ptr
    @ptr: Expr representing the memory address
    @size: memory size (in bits), byte aligned
    """
    assert size % 8 == 0
    base, offset = get_expr_base_offset(ptr)
    memarray = self.base_to_memarray.get(base, None)
    if memarray is not None:
        mems = memarray.read(offset, size)
        ret = ExprCompose(*mems)
    else:
        ret = ExprMem(ptr, size)
    return ret

def write(

self, ptr, expr)

Update the corresponding Expr @expr at address @ptr @ptr: Expr representing the memory address @expr: Expr instance

def write(self, ptr, expr):
    """
    Update the corresponding Expr @expr at address @ptr
    @ptr: Expr representing the memory address
    @expr: Expr instance
    """
    assert ptr.size == self.addrsize
    base, offset = get_expr_base_offset(ptr)
    memarray = self.base_to_memarray.get(base, None)
    if memarray is None:
        memarray = MemArray(base, self.expr_simp)
        self.base_to_memarray[base] = memarray
    memarray.write(offset, expr)

class StateEngine

Stores an Engine state

class StateEngine(object):
    """Stores an Engine state"""

    def merge(self, other):
        """Generate a new state, representing the merge of self and @other
        @other: a StateEngine instance"""

        raise NotImplementedError("Abstract method")

Ancestors (in MRO)

Methods

def merge(

self, other)

Generate a new state, representing the merge of self and @other @other: a StateEngine instance

def merge(self, other):
    """Generate a new state, representing the merge of self and @other
    @other: a StateEngine instance"""
    raise NotImplementedError("Abstract method")

class SymbolMngr

Symbolic store manager (IDs and MEMs)

class SymbolMngr(object):
    """Symbolic store manager (IDs and MEMs)"""

    def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit):
        assert addrsize is not None
        if init is None:
            init = {}
        self.addrsize = addrsize
        self.expr_simp = expr_simp
        self.symbols_id = {}
        self.symbols_mem = MemSparse(addrsize, expr_simp)
        self.mask = (1 << addrsize) - 1
        for expr, value in init.iteritems():
            self.write(expr, value)

    def __contains__(self, expr):
        if expr.is_id():
            return self.symbols_id.__contains__(expr)
        if expr.is_mem():
            return self.symbols_mem.__contains__(expr)
        return False

    def __getitem__(self, expr):
        return self.read(expr)

    def __setitem__(self, expr, value):
        self.write(expr, value)

    def __delitem__(self, expr):
        if expr.is_id():
            del self.symbols_id[expr]
        elif expr.is_mem():
            del self.symbols_mem[expr]
        else:
            raise TypeError("Bad source expr")

    def copy(self):
        """Copy object instance"""
        obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp)
        return obj

    def clear(self):
        """Forget every variables values"""
        self.symbols_id.clear()
        self.symbols_mem.clear()

    def read(self, src):
        """
        Return the value corresponding to Expr @src
        @src: ExprId or ExprMem instance
        """
        if src.is_id():
            return self.symbols_id.get(src, src)
        elif src.is_mem():
            # Only byte aligned accesses are supported for now
            assert src.size % 8 == 0
            return self.symbols_mem.read(src.arg, src.size)
        else:
            raise TypeError("Bad source expr")

    def write(self, dst, src):
        """
        Update @dst with @src expression
        @dst: ExprId or ExprMem instance
        @src: Expression instance
        """
        assert dst.size == src.size
        if dst.is_id():
            if dst == src:
                if dst in self.symbols_id:
                    del self.symbols_id[dst]
            else:
                self.symbols_id[dst] = src
        elif dst.is_mem():
            # Only byte aligned accesses are supported for now
            assert dst.size % 8 == 0
            self.symbols_mem.write(dst.arg, src)
        else:
            raise TypeError("Bad destination expr")

    def dump(self, ids=True, mems=True):
        """Display memory content"""
        if ids:
            for variable, value in self.ids():
                print '%s = %s' % (variable, value)
        if mems:
            for mem, value in self.memory():
                print '%s = %s' % (mem, value)

    def __repr__(self):
        out = []
        for variable, value in self.iteritems():
            out.append('%s = %s' % (variable, value))
        return "\n".join(out)

    def iteritems(self):
        """ExprId/ExprMem iteritems of the current state"""
        for variable, value in self.ids():
            yield variable, value
        for variable, value in self.memory():
            yield variable, value

    def items(self):
        """Return variables/values of the current state"""
        return list(self.iteritems())

    def __iter__(self):
        for expr, _ in self.iteritems():
            yield expr

    def ids(self):
        """Iterate on variables and their values."""
        for expr, value in self.symbols_id.iteritems():
            yield expr, value

    def memory(self):
        """Iterate on memory variables and their values."""
        for mem, value in self.symbols_mem.iteritems():
            yield mem, value

    def keys(self):
        """Variables of the current state"""
        return list(self)

    def get(self, expr, default=None):
        """Deprecated version of read"""
        warnings.warn('DEPRECATION WARNING: use "read(self, expr)" instead of get')
        ret = self.read(expr)
        if default is not None and ret == expr:
            return default
        return ret

Ancestors (in MRO)

Instance variables

var addrsize

var expr_simp

var mask

var symbols_id

var symbols_mem

Methods

def __init__(

self, init=None, addrsize=None, expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)

def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit):
    assert addrsize is not None
    if init is None:
        init = {}
    self.addrsize = addrsize
    self.expr_simp = expr_simp
    self.symbols_id = {}
    self.symbols_mem = MemSparse(addrsize, expr_simp)
    self.mask = (1 << addrsize) - 1
    for expr, value in init.iteritems():
        self.write(expr, value)

def clear(

self)

Forget every variables values

def clear(self):
    """Forget every variables values"""
    self.symbols_id.clear()
    self.symbols_mem.clear()

def copy(

self)

Copy object instance

def copy(self):
    """Copy object instance"""
    obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp)
    return obj

def dump(

self, ids=True, mems=True)

Display memory content

def dump(self, ids=True, mems=True):
    """Display memory content"""
    if ids:
        for variable, value in self.ids():
            print '%s = %s' % (variable, value)
    if mems:
        for mem, value in self.memory():
            print '%s = %s' % (mem, value)

def get(

self, expr, default=None)

Deprecated version of read

def get(self, expr, default=None):
    """Deprecated version of read"""
    warnings.warn('DEPRECATION WARNING: use "read(self, expr)" instead of get')
    ret = self.read(expr)
    if default is not None and ret == expr:
        return default
    return ret

def ids(

self)

Iterate on variables and their values.

def ids(self):
    """Iterate on variables and their values."""
    for expr, value in self.symbols_id.iteritems():
        yield expr, value

def items(

self)

Return variables/values of the current state

def items(self):
    """Return variables/values of the current state"""
    return list(self.iteritems())

def iteritems(

self)

ExprId/ExprMem iteritems of the current state

def iteritems(self):
    """ExprId/ExprMem iteritems of the current state"""
    for variable, value in self.ids():
        yield variable, value
    for variable, value in self.memory():
        yield variable, value

def keys(

self)

Variables of the current state

def keys(self):
    """Variables of the current state"""
    return list(self)

def memory(

self)

Iterate on memory variables and their values.

def memory(self):
    """Iterate on memory variables and their values."""
    for mem, value in self.symbols_mem.iteritems():
        yield mem, value

def read(

self, src)

Return the value corresponding to Expr @src @src: ExprId or ExprMem instance

def read(self, src):
    """
    Return the value corresponding to Expr @src
    @src: ExprId or ExprMem instance
    """
    if src.is_id():
        return self.symbols_id.get(src, src)
    elif src.is_mem():
        # Only byte aligned accesses are supported for now
        assert src.size % 8 == 0
        return self.symbols_mem.read(src.arg, src.size)
    else:
        raise TypeError("Bad source expr")

def write(

self, dst, src)

Update @dst with @src expression @dst: ExprId or ExprMem instance @src: Expression instance

def write(self, dst, src):
    """
    Update @dst with @src expression
    @dst: ExprId or ExprMem instance
    @src: Expression instance
    """
    assert dst.size == src.size
    if dst.is_id():
        if dst == src:
            if dst in self.symbols_id:
                del self.symbols_id[dst]
        else:
            self.symbols_id[dst] = src
    elif dst.is_mem():
        # Only byte aligned accesses are supported for now
        assert dst.size % 8 == 0
        self.symbols_mem.write(dst.arg, src)
    else:
        raise TypeError("Bad destination expr")

class SymbolicExecutionEngine

Symbolic execution engine Allow IR code emulation in symbolic domain

Examples: from miasm2.ir.symbexec import SymbolicExecutionEngine from miasm2.ir.ir import AssignBlock

ir_arch = ir_x86_32()

init_state = {
    ir_arch.arch.regs.EAX: ir_arch.arch.regs.EBX,
    ExprMem(id_x+ExprInt(0x10, 32), 32): id_a,
}

sb_exec = SymbolicExecutionEngine(ir_arch, init_state)

>>> sb_exec.dump()
EAX                = a
@32[x + 0x10]      = a
>>> sb_exec.dump(mems=False)
EAX                = a

>>> print sb_exec.eval_expr(ir_arch.arch.regs.EAX + ir_arch.arch.regs.ECX)
EBX + ECX

Inspecting state: - dump - modified State manipulation: - '.state' (rw)

Evaluation (read only): - eval_expr - eval_assignblk Evaluation with state update: - eval_updt_expr - eval_updt_assignblk - eval_updt_irblock

Start a symbolic execution based on provisioned '.ir_arch' blocks: - run_block_at - run_at

class SymbolicExecutionEngine(object):
    """
    Symbolic execution engine
    Allow IR code emulation in symbolic domain


    Examples:
        from miasm2.ir.symbexec import SymbolicExecutionEngine
        from miasm2.ir.ir import AssignBlock

        ir_arch = ir_x86_32()

        init_state = {
            ir_arch.arch.regs.EAX: ir_arch.arch.regs.EBX,
            ExprMem(id_x+ExprInt(0x10, 32), 32): id_a,
        }

        sb_exec = SymbolicExecutionEngine(ir_arch, init_state)

        >>> sb_exec.dump()
        EAX                = a
        @32[x + 0x10]      = a
        >>> sb_exec.dump(mems=False)
        EAX                = a

        >>> print sb_exec.eval_expr(ir_arch.arch.regs.EAX + ir_arch.arch.regs.ECX)
        EBX + ECX

    Inspecting state:
        - dump
        - modified
    State manipulation:
        - '.state' (rw)

    Evaluation (read only):
        - eval_expr
        - eval_assignblk
    Evaluation with state update:
        - eval_updt_expr
        - eval_updt_assignblk
        - eval_updt_irblock

    Start a symbolic execution based on provisioned '.ir_arch' blocks:
        - run_block_at
        - run_at
    """

    StateEngine = SymbolicState

    def __init__(self, ir_arch, state=None,
                 func_read=None,
                 func_write=None,
                 sb_expr_simp=expr_simp_explicit):

        self.expr_to_visitor = {
            ExprInt: self.eval_exprint,
            ExprId: self.eval_exprid,
            ExprLoc: self.eval_exprloc,
            ExprMem: self.eval_exprmem,
            ExprSlice: self.eval_exprslice,
            ExprCond: self.eval_exprcond,
            ExprOp: self.eval_exprop,
            ExprCompose: self.eval_exprcompose,
        }

        if state is None:
            state = {}

        self.symbols = SymbolMngr(addrsize=ir_arch.addrsize, expr_simp=sb_expr_simp)

        for dst, src in state.iteritems():
            self.symbols.write(dst, src)

        if func_read:
            warnings.warn('DEPRECATION WARNING: override function "mem_read(self, expr)" instead of func_read')
        if func_write:
            warnings.warn('DEPRECATION WARNING: override function "mem_write(self, dsr, src)" instead of func_write')

        self.func_read = func_read
        self.func_write = func_write
        self.ir_arch = ir_arch
        self.expr_simp = sb_expr_simp

    def get_state(self):
        """Return the current state of the SymbolicEngine"""
        state = self.StateEngine(dict(self.symbols))
        return state

    def set_state(self, state):
        """Restaure the @state of the engine
        @state: StateEngine instance
        """
        self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp)
        for dst, src in dict(state).iteritems():
            self.symbols[dst] = src

    state = property(get_state, set_state)

    def eval_expr_visitor(self, expr, cache=None):
        """
        [DEV]: Override to change the behavior of an Expr evaluation.
        This function recursively applies 'eval_expr*' to @expr.
        This function uses @cache to speedup re-evaluation of expression.
        """
        if cache is None:
            cache = {}

        ret = cache.get(expr, None)
        if ret is not None:
            return ret

        new_expr = self.expr_simp(expr)
        ret = cache.get(new_expr, None)
        if ret is not None:
            return ret

        func = self.expr_to_visitor.get(new_expr.__class__, None)
        if func is None:
            raise TypeError("Unknown expr type")

        ret = func(new_expr, cache=cache)
        ret = self.expr_simp(ret)
        assert ret is not None

        cache[expr] = ret
        cache[new_expr] = ret
        return ret

    def eval_exprint(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprInt using the current state"""
        return expr

    def eval_exprid(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprId using the current state"""
        ret = self.symbols.read(expr)
        return ret

    def eval_exprloc(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprLoc using the current state"""
        offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key)
        if offset is not None:
            ret = ExprInt(offset, expr.size)
        else:
            ret = expr
        return ret

    def eval_exprmem(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprMem using the current state
        This function first evaluate the memory pointer value.
        Override 'mem_read' to modify the effective memory accesses
        """
        ptr = self.eval_expr_visitor(expr.arg, **kwargs)
        mem = ExprMem(ptr, expr.size)
        ret = self.mem_read(mem)
        return ret

    def eval_exprcond(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprCond using the current state"""
        cond = self.eval_expr_visitor(expr.cond, **kwargs)
        src1 = self.eval_expr_visitor(expr.src1, **kwargs)
        src2 = self.eval_expr_visitor(expr.src2, **kwargs)
        ret = ExprCond(cond, src1, src2)
        return ret

    def eval_exprslice(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprSlice using the current state"""
        arg = self.eval_expr_visitor(expr.arg, **kwargs)
        ret = ExprSlice(arg, expr.start, expr.stop)
        return ret

    def eval_exprop(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprOp using the current state"""
        args = []
        for oarg in expr.args:
            arg = self.eval_expr_visitor(oarg, **kwargs)
            args.append(arg)
        ret = ExprOp(expr.op, *args)
        return ret

    def eval_exprcompose(self, expr, **kwargs):
        """[DEV]: Evaluate an ExprCompose using the current state"""
        args = []
        for arg in expr.args:
            args.append(self.eval_expr_visitor(arg, **kwargs))
        ret = ExprCompose(*args)
        return ret

    def eval_expr(self, expr, eval_cache=None):
        """
        Evaluate @expr
        @expr: Expresion instance to evaluate
        @cache: None or dictionary linking variables to their values
        """
        if eval_cache is None:
            eval_cache = {}
        ret = self.eval_expr_visitor(expr, cache=eval_cache)
        assert ret is not None
        return ret

    def modified(self, init_state=None, ids=True, mems=True):
        """
        Return the modified variables.
        @init_state: a base dictionary linking variables to their initial values
        to diff. Can be None.
        @ids: track ids only
        @mems: track mems only
        """
        if init_state is None:
            init_state = {}
        if ids:
            for variable, value in self.symbols.symbols_id.iteritems():
                if variable in init_state and init_state[variable] == value:
                    continue
                yield variable, value
        if mems:
            for mem, value in self.symbols.memory():
                if mem in init_state and init_state[mem] == value:
                    continue
                yield mem, value

    def dump(self, ids=True, mems=True):
        """
        Display modififed variables
        @ids: display modified ids
        @mems: display modified memory
        """

        for variable, value in self.modified(None, ids, mems):
            print "%-18s" % variable, "=", "%s" % value

    def eval_assignblk(self, assignblk):
        """
        Evaluate AssignBlock using the current state

        Returns a dictionary containing modified keys associated to their values

        @assignblk: AssignBlock instance
        """
        pool_out = {}
        eval_cache = {}
        for dst, src in assignblk.iteritems():
            src = self.eval_expr(src, eval_cache)
            if dst.is_mem():
                ptr = self.eval_expr(dst.arg, eval_cache)
                # Test if mem lookup is known
                tmp = ExprMem(ptr, dst.size)
                pool_out[tmp] = src
            elif dst.is_id():
                pool_out[dst] = src
            else:
                raise ValueError("Unknown destination type", str(dst))

        return pool_out

    def apply_change(self, dst, src):
        """
        Apply @dst = @src on the current state WITHOUT evaluating both side
        @dst: Expr, destination
        @src: Expr, source
        """
        if dst.is_mem():
            self.mem_write(dst, src)
        else:
            self.symbols.write(dst, src)

    def eval_updt_assignblk(self, assignblk):
        """
        Apply an AssignBlock on the current state
        @assignblk: AssignBlock instance
        """
        mem_dst = []
        dst_src = self.eval_assignblk(assignblk)
        for dst, src in dst_src.iteritems():
            self.apply_change(dst, src)
            if dst.is_mem():
                mem_dst.append(dst)
        return mem_dst

    def eval_updt_irblock(self, irb, step=False):
        """
        Symbolic execution of the @irb on the current state
        @irb: irbloc instance
        @step: display intermediate steps
        """
        for assignblk in irb:
            if step:
                print 'Instr', assignblk.instr
                print 'Assignblk:'
                print assignblk
                print '_' * 80
            self.eval_updt_assignblk(assignblk)
            if step:
                self.dump(mems=False)
                self.dump(ids=False)
                print '_' * 80
        dst = self.eval_expr(self.ir_arch.IRDst)

        return dst

    def run_block_at(self, ircfg, addr, step=False):
        """
        Symbolic execution of the block at @addr
        @addr: address to execute (int or ExprInt or label)
        @step: display intermediate steps
        """
        irblock = ircfg.get_block(addr)
        if irblock is not None:
            addr = self.eval_updt_irblock(irblock, step=step)
        return addr

    def run_at(self, ircfg, addr, lbl_stop=None, step=False):
        """
        Symbolic execution starting at @addr
        @addr: address to execute (int or ExprInt or label)
        @lbl_stop: LocKey to stop execution on
        @step: display intermediate steps
        """
        while True:
            irblock = ircfg.get_block(addr)
            if irblock is None:
                break
            if irblock.loc_key == lbl_stop:
                break
            addr = self.eval_updt_irblock(irblock, step=step)
        return addr

    def del_mem_above_stack(self, stack_ptr):
        """
        Remove all stored memory values with following properties:
        * pointer based on initial stack value
        * pointer below current stack pointer
        """
        stack_ptr = self.eval_expr(stack_ptr)
        base, stk_offset = get_expr_base_offset(stack_ptr)
        memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None)
        if memarray:
            to_del = set()
            for offset in memarray:
                if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0:
                    to_del.add(offset)

            for offset in to_del:
                del memarray[offset]

    def eval_updt_expr(self, expr):
        """
        Evaluate @expr and apply side effect if needed (ie. if expr is an
        assignment). Return the evaluated value
        """

        # Update value if needed
        if expr.is_aff():
            ret = self.eval_expr(expr.src)
            self.eval_updt_assignblk(AssignBlock([expr]))
        else:
            ret = self.eval_expr(expr)

        return ret

    def _resolve_mem_parts(self, expr):
        """For a given ExprMem @expr, get known/unknown parts from the store.
        @expr: ExprMem instance

        Return a list of (known, value) where known is a bool representing if
        the value has been resolved from the store or not.
        """

        # Extract known parts in symbols
        assert expr.size % 8 == 0
        ptr = expr.arg
        known = []
        ptrs = []
        for index in xrange(expr.size / 8):
            offset = self.expr_simp(ptr + ExprInt(index, ptr.size))
            ptrs.append(offset)
            mem = ExprMem(offset, 8)
            known.append(mem in self.symbols)

        reads = merge_ptr_read(known, ptrs)
        out = []
        for is_known, ptr_value, size in reads:
            mem = ExprMem(ptr_value, size)
            if is_known:
                mem = self.symbols.read(mem)
            out.append((is_known, mem))
        return out

    def mem_read(self, expr):
        """
        [DEV]: Override to modify the effective memory reads

        Read symbolic value at ExprMem @expr
        @expr: ExprMem
        """

        parts = self._resolve_mem_parts(expr)

        out = []
        for known, part in parts:
            if not known and part.is_mem() and self.func_read is not None:
                ret = self.func_read(part)
            else:
                ret = part

            out.append(ret)
        ret = self.expr_simp(ExprCompose(*out))

        assert ret.size == expr.size
        return ret

    def mem_write(self, dst, src):
        """
        [DEV]: Override to modify the effective memory writes

        Write symbolic value @src at ExprMem @dst
        @dst: destination ExprMem
        @src: source Expression
        """
        if self.func_write is not None:
            self.func_write(self, dst, src)
        else:
            self.symbols.write(dst, src)


    # Deprecated methods

    def apply_expr_on_state(self, expr, cache):
        """Deprecated version of eval_expr"""
        warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state')

        if cache is None:
            cache = {}
        ret = self.eval_expr(expr, eval_cache=cache)
        return ret

    def modified_mems(self, init_state=None):
        """Deprecated version of modified(ids=False)"""
        warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems')
        for mem in self.modified(init_state=init_state, ids=False):
            yield mem

    def modified_regs(self, init_state=None):
        """Deprecated version of modified(mems=False)"""
        warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs')
        for reg in self.modified(init_state=init_state, mems=False):
            yield reg

    def dump_id(self):
        """Deprecated version of dump(mems=False)"""
        warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id')
        self.dump(mems=False)

    def dump_mem(self):
        """Deprecated version of dump(ids=False)"""
        warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem')
        self.dump(ids=False)

    def eval_ir_expr(self, assignblk):
        """Deprecated version of eval_ir_expr(self, assignblk)"""
        warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr')
        return self.eval_assignblk(assignblk).iteritems()

    def eval_ir(self, assignblk):
        """Deprecated version of eval_updt_assignblk(self, assignblk)"""
        warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir')
        return self.eval_updt_assignblk(assignblk)

    def emulbloc(self, irb, step=False):
        """Deprecated version of eval_updt_irblock(self, irb, step=False)"""
        warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc')
        return self.eval_updt_irblock(irb, step)

    def emul_ir_bloc(self, _, addr, step=False):
        """Deprecated version of run_block_at"""
        warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc')
        return self.run_block_at(addr, step)

    def emul_ir_block(self, addr, step=False):
        """Deprecated version of run_block_at"""
        warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block')
        return self.run_block_at(addr, step)

    def emul_ir_blocks(self, addr, lbl_stop=None, step=False):
        """Deprecated version of run_at"""
        warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks')
        return self.run_at(addr, lbl_stop, step)

    def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False):
        """Deprecated version of run_at"""
        warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs')
        return self.run_at(addr, lbl_stop, step)

    def apply_expr(self, expr):
        """Deprecated version of eval_updt_expr"""
        warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr')
        return self.eval_updt_expr(expr)

    def as_assignblock(self):
        """Return the current state as an AssignBlock"""
        warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock')
        out = []
        for dst, src in self.modified(ids=True, mems=True):
            out.append((dst, src))
        return AssignBlock(dict(out))

Ancestors (in MRO)

Class variables

var StateEngine

var state

Instance variables

var expr_simp

var expr_to_visitor

var func_read

var func_write

var ir_arch

var state

Return the current state of the SymbolicEngine

var symbols

Methods

def __init__(

self, ir_arch, state=None, func_read=None, func_write=None, sb_expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)

def __init__(self, ir_arch, state=None,
             func_read=None,
             func_write=None,
             sb_expr_simp=expr_simp_explicit):
    self.expr_to_visitor = {
        ExprInt: self.eval_exprint,
        ExprId: self.eval_exprid,
        ExprLoc: self.eval_exprloc,
        ExprMem: self.eval_exprmem,
        ExprSlice: self.eval_exprslice,
        ExprCond: self.eval_exprcond,
        ExprOp: self.eval_exprop,
        ExprCompose: self.eval_exprcompose,
    }
    if state is None:
        state = {}
    self.symbols = SymbolMngr(addrsize=ir_arch.addrsize, expr_simp=sb_expr_simp)
    for dst, src in state.iteritems():
        self.symbols.write(dst, src)
    if func_read:
        warnings.warn('DEPRECATION WARNING: override function "mem_read(self, expr)" instead of func_read')
    if func_write:
        warnings.warn('DEPRECATION WARNING: override function "mem_write(self, dsr, src)" instead of func_write')
    self.func_read = func_read
    self.func_write = func_write
    self.ir_arch = ir_arch
    self.expr_simp = sb_expr_simp

def apply_change(

self, dst, src)

Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source

def apply_change(self, dst, src):
    """
    Apply @dst = @src on the current state WITHOUT evaluating both side
    @dst: Expr, destination
    @src: Expr, source
    """
    if dst.is_mem():
        self.mem_write(dst, src)
    else:
        self.symbols.write(dst, src)

def apply_expr(

self, expr)

Deprecated version of eval_updt_expr

def apply_expr(self, expr):
    """Deprecated version of eval_updt_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr')
    return self.eval_updt_expr(expr)

def apply_expr_on_state(

self, expr, cache)

Deprecated version of eval_expr

def apply_expr_on_state(self, expr, cache):
    """Deprecated version of eval_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state')
    if cache is None:
        cache = {}
    ret = self.eval_expr(expr, eval_cache=cache)
    return ret

def as_assignblock(

self)

Return the current state as an AssignBlock

def as_assignblock(self):
    """Return the current state as an AssignBlock"""
    warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock')
    out = []
    for dst, src in self.modified(ids=True, mems=True):
        out.append((dst, src))
    return AssignBlock(dict(out))

def del_mem_above_stack(

self, stack_ptr)

Remove all stored memory values with following properties: pointer based on initial stack value pointer below current stack pointer

def del_mem_above_stack(self, stack_ptr):
    """
    Remove all stored memory values with following properties:
    * pointer based on initial stack value
    * pointer below current stack pointer
    """
    stack_ptr = self.eval_expr(stack_ptr)
    base, stk_offset = get_expr_base_offset(stack_ptr)
    memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None)
    if memarray:
        to_del = set()
        for offset in memarray:
            if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0:
                to_del.add(offset)
        for offset in to_del:
            del memarray[offset]

def dump(

self, ids=True, mems=True)

Display modififed variables @ids: display modified ids @mems: display modified memory

def dump(self, ids=True, mems=True):
    """
    Display modififed variables
    @ids: display modified ids
    @mems: display modified memory
    """
    for variable, value in self.modified(None, ids, mems):
        print "%-18s" % variable, "=", "%s" % value

def dump_id(

self)

Deprecated version of dump(mems=False)

def dump_id(self):
    """Deprecated version of dump(mems=False)"""
    warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id')
    self.dump(mems=False)

def dump_mem(

self)

Deprecated version of dump(ids=False)

def dump_mem(self):
    """Deprecated version of dump(ids=False)"""
    warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem')
    self.dump(ids=False)

def emul_ir_bloc(

self, _, addr, step=False)

Deprecated version of run_block_at

def emul_ir_bloc(self, _, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc')
    return self.run_block_at(addr, step)

def emul_ir_block(

self, addr, step=False)

Deprecated version of run_block_at

def emul_ir_block(self, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block')
    return self.run_block_at(addr, step)

def emul_ir_blocks(

self, addr, lbl_stop=None, step=False)

Deprecated version of run_at

def emul_ir_blocks(self, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks')
    return self.run_at(addr, lbl_stop, step)

def emul_ir_blocs(

self, _, addr, lbl_stop=None, step=False)

Deprecated version of run_at

def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs')
    return self.run_at(addr, lbl_stop, step)

def emulbloc(

self, irb, step=False)

Deprecated version of eval_updt_irblock(self, irb, step=False)

def emulbloc(self, irb, step=False):
    """Deprecated version of eval_updt_irblock(self, irb, step=False)"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc')
    return self.eval_updt_irblock(irb, step)

def eval_assignblk(

self, assignblk)

Evaluate AssignBlock using the current state

Returns a dictionary containing modified keys associated to their values

@assignblk: AssignBlock instance

def eval_assignblk(self, assignblk):
    """
    Evaluate AssignBlock using the current state
    Returns a dictionary containing modified keys associated to their values
    @assignblk: AssignBlock instance
    """
    pool_out = {}
    eval_cache = {}
    for dst, src in assignblk.iteritems():
        src = self.eval_expr(src, eval_cache)
        if dst.is_mem():
            ptr = self.eval_expr(dst.arg, eval_cache)
            # Test if mem lookup is known
            tmp = ExprMem(ptr, dst.size)
            pool_out[tmp] = src
        elif dst.is_id():
            pool_out[dst] = src
        else:
            raise ValueError("Unknown destination type", str(dst))
    return pool_out

def eval_expr(

self, expr, eval_cache=None)

Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values

def eval_expr(self, expr, eval_cache=None):
    """
    Evaluate @expr
    @expr: Expresion instance to evaluate
    @cache: None or dictionary linking variables to their values
    """
    if eval_cache is None:
        eval_cache = {}
    ret = self.eval_expr_visitor(expr, cache=eval_cache)
    assert ret is not None
    return ret

def eval_expr_visitor(

self, expr, cache=None)

[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.

def eval_expr_visitor(self, expr, cache=None):
    """
    [DEV]: Override to change the behavior of an Expr evaluation.
    This function recursively applies 'eval_expr*' to @expr.
    This function uses @cache to speedup re-evaluation of expression.
    """
    if cache is None:
        cache = {}
    ret = cache.get(expr, None)
    if ret is not None:
        return ret
    new_expr = self.expr_simp(expr)
    ret = cache.get(new_expr, None)
    if ret is not None:
        return ret
    func = self.expr_to_visitor.get(new_expr.__class__, None)
    if func is None:
        raise TypeError("Unknown expr type")
    ret = func(new_expr, cache=cache)
    ret = self.expr_simp(ret)
    assert ret is not None
    cache[expr] = ret
    cache[new_expr] = ret
    return ret

def eval_exprcompose(

self, expr, **kwargs)

[DEV]: Evaluate an ExprCompose using the current state

def eval_exprcompose(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCompose using the current state"""
    args = []
    for arg in expr.args:
        args.append(self.eval_expr_visitor(arg, **kwargs))
    ret = ExprCompose(*args)
    return ret

def eval_exprcond(

self, expr, **kwargs)

[DEV]: Evaluate an ExprCond using the current state

def eval_exprcond(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCond using the current state"""
    cond = self.eval_expr_visitor(expr.cond, **kwargs)
    src1 = self.eval_expr_visitor(expr.src1, **kwargs)
    src2 = self.eval_expr_visitor(expr.src2, **kwargs)
    ret = ExprCond(cond, src1, src2)
    return ret

def eval_exprid(

self, expr, **kwargs)

[DEV]: Evaluate an ExprId using the current state

def eval_exprid(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprId using the current state"""
    ret = self.symbols.read(expr)
    return ret

def eval_exprint(

self, expr, **kwargs)

[DEV]: Evaluate an ExprInt using the current state

def eval_exprint(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprInt using the current state"""
    return expr

def eval_exprloc(

self, expr, **kwargs)

[DEV]: Evaluate an ExprLoc using the current state

def eval_exprloc(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprLoc using the current state"""
    offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key)
    if offset is not None:
        ret = ExprInt(offset, expr.size)
    else:
        ret = expr
    return ret

def eval_exprmem(

self, expr, **kwargs)

[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses

def eval_exprmem(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprMem using the current state
    This function first evaluate the memory pointer value.
    Override 'mem_read' to modify the effective memory accesses
    """
    ptr = self.eval_expr_visitor(expr.arg, **kwargs)
    mem = ExprMem(ptr, expr.size)
    ret = self.mem_read(mem)
    return ret

def eval_exprop(

self, expr, **kwargs)

[DEV]: Evaluate an ExprOp using the current state

def eval_exprop(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprOp using the current state"""
    args = []
    for oarg in expr.args:
        arg = self.eval_expr_visitor(oarg, **kwargs)
        args.append(arg)
    ret = ExprOp(expr.op, *args)
    return ret

def eval_exprslice(

self, expr, **kwargs)

[DEV]: Evaluate an ExprSlice using the current state

def eval_exprslice(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprSlice using the current state"""
    arg = self.eval_expr_visitor(expr.arg, **kwargs)
    ret = ExprSlice(arg, expr.start, expr.stop)
    return ret

def eval_ir(

self, assignblk)

Deprecated version of eval_updt_assignblk(self, assignblk)

def eval_ir(self, assignblk):
    """Deprecated version of eval_updt_assignblk(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir')
    return self.eval_updt_assignblk(assignblk)

def eval_ir_expr(

self, assignblk)

Deprecated version of eval_ir_expr(self, assignblk)

def eval_ir_expr(self, assignblk):
    """Deprecated version of eval_ir_expr(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr')
    return self.eval_assignblk(assignblk).iteritems()

def eval_updt_assignblk(

self, assignblk)

Apply an AssignBlock on the current state @assignblk: AssignBlock instance

def eval_updt_assignblk(self, assignblk):
    """
    Apply an AssignBlock on the current state
    @assignblk: AssignBlock instance
    """
    mem_dst = []
    dst_src = self.eval_assignblk(assignblk)
    for dst, src in dst_src.iteritems():
        self.apply_change(dst, src)
        if dst.is_mem():
            mem_dst.append(dst)
    return mem_dst

def eval_updt_expr(

self, expr)

Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value

def eval_updt_expr(self, expr):
    """
    Evaluate @expr and apply side effect if needed (ie. if expr is an
    assignment). Return the evaluated value
    """
    # Update value if needed
    if expr.is_aff():
        ret = self.eval_expr(expr.src)
        self.eval_updt_assignblk(AssignBlock([expr]))
    else:
        ret = self.eval_expr(expr)
    return ret

def eval_updt_irblock(

self, irb, step=False)

Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps

def eval_updt_irblock(self, irb, step=False):
    """
    Symbolic execution of the @irb on the current state
    @irb: irbloc instance
    @step: display intermediate steps
    """
    for assignblk in irb:
        if step:
            print 'Instr', assignblk.instr
            print 'Assignblk:'
            print assignblk
            print '_' * 80
        self.eval_updt_assignblk(assignblk)
        if step:
            self.dump(mems=False)
            self.dump(ids=False)
            print '_' * 80
    dst = self.eval_expr(self.ir_arch.IRDst)
    return dst

def get_state(

self)

Return the current state of the SymbolicEngine

def get_state(self):
    """Return the current state of the SymbolicEngine"""
    state = self.StateEngine(dict(self.symbols))
    return state

def mem_read(

self, expr)

[DEV]: Override to modify the effective memory reads

Read symbolic value at ExprMem @expr @expr: ExprMem

def mem_read(self, expr):
    """
    [DEV]: Override to modify the effective memory reads
    Read symbolic value at ExprMem @expr
    @expr: ExprMem
    """
    parts = self._resolve_mem_parts(expr)
    out = []
    for known, part in parts:
        if not known and part.is_mem() and self.func_read is not None:
            ret = self.func_read(part)
        else:
            ret = part
        out.append(ret)
    ret = self.expr_simp(ExprCompose(*out))
    assert ret.size == expr.size
    return ret

def mem_write(

self, dst, src)

[DEV]: Override to modify the effective memory writes

Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression

def mem_write(self, dst, src):
    """
    [DEV]: Override to modify the effective memory writes
    Write symbolic value @src at ExprMem @dst
    @dst: destination ExprMem
    @src: source Expression
    """
    if self.func_write is not None:
        self.func_write(self, dst, src)
    else:
        self.symbols.write(dst, src)

def modified(

self, init_state=None, ids=True, mems=True)

Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only

def modified(self, init_state=None, ids=True, mems=True):
    """
    Return the modified variables.
    @init_state: a base dictionary linking variables to their initial values
    to diff. Can be None.
    @ids: track ids only
    @mems: track mems only
    """
    if init_state is None:
        init_state = {}
    if ids:
        for variable, value in self.symbols.symbols_id.iteritems():
            if variable in init_state and init_state[variable] == value:
                continue
            yield variable, value
    if mems:
        for mem, value in self.symbols.memory():
            if mem in init_state and init_state[mem] == value:
                continue
            yield mem, value

def modified_mems(

self, init_state=None)

Deprecated version of modified(ids=False)

def modified_mems(self, init_state=None):
    """Deprecated version of modified(ids=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems')
    for mem in self.modified(init_state=init_state, ids=False):
        yield mem

def modified_regs(

self, init_state=None)

Deprecated version of modified(mems=False)

def modified_regs(self, init_state=None):
    """Deprecated version of modified(mems=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs')
    for reg in self.modified(init_state=init_state, mems=False):
        yield reg

def run_at(

self, ircfg, addr, lbl_stop=None, step=False)

Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps

def run_at(self, ircfg, addr, lbl_stop=None, step=False):
    """
    Symbolic execution starting at @addr
    @addr: address to execute (int or ExprInt or label)
    @lbl_stop: LocKey to stop execution on
    @step: display intermediate steps
    """
    while True:
        irblock = ircfg.get_block(addr)
        if irblock is None:
            break
        if irblock.loc_key == lbl_stop:
            break
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def run_block_at(

self, ircfg, addr, step=False)

Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps

def run_block_at(self, ircfg, addr, step=False):
    """
    Symbolic execution of the block at @addr
    @addr: address to execute (int or ExprInt or label)
    @step: display intermediate steps
    """
    irblock = ircfg.get_block(addr)
    if irblock is not None:
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def set_state(

self, state)

Restaure the @state of the engine @state: StateEngine instance

def set_state(self, state):
    """Restaure the @state of the engine
    @state: StateEngine instance
    """
    self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp)
    for dst, src in dict(state).iteritems():
        self.symbols[dst] = src

class SymbolicState

Stores a SymbolicExecutionEngine state

class SymbolicState(StateEngine):
    """Stores a SymbolicExecutionEngine state"""

    def __init__(self, dct):
        self._symbols = frozenset(dct.items())

    def __hash__(self):
        return hash((self.__class__, self._symbols))

    def __eq__(self, other):
        if self is other:
            return True
        if self.__class__ != other.__class__:
            return False
        return self.symbols == other.symbols

    def __ne__(self, other):
        return not self.__eq__(other)

    def __iter__(self):
        for dst, src in self._symbols:
            yield dst, src

    def iteritems(self):
        """Iterate on stored memory/values"""
        return self.__iter__()

    def merge(self, other):
        """Merge two symbolic states
        Only equal expressions are kept in both states
        @other: second symbolic state
        """

        symb_a = self.symbols
        symb_b = other.symbols
        intersection = set(symb_a.keys()).intersection(symb_b.keys())
        out = {}
        for dst in intersection:
            if symb_a[dst] == symb_b[dst]:
                out[dst] = symb_a[dst]
        return self.__class__(out)

    @property
    def symbols(self):
        """Return the dictionnary of known symbols"""
        return dict(self._symbols)

Ancestors (in MRO)

Instance variables

var symbols

Return the dictionnary of known symbols

Methods

def __init__(

self, dct)

def __init__(self, dct):
    self._symbols = frozenset(dct.items())

def iteritems(

self)

Iterate on stored memory/values

def iteritems(self):
    """Iterate on stored memory/values"""
    return self.__iter__()

def merge(

self, other)

Inheritance: StateEngine.merge

Merge two symbolic states Only equal expressions are kept in both states @other: second symbolic state

def merge(self, other):
    """Merge two symbolic states
    Only equal expressions are kept in both states
    @other: second symbolic state
    """
    symb_a = self.symbols
    symb_b = other.symbols
    intersection = set(symb_a.keys()).intersection(symb_b.keys())
    out = {}
    for dst in intersection:
        if symb_a[dst] == symb_b[dst]:
            out[dst] = symb_a[dst]
    return self.__class__(out)

class symbexec

DEPRECATED object Use SymbolicExecutionEngine instead of symbexec

class symbexec(SymbolicExecutionEngine):
    """
    DEPRECATED object
    Use SymbolicExecutionEngine instead of symbexec
    """

    def __init__(self, ir_arch, known_symbols,
                 func_read=None,
                 func_write=None,
                 sb_expr_simp=expr_simp_explicit):
        warnings.warn("Deprecated API: use SymbolicExecutionEngine")
        super(symbexec, self).__init__(ir_arch, known_symbols,
                                       func_read,
                                       func_write,
                                       sb_expr_simp=sb_expr_simp)

Ancestors (in MRO)

Class variables

var StateEngine

Inheritance: SymbolicExecutionEngine.StateEngine

Instance variables

var expr_simp

Inheritance: SymbolicExecutionEngine.expr_simp

var expr_to_visitor

Inheritance: SymbolicExecutionEngine.expr_to_visitor

var func_read

Inheritance: SymbolicExecutionEngine.func_read

var func_write

Inheritance: SymbolicExecutionEngine.func_write

var ir_arch

Inheritance: SymbolicExecutionEngine.ir_arch

var state

Inheritance: SymbolicExecutionEngine.state

Return the current state of the SymbolicEngine

var symbols

Inheritance: SymbolicExecutionEngine.symbols

Methods

def __init__(

self, ir_arch, known_symbols, func_read=None, func_write=None, sb_expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)

Inheritance: SymbolicExecutionEngine.__init__

def __init__(self, ir_arch, known_symbols,
             func_read=None,
             func_write=None,
             sb_expr_simp=expr_simp_explicit):
    warnings.warn("Deprecated API: use SymbolicExecutionEngine")
    super(symbexec, self).__init__(ir_arch, known_symbols,
                                   func_read,
                                   func_write,
                                   sb_expr_simp=sb_expr_simp)

def apply_change(

self, dst, src)

Inheritance: SymbolicExecutionEngine.apply_change

Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source

def apply_change(self, dst, src):
    """
    Apply @dst = @src on the current state WITHOUT evaluating both side
    @dst: Expr, destination
    @src: Expr, source
    """
    if dst.is_mem():
        self.mem_write(dst, src)
    else:
        self.symbols.write(dst, src)

def apply_expr(

self, expr)

Inheritance: SymbolicExecutionEngine.apply_expr

Deprecated version of eval_updt_expr

def apply_expr(self, expr):
    """Deprecated version of eval_updt_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr')
    return self.eval_updt_expr(expr)

def apply_expr_on_state(

self, expr, cache)

Inheritance: SymbolicExecutionEngine.apply_expr_on_state

Deprecated version of eval_expr

def apply_expr_on_state(self, expr, cache):
    """Deprecated version of eval_expr"""
    warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state')
    if cache is None:
        cache = {}
    ret = self.eval_expr(expr, eval_cache=cache)
    return ret

def as_assignblock(

self)

Inheritance: SymbolicExecutionEngine.as_assignblock

Return the current state as an AssignBlock

def as_assignblock(self):
    """Return the current state as an AssignBlock"""
    warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock')
    out = []
    for dst, src in self.modified(ids=True, mems=True):
        out.append((dst, src))
    return AssignBlock(dict(out))

def del_mem_above_stack(

self, stack_ptr)

Inheritance: SymbolicExecutionEngine.del_mem_above_stack

Remove all stored memory values with following properties: pointer based on initial stack value pointer below current stack pointer

def del_mem_above_stack(self, stack_ptr):
    """
    Remove all stored memory values with following properties:
    * pointer based on initial stack value
    * pointer below current stack pointer
    """
    stack_ptr = self.eval_expr(stack_ptr)
    base, stk_offset = get_expr_base_offset(stack_ptr)
    memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None)
    if memarray:
        to_del = set()
        for offset in memarray:
            if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0:
                to_del.add(offset)
        for offset in to_del:
            del memarray[offset]

def dump(

self, ids=True, mems=True)

Inheritance: SymbolicExecutionEngine.dump

Display modififed variables @ids: display modified ids @mems: display modified memory

def dump(self, ids=True, mems=True):
    """
    Display modififed variables
    @ids: display modified ids
    @mems: display modified memory
    """
    for variable, value in self.modified(None, ids, mems):
        print "%-18s" % variable, "=", "%s" % value

def dump_id(

self)

Inheritance: SymbolicExecutionEngine.dump_id

Deprecated version of dump(mems=False)

def dump_id(self):
    """Deprecated version of dump(mems=False)"""
    warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id')
    self.dump(mems=False)

def dump_mem(

self)

Inheritance: SymbolicExecutionEngine.dump_mem

Deprecated version of dump(ids=False)

def dump_mem(self):
    """Deprecated version of dump(ids=False)"""
    warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem')
    self.dump(ids=False)

def emul_ir_bloc(

self, _, addr, step=False)

Inheritance: SymbolicExecutionEngine.emul_ir_bloc

Deprecated version of run_block_at

def emul_ir_bloc(self, _, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc')
    return self.run_block_at(addr, step)

def emul_ir_block(

self, addr, step=False)

Inheritance: SymbolicExecutionEngine.emul_ir_block

Deprecated version of run_block_at

def emul_ir_block(self, addr, step=False):
    """Deprecated version of run_block_at"""
    warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block')
    return self.run_block_at(addr, step)

def emul_ir_blocks(

self, addr, lbl_stop=None, step=False)

Inheritance: SymbolicExecutionEngine.emul_ir_blocks

Deprecated version of run_at

def emul_ir_blocks(self, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks')
    return self.run_at(addr, lbl_stop, step)

def emul_ir_blocs(

self, _, addr, lbl_stop=None, step=False)

Inheritance: SymbolicExecutionEngine.emul_ir_blocs

Deprecated version of run_at

def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False):
    """Deprecated version of run_at"""
    warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs')
    return self.run_at(addr, lbl_stop, step)

def emulbloc(

self, irb, step=False)

Inheritance: SymbolicExecutionEngine.emulbloc

Deprecated version of eval_updt_irblock(self, irb, step=False)

def emulbloc(self, irb, step=False):
    """Deprecated version of eval_updt_irblock(self, irb, step=False)"""
    warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc')
    return self.eval_updt_irblock(irb, step)

def eval_assignblk(

self, assignblk)

Inheritance: SymbolicExecutionEngine.eval_assignblk

Evaluate AssignBlock using the current state

Returns a dictionary containing modified keys associated to their values

@assignblk: AssignBlock instance

def eval_assignblk(self, assignblk):
    """
    Evaluate AssignBlock using the current state
    Returns a dictionary containing modified keys associated to their values
    @assignblk: AssignBlock instance
    """
    pool_out = {}
    eval_cache = {}
    for dst, src in assignblk.iteritems():
        src = self.eval_expr(src, eval_cache)
        if dst.is_mem():
            ptr = self.eval_expr(dst.arg, eval_cache)
            # Test if mem lookup is known
            tmp = ExprMem(ptr, dst.size)
            pool_out[tmp] = src
        elif dst.is_id():
            pool_out[dst] = src
        else:
            raise ValueError("Unknown destination type", str(dst))
    return pool_out

def eval_expr(

self, expr, eval_cache=None)

Inheritance: SymbolicExecutionEngine.eval_expr

Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values

def eval_expr(self, expr, eval_cache=None):
    """
    Evaluate @expr
    @expr: Expresion instance to evaluate
    @cache: None or dictionary linking variables to their values
    """
    if eval_cache is None:
        eval_cache = {}
    ret = self.eval_expr_visitor(expr, cache=eval_cache)
    assert ret is not None
    return ret

def eval_expr_visitor(

self, expr, cache=None)

Inheritance: SymbolicExecutionEngine.eval_expr_visitor

[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.

def eval_expr_visitor(self, expr, cache=None):
    """
    [DEV]: Override to change the behavior of an Expr evaluation.
    This function recursively applies 'eval_expr*' to @expr.
    This function uses @cache to speedup re-evaluation of expression.
    """
    if cache is None:
        cache = {}
    ret = cache.get(expr, None)
    if ret is not None:
        return ret
    new_expr = self.expr_simp(expr)
    ret = cache.get(new_expr, None)
    if ret is not None:
        return ret
    func = self.expr_to_visitor.get(new_expr.__class__, None)
    if func is None:
        raise TypeError("Unknown expr type")
    ret = func(new_expr, cache=cache)
    ret = self.expr_simp(ret)
    assert ret is not None
    cache[expr] = ret
    cache[new_expr] = ret
    return ret

def eval_exprcompose(

self, expr, **kwargs)

Inheritance: SymbolicExecutionEngine.eval_exprcompose

[DEV]: Evaluate an ExprCompose using the current state

def eval_exprcompose(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCompose using the current state"""
    args = []
    for arg in expr.args:
        args.append(self.eval_expr_visitor(arg, **kwargs))
    ret = ExprCompose(*args)
    return ret

def eval_exprcond(

self, expr, **kwargs)

Inheritance: SymbolicExecutionEngine.eval_exprcond

[DEV]: Evaluate an ExprCond using the current state

def eval_exprcond(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprCond using the current state"""
    cond = self.eval_expr_visitor(expr.cond, **kwargs)
    src1 = self.eval_expr_visitor(expr.src1, **kwargs)
    src2 = self.eval_expr_visitor(expr.src2, **kwargs)
    ret = ExprCond(cond, src1, src2)
    return ret

def eval_exprid(

self, expr, **kwargs)

Inheritance: SymbolicExecutionEngine.eval_exprid

[DEV]: Evaluate an ExprId using the current state

def eval_exprid(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprId using the current state"""
    ret = self.symbols.read(expr)
    return ret

def eval_exprint(

self, expr, **kwargs)

Inheritance: SymbolicExecutionEngine.eval_exprint

[DEV]: Evaluate an ExprInt using the current state

def eval_exprint(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprInt using the current state"""
    return expr

def eval_exprloc(

self, expr, **kwargs)

Inheritance: SymbolicExecutionEngine.eval_exprloc

[DEV]: Evaluate an ExprLoc using the current state

def eval_exprloc(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprLoc using the current state"""
    offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key)
    if offset is not None:
        ret = ExprInt(offset, expr.size)
    else:
        ret = expr
    return ret

def eval_exprmem(

self, expr, **kwargs)

Inheritance: SymbolicExecutionEngine.eval_exprmem

[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses

def eval_exprmem(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprMem using the current state
    This function first evaluate the memory pointer value.
    Override 'mem_read' to modify the effective memory accesses
    """
    ptr = self.eval_expr_visitor(expr.arg, **kwargs)
    mem = ExprMem(ptr, expr.size)
    ret = self.mem_read(mem)
    return ret

def eval_exprop(

self, expr, **kwargs)

Inheritance: SymbolicExecutionEngine.eval_exprop

[DEV]: Evaluate an ExprOp using the current state

def eval_exprop(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprOp using the current state"""
    args = []
    for oarg in expr.args:
        arg = self.eval_expr_visitor(oarg, **kwargs)
        args.append(arg)
    ret = ExprOp(expr.op, *args)
    return ret

def eval_exprslice(

self, expr, **kwargs)

Inheritance: SymbolicExecutionEngine.eval_exprslice

[DEV]: Evaluate an ExprSlice using the current state

def eval_exprslice(self, expr, **kwargs):
    """[DEV]: Evaluate an ExprSlice using the current state"""
    arg = self.eval_expr_visitor(expr.arg, **kwargs)
    ret = ExprSlice(arg, expr.start, expr.stop)
    return ret

def eval_ir(

self, assignblk)

Inheritance: SymbolicExecutionEngine.eval_ir

Deprecated version of eval_updt_assignblk(self, assignblk)

def eval_ir(self, assignblk):
    """Deprecated version of eval_updt_assignblk(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir')
    return self.eval_updt_assignblk(assignblk)

def eval_ir_expr(

self, assignblk)

Inheritance: SymbolicExecutionEngine.eval_ir_expr

Deprecated version of eval_ir_expr(self, assignblk)

def eval_ir_expr(self, assignblk):
    """Deprecated version of eval_ir_expr(self, assignblk)"""
    warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr')
    return self.eval_assignblk(assignblk).iteritems()

def eval_updt_assignblk(

self, assignblk)

Inheritance: SymbolicExecutionEngine.eval_updt_assignblk

Apply an AssignBlock on the current state @assignblk: AssignBlock instance

def eval_updt_assignblk(self, assignblk):
    """
    Apply an AssignBlock on the current state
    @assignblk: AssignBlock instance
    """
    mem_dst = []
    dst_src = self.eval_assignblk(assignblk)
    for dst, src in dst_src.iteritems():
        self.apply_change(dst, src)
        if dst.is_mem():
            mem_dst.append(dst)
    return mem_dst

def eval_updt_expr(

self, expr)

Inheritance: SymbolicExecutionEngine.eval_updt_expr

Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value

def eval_updt_expr(self, expr):
    """
    Evaluate @expr and apply side effect if needed (ie. if expr is an
    assignment). Return the evaluated value
    """
    # Update value if needed
    if expr.is_aff():
        ret = self.eval_expr(expr.src)
        self.eval_updt_assignblk(AssignBlock([expr]))
    else:
        ret = self.eval_expr(expr)
    return ret

def eval_updt_irblock(

self, irb, step=False)

Inheritance: SymbolicExecutionEngine.eval_updt_irblock

Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps

def eval_updt_irblock(self, irb, step=False):
    """
    Symbolic execution of the @irb on the current state
    @irb: irbloc instance
    @step: display intermediate steps
    """
    for assignblk in irb:
        if step:
            print 'Instr', assignblk.instr
            print 'Assignblk:'
            print assignblk
            print '_' * 80
        self.eval_updt_assignblk(assignblk)
        if step:
            self.dump(mems=False)
            self.dump(ids=False)
            print '_' * 80
    dst = self.eval_expr(self.ir_arch.IRDst)
    return dst

def get_state(

self)

Inheritance: SymbolicExecutionEngine.get_state

Return the current state of the SymbolicEngine

def get_state(self):
    """Return the current state of the SymbolicEngine"""
    state = self.StateEngine(dict(self.symbols))
    return state

def mem_read(

self, expr)

Inheritance: SymbolicExecutionEngine.mem_read

[DEV]: Override to modify the effective memory reads

Read symbolic value at ExprMem @expr @expr: ExprMem

def mem_read(self, expr):
    """
    [DEV]: Override to modify the effective memory reads
    Read symbolic value at ExprMem @expr
    @expr: ExprMem
    """
    parts = self._resolve_mem_parts(expr)
    out = []
    for known, part in parts:
        if not known and part.is_mem() and self.func_read is not None:
            ret = self.func_read(part)
        else:
            ret = part
        out.append(ret)
    ret = self.expr_simp(ExprCompose(*out))
    assert ret.size == expr.size
    return ret

def mem_write(

self, dst, src)

Inheritance: SymbolicExecutionEngine.mem_write

[DEV]: Override to modify the effective memory writes

Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression

def mem_write(self, dst, src):
    """
    [DEV]: Override to modify the effective memory writes
    Write symbolic value @src at ExprMem @dst
    @dst: destination ExprMem
    @src: source Expression
    """
    if self.func_write is not None:
        self.func_write(self, dst, src)
    else:
        self.symbols.write(dst, src)

def modified(

self, init_state=None, ids=True, mems=True)

Inheritance: SymbolicExecutionEngine.modified

Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only

def modified(self, init_state=None, ids=True, mems=True):
    """
    Return the modified variables.
    @init_state: a base dictionary linking variables to their initial values
    to diff. Can be None.
    @ids: track ids only
    @mems: track mems only
    """
    if init_state is None:
        init_state = {}
    if ids:
        for variable, value in self.symbols.symbols_id.iteritems():
            if variable in init_state and init_state[variable] == value:
                continue
            yield variable, value
    if mems:
        for mem, value in self.symbols.memory():
            if mem in init_state and init_state[mem] == value:
                continue
            yield mem, value

def modified_mems(

self, init_state=None)

Inheritance: SymbolicExecutionEngine.modified_mems

Deprecated version of modified(ids=False)

def modified_mems(self, init_state=None):
    """Deprecated version of modified(ids=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems')
    for mem in self.modified(init_state=init_state, ids=False):
        yield mem

def modified_regs(

self, init_state=None)

Inheritance: SymbolicExecutionEngine.modified_regs

Deprecated version of modified(mems=False)

def modified_regs(self, init_state=None):
    """Deprecated version of modified(mems=False)"""
    warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs')
    for reg in self.modified(init_state=init_state, mems=False):
        yield reg

def run_at(

self, ircfg, addr, lbl_stop=None, step=False)

Inheritance: SymbolicExecutionEngine.run_at

Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps

def run_at(self, ircfg, addr, lbl_stop=None, step=False):
    """
    Symbolic execution starting at @addr
    @addr: address to execute (int or ExprInt or label)
    @lbl_stop: LocKey to stop execution on
    @step: display intermediate steps
    """
    while True:
        irblock = ircfg.get_block(addr)
        if irblock is None:
            break
        if irblock.loc_key == lbl_stop:
            break
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def run_block_at(

self, ircfg, addr, step=False)

Inheritance: SymbolicExecutionEngine.run_block_at

Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps

def run_block_at(self, ircfg, addr, step=False):
    """
    Symbolic execution of the block at @addr
    @addr: address to execute (int or ExprInt or label)
    @step: display intermediate steps
    """
    irblock = ircfg.get_block(addr)
    if irblock is not None:
        addr = self.eval_updt_irblock(irblock, step=step)
    return addr

def set_state(

self, state)

Inheritance: SymbolicExecutionEngine.set_state

Restaure the @state of the engine @state: StateEngine instance

def set_state(self, state):
    """Restaure the @state of the engine
    @state: StateEngine instance
    """
    self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp)
    for dst, src in dict(state).iteritems():
        self.symbols[dst] = src