miasm2.ir.symbexec module
import warnings import logging from collections import MutableMapping from miasm2.expression.expression import ExprOp, ExprId, ExprLoc, ExprInt, \ ExprMem, ExprCompose, ExprSlice, ExprCond from miasm2.expression.simplifications import expr_simp_explicit from miasm2.ir.ir import AssignBlock log = logging.getLogger("symbexec") console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) log.addHandler(console_handler) log.setLevel(logging.INFO) def get_block(ir_arch, ircfg, mdis, addr): """Get IRBlock at address @addr""" loc_key = ircfg.get_or_create_loc_key(addr) if not loc_key in ircfg.blocks: offset = mdis.loc_db.get_location_offset(loc_key) block = mdis.dis_block(offset) ir_arch.add_asmblock_to_ircfg(block, ircfg) irblock = ircfg.get_block(loc_key) if irblock is None: raise LookupError('No block found at that address: %s' % ir_arch.loc_db.pretty_str(loc_key)) return irblock class StateEngine(object): """Stores an Engine state""" def merge(self, other): """Generate a new state, representing the merge of self and @other @other: a StateEngine instance""" raise NotImplementedError("Abstract method") class SymbolicState(StateEngine): """Stores a SymbolicExecutionEngine state""" def __init__(self, dct): self._symbols = frozenset(dct.items()) def __hash__(self): return hash((self.__class__, self._symbols)) def __eq__(self, other): if self is other: return True if self.__class__ != other.__class__: return False return self.symbols == other.symbols def __ne__(self, other): return not self.__eq__(other) def __iter__(self): for dst, src in self._symbols: yield dst, src def iteritems(self): """Iterate on stored memory/values""" return self.__iter__() def merge(self, other): """Merge two symbolic states Only equal expressions are kept in both states @other: second symbolic state """ symb_a = self.symbols symb_b = other.symbols intersection = set(symb_a.keys()).intersection(symb_b.keys()) out = {} for dst in intersection: if symb_a[dst] == symb_b[dst]: out[dst] = symb_a[dst] return self.__class__(out) @property def symbols(self): """Return the dictionnary of known symbols""" return dict(self._symbols) INTERNAL_INTBASE_NAME = "__INTERNAL_INTBASE__" def get_expr_base_offset(expr): """Return a couple representing the symbolic/concrete part of an addition expression. If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used If there is not concrete part, 0 is used @expr: Expression instance """ if expr.is_int(): internal_intbase = ExprId(INTERNAL_INTBASE_NAME, expr.size) return internal_intbase, int(expr) if not expr.is_op('+'): return expr, 0 if expr.args[-1].is_int(): args, offset = expr.args[:-1], int(expr.args[-1]) if len(args) == 1: return args[0], offset return ExprOp('+', *args), offset return expr, 0 class MemArray(MutableMapping): """Link between base and its (offset, Expr) Given an expression (say *base*), this structure will store every memory content relatively to an integer offset from *base*. The value associated to a given offset is a description of the slice of a stored expression. The slice size depends on the configutation of the MemArray. For example, for a slice size of 8 bits, the affectation: - @32[EAX+0x10] = EBX will store for the base EAX: - 0x10: (EBX, 0) - 0x11: (EBX, 1) - 0x12: (EBX, 2) - 0x13: (EBX, 3) If the *base* is EAX+EBX, this structure can store the following contents: - @32[EAX+EBX] - @8[EAX+EBX+0x100] But not: - @32[EAX+0x10] (which is stored in another MemArray based on EAX) - @32[EAX+EBX+ECX] (which is stored in another MemArray based on EAX+EBX+ECX) """ def __init__(self, base, expr_simp=expr_simp_explicit): self._base = base self.expr_simp = expr_simp self._mask = int(base.mask) self._offset_to_expr = {} @property def base(self): """Expression representing the symbolic base address""" return self._base @property def mask(self): """Mask offset""" return self._mask def __getitem__(self, offset): assert 0 <= offset <= self._mask return self._offset_to_expr.__getitem__(offset) def __setitem__(self, offset, value): raise RuntimeError("Use write api to update keys") def __delitem__(self, offset): assert 0 <= offset <= self._mask return self._offset_to_expr.__delitem__(offset) def __iter__(self): for offset, _ in self._offset_to_expr.iteritems(): yield offset def __len__(self): return len(self._offset_to_expr) def __repr__(self): out = [] out.append("Base: %s" % self.base) for offset, (index, value) in sorted(self._offset_to_expr.iteritems()): out.append("%16X %d %s" % (offset, index, value)) return '\n'.join(out) def copy(self): """Copy object instance""" obj = MemArray(self.base, self.expr_simp) obj._offset_to_expr = self._offset_to_expr.copy() return obj @staticmethod def offset_to_ptr(base, offset): """ Return an expression representing the @base + @offset @base: symbolic base address @offset: relative offset integer to the @base address """ if base.is_id(INTERNAL_INTBASE_NAME): ptr = ExprInt(offset, base.size) elif offset == 0: ptr = base else: ptr = base + ExprInt(offset, base.size) return ptr.canonize() def read(self, offset, size): """ Return memory at @offset with @size as an Expr list @offset: integer (in bytes) @size: integer (in bits), byte aligned Consider the following state: - 0x10: (EBX, 0) - 0x11: (EBX, 1) - 0x12: (EBX, 2) - 0x13: (EBX, 3) A read at 0x10 of 32 bits should return: EBX """ assert size % 8 == 0 # Parts is (Expr's offset, size, Expr) parts = [] for index in xrange(size / 8): # Wrap read: # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2 request_offset = (offset + index) & self._mask if request_offset in self._offset_to_expr: # Known memory portion off, data = self._offset_to_expr[request_offset] parts.append((off, 1, data)) continue # Unknown memory portion ptr = self.offset_to_ptr(self.base, request_offset) data = ExprMem(ptr, 8) parts.append((0, 1, data)) # Group similar datas # XXX TODO: only little endian here index = 0 while index + 1 < len(parts): off_a, size_a, data_a = parts[index] off_b, size_b, data_b = parts[index+1] if data_a == data_b and off_a + size_a == off_b: # Read consecutive bytes of a variable # [(0, 8, x), (1, 8, x)] => (0, 16, x) parts[index:index+2] = [(off_a, size_a + size_b, data_a)] continue if data_a.is_int() and data_b.is_int(): # Read integer parts # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744) int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8]) int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8]) assert int1.is_int() and int2.is_int() int1, int2 = int(int1), int(int2) result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8) parts[index:index+2] = [(0, size_a + size_b, result)] continue if data_a.is_mem() and data_b.is_mem(): # Read consecutive bytes of a memory variable ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.arg) ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.arg) if ptr_base_a != ptr_base_b: index += 1 continue if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask: assert size_a <= data_a.size / 8 - off_a assert size_b <= data_b.size / 8 - off_b # Successive comparable symbolic pointers # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr]) ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask) data = ExprMem(ptr, (size_a + size_b) * 8) parts[index:index+2] = [(0, size_a + size_b, data)] continue index += 1 # Slice datas read_mem = [] for off, bytesize, data in parts: if data.size / 8 != bytesize: data = data[off * 8: (off + bytesize) * 8] read_mem.append(data) return read_mem def write(self, offset, expr): """ Write @expr at @offset @offset: integer (in bytes) @expr: Expr instance value """ assert expr.size % 8 == 0 assert offset <= self._mask for index in xrange(expr.size / 8): # Wrap write: # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2 request_offset = (offset + index) & self._mask # XXX TODO: only little endian here self._offset_to_expr[request_offset] = (index, expr) tmp = self.expr_simp(expr[index * 8: (index + 1) * 8]) # Special case: Simplify slice of pointer (simplification is ok # here, as we won't store the simplified expression) if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0: new_ptr = self.expr_simp(tmp.arg.arg + ExprInt(tmp.start / 8, tmp.arg.arg.size)) tmp = ExprMem(new_ptr, tmp.stop - tmp.start) # Test if write to original value if tmp.is_mem(): src_ptr, src_off = get_expr_base_offset(tmp.arg) if src_ptr == self.base and src_off == request_offset: del self._offset_to_expr[request_offset] def _get_variable_parts(self, index, known_offsets, forward=True): """ Find consecutive memory parts representing the same variable. The part starts at offset known_offsets[@index] and search is in offset direction determined by @forward Return the number of consecutive parts of the same variable. @index: index of the memory offset in known_offsets @known_offsets: sorted offsets @forward: Search in offset growing direction if True, else in reverse order """ offset = known_offsets[index] value_byte_index, value = self._offset_to_expr[offset] assert value.size % 8 == 0 if forward: start, end, step = value_byte_index + 1, value.size / 8, 1 else: start, end, step = value_byte_index - 1, -1, -1 partnum = 1 for value_offset in xrange(start, end, step): offset += step # Check if next part is in known_offsets next_index = index + step * partnum if not 0 <= next_index < len(known_offsets): break offset_next = known_offsets[next_index] if offset_next != offset: break # Check if next part is a part of the searched value byte_index, value_next = self._offset_to_expr[offset_next] if byte_index != value_offset: break if value != value_next: break partnum += 1 return partnum def _build_value_at_offset(self, value, offset, start, length): """ Return a couple. The first element is the memory Expression representing the value at @offset, the second is its value. The value is truncated at byte @start with @length @value: Expression to truncate @offset: offset in bytes of the variable (integer) @start: value's byte offset (integer) @length: length in bytes (integer) """ ptr = self.offset_to_ptr(self.base, offset) size = length * 8 if start == 0 and size == value.size: result = value else: result = self.expr_simp(value[start * 8: start * 8 + size]) return ExprMem(ptr, size), result def memory(self): """ Iterate on stored memory/values The goal here is to group entities. Consider the following state: EAX + 0x10 = (0, EDX) EAX + 0x11 = (1, EDX) EAX + 0x12 = (2, EDX) EAX + 0x13 = (3, EDX) The function should return: @32[EAX + 0x10] = EDX """ if not self._offset_to_expr: raise StopIteration known_offsets = self._offset_to_expr.keys() known_offsets.sort() index = 0 # Test if the first element is the continuation of the last byte. If # yes, merge and output it first. min_int = 0 max_int = (1 << self.base.size) - 1 limit_index = len(known_offsets) first_element = None # Special case where a variable spreads on max_int/min_int if known_offsets[0] == min_int and known_offsets[-1] == max_int: min_offset, max_offset = known_offsets[0], known_offsets[-1] min_byte_index, min_value = self._offset_to_expr[min_offset] max_byte_index, max_value = self._offset_to_expr[max_offset] if min_value == max_value and max_byte_index + 1 == min_byte_index: # Look for current variable start partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False) # Look for current variable end partnum_after = self._get_variable_parts(0, known_offsets) partnum = partnum_before + partnum_after offset = known_offsets[-partnum_before] index_value, value = self._offset_to_expr[offset] mem, result = self._build_value_at_offset(value, offset, index_value, partnum) first_element = mem, result index = partnum_after limit_index = len(known_offsets) - partnum_before # Special cases are done, walk and merge variables while index < limit_index: offset = known_offsets[index] index_value, value = self._offset_to_expr[offset] partnum = self._get_variable_parts(index, known_offsets) mem, result = self._build_value_at_offset(value, offset, index_value, partnum) yield mem, result index += partnum if first_element is not None: yield first_element def dump(self): """Display MemArray content""" for mem, value in self.memory(): print "%s = %s" % (mem, value) class MemSparse(object): """Link a symbolic memory pointer to its MemArray. For each symbolic memory object, this object will extract the memory pointer *ptr*. It then splits *ptr* into a symbolic and an integer part. For example, the memory @[ESP+4] will give ESP+4 for *ptr*. *ptr* is then split into its base ESP and its offset 4. Each symbolic base address uses a different MemArray. Example: - @32[EAX+EBX] - @8[EAX+EBX+0x100] Will be stored in the same MemArray with a EAX+EBX base """ def __init__(self, addrsize, expr_simp=expr_simp_explicit): """ @addrsize: size (in bits) of the addresses manipulated by the MemSparse @expr_simp: an ExpressionSimplifier instance """ self.addrsize = addrsize self.expr_simp = expr_simp self.base_to_memarray = {} def __contains__(self, expr): """ Return True if the whole @expr is present For partial check, use 'contains_partial' """ if not expr.is_mem(): return False ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: return False for i in xrange(expr.size / 8): if offset + i not in memarray: return False return True def contains_partial(self, expr): """ Return True if a part of @expr is present in memory """ if not expr.is_mem(): return False ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: return False for i in xrange(expr.size / 8): if offset + i in memarray: return True return False def clear(self): """Reset the current object content""" self.base_to_memarray.clear() def copy(self): """Copy the current object instance""" base_to_memarray = {} for base, memarray in self.base_to_memarray.iteritems(): base_to_memarray[base] = memarray.copy() obj = MemSparse(self.addrsize, self.expr_simp) obj.base_to_memarray = base_to_memarray return obj def __delitem__(self, expr): """ Delete a value @expr *fully* present in memory For partial delete, use delete_partial """ ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: raise KeyError # Check if whole entity is in the MemArray before deleting it for i in xrange(expr.size / 8): if (offset + i) & memarray.mask not in memarray: raise KeyError for i in xrange(expr.size / 8): del memarray[(offset + i) & memarray.mask] def delete_partial(self, expr): """ Delete @expr from memory. Skip parts of @expr which are not present in memory. """ ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: raise KeyError # Check if whole entity is in the MemArray before deleting it for i in xrange(expr.size / 8): real_offset = (offset + i) & memarray.mask if real_offset in memarray: del memarray[real_offset] def read(self, ptr, size): """ Return the value associated with the Expr at address @ptr @ptr: Expr representing the memory address @size: memory size (in bits), byte aligned """ assert size % 8 == 0 base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is not None: mems = memarray.read(offset, size) ret = ExprCompose(*mems) else: ret = ExprMem(ptr, size) return ret def write(self, ptr, expr): """ Update the corresponding Expr @expr at address @ptr @ptr: Expr representing the memory address @expr: Expr instance """ assert ptr.size == self.addrsize base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: memarray = MemArray(base, self.expr_simp) self.base_to_memarray[base] = memarray memarray.write(offset, expr) def iteritems(self): """Iterate on stored memory variables and their values.""" for _, memarray in sorted(self.base_to_memarray.iteritems()): for mem, value in memarray.memory(): yield mem, value def items(self): """Return stored memory variables and their values.""" return list(self.iteritems()) def dump(self): """Display MemSparse content""" for mem, value in self.iteritems(): print "%s = %s" % (mem, value) def __repr__(self): out = [] for _, memarray in sorted(self.base_to_memarray.iteritems()): out.append(repr(memarray)) return '\n'.join(out) class SymbolMngr(object): """Symbolic store manager (IDs and MEMs)""" def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit): assert addrsize is not None if init is None: init = {} self.addrsize = addrsize self.expr_simp = expr_simp self.symbols_id = {} self.symbols_mem = MemSparse(addrsize, expr_simp) self.mask = (1 << addrsize) - 1 for expr, value in init.iteritems(): self.write(expr, value) def __contains__(self, expr): if expr.is_id(): return self.symbols_id.__contains__(expr) if expr.is_mem(): return self.symbols_mem.__contains__(expr) return False def __getitem__(self, expr): return self.read(expr) def __setitem__(self, expr, value): self.write(expr, value) def __delitem__(self, expr): if expr.is_id(): del self.symbols_id[expr] elif expr.is_mem(): del self.symbols_mem[expr] else: raise TypeError("Bad source expr") def copy(self): """Copy object instance""" obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp) return obj def clear(self): """Forget every variables values""" self.symbols_id.clear() self.symbols_mem.clear() def read(self, src): """ Return the value corresponding to Expr @src @src: ExprId or ExprMem instance """ if src.is_id(): return self.symbols_id.get(src, src) elif src.is_mem(): # Only byte aligned accesses are supported for now assert src.size % 8 == 0 return self.symbols_mem.read(src.arg, src.size) else: raise TypeError("Bad source expr") def write(self, dst, src): """ Update @dst with @src expression @dst: ExprId or ExprMem instance @src: Expression instance """ assert dst.size == src.size if dst.is_id(): if dst == src: if dst in self.symbols_id: del self.symbols_id[dst] else: self.symbols_id[dst] = src elif dst.is_mem(): # Only byte aligned accesses are supported for now assert dst.size % 8 == 0 self.symbols_mem.write(dst.arg, src) else: raise TypeError("Bad destination expr") def dump(self, ids=True, mems=True): """Display memory content""" if ids: for variable, value in self.ids(): print '%s = %s' % (variable, value) if mems: for mem, value in self.memory(): print '%s = %s' % (mem, value) def __repr__(self): out = [] for variable, value in self.iteritems(): out.append('%s = %s' % (variable, value)) return "\n".join(out) def iteritems(self): """ExprId/ExprMem iteritems of the current state""" for variable, value in self.ids(): yield variable, value for variable, value in self.memory(): yield variable, value def items(self): """Return variables/values of the current state""" return list(self.iteritems()) def __iter__(self): for expr, _ in self.iteritems(): yield expr def ids(self): """Iterate on variables and their values.""" for expr, value in self.symbols_id.iteritems(): yield expr, value def memory(self): """Iterate on memory variables and their values.""" for mem, value in self.symbols_mem.iteritems(): yield mem, value def keys(self): """Variables of the current state""" return list(self) def get(self, expr, default=None): """Deprecated version of read""" warnings.warn('DEPRECATION WARNING: use "read(self, expr)" instead of get') ret = self.read(expr) if default is not None and ret == expr: return default return ret def merge_ptr_read(known, ptrs): """ Merge common memory parts in a multiple byte memory. @ptrs: memory bytes list @known: ptrs' associated boolean for present/unpresent memory part in the store """ assert known out = [] known.append(None) ptrs.append(None) last, value, size = known[0], ptrs[0], 8 for index, part in enumerate(known[1:], 1): if part == last: size += 8 else: out.append((last, value, size)) last, value, size = part, ptrs[index], 8 return out class SymbolicExecutionEngine(object): """ Symbolic execution engine Allow IR code emulation in symbolic domain Examples: from miasm2.ir.symbexec import SymbolicExecutionEngine from miasm2.ir.ir import AssignBlock ir_arch = ir_x86_32() init_state = { ir_arch.arch.regs.EAX: ir_arch.arch.regs.EBX, ExprMem(id_x+ExprInt(0x10, 32), 32): id_a, } sb_exec = SymbolicExecutionEngine(ir_arch, init_state) >>> sb_exec.dump() EAX = a @32[x + 0x10] = a >>> sb_exec.dump(mems=False) EAX = a >>> print sb_exec.eval_expr(ir_arch.arch.regs.EAX + ir_arch.arch.regs.ECX) EBX + ECX Inspecting state: - dump - modified State manipulation: - '.state' (rw) Evaluation (read only): - eval_expr - eval_assignblk Evaluation with state update: - eval_updt_expr - eval_updt_assignblk - eval_updt_irblock Start a symbolic execution based on provisioned '.ir_arch' blocks: - run_block_at - run_at """ StateEngine = SymbolicState def __init__(self, ir_arch, state=None, func_read=None, func_write=None, sb_expr_simp=expr_simp_explicit): self.expr_to_visitor = { ExprInt: self.eval_exprint, ExprId: self.eval_exprid, ExprLoc: self.eval_exprloc, ExprMem: self.eval_exprmem, ExprSlice: self.eval_exprslice, ExprCond: self.eval_exprcond, ExprOp: self.eval_exprop, ExprCompose: self.eval_exprcompose, } if state is None: state = {} self.symbols = SymbolMngr(addrsize=ir_arch.addrsize, expr_simp=sb_expr_simp) for dst, src in state.iteritems(): self.symbols.write(dst, src) if func_read: warnings.warn('DEPRECATION WARNING: override function "mem_read(self, expr)" instead of func_read') if func_write: warnings.warn('DEPRECATION WARNING: override function "mem_write(self, dsr, src)" instead of func_write') self.func_read = func_read self.func_write = func_write self.ir_arch = ir_arch self.expr_simp = sb_expr_simp def get_state(self): """Return the current state of the SymbolicEngine""" state = self.StateEngine(dict(self.symbols)) return state def set_state(self, state): """Restaure the @state of the engine @state: StateEngine instance """ self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp) for dst, src in dict(state).iteritems(): self.symbols[dst] = src state = property(get_state, set_state) def eval_expr_visitor(self, expr, cache=None): """ [DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression. """ if cache is None: cache = {} ret = cache.get(expr, None) if ret is not None: return ret new_expr = self.expr_simp(expr) ret = cache.get(new_expr, None) if ret is not None: return ret func = self.expr_to_visitor.get(new_expr.__class__, None) if func is None: raise TypeError("Unknown expr type") ret = func(new_expr, cache=cache) ret = self.expr_simp(ret) assert ret is not None cache[expr] = ret cache[new_expr] = ret return ret def eval_exprint(self, expr, **kwargs): """[DEV]: Evaluate an ExprInt using the current state""" return expr def eval_exprid(self, expr, **kwargs): """[DEV]: Evaluate an ExprId using the current state""" ret = self.symbols.read(expr) return ret def eval_exprloc(self, expr, **kwargs): """[DEV]: Evaluate an ExprLoc using the current state""" offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key) if offset is not None: ret = ExprInt(offset, expr.size) else: ret = expr return ret def eval_exprmem(self, expr, **kwargs): """[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses """ ptr = self.eval_expr_visitor(expr.arg, **kwargs) mem = ExprMem(ptr, expr.size) ret = self.mem_read(mem) return ret def eval_exprcond(self, expr, **kwargs): """[DEV]: Evaluate an ExprCond using the current state""" cond = self.eval_expr_visitor(expr.cond, **kwargs) src1 = self.eval_expr_visitor(expr.src1, **kwargs) src2 = self.eval_expr_visitor(expr.src2, **kwargs) ret = ExprCond(cond, src1, src2) return ret def eval_exprslice(self, expr, **kwargs): """[DEV]: Evaluate an ExprSlice using the current state""" arg = self.eval_expr_visitor(expr.arg, **kwargs) ret = ExprSlice(arg, expr.start, expr.stop) return ret def eval_exprop(self, expr, **kwargs): """[DEV]: Evaluate an ExprOp using the current state""" args = [] for oarg in expr.args: arg = self.eval_expr_visitor(oarg, **kwargs) args.append(arg) ret = ExprOp(expr.op, *args) return ret def eval_exprcompose(self, expr, **kwargs): """[DEV]: Evaluate an ExprCompose using the current state""" args = [] for arg in expr.args: args.append(self.eval_expr_visitor(arg, **kwargs)) ret = ExprCompose(*args) return ret def eval_expr(self, expr, eval_cache=None): """ Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values """ if eval_cache is None: eval_cache = {} ret = self.eval_expr_visitor(expr, cache=eval_cache) assert ret is not None return ret def modified(self, init_state=None, ids=True, mems=True): """ Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only """ if init_state is None: init_state = {} if ids: for variable, value in self.symbols.symbols_id.iteritems(): if variable in init_state and init_state[variable] == value: continue yield variable, value if mems: for mem, value in self.symbols.memory(): if mem in init_state and init_state[mem] == value: continue yield mem, value def dump(self, ids=True, mems=True): """ Display modififed variables @ids: display modified ids @mems: display modified memory """ for variable, value in self.modified(None, ids, mems): print "%-18s" % variable, "=", "%s" % value def eval_assignblk(self, assignblk): """ Evaluate AssignBlock using the current state Returns a dictionary containing modified keys associated to their values @assignblk: AssignBlock instance """ pool_out = {} eval_cache = {} for dst, src in assignblk.iteritems(): src = self.eval_expr(src, eval_cache) if dst.is_mem(): ptr = self.eval_expr(dst.arg, eval_cache) # Test if mem lookup is known tmp = ExprMem(ptr, dst.size) pool_out[tmp] = src elif dst.is_id(): pool_out[dst] = src else: raise ValueError("Unknown destination type", str(dst)) return pool_out def apply_change(self, dst, src): """ Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source """ if dst.is_mem(): self.mem_write(dst, src) else: self.symbols.write(dst, src) def eval_updt_assignblk(self, assignblk): """ Apply an AssignBlock on the current state @assignblk: AssignBlock instance """ mem_dst = [] dst_src = self.eval_assignblk(assignblk) for dst, src in dst_src.iteritems(): self.apply_change(dst, src) if dst.is_mem(): mem_dst.append(dst) return mem_dst def eval_updt_irblock(self, irb, step=False): """ Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps """ for assignblk in irb: if step: print 'Instr', assignblk.instr print 'Assignblk:' print assignblk print '_' * 80 self.eval_updt_assignblk(assignblk) if step: self.dump(mems=False) self.dump(ids=False) print '_' * 80 dst = self.eval_expr(self.ir_arch.IRDst) return dst def run_block_at(self, ircfg, addr, step=False): """ Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps """ irblock = ircfg.get_block(addr) if irblock is not None: addr = self.eval_updt_irblock(irblock, step=step) return addr def run_at(self, ircfg, addr, lbl_stop=None, step=False): """ Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps """ while True: irblock = ircfg.get_block(addr) if irblock is None: break if irblock.loc_key == lbl_stop: break addr = self.eval_updt_irblock(irblock, step=step) return addr def del_mem_above_stack(self, stack_ptr): """ Remove all stored memory values with following properties: * pointer based on initial stack value * pointer below current stack pointer """ stack_ptr = self.eval_expr(stack_ptr) base, stk_offset = get_expr_base_offset(stack_ptr) memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) if memarray: to_del = set() for offset in memarray: if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: to_del.add(offset) for offset in to_del: del memarray[offset] def eval_updt_expr(self, expr): """ Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value """ # Update value if needed if expr.is_aff(): ret = self.eval_expr(expr.src) self.eval_updt_assignblk(AssignBlock([expr])) else: ret = self.eval_expr(expr) return ret def _resolve_mem_parts(self, expr): """For a given ExprMem @expr, get known/unknown parts from the store. @expr: ExprMem instance Return a list of (known, value) where known is a bool representing if the value has been resolved from the store or not. """ # Extract known parts in symbols assert expr.size % 8 == 0 ptr = expr.arg known = [] ptrs = [] for index in xrange(expr.size / 8): offset = self.expr_simp(ptr + ExprInt(index, ptr.size)) ptrs.append(offset) mem = ExprMem(offset, 8) known.append(mem in self.symbols) reads = merge_ptr_read(known, ptrs) out = [] for is_known, ptr_value, size in reads: mem = ExprMem(ptr_value, size) if is_known: mem = self.symbols.read(mem) out.append((is_known, mem)) return out def mem_read(self, expr): """ [DEV]: Override to modify the effective memory reads Read symbolic value at ExprMem @expr @expr: ExprMem """ parts = self._resolve_mem_parts(expr) out = [] for known, part in parts: if not known and part.is_mem() and self.func_read is not None: ret = self.func_read(part) else: ret = part out.append(ret) ret = self.expr_simp(ExprCompose(*out)) assert ret.size == expr.size return ret def mem_write(self, dst, src): """ [DEV]: Override to modify the effective memory writes Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression """ if self.func_write is not None: self.func_write(self, dst, src) else: self.symbols.write(dst, src) # Deprecated methods def apply_expr_on_state(self, expr, cache): """Deprecated version of eval_expr""" warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state') if cache is None: cache = {} ret = self.eval_expr(expr, eval_cache=cache) return ret def modified_mems(self, init_state=None): """Deprecated version of modified(ids=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems') for mem in self.modified(init_state=init_state, ids=False): yield mem def modified_regs(self, init_state=None): """Deprecated version of modified(mems=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs') for reg in self.modified(init_state=init_state, mems=False): yield reg def dump_id(self): """Deprecated version of dump(mems=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id') self.dump(mems=False) def dump_mem(self): """Deprecated version of dump(ids=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem') self.dump(ids=False) def eval_ir_expr(self, assignblk): """Deprecated version of eval_ir_expr(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr') return self.eval_assignblk(assignblk).iteritems() def eval_ir(self, assignblk): """Deprecated version of eval_updt_assignblk(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir') return self.eval_updt_assignblk(assignblk) def emulbloc(self, irb, step=False): """Deprecated version of eval_updt_irblock(self, irb, step=False)""" warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc') return self.eval_updt_irblock(irb, step) def emul_ir_bloc(self, _, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc') return self.run_block_at(addr, step) def emul_ir_block(self, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block') return self.run_block_at(addr, step) def emul_ir_blocks(self, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks') return self.run_at(addr, lbl_stop, step) def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs') return self.run_at(addr, lbl_stop, step) def apply_expr(self, expr): """Deprecated version of eval_updt_expr""" warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr') return self.eval_updt_expr(expr) def as_assignblock(self): """Return the current state as an AssignBlock""" warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock') out = [] for dst, src in self.modified(ids=True, mems=True): out.append((dst, src)) return AssignBlock(dict(out)) class symbexec(SymbolicExecutionEngine): """ DEPRECATED object Use SymbolicExecutionEngine instead of symbexec """ def __init__(self, ir_arch, known_symbols, func_read=None, func_write=None, sb_expr_simp=expr_simp_explicit): warnings.warn("Deprecated API: use SymbolicExecutionEngine") super(symbexec, self).__init__(ir_arch, known_symbols, func_read, func_write, sb_expr_simp=sb_expr_simp)
Module variables
var INTERNAL_INTBASE_NAME
var console_handler
var log
Functions
def get_block(
ir_arch, ircfg, mdis, addr)
Get IRBlock at address @addr
def get_block(ir_arch, ircfg, mdis, addr): """Get IRBlock at address @addr""" loc_key = ircfg.get_or_create_loc_key(addr) if not loc_key in ircfg.blocks: offset = mdis.loc_db.get_location_offset(loc_key) block = mdis.dis_block(offset) ir_arch.add_asmblock_to_ircfg(block, ircfg) irblock = ircfg.get_block(loc_key) if irblock is None: raise LookupError('No block found at that address: %s' % ir_arch.loc_db.pretty_str(loc_key)) return irblock
def get_expr_base_offset(
expr)
Return a couple representing the symbolic/concrete part of an addition expression.
If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used If there is not concrete part, 0 is used @expr: Expression instance
def get_expr_base_offset(expr): """Return a couple representing the symbolic/concrete part of an addition expression. If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used If there is not concrete part, 0 is used @expr: Expression instance """ if expr.is_int(): internal_intbase = ExprId(INTERNAL_INTBASE_NAME, expr.size) return internal_intbase, int(expr) if not expr.is_op('+'): return expr, 0 if expr.args[-1].is_int(): args, offset = expr.args[:-1], int(expr.args[-1]) if len(args) == 1: return args[0], offset return ExprOp('+', *args), offset return expr, 0
def merge_ptr_read(
known, ptrs)
Merge common memory parts in a multiple byte memory. @ptrs: memory bytes list @known: ptrs' associated boolean for present/unpresent memory part in the store
def merge_ptr_read(known, ptrs): """ Merge common memory parts in a multiple byte memory. @ptrs: memory bytes list @known: ptrs' associated boolean for present/unpresent memory part in the store """ assert known out = [] known.append(None) ptrs.append(None) last, value, size = known[0], ptrs[0], 8 for index, part in enumerate(known[1:], 1): if part == last: size += 8 else: out.append((last, value, size)) last, value, size = part, ptrs[index], 8 return out
Classes
class MemArray
Link between base and its (offset, Expr)
Given an expression (say base), this structure will store every memory content relatively to an integer offset from base.
The value associated to a given offset is a description of the slice of a stored expression. The slice size depends on the configutation of the MemArray. For example, for a slice size of 8 bits, the affectation: - @32[EAX+0x10] = EBX
will store for the base EAX: - 0x10: (EBX, 0) - 0x11: (EBX, 1) - 0x12: (EBX, 2) - 0x13: (EBX, 3)
If the base is EAX+EBX, this structure can store the following contents: - @32[EAX+EBX] - @8[EAX+EBX+0x100] But not: - @32[EAX+0x10] (which is stored in another MemArray based on EAX) - @32[EAX+EBX+ECX] (which is stored in another MemArray based on EAX+EBX+ECX)
class MemArray(MutableMapping): """Link between base and its (offset, Expr) Given an expression (say *base*), this structure will store every memory content relatively to an integer offset from *base*. The value associated to a given offset is a description of the slice of a stored expression. The slice size depends on the configutation of the MemArray. For example, for a slice size of 8 bits, the affectation: - @32[EAX+0x10] = EBX will store for the base EAX: - 0x10: (EBX, 0) - 0x11: (EBX, 1) - 0x12: (EBX, 2) - 0x13: (EBX, 3) If the *base* is EAX+EBX, this structure can store the following contents: - @32[EAX+EBX] - @8[EAX+EBX+0x100] But not: - @32[EAX+0x10] (which is stored in another MemArray based on EAX) - @32[EAX+EBX+ECX] (which is stored in another MemArray based on EAX+EBX+ECX) """ def __init__(self, base, expr_simp=expr_simp_explicit): self._base = base self.expr_simp = expr_simp self._mask = int(base.mask) self._offset_to_expr = {} @property def base(self): """Expression representing the symbolic base address""" return self._base @property def mask(self): """Mask offset""" return self._mask def __getitem__(self, offset): assert 0 <= offset <= self._mask return self._offset_to_expr.__getitem__(offset) def __setitem__(self, offset, value): raise RuntimeError("Use write api to update keys") def __delitem__(self, offset): assert 0 <= offset <= self._mask return self._offset_to_expr.__delitem__(offset) def __iter__(self): for offset, _ in self._offset_to_expr.iteritems(): yield offset def __len__(self): return len(self._offset_to_expr) def __repr__(self): out = [] out.append("Base: %s" % self.base) for offset, (index, value) in sorted(self._offset_to_expr.iteritems()): out.append("%16X %d %s" % (offset, index, value)) return '\n'.join(out) def copy(self): """Copy object instance""" obj = MemArray(self.base, self.expr_simp) obj._offset_to_expr = self._offset_to_expr.copy() return obj @staticmethod def offset_to_ptr(base, offset): """ Return an expression representing the @base + @offset @base: symbolic base address @offset: relative offset integer to the @base address """ if base.is_id(INTERNAL_INTBASE_NAME): ptr = ExprInt(offset, base.size) elif offset == 0: ptr = base else: ptr = base + ExprInt(offset, base.size) return ptr.canonize() def read(self, offset, size): """ Return memory at @offset with @size as an Expr list @offset: integer (in bytes) @size: integer (in bits), byte aligned Consider the following state: - 0x10: (EBX, 0) - 0x11: (EBX, 1) - 0x12: (EBX, 2) - 0x13: (EBX, 3) A read at 0x10 of 32 bits should return: EBX """ assert size % 8 == 0 # Parts is (Expr's offset, size, Expr) parts = [] for index in xrange(size / 8): # Wrap read: # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2 request_offset = (offset + index) & self._mask if request_offset in self._offset_to_expr: # Known memory portion off, data = self._offset_to_expr[request_offset] parts.append((off, 1, data)) continue # Unknown memory portion ptr = self.offset_to_ptr(self.base, request_offset) data = ExprMem(ptr, 8) parts.append((0, 1, data)) # Group similar datas # XXX TODO: only little endian here index = 0 while index + 1 < len(parts): off_a, size_a, data_a = parts[index] off_b, size_b, data_b = parts[index+1] if data_a == data_b and off_a + size_a == off_b: # Read consecutive bytes of a variable # [(0, 8, x), (1, 8, x)] => (0, 16, x) parts[index:index+2] = [(off_a, size_a + size_b, data_a)] continue if data_a.is_int() and data_b.is_int(): # Read integer parts # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744) int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8]) int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8]) assert int1.is_int() and int2.is_int() int1, int2 = int(int1), int(int2) result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8) parts[index:index+2] = [(0, size_a + size_b, result)] continue if data_a.is_mem() and data_b.is_mem(): # Read consecutive bytes of a memory variable ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.arg) ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.arg) if ptr_base_a != ptr_base_b: index += 1 continue if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask: assert size_a <= data_a.size / 8 - off_a assert size_b <= data_b.size / 8 - off_b # Successive comparable symbolic pointers # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr]) ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask) data = ExprMem(ptr, (size_a + size_b) * 8) parts[index:index+2] = [(0, size_a + size_b, data)] continue index += 1 # Slice datas read_mem = [] for off, bytesize, data in parts: if data.size / 8 != bytesize: data = data[off * 8: (off + bytesize) * 8] read_mem.append(data) return read_mem def write(self, offset, expr): """ Write @expr at @offset @offset: integer (in bytes) @expr: Expr instance value """ assert expr.size % 8 == 0 assert offset <= self._mask for index in xrange(expr.size / 8): # Wrap write: # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2 request_offset = (offset + index) & self._mask # XXX TODO: only little endian here self._offset_to_expr[request_offset] = (index, expr) tmp = self.expr_simp(expr[index * 8: (index + 1) * 8]) # Special case: Simplify slice of pointer (simplification is ok # here, as we won't store the simplified expression) if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0: new_ptr = self.expr_simp(tmp.arg.arg + ExprInt(tmp.start / 8, tmp.arg.arg.size)) tmp = ExprMem(new_ptr, tmp.stop - tmp.start) # Test if write to original value if tmp.is_mem(): src_ptr, src_off = get_expr_base_offset(tmp.arg) if src_ptr == self.base and src_off == request_offset: del self._offset_to_expr[request_offset] def _get_variable_parts(self, index, known_offsets, forward=True): """ Find consecutive memory parts representing the same variable. The part starts at offset known_offsets[@index] and search is in offset direction determined by @forward Return the number of consecutive parts of the same variable. @index: index of the memory offset in known_offsets @known_offsets: sorted offsets @forward: Search in offset growing direction if True, else in reverse order """ offset = known_offsets[index] value_byte_index, value = self._offset_to_expr[offset] assert value.size % 8 == 0 if forward: start, end, step = value_byte_index + 1, value.size / 8, 1 else: start, end, step = value_byte_index - 1, -1, -1 partnum = 1 for value_offset in xrange(start, end, step): offset += step # Check if next part is in known_offsets next_index = index + step * partnum if not 0 <= next_index < len(known_offsets): break offset_next = known_offsets[next_index] if offset_next != offset: break # Check if next part is a part of the searched value byte_index, value_next = self._offset_to_expr[offset_next] if byte_index != value_offset: break if value != value_next: break partnum += 1 return partnum def _build_value_at_offset(self, value, offset, start, length): """ Return a couple. The first element is the memory Expression representing the value at @offset, the second is its value. The value is truncated at byte @start with @length @value: Expression to truncate @offset: offset in bytes of the variable (integer) @start: value's byte offset (integer) @length: length in bytes (integer) """ ptr = self.offset_to_ptr(self.base, offset) size = length * 8 if start == 0 and size == value.size: result = value else: result = self.expr_simp(value[start * 8: start * 8 + size]) return ExprMem(ptr, size), result def memory(self): """ Iterate on stored memory/values The goal here is to group entities. Consider the following state: EAX + 0x10 = (0, EDX) EAX + 0x11 = (1, EDX) EAX + 0x12 = (2, EDX) EAX + 0x13 = (3, EDX) The function should return: @32[EAX + 0x10] = EDX """ if not self._offset_to_expr: raise StopIteration known_offsets = self._offset_to_expr.keys() known_offsets.sort() index = 0 # Test if the first element is the continuation of the last byte. If # yes, merge and output it first. min_int = 0 max_int = (1 << self.base.size) - 1 limit_index = len(known_offsets) first_element = None # Special case where a variable spreads on max_int/min_int if known_offsets[0] == min_int and known_offsets[-1] == max_int: min_offset, max_offset = known_offsets[0], known_offsets[-1] min_byte_index, min_value = self._offset_to_expr[min_offset] max_byte_index, max_value = self._offset_to_expr[max_offset] if min_value == max_value and max_byte_index + 1 == min_byte_index: # Look for current variable start partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False) # Look for current variable end partnum_after = self._get_variable_parts(0, known_offsets) partnum = partnum_before + partnum_after offset = known_offsets[-partnum_before] index_value, value = self._offset_to_expr[offset] mem, result = self._build_value_at_offset(value, offset, index_value, partnum) first_element = mem, result index = partnum_after limit_index = len(known_offsets) - partnum_before # Special cases are done, walk and merge variables while index < limit_index: offset = known_offsets[index] index_value, value = self._offset_to_expr[offset] partnum = self._get_variable_parts(index, known_offsets) mem, result = self._build_value_at_offset(value, offset, index_value, partnum) yield mem, result index += partnum if first_element is not None: yield first_element def dump(self): """Display MemArray content""" for mem, value in self.memory(): print "%s = %s" % (mem, value)
Ancestors (in MRO)
- MemArray
- _abcoll.MutableMapping
- _abcoll.Mapping
- _abcoll.Sized
- _abcoll.Iterable
- _abcoll.Container
- __builtin__.object
Static methods
def offset_to_ptr(
base, offset)
Return an expression representing the @base + @offset @base: symbolic base address @offset: relative offset integer to the @base address
@staticmethod def offset_to_ptr(base, offset): """ Return an expression representing the @base + @offset @base: symbolic base address @offset: relative offset integer to the @base address """ if base.is_id(INTERNAL_INTBASE_NAME): ptr = ExprInt(offset, base.size) elif offset == 0: ptr = base else: ptr = base + ExprInt(offset, base.size) return ptr.canonize()
Instance variables
var base
Expression representing the symbolic base address
var expr_simp
var mask
Mask offset
Methods
def __init__(
self, base, expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)
def __init__(self, base, expr_simp=expr_simp_explicit): self._base = base self.expr_simp = expr_simp self._mask = int(base.mask) self._offset_to_expr = {}
def clear(
self)
D.clear() -> None. Remove all items from D.
def clear(self): 'D.clear() -> None. Remove all items from D.' try: while True: self.popitem() except KeyError: pass
def copy(
self)
Copy object instance
def copy(self): """Copy object instance""" obj = MemArray(self.base, self.expr_simp) obj._offset_to_expr = self._offset_to_expr.copy() return obj
def dump(
self)
Display MemArray content
def dump(self): """Display MemArray content""" for mem, value in self.memory(): print "%s = %s" % (mem, value)
def get(
self, key, default=None)
D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.
def get(self, key, default=None): 'D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.' try: return self[key] except KeyError: return default
def items(
self)
D.items() -> list of D's (key, value) pairs, as 2-tuples
def items(self): "D.items() -> list of D's (key, value) pairs, as 2-tuples" return [(key, self[key]) for key in self]
def iteritems(
self)
D.iteritems() -> an iterator over the (key, value) items of D
def iteritems(self): 'D.iteritems() -> an iterator over the (key, value) items of D' for key in self: yield (key, self[key])
def iterkeys(
self)
D.iterkeys() -> an iterator over the keys of D
def iterkeys(self): 'D.iterkeys() -> an iterator over the keys of D' return iter(self)
def itervalues(
self)
D.itervalues() -> an iterator over the values of D
def itervalues(self): 'D.itervalues() -> an iterator over the values of D' for key in self: yield self[key]
def keys(
self)
D.keys() -> list of D's keys
def keys(self): "D.keys() -> list of D's keys" return list(self)
def memory(
self)
Iterate on stored memory/values
The goal here is to group entities.
Consider the following state: EAX + 0x10 = (0, EDX) EAX + 0x11 = (1, EDX) EAX + 0x12 = (2, EDX) EAX + 0x13 = (3, EDX)
The function should return: @32[EAX + 0x10] = EDX
def memory(self): """ Iterate on stored memory/values The goal here is to group entities. Consider the following state: EAX + 0x10 = (0, EDX) EAX + 0x11 = (1, EDX) EAX + 0x12 = (2, EDX) EAX + 0x13 = (3, EDX) The function should return: @32[EAX + 0x10] = EDX """ if not self._offset_to_expr: raise StopIteration known_offsets = self._offset_to_expr.keys() known_offsets.sort() index = 0 # Test if the first element is the continuation of the last byte. If # yes, merge and output it first. min_int = 0 max_int = (1 << self.base.size) - 1 limit_index = len(known_offsets) first_element = None # Special case where a variable spreads on max_int/min_int if known_offsets[0] == min_int and known_offsets[-1] == max_int: min_offset, max_offset = known_offsets[0], known_offsets[-1] min_byte_index, min_value = self._offset_to_expr[min_offset] max_byte_index, max_value = self._offset_to_expr[max_offset] if min_value == max_value and max_byte_index + 1 == min_byte_index: # Look for current variable start partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False) # Look for current variable end partnum_after = self._get_variable_parts(0, known_offsets) partnum = partnum_before + partnum_after offset = known_offsets[-partnum_before] index_value, value = self._offset_to_expr[offset] mem, result = self._build_value_at_offset(value, offset, index_value, partnum) first_element = mem, result index = partnum_after limit_index = len(known_offsets) - partnum_before # Special cases are done, walk and merge variables while index < limit_index: offset = known_offsets[index] index_value, value = self._offset_to_expr[offset] partnum = self._get_variable_parts(index, known_offsets) mem, result = self._build_value_at_offset(value, offset, index_value, partnum) yield mem, result index += partnum if first_element is not None: yield first_element
def pop(
self, key, default=<object object at 0x7f19b494d030>)
D.pop(k[,d]) -> v, remove specified key and return the corresponding value. If key is not found, d is returned if given, otherwise KeyError is raised.
def pop(self, key, default=__marker): '''D.pop(k[,d]) -> v, remove specified key and return the corresponding value. If key is not found, d is returned if given, otherwise KeyError is raised. ''' try: value = self[key] except KeyError: if default is self.__marker: raise return default else: del self[key] return value
def popitem(
self)
D.popitem() -> (k, v), remove and return some (key, value) pair as a 2-tuple; but raise KeyError if D is empty.
def popitem(self): '''D.popitem() -> (k, v), remove and return some (key, value) pair as a 2-tuple; but raise KeyError if D is empty. ''' try: key = next(iter(self)) except StopIteration: raise KeyError value = self[key] del self[key] return key, value
def read(
self, offset, size)
Return memory at @offset with @size as an Expr list @offset: integer (in bytes) @size: integer (in bits), byte aligned
Consider the following state: - 0x10: (EBX, 0) - 0x11: (EBX, 1) - 0x12: (EBX, 2) - 0x13: (EBX, 3)
A read at 0x10 of 32 bits should return: EBX
def read(self, offset, size): """ Return memory at @offset with @size as an Expr list @offset: integer (in bytes) @size: integer (in bits), byte aligned Consider the following state: - 0x10: (EBX, 0) - 0x11: (EBX, 1) - 0x12: (EBX, 2) - 0x13: (EBX, 3) A read at 0x10 of 32 bits should return: EBX """ assert size % 8 == 0 # Parts is (Expr's offset, size, Expr) parts = [] for index in xrange(size / 8): # Wrap read: # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2 request_offset = (offset + index) & self._mask if request_offset in self._offset_to_expr: # Known memory portion off, data = self._offset_to_expr[request_offset] parts.append((off, 1, data)) continue # Unknown memory portion ptr = self.offset_to_ptr(self.base, request_offset) data = ExprMem(ptr, 8) parts.append((0, 1, data)) # Group similar datas # XXX TODO: only little endian here index = 0 while index + 1 < len(parts): off_a, size_a, data_a = parts[index] off_b, size_b, data_b = parts[index+1] if data_a == data_b and off_a + size_a == off_b: # Read consecutive bytes of a variable # [(0, 8, x), (1, 8, x)] => (0, 16, x) parts[index:index+2] = [(off_a, size_a + size_b, data_a)] continue if data_a.is_int() and data_b.is_int(): # Read integer parts # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744) int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8]) int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8]) assert int1.is_int() and int2.is_int() int1, int2 = int(int1), int(int2) result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8) parts[index:index+2] = [(0, size_a + size_b, result)] continue if data_a.is_mem() and data_b.is_mem(): # Read consecutive bytes of a memory variable ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.arg) ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.arg) if ptr_base_a != ptr_base_b: index += 1 continue if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask: assert size_a <= data_a.size / 8 - off_a assert size_b <= data_b.size / 8 - off_b # Successive comparable symbolic pointers # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr]) ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask) data = ExprMem(ptr, (size_a + size_b) * 8) parts[index:index+2] = [(0, size_a + size_b, data)] continue index += 1 # Slice datas read_mem = [] for off, bytesize, data in parts: if data.size / 8 != bytesize: data = data[off * 8: (off + bytesize) * 8] read_mem.append(data) return read_mem
def setdefault(
self, key, default=None)
D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D
def setdefault(self, key, default=None): 'D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D' try: return self[key] except KeyError: self[key] = default return default
def update(
*args, **kwds)
D.update([E, ]**F) -> None. Update D from mapping/iterable E and F. If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
def update(*args, **kwds): ''' D.update([E, ]**F) -> None. Update D from mapping/iterable E and F. If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v ''' if len(args) > 2: raise TypeError("update() takes at most 2 positional " "arguments ({} given)".format(len(args))) elif not args: raise TypeError("update() takes at least 1 argument (0 given)") self = args[0] other = args[1] if len(args) >= 2 else () if isinstance(other, Mapping): for key in other: self[key] = other[key] elif hasattr(other, "keys"): for key in other.keys(): self[key] = other[key] else: for key, value in other: self[key] = value for key, value in kwds.items(): self[key] = value
def values(
self)
D.values() -> list of D's values
def values(self): "D.values() -> list of D's values" return [self[key] for key in self]
def write(
self, offset, expr)
Write @expr at @offset @offset: integer (in bytes) @expr: Expr instance value
def write(self, offset, expr): """ Write @expr at @offset @offset: integer (in bytes) @expr: Expr instance value """ assert expr.size % 8 == 0 assert offset <= self._mask for index in xrange(expr.size / 8): # Wrap write: # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2 request_offset = (offset + index) & self._mask # XXX TODO: only little endian here self._offset_to_expr[request_offset] = (index, expr) tmp = self.expr_simp(expr[index * 8: (index + 1) * 8]) # Special case: Simplify slice of pointer (simplification is ok # here, as we won't store the simplified expression) if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0: new_ptr = self.expr_simp(tmp.arg.arg + ExprInt(tmp.start / 8, tmp.arg.arg.size)) tmp = ExprMem(new_ptr, tmp.stop - tmp.start) # Test if write to original value if tmp.is_mem(): src_ptr, src_off = get_expr_base_offset(tmp.arg) if src_ptr == self.base and src_off == request_offset: del self._offset_to_expr[request_offset]
class MemSparse
Link a symbolic memory pointer to its MemArray.
For each symbolic memory object, this object will extract the memory pointer ptr. It then splits ptr into a symbolic and an integer part. For example, the memory @[ESP+4] will give ESP+4 for ptr. ptr is then split into its base ESP and its offset 4. Each symbolic base address uses a different MemArray.
Example: - @32[EAX+EBX] - @8[EAX+EBX+0x100] Will be stored in the same MemArray with a EAX+EBX base
class MemSparse(object): """Link a symbolic memory pointer to its MemArray. For each symbolic memory object, this object will extract the memory pointer *ptr*. It then splits *ptr* into a symbolic and an integer part. For example, the memory @[ESP+4] will give ESP+4 for *ptr*. *ptr* is then split into its base ESP and its offset 4. Each symbolic base address uses a different MemArray. Example: - @32[EAX+EBX] - @8[EAX+EBX+0x100] Will be stored in the same MemArray with a EAX+EBX base """ def __init__(self, addrsize, expr_simp=expr_simp_explicit): """ @addrsize: size (in bits) of the addresses manipulated by the MemSparse @expr_simp: an ExpressionSimplifier instance """ self.addrsize = addrsize self.expr_simp = expr_simp self.base_to_memarray = {} def __contains__(self, expr): """ Return True if the whole @expr is present For partial check, use 'contains_partial' """ if not expr.is_mem(): return False ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: return False for i in xrange(expr.size / 8): if offset + i not in memarray: return False return True def contains_partial(self, expr): """ Return True if a part of @expr is present in memory """ if not expr.is_mem(): return False ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: return False for i in xrange(expr.size / 8): if offset + i in memarray: return True return False def clear(self): """Reset the current object content""" self.base_to_memarray.clear() def copy(self): """Copy the current object instance""" base_to_memarray = {} for base, memarray in self.base_to_memarray.iteritems(): base_to_memarray[base] = memarray.copy() obj = MemSparse(self.addrsize, self.expr_simp) obj.base_to_memarray = base_to_memarray return obj def __delitem__(self, expr): """ Delete a value @expr *fully* present in memory For partial delete, use delete_partial """ ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: raise KeyError # Check if whole entity is in the MemArray before deleting it for i in xrange(expr.size / 8): if (offset + i) & memarray.mask not in memarray: raise KeyError for i in xrange(expr.size / 8): del memarray[(offset + i) & memarray.mask] def delete_partial(self, expr): """ Delete @expr from memory. Skip parts of @expr which are not present in memory. """ ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: raise KeyError # Check if whole entity is in the MemArray before deleting it for i in xrange(expr.size / 8): real_offset = (offset + i) & memarray.mask if real_offset in memarray: del memarray[real_offset] def read(self, ptr, size): """ Return the value associated with the Expr at address @ptr @ptr: Expr representing the memory address @size: memory size (in bits), byte aligned """ assert size % 8 == 0 base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is not None: mems = memarray.read(offset, size) ret = ExprCompose(*mems) else: ret = ExprMem(ptr, size) return ret def write(self, ptr, expr): """ Update the corresponding Expr @expr at address @ptr @ptr: Expr representing the memory address @expr: Expr instance """ assert ptr.size == self.addrsize base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: memarray = MemArray(base, self.expr_simp) self.base_to_memarray[base] = memarray memarray.write(offset, expr) def iteritems(self): """Iterate on stored memory variables and their values.""" for _, memarray in sorted(self.base_to_memarray.iteritems()): for mem, value in memarray.memory(): yield mem, value def items(self): """Return stored memory variables and their values.""" return list(self.iteritems()) def dump(self): """Display MemSparse content""" for mem, value in self.iteritems(): print "%s = %s" % (mem, value) def __repr__(self): out = [] for _, memarray in sorted(self.base_to_memarray.iteritems()): out.append(repr(memarray)) return '\n'.join(out)
Ancestors (in MRO)
- MemSparse
- __builtin__.object
Instance variables
var addrsize
var base_to_memarray
var expr_simp
Methods
def __init__(
self, addrsize, expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)
@addrsize: size (in bits) of the addresses manipulated by the MemSparse @expr_simp: an ExpressionSimplifier instance
def __init__(self, addrsize, expr_simp=expr_simp_explicit): """ @addrsize: size (in bits) of the addresses manipulated by the MemSparse @expr_simp: an ExpressionSimplifier instance """ self.addrsize = addrsize self.expr_simp = expr_simp self.base_to_memarray = {}
def clear(
self)
Reset the current object content
def clear(self): """Reset the current object content""" self.base_to_memarray.clear()
def contains_partial(
self, expr)
Return True if a part of @expr is present in memory
def contains_partial(self, expr): """ Return True if a part of @expr is present in memory """ if not expr.is_mem(): return False ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: return False for i in xrange(expr.size / 8): if offset + i in memarray: return True return False
def copy(
self)
Copy the current object instance
def copy(self): """Copy the current object instance""" base_to_memarray = {} for base, memarray in self.base_to_memarray.iteritems(): base_to_memarray[base] = memarray.copy() obj = MemSparse(self.addrsize, self.expr_simp) obj.base_to_memarray = base_to_memarray return obj
def delete_partial(
self, expr)
Delete @expr from memory. Skip parts of @expr which are not present in memory.
def delete_partial(self, expr): """ Delete @expr from memory. Skip parts of @expr which are not present in memory. """ ptr = expr.arg base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: raise KeyError # Check if whole entity is in the MemArray before deleting it for i in xrange(expr.size / 8): real_offset = (offset + i) & memarray.mask if real_offset in memarray: del memarray[real_offset]
def dump(
self)
Display MemSparse content
def dump(self): """Display MemSparse content""" for mem, value in self.iteritems(): print "%s = %s" % (mem, value)
def items(
self)
Return stored memory variables and their values.
def items(self): """Return stored memory variables and their values.""" return list(self.iteritems())
def iteritems(
self)
Iterate on stored memory variables and their values.
def iteritems(self): """Iterate on stored memory variables and their values.""" for _, memarray in sorted(self.base_to_memarray.iteritems()): for mem, value in memarray.memory(): yield mem, value
def read(
self, ptr, size)
Return the value associated with the Expr at address @ptr @ptr: Expr representing the memory address @size: memory size (in bits), byte aligned
def read(self, ptr, size): """ Return the value associated with the Expr at address @ptr @ptr: Expr representing the memory address @size: memory size (in bits), byte aligned """ assert size % 8 == 0 base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is not None: mems = memarray.read(offset, size) ret = ExprCompose(*mems) else: ret = ExprMem(ptr, size) return ret
def write(
self, ptr, expr)
Update the corresponding Expr @expr at address @ptr @ptr: Expr representing the memory address @expr: Expr instance
def write(self, ptr, expr): """ Update the corresponding Expr @expr at address @ptr @ptr: Expr representing the memory address @expr: Expr instance """ assert ptr.size == self.addrsize base, offset = get_expr_base_offset(ptr) memarray = self.base_to_memarray.get(base, None) if memarray is None: memarray = MemArray(base, self.expr_simp) self.base_to_memarray[base] = memarray memarray.write(offset, expr)
class StateEngine
Stores an Engine state
class StateEngine(object): """Stores an Engine state""" def merge(self, other): """Generate a new state, representing the merge of self and @other @other: a StateEngine instance""" raise NotImplementedError("Abstract method")
Ancestors (in MRO)
- StateEngine
- __builtin__.object
Methods
def merge(
self, other)
Generate a new state, representing the merge of self and @other @other: a StateEngine instance
def merge(self, other): """Generate a new state, representing the merge of self and @other @other: a StateEngine instance""" raise NotImplementedError("Abstract method")
class SymbolMngr
Symbolic store manager (IDs and MEMs)
class SymbolMngr(object): """Symbolic store manager (IDs and MEMs)""" def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit): assert addrsize is not None if init is None: init = {} self.addrsize = addrsize self.expr_simp = expr_simp self.symbols_id = {} self.symbols_mem = MemSparse(addrsize, expr_simp) self.mask = (1 << addrsize) - 1 for expr, value in init.iteritems(): self.write(expr, value) def __contains__(self, expr): if expr.is_id(): return self.symbols_id.__contains__(expr) if expr.is_mem(): return self.symbols_mem.__contains__(expr) return False def __getitem__(self, expr): return self.read(expr) def __setitem__(self, expr, value): self.write(expr, value) def __delitem__(self, expr): if expr.is_id(): del self.symbols_id[expr] elif expr.is_mem(): del self.symbols_mem[expr] else: raise TypeError("Bad source expr") def copy(self): """Copy object instance""" obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp) return obj def clear(self): """Forget every variables values""" self.symbols_id.clear() self.symbols_mem.clear() def read(self, src): """ Return the value corresponding to Expr @src @src: ExprId or ExprMem instance """ if src.is_id(): return self.symbols_id.get(src, src) elif src.is_mem(): # Only byte aligned accesses are supported for now assert src.size % 8 == 0 return self.symbols_mem.read(src.arg, src.size) else: raise TypeError("Bad source expr") def write(self, dst, src): """ Update @dst with @src expression @dst: ExprId or ExprMem instance @src: Expression instance """ assert dst.size == src.size if dst.is_id(): if dst == src: if dst in self.symbols_id: del self.symbols_id[dst] else: self.symbols_id[dst] = src elif dst.is_mem(): # Only byte aligned accesses are supported for now assert dst.size % 8 == 0 self.symbols_mem.write(dst.arg, src) else: raise TypeError("Bad destination expr") def dump(self, ids=True, mems=True): """Display memory content""" if ids: for variable, value in self.ids(): print '%s = %s' % (variable, value) if mems: for mem, value in self.memory(): print '%s = %s' % (mem, value) def __repr__(self): out = [] for variable, value in self.iteritems(): out.append('%s = %s' % (variable, value)) return "\n".join(out) def iteritems(self): """ExprId/ExprMem iteritems of the current state""" for variable, value in self.ids(): yield variable, value for variable, value in self.memory(): yield variable, value def items(self): """Return variables/values of the current state""" return list(self.iteritems()) def __iter__(self): for expr, _ in self.iteritems(): yield expr def ids(self): """Iterate on variables and their values.""" for expr, value in self.symbols_id.iteritems(): yield expr, value def memory(self): """Iterate on memory variables and their values.""" for mem, value in self.symbols_mem.iteritems(): yield mem, value def keys(self): """Variables of the current state""" return list(self) def get(self, expr, default=None): """Deprecated version of read""" warnings.warn('DEPRECATION WARNING: use "read(self, expr)" instead of get') ret = self.read(expr) if default is not None and ret == expr: return default return ret
Ancestors (in MRO)
- SymbolMngr
- __builtin__.object
Instance variables
var addrsize
var expr_simp
var mask
var symbols_id
var symbols_mem
Methods
def __init__(
self, init=None, addrsize=None, expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)
def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit): assert addrsize is not None if init is None: init = {} self.addrsize = addrsize self.expr_simp = expr_simp self.symbols_id = {} self.symbols_mem = MemSparse(addrsize, expr_simp) self.mask = (1 << addrsize) - 1 for expr, value in init.iteritems(): self.write(expr, value)
def clear(
self)
Forget every variables values
def clear(self): """Forget every variables values""" self.symbols_id.clear() self.symbols_mem.clear()
def copy(
self)
Copy object instance
def copy(self): """Copy object instance""" obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp) return obj
def dump(
self, ids=True, mems=True)
Display memory content
def dump(self, ids=True, mems=True): """Display memory content""" if ids: for variable, value in self.ids(): print '%s = %s' % (variable, value) if mems: for mem, value in self.memory(): print '%s = %s' % (mem, value)
def get(
self, expr, default=None)
Deprecated version of read
def get(self, expr, default=None): """Deprecated version of read""" warnings.warn('DEPRECATION WARNING: use "read(self, expr)" instead of get') ret = self.read(expr) if default is not None and ret == expr: return default return ret
def ids(
self)
Iterate on variables and their values.
def ids(self): """Iterate on variables and their values.""" for expr, value in self.symbols_id.iteritems(): yield expr, value
def items(
self)
Return variables/values of the current state
def items(self): """Return variables/values of the current state""" return list(self.iteritems())
def iteritems(
self)
ExprId/ExprMem iteritems of the current state
def iteritems(self): """ExprId/ExprMem iteritems of the current state""" for variable, value in self.ids(): yield variable, value for variable, value in self.memory(): yield variable, value
def keys(
self)
Variables of the current state
def keys(self): """Variables of the current state""" return list(self)
def memory(
self)
Iterate on memory variables and their values.
def memory(self): """Iterate on memory variables and their values.""" for mem, value in self.symbols_mem.iteritems(): yield mem, value
def read(
self, src)
Return the value corresponding to Expr @src @src: ExprId or ExprMem instance
def read(self, src): """ Return the value corresponding to Expr @src @src: ExprId or ExprMem instance """ if src.is_id(): return self.symbols_id.get(src, src) elif src.is_mem(): # Only byte aligned accesses are supported for now assert src.size % 8 == 0 return self.symbols_mem.read(src.arg, src.size) else: raise TypeError("Bad source expr")
def write(
self, dst, src)
Update @dst with @src expression @dst: ExprId or ExprMem instance @src: Expression instance
def write(self, dst, src): """ Update @dst with @src expression @dst: ExprId or ExprMem instance @src: Expression instance """ assert dst.size == src.size if dst.is_id(): if dst == src: if dst in self.symbols_id: del self.symbols_id[dst] else: self.symbols_id[dst] = src elif dst.is_mem(): # Only byte aligned accesses are supported for now assert dst.size % 8 == 0 self.symbols_mem.write(dst.arg, src) else: raise TypeError("Bad destination expr")
class SymbolicExecutionEngine
Symbolic execution engine Allow IR code emulation in symbolic domain
Examples: from miasm2.ir.symbexec import SymbolicExecutionEngine from miasm2.ir.ir import AssignBlock
ir_arch = ir_x86_32() init_state = { ir_arch.arch.regs.EAX: ir_arch.arch.regs.EBX, ExprMem(id_x+ExprInt(0x10, 32), 32): id_a, } sb_exec = SymbolicExecutionEngine(ir_arch, init_state) >>> sb_exec.dump() EAX = a @32[x + 0x10] = a >>> sb_exec.dump(mems=False) EAX = a >>> print sb_exec.eval_expr(ir_arch.arch.regs.EAX + ir_arch.arch.regs.ECX) EBX + ECX
Inspecting state: - dump - modified State manipulation: - '.state' (rw)
Evaluation (read only): - eval_expr - eval_assignblk Evaluation with state update: - eval_updt_expr - eval_updt_assignblk - eval_updt_irblock
Start a symbolic execution based on provisioned '.ir_arch' blocks: - run_block_at - run_at
class SymbolicExecutionEngine(object): """ Symbolic execution engine Allow IR code emulation in symbolic domain Examples: from miasm2.ir.symbexec import SymbolicExecutionEngine from miasm2.ir.ir import AssignBlock ir_arch = ir_x86_32() init_state = { ir_arch.arch.regs.EAX: ir_arch.arch.regs.EBX, ExprMem(id_x+ExprInt(0x10, 32), 32): id_a, } sb_exec = SymbolicExecutionEngine(ir_arch, init_state) >>> sb_exec.dump() EAX = a @32[x + 0x10] = a >>> sb_exec.dump(mems=False) EAX = a >>> print sb_exec.eval_expr(ir_arch.arch.regs.EAX + ir_arch.arch.regs.ECX) EBX + ECX Inspecting state: - dump - modified State manipulation: - '.state' (rw) Evaluation (read only): - eval_expr - eval_assignblk Evaluation with state update: - eval_updt_expr - eval_updt_assignblk - eval_updt_irblock Start a symbolic execution based on provisioned '.ir_arch' blocks: - run_block_at - run_at """ StateEngine = SymbolicState def __init__(self, ir_arch, state=None, func_read=None, func_write=None, sb_expr_simp=expr_simp_explicit): self.expr_to_visitor = { ExprInt: self.eval_exprint, ExprId: self.eval_exprid, ExprLoc: self.eval_exprloc, ExprMem: self.eval_exprmem, ExprSlice: self.eval_exprslice, ExprCond: self.eval_exprcond, ExprOp: self.eval_exprop, ExprCompose: self.eval_exprcompose, } if state is None: state = {} self.symbols = SymbolMngr(addrsize=ir_arch.addrsize, expr_simp=sb_expr_simp) for dst, src in state.iteritems(): self.symbols.write(dst, src) if func_read: warnings.warn('DEPRECATION WARNING: override function "mem_read(self, expr)" instead of func_read') if func_write: warnings.warn('DEPRECATION WARNING: override function "mem_write(self, dsr, src)" instead of func_write') self.func_read = func_read self.func_write = func_write self.ir_arch = ir_arch self.expr_simp = sb_expr_simp def get_state(self): """Return the current state of the SymbolicEngine""" state = self.StateEngine(dict(self.symbols)) return state def set_state(self, state): """Restaure the @state of the engine @state: StateEngine instance """ self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp) for dst, src in dict(state).iteritems(): self.symbols[dst] = src state = property(get_state, set_state) def eval_expr_visitor(self, expr, cache=None): """ [DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression. """ if cache is None: cache = {} ret = cache.get(expr, None) if ret is not None: return ret new_expr = self.expr_simp(expr) ret = cache.get(new_expr, None) if ret is not None: return ret func = self.expr_to_visitor.get(new_expr.__class__, None) if func is None: raise TypeError("Unknown expr type") ret = func(new_expr, cache=cache) ret = self.expr_simp(ret) assert ret is not None cache[expr] = ret cache[new_expr] = ret return ret def eval_exprint(self, expr, **kwargs): """[DEV]: Evaluate an ExprInt using the current state""" return expr def eval_exprid(self, expr, **kwargs): """[DEV]: Evaluate an ExprId using the current state""" ret = self.symbols.read(expr) return ret def eval_exprloc(self, expr, **kwargs): """[DEV]: Evaluate an ExprLoc using the current state""" offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key) if offset is not None: ret = ExprInt(offset, expr.size) else: ret = expr return ret def eval_exprmem(self, expr, **kwargs): """[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses """ ptr = self.eval_expr_visitor(expr.arg, **kwargs) mem = ExprMem(ptr, expr.size) ret = self.mem_read(mem) return ret def eval_exprcond(self, expr, **kwargs): """[DEV]: Evaluate an ExprCond using the current state""" cond = self.eval_expr_visitor(expr.cond, **kwargs) src1 = self.eval_expr_visitor(expr.src1, **kwargs) src2 = self.eval_expr_visitor(expr.src2, **kwargs) ret = ExprCond(cond, src1, src2) return ret def eval_exprslice(self, expr, **kwargs): """[DEV]: Evaluate an ExprSlice using the current state""" arg = self.eval_expr_visitor(expr.arg, **kwargs) ret = ExprSlice(arg, expr.start, expr.stop) return ret def eval_exprop(self, expr, **kwargs): """[DEV]: Evaluate an ExprOp using the current state""" args = [] for oarg in expr.args: arg = self.eval_expr_visitor(oarg, **kwargs) args.append(arg) ret = ExprOp(expr.op, *args) return ret def eval_exprcompose(self, expr, **kwargs): """[DEV]: Evaluate an ExprCompose using the current state""" args = [] for arg in expr.args: args.append(self.eval_expr_visitor(arg, **kwargs)) ret = ExprCompose(*args) return ret def eval_expr(self, expr, eval_cache=None): """ Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values """ if eval_cache is None: eval_cache = {} ret = self.eval_expr_visitor(expr, cache=eval_cache) assert ret is not None return ret def modified(self, init_state=None, ids=True, mems=True): """ Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only """ if init_state is None: init_state = {} if ids: for variable, value in self.symbols.symbols_id.iteritems(): if variable in init_state and init_state[variable] == value: continue yield variable, value if mems: for mem, value in self.symbols.memory(): if mem in init_state and init_state[mem] == value: continue yield mem, value def dump(self, ids=True, mems=True): """ Display modififed variables @ids: display modified ids @mems: display modified memory """ for variable, value in self.modified(None, ids, mems): print "%-18s" % variable, "=", "%s" % value def eval_assignblk(self, assignblk): """ Evaluate AssignBlock using the current state Returns a dictionary containing modified keys associated to their values @assignblk: AssignBlock instance """ pool_out = {} eval_cache = {} for dst, src in assignblk.iteritems(): src = self.eval_expr(src, eval_cache) if dst.is_mem(): ptr = self.eval_expr(dst.arg, eval_cache) # Test if mem lookup is known tmp = ExprMem(ptr, dst.size) pool_out[tmp] = src elif dst.is_id(): pool_out[dst] = src else: raise ValueError("Unknown destination type", str(dst)) return pool_out def apply_change(self, dst, src): """ Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source """ if dst.is_mem(): self.mem_write(dst, src) else: self.symbols.write(dst, src) def eval_updt_assignblk(self, assignblk): """ Apply an AssignBlock on the current state @assignblk: AssignBlock instance """ mem_dst = [] dst_src = self.eval_assignblk(assignblk) for dst, src in dst_src.iteritems(): self.apply_change(dst, src) if dst.is_mem(): mem_dst.append(dst) return mem_dst def eval_updt_irblock(self, irb, step=False): """ Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps """ for assignblk in irb: if step: print 'Instr', assignblk.instr print 'Assignblk:' print assignblk print '_' * 80 self.eval_updt_assignblk(assignblk) if step: self.dump(mems=False) self.dump(ids=False) print '_' * 80 dst = self.eval_expr(self.ir_arch.IRDst) return dst def run_block_at(self, ircfg, addr, step=False): """ Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps """ irblock = ircfg.get_block(addr) if irblock is not None: addr = self.eval_updt_irblock(irblock, step=step) return addr def run_at(self, ircfg, addr, lbl_stop=None, step=False): """ Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps """ while True: irblock = ircfg.get_block(addr) if irblock is None: break if irblock.loc_key == lbl_stop: break addr = self.eval_updt_irblock(irblock, step=step) return addr def del_mem_above_stack(self, stack_ptr): """ Remove all stored memory values with following properties: * pointer based on initial stack value * pointer below current stack pointer """ stack_ptr = self.eval_expr(stack_ptr) base, stk_offset = get_expr_base_offset(stack_ptr) memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) if memarray: to_del = set() for offset in memarray: if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: to_del.add(offset) for offset in to_del: del memarray[offset] def eval_updt_expr(self, expr): """ Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value """ # Update value if needed if expr.is_aff(): ret = self.eval_expr(expr.src) self.eval_updt_assignblk(AssignBlock([expr])) else: ret = self.eval_expr(expr) return ret def _resolve_mem_parts(self, expr): """For a given ExprMem @expr, get known/unknown parts from the store. @expr: ExprMem instance Return a list of (known, value) where known is a bool representing if the value has been resolved from the store or not. """ # Extract known parts in symbols assert expr.size % 8 == 0 ptr = expr.arg known = [] ptrs = [] for index in xrange(expr.size / 8): offset = self.expr_simp(ptr + ExprInt(index, ptr.size)) ptrs.append(offset) mem = ExprMem(offset, 8) known.append(mem in self.symbols) reads = merge_ptr_read(known, ptrs) out = [] for is_known, ptr_value, size in reads: mem = ExprMem(ptr_value, size) if is_known: mem = self.symbols.read(mem) out.append((is_known, mem)) return out def mem_read(self, expr): """ [DEV]: Override to modify the effective memory reads Read symbolic value at ExprMem @expr @expr: ExprMem """ parts = self._resolve_mem_parts(expr) out = [] for known, part in parts: if not known and part.is_mem() and self.func_read is not None: ret = self.func_read(part) else: ret = part out.append(ret) ret = self.expr_simp(ExprCompose(*out)) assert ret.size == expr.size return ret def mem_write(self, dst, src): """ [DEV]: Override to modify the effective memory writes Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression """ if self.func_write is not None: self.func_write(self, dst, src) else: self.symbols.write(dst, src) # Deprecated methods def apply_expr_on_state(self, expr, cache): """Deprecated version of eval_expr""" warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state') if cache is None: cache = {} ret = self.eval_expr(expr, eval_cache=cache) return ret def modified_mems(self, init_state=None): """Deprecated version of modified(ids=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems') for mem in self.modified(init_state=init_state, ids=False): yield mem def modified_regs(self, init_state=None): """Deprecated version of modified(mems=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs') for reg in self.modified(init_state=init_state, mems=False): yield reg def dump_id(self): """Deprecated version of dump(mems=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id') self.dump(mems=False) def dump_mem(self): """Deprecated version of dump(ids=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem') self.dump(ids=False) def eval_ir_expr(self, assignblk): """Deprecated version of eval_ir_expr(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr') return self.eval_assignblk(assignblk).iteritems() def eval_ir(self, assignblk): """Deprecated version of eval_updt_assignblk(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir') return self.eval_updt_assignblk(assignblk) def emulbloc(self, irb, step=False): """Deprecated version of eval_updt_irblock(self, irb, step=False)""" warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc') return self.eval_updt_irblock(irb, step) def emul_ir_bloc(self, _, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc') return self.run_block_at(addr, step) def emul_ir_block(self, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block') return self.run_block_at(addr, step) def emul_ir_blocks(self, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks') return self.run_at(addr, lbl_stop, step) def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs') return self.run_at(addr, lbl_stop, step) def apply_expr(self, expr): """Deprecated version of eval_updt_expr""" warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr') return self.eval_updt_expr(expr) def as_assignblock(self): """Return the current state as an AssignBlock""" warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock') out = [] for dst, src in self.modified(ids=True, mems=True): out.append((dst, src)) return AssignBlock(dict(out))
Ancestors (in MRO)
- SymbolicExecutionEngine
- __builtin__.object
Class variables
var StateEngine
var state
Instance variables
var expr_simp
var expr_to_visitor
var func_read
var func_write
var ir_arch
var state
Return the current state of the SymbolicEngine
var symbols
Methods
def __init__(
self, ir_arch, state=None, func_read=None, func_write=None, sb_expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)
def __init__(self, ir_arch, state=None, func_read=None, func_write=None, sb_expr_simp=expr_simp_explicit): self.expr_to_visitor = { ExprInt: self.eval_exprint, ExprId: self.eval_exprid, ExprLoc: self.eval_exprloc, ExprMem: self.eval_exprmem, ExprSlice: self.eval_exprslice, ExprCond: self.eval_exprcond, ExprOp: self.eval_exprop, ExprCompose: self.eval_exprcompose, } if state is None: state = {} self.symbols = SymbolMngr(addrsize=ir_arch.addrsize, expr_simp=sb_expr_simp) for dst, src in state.iteritems(): self.symbols.write(dst, src) if func_read: warnings.warn('DEPRECATION WARNING: override function "mem_read(self, expr)" instead of func_read') if func_write: warnings.warn('DEPRECATION WARNING: override function "mem_write(self, dsr, src)" instead of func_write') self.func_read = func_read self.func_write = func_write self.ir_arch = ir_arch self.expr_simp = sb_expr_simp
def apply_change(
self, dst, src)
Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source
def apply_change(self, dst, src): """ Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source """ if dst.is_mem(): self.mem_write(dst, src) else: self.symbols.write(dst, src)
def apply_expr(
self, expr)
Deprecated version of eval_updt_expr
def apply_expr(self, expr): """Deprecated version of eval_updt_expr""" warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr') return self.eval_updt_expr(expr)
def apply_expr_on_state(
self, expr, cache)
Deprecated version of eval_expr
def apply_expr_on_state(self, expr, cache): """Deprecated version of eval_expr""" warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state') if cache is None: cache = {} ret = self.eval_expr(expr, eval_cache=cache) return ret
def as_assignblock(
self)
Return the current state as an AssignBlock
def as_assignblock(self): """Return the current state as an AssignBlock""" warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock') out = [] for dst, src in self.modified(ids=True, mems=True): out.append((dst, src)) return AssignBlock(dict(out))
def del_mem_above_stack(
self, stack_ptr)
Remove all stored memory values with following properties: pointer based on initial stack value pointer below current stack pointer
def del_mem_above_stack(self, stack_ptr): """ Remove all stored memory values with following properties: * pointer based on initial stack value * pointer below current stack pointer """ stack_ptr = self.eval_expr(stack_ptr) base, stk_offset = get_expr_base_offset(stack_ptr) memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) if memarray: to_del = set() for offset in memarray: if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: to_del.add(offset) for offset in to_del: del memarray[offset]
def dump(
self, ids=True, mems=True)
Display modififed variables @ids: display modified ids @mems: display modified memory
def dump(self, ids=True, mems=True): """ Display modififed variables @ids: display modified ids @mems: display modified memory """ for variable, value in self.modified(None, ids, mems): print "%-18s" % variable, "=", "%s" % value
def dump_id(
self)
Deprecated version of dump(mems=False)
def dump_id(self): """Deprecated version of dump(mems=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id') self.dump(mems=False)
def dump_mem(
self)
Deprecated version of dump(ids=False)
def dump_mem(self): """Deprecated version of dump(ids=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem') self.dump(ids=False)
def emul_ir_bloc(
self, _, addr, step=False)
Deprecated version of run_block_at
def emul_ir_bloc(self, _, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc') return self.run_block_at(addr, step)
def emul_ir_block(
self, addr, step=False)
Deprecated version of run_block_at
def emul_ir_block(self, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block') return self.run_block_at(addr, step)
def emul_ir_blocks(
self, addr, lbl_stop=None, step=False)
Deprecated version of run_at
def emul_ir_blocks(self, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks') return self.run_at(addr, lbl_stop, step)
def emul_ir_blocs(
self, _, addr, lbl_stop=None, step=False)
Deprecated version of run_at
def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs') return self.run_at(addr, lbl_stop, step)
def emulbloc(
self, irb, step=False)
Deprecated version of eval_updt_irblock(self, irb, step=False)
def emulbloc(self, irb, step=False): """Deprecated version of eval_updt_irblock(self, irb, step=False)""" warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc') return self.eval_updt_irblock(irb, step)
def eval_assignblk(
self, assignblk)
Evaluate AssignBlock using the current state
Returns a dictionary containing modified keys associated to their values
@assignblk: AssignBlock instance
def eval_assignblk(self, assignblk): """ Evaluate AssignBlock using the current state Returns a dictionary containing modified keys associated to their values @assignblk: AssignBlock instance """ pool_out = {} eval_cache = {} for dst, src in assignblk.iteritems(): src = self.eval_expr(src, eval_cache) if dst.is_mem(): ptr = self.eval_expr(dst.arg, eval_cache) # Test if mem lookup is known tmp = ExprMem(ptr, dst.size) pool_out[tmp] = src elif dst.is_id(): pool_out[dst] = src else: raise ValueError("Unknown destination type", str(dst)) return pool_out
def eval_expr(
self, expr, eval_cache=None)
Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values
def eval_expr(self, expr, eval_cache=None): """ Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values """ if eval_cache is None: eval_cache = {} ret = self.eval_expr_visitor(expr, cache=eval_cache) assert ret is not None return ret
def eval_expr_visitor(
self, expr, cache=None)
[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.
def eval_expr_visitor(self, expr, cache=None): """ [DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression. """ if cache is None: cache = {} ret = cache.get(expr, None) if ret is not None: return ret new_expr = self.expr_simp(expr) ret = cache.get(new_expr, None) if ret is not None: return ret func = self.expr_to_visitor.get(new_expr.__class__, None) if func is None: raise TypeError("Unknown expr type") ret = func(new_expr, cache=cache) ret = self.expr_simp(ret) assert ret is not None cache[expr] = ret cache[new_expr] = ret return ret
def eval_exprcompose(
self, expr, **kwargs)
[DEV]: Evaluate an ExprCompose using the current state
def eval_exprcompose(self, expr, **kwargs): """[DEV]: Evaluate an ExprCompose using the current state""" args = [] for arg in expr.args: args.append(self.eval_expr_visitor(arg, **kwargs)) ret = ExprCompose(*args) return ret
def eval_exprcond(
self, expr, **kwargs)
[DEV]: Evaluate an ExprCond using the current state
def eval_exprcond(self, expr, **kwargs): """[DEV]: Evaluate an ExprCond using the current state""" cond = self.eval_expr_visitor(expr.cond, **kwargs) src1 = self.eval_expr_visitor(expr.src1, **kwargs) src2 = self.eval_expr_visitor(expr.src2, **kwargs) ret = ExprCond(cond, src1, src2) return ret
def eval_exprid(
self, expr, **kwargs)
[DEV]: Evaluate an ExprId using the current state
def eval_exprid(self, expr, **kwargs): """[DEV]: Evaluate an ExprId using the current state""" ret = self.symbols.read(expr) return ret
def eval_exprint(
self, expr, **kwargs)
[DEV]: Evaluate an ExprInt using the current state
def eval_exprint(self, expr, **kwargs): """[DEV]: Evaluate an ExprInt using the current state""" return expr
def eval_exprloc(
self, expr, **kwargs)
[DEV]: Evaluate an ExprLoc using the current state
def eval_exprloc(self, expr, **kwargs): """[DEV]: Evaluate an ExprLoc using the current state""" offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key) if offset is not None: ret = ExprInt(offset, expr.size) else: ret = expr return ret
def eval_exprmem(
self, expr, **kwargs)
[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses
def eval_exprmem(self, expr, **kwargs): """[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses """ ptr = self.eval_expr_visitor(expr.arg, **kwargs) mem = ExprMem(ptr, expr.size) ret = self.mem_read(mem) return ret
def eval_exprop(
self, expr, **kwargs)
[DEV]: Evaluate an ExprOp using the current state
def eval_exprop(self, expr, **kwargs): """[DEV]: Evaluate an ExprOp using the current state""" args = [] for oarg in expr.args: arg = self.eval_expr_visitor(oarg, **kwargs) args.append(arg) ret = ExprOp(expr.op, *args) return ret
def eval_exprslice(
self, expr, **kwargs)
[DEV]: Evaluate an ExprSlice using the current state
def eval_exprslice(self, expr, **kwargs): """[DEV]: Evaluate an ExprSlice using the current state""" arg = self.eval_expr_visitor(expr.arg, **kwargs) ret = ExprSlice(arg, expr.start, expr.stop) return ret
def eval_ir(
self, assignblk)
Deprecated version of eval_updt_assignblk(self, assignblk)
def eval_ir(self, assignblk): """Deprecated version of eval_updt_assignblk(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir') return self.eval_updt_assignblk(assignblk)
def eval_ir_expr(
self, assignblk)
Deprecated version of eval_ir_expr(self, assignblk)
def eval_ir_expr(self, assignblk): """Deprecated version of eval_ir_expr(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr') return self.eval_assignblk(assignblk).iteritems()
def eval_updt_assignblk(
self, assignblk)
Apply an AssignBlock on the current state @assignblk: AssignBlock instance
def eval_updt_assignblk(self, assignblk): """ Apply an AssignBlock on the current state @assignblk: AssignBlock instance """ mem_dst = [] dst_src = self.eval_assignblk(assignblk) for dst, src in dst_src.iteritems(): self.apply_change(dst, src) if dst.is_mem(): mem_dst.append(dst) return mem_dst
def eval_updt_expr(
self, expr)
Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value
def eval_updt_expr(self, expr): """ Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value """ # Update value if needed if expr.is_aff(): ret = self.eval_expr(expr.src) self.eval_updt_assignblk(AssignBlock([expr])) else: ret = self.eval_expr(expr) return ret
def eval_updt_irblock(
self, irb, step=False)
Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps
def eval_updt_irblock(self, irb, step=False): """ Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps """ for assignblk in irb: if step: print 'Instr', assignblk.instr print 'Assignblk:' print assignblk print '_' * 80 self.eval_updt_assignblk(assignblk) if step: self.dump(mems=False) self.dump(ids=False) print '_' * 80 dst = self.eval_expr(self.ir_arch.IRDst) return dst
def get_state(
self)
Return the current state of the SymbolicEngine
def get_state(self): """Return the current state of the SymbolicEngine""" state = self.StateEngine(dict(self.symbols)) return state
def mem_read(
self, expr)
[DEV]: Override to modify the effective memory reads
Read symbolic value at ExprMem @expr @expr: ExprMem
def mem_read(self, expr): """ [DEV]: Override to modify the effective memory reads Read symbolic value at ExprMem @expr @expr: ExprMem """ parts = self._resolve_mem_parts(expr) out = [] for known, part in parts: if not known and part.is_mem() and self.func_read is not None: ret = self.func_read(part) else: ret = part out.append(ret) ret = self.expr_simp(ExprCompose(*out)) assert ret.size == expr.size return ret
def mem_write(
self, dst, src)
[DEV]: Override to modify the effective memory writes
Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression
def mem_write(self, dst, src): """ [DEV]: Override to modify the effective memory writes Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression """ if self.func_write is not None: self.func_write(self, dst, src) else: self.symbols.write(dst, src)
def modified(
self, init_state=None, ids=True, mems=True)
Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only
def modified(self, init_state=None, ids=True, mems=True): """ Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only """ if init_state is None: init_state = {} if ids: for variable, value in self.symbols.symbols_id.iteritems(): if variable in init_state and init_state[variable] == value: continue yield variable, value if mems: for mem, value in self.symbols.memory(): if mem in init_state and init_state[mem] == value: continue yield mem, value
def modified_mems(
self, init_state=None)
Deprecated version of modified(ids=False)
def modified_mems(self, init_state=None): """Deprecated version of modified(ids=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems') for mem in self.modified(init_state=init_state, ids=False): yield mem
def modified_regs(
self, init_state=None)
Deprecated version of modified(mems=False)
def modified_regs(self, init_state=None): """Deprecated version of modified(mems=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs') for reg in self.modified(init_state=init_state, mems=False): yield reg
def run_at(
self, ircfg, addr, lbl_stop=None, step=False)
Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps
def run_at(self, ircfg, addr, lbl_stop=None, step=False): """ Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps """ while True: irblock = ircfg.get_block(addr) if irblock is None: break if irblock.loc_key == lbl_stop: break addr = self.eval_updt_irblock(irblock, step=step) return addr
def run_block_at(
self, ircfg, addr, step=False)
Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps
def run_block_at(self, ircfg, addr, step=False): """ Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps """ irblock = ircfg.get_block(addr) if irblock is not None: addr = self.eval_updt_irblock(irblock, step=step) return addr
def set_state(
self, state)
Restaure the @state of the engine @state: StateEngine instance
def set_state(self, state): """Restaure the @state of the engine @state: StateEngine instance """ self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp) for dst, src in dict(state).iteritems(): self.symbols[dst] = src
class SymbolicState
Stores a SymbolicExecutionEngine state
class SymbolicState(StateEngine): """Stores a SymbolicExecutionEngine state""" def __init__(self, dct): self._symbols = frozenset(dct.items()) def __hash__(self): return hash((self.__class__, self._symbols)) def __eq__(self, other): if self is other: return True if self.__class__ != other.__class__: return False return self.symbols == other.symbols def __ne__(self, other): return not self.__eq__(other) def __iter__(self): for dst, src in self._symbols: yield dst, src def iteritems(self): """Iterate on stored memory/values""" return self.__iter__() def merge(self, other): """Merge two symbolic states Only equal expressions are kept in both states @other: second symbolic state """ symb_a = self.symbols symb_b = other.symbols intersection = set(symb_a.keys()).intersection(symb_b.keys()) out = {} for dst in intersection: if symb_a[dst] == symb_b[dst]: out[dst] = symb_a[dst] return self.__class__(out) @property def symbols(self): """Return the dictionnary of known symbols""" return dict(self._symbols)
Ancestors (in MRO)
- SymbolicState
- StateEngine
- __builtin__.object
Instance variables
var symbols
Return the dictionnary of known symbols
Methods
def __init__(
self, dct)
def __init__(self, dct): self._symbols = frozenset(dct.items())
def iteritems(
self)
Iterate on stored memory/values
def iteritems(self): """Iterate on stored memory/values""" return self.__iter__()
def merge(
self, other)
Inheritance:
StateEngine
.merge
Merge two symbolic states Only equal expressions are kept in both states @other: second symbolic state
def merge(self, other): """Merge two symbolic states Only equal expressions are kept in both states @other: second symbolic state """ symb_a = self.symbols symb_b = other.symbols intersection = set(symb_a.keys()).intersection(symb_b.keys()) out = {} for dst in intersection: if symb_a[dst] == symb_b[dst]: out[dst] = symb_a[dst] return self.__class__(out)
class symbexec
DEPRECATED object Use SymbolicExecutionEngine instead of symbexec
class symbexec(SymbolicExecutionEngine): """ DEPRECATED object Use SymbolicExecutionEngine instead of symbexec """ def __init__(self, ir_arch, known_symbols, func_read=None, func_write=None, sb_expr_simp=expr_simp_explicit): warnings.warn("Deprecated API: use SymbolicExecutionEngine") super(symbexec, self).__init__(ir_arch, known_symbols, func_read, func_write, sb_expr_simp=sb_expr_simp)
Ancestors (in MRO)
- symbexec
- SymbolicExecutionEngine
- __builtin__.object
Class variables
Instance variables
Methods
def __init__(
self, ir_arch, known_symbols, func_read=None, func_write=None, sb_expr_simp=<miasm2.expression.simplifications.ExpressionSimplifier object at 0x7f19b240b250>)
Inheritance:
SymbolicExecutionEngine
.__init__
def __init__(self, ir_arch, known_symbols, func_read=None, func_write=None, sb_expr_simp=expr_simp_explicit): warnings.warn("Deprecated API: use SymbolicExecutionEngine") super(symbexec, self).__init__(ir_arch, known_symbols, func_read, func_write, sb_expr_simp=sb_expr_simp)
def apply_change(
self, dst, src)
Inheritance:
SymbolicExecutionEngine
.apply_change
Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source
def apply_change(self, dst, src): """ Apply @dst = @src on the current state WITHOUT evaluating both side @dst: Expr, destination @src: Expr, source """ if dst.is_mem(): self.mem_write(dst, src) else: self.symbols.write(dst, src)
def apply_expr(
self, expr)
Inheritance:
SymbolicExecutionEngine
.apply_expr
Deprecated version of eval_updt_expr
def apply_expr(self, expr): """Deprecated version of eval_updt_expr""" warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr') return self.eval_updt_expr(expr)
def apply_expr_on_state(
self, expr, cache)
Inheritance:
SymbolicExecutionEngine
.apply_expr_on_state
Deprecated version of eval_expr
def apply_expr_on_state(self, expr, cache): """Deprecated version of eval_expr""" warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state') if cache is None: cache = {} ret = self.eval_expr(expr, eval_cache=cache) return ret
def as_assignblock(
self)
Inheritance:
SymbolicExecutionEngine
.as_assignblock
Return the current state as an AssignBlock
def as_assignblock(self): """Return the current state as an AssignBlock""" warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock') out = [] for dst, src in self.modified(ids=True, mems=True): out.append((dst, src)) return AssignBlock(dict(out))
def del_mem_above_stack(
self, stack_ptr)
Inheritance:
SymbolicExecutionEngine
.del_mem_above_stack
Remove all stored memory values with following properties: pointer based on initial stack value pointer below current stack pointer
def del_mem_above_stack(self, stack_ptr): """ Remove all stored memory values with following properties: * pointer based on initial stack value * pointer below current stack pointer """ stack_ptr = self.eval_expr(stack_ptr) base, stk_offset = get_expr_base_offset(stack_ptr) memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) if memarray: to_del = set() for offset in memarray: if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: to_del.add(offset) for offset in to_del: del memarray[offset]
def dump(
self, ids=True, mems=True)
Inheritance:
SymbolicExecutionEngine
.dump
Display modififed variables @ids: display modified ids @mems: display modified memory
def dump(self, ids=True, mems=True): """ Display modififed variables @ids: display modified ids @mems: display modified memory """ for variable, value in self.modified(None, ids, mems): print "%-18s" % variable, "=", "%s" % value
def dump_id(
self)
Inheritance:
SymbolicExecutionEngine
.dump_id
Deprecated version of dump(mems=False)
def dump_id(self): """Deprecated version of dump(mems=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id') self.dump(mems=False)
def dump_mem(
self)
Inheritance:
SymbolicExecutionEngine
.dump_mem
Deprecated version of dump(ids=False)
def dump_mem(self): """Deprecated version of dump(ids=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem') self.dump(ids=False)
def emul_ir_bloc(
self, _, addr, step=False)
Inheritance:
SymbolicExecutionEngine
.emul_ir_bloc
Deprecated version of run_block_at
def emul_ir_bloc(self, _, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc') return self.run_block_at(addr, step)
def emul_ir_block(
self, addr, step=False)
Inheritance:
SymbolicExecutionEngine
.emul_ir_block
Deprecated version of run_block_at
def emul_ir_block(self, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block') return self.run_block_at(addr, step)
def emul_ir_blocks(
self, addr, lbl_stop=None, step=False)
Inheritance:
SymbolicExecutionEngine
.emul_ir_blocks
Deprecated version of run_at
def emul_ir_blocks(self, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks') return self.run_at(addr, lbl_stop, step)
def emul_ir_blocs(
self, _, addr, lbl_stop=None, step=False)
Inheritance:
SymbolicExecutionEngine
.emul_ir_blocs
Deprecated version of run_at
def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs') return self.run_at(addr, lbl_stop, step)
def emulbloc(
self, irb, step=False)
Inheritance:
SymbolicExecutionEngine
.emulbloc
Deprecated version of eval_updt_irblock(self, irb, step=False)
def emulbloc(self, irb, step=False): """Deprecated version of eval_updt_irblock(self, irb, step=False)""" warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc') return self.eval_updt_irblock(irb, step)
def eval_assignblk(
self, assignblk)
Inheritance:
SymbolicExecutionEngine
.eval_assignblk
Evaluate AssignBlock using the current state
Returns a dictionary containing modified keys associated to their values
@assignblk: AssignBlock instance
def eval_assignblk(self, assignblk): """ Evaluate AssignBlock using the current state Returns a dictionary containing modified keys associated to their values @assignblk: AssignBlock instance """ pool_out = {} eval_cache = {} for dst, src in assignblk.iteritems(): src = self.eval_expr(src, eval_cache) if dst.is_mem(): ptr = self.eval_expr(dst.arg, eval_cache) # Test if mem lookup is known tmp = ExprMem(ptr, dst.size) pool_out[tmp] = src elif dst.is_id(): pool_out[dst] = src else: raise ValueError("Unknown destination type", str(dst)) return pool_out
def eval_expr(
self, expr, eval_cache=None)
Inheritance:
SymbolicExecutionEngine
.eval_expr
Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values
def eval_expr(self, expr, eval_cache=None): """ Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values """ if eval_cache is None: eval_cache = {} ret = self.eval_expr_visitor(expr, cache=eval_cache) assert ret is not None return ret
def eval_expr_visitor(
self, expr, cache=None)
Inheritance:
SymbolicExecutionEngine
.eval_expr_visitor
[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.
def eval_expr_visitor(self, expr, cache=None): """ [DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression. """ if cache is None: cache = {} ret = cache.get(expr, None) if ret is not None: return ret new_expr = self.expr_simp(expr) ret = cache.get(new_expr, None) if ret is not None: return ret func = self.expr_to_visitor.get(new_expr.__class__, None) if func is None: raise TypeError("Unknown expr type") ret = func(new_expr, cache=cache) ret = self.expr_simp(ret) assert ret is not None cache[expr] = ret cache[new_expr] = ret return ret
def eval_exprcompose(
self, expr, **kwargs)
Inheritance:
SymbolicExecutionEngine
.eval_exprcompose
[DEV]: Evaluate an ExprCompose using the current state
def eval_exprcompose(self, expr, **kwargs): """[DEV]: Evaluate an ExprCompose using the current state""" args = [] for arg in expr.args: args.append(self.eval_expr_visitor(arg, **kwargs)) ret = ExprCompose(*args) return ret
def eval_exprcond(
self, expr, **kwargs)
Inheritance:
SymbolicExecutionEngine
.eval_exprcond
[DEV]: Evaluate an ExprCond using the current state
def eval_exprcond(self, expr, **kwargs): """[DEV]: Evaluate an ExprCond using the current state""" cond = self.eval_expr_visitor(expr.cond, **kwargs) src1 = self.eval_expr_visitor(expr.src1, **kwargs) src2 = self.eval_expr_visitor(expr.src2, **kwargs) ret = ExprCond(cond, src1, src2) return ret
def eval_exprid(
self, expr, **kwargs)
Inheritance:
SymbolicExecutionEngine
.eval_exprid
[DEV]: Evaluate an ExprId using the current state
def eval_exprid(self, expr, **kwargs): """[DEV]: Evaluate an ExprId using the current state""" ret = self.symbols.read(expr) return ret
def eval_exprint(
self, expr, **kwargs)
Inheritance:
SymbolicExecutionEngine
.eval_exprint
[DEV]: Evaluate an ExprInt using the current state
def eval_exprint(self, expr, **kwargs): """[DEV]: Evaluate an ExprInt using the current state""" return expr
def eval_exprloc(
self, expr, **kwargs)
Inheritance:
SymbolicExecutionEngine
.eval_exprloc
[DEV]: Evaluate an ExprLoc using the current state
def eval_exprloc(self, expr, **kwargs): """[DEV]: Evaluate an ExprLoc using the current state""" offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key) if offset is not None: ret = ExprInt(offset, expr.size) else: ret = expr return ret
def eval_exprmem(
self, expr, **kwargs)
Inheritance:
SymbolicExecutionEngine
.eval_exprmem
[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses
def eval_exprmem(self, expr, **kwargs): """[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses """ ptr = self.eval_expr_visitor(expr.arg, **kwargs) mem = ExprMem(ptr, expr.size) ret = self.mem_read(mem) return ret
def eval_exprop(
self, expr, **kwargs)
Inheritance:
SymbolicExecutionEngine
.eval_exprop
[DEV]: Evaluate an ExprOp using the current state
def eval_exprop(self, expr, **kwargs): """[DEV]: Evaluate an ExprOp using the current state""" args = [] for oarg in expr.args: arg = self.eval_expr_visitor(oarg, **kwargs) args.append(arg) ret = ExprOp(expr.op, *args) return ret
def eval_exprslice(
self, expr, **kwargs)
Inheritance:
SymbolicExecutionEngine
.eval_exprslice
[DEV]: Evaluate an ExprSlice using the current state
def eval_exprslice(self, expr, **kwargs): """[DEV]: Evaluate an ExprSlice using the current state""" arg = self.eval_expr_visitor(expr.arg, **kwargs) ret = ExprSlice(arg, expr.start, expr.stop) return ret
def eval_ir(
self, assignblk)
Inheritance:
SymbolicExecutionEngine
.eval_ir
Deprecated version of eval_updt_assignblk(self, assignblk)
def eval_ir(self, assignblk): """Deprecated version of eval_updt_assignblk(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir') return self.eval_updt_assignblk(assignblk)
def eval_ir_expr(
self, assignblk)
Inheritance:
SymbolicExecutionEngine
.eval_ir_expr
Deprecated version of eval_ir_expr(self, assignblk)
def eval_ir_expr(self, assignblk): """Deprecated version of eval_ir_expr(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr') return self.eval_assignblk(assignblk).iteritems()
def eval_updt_assignblk(
self, assignblk)
Inheritance:
SymbolicExecutionEngine
.eval_updt_assignblk
Apply an AssignBlock on the current state @assignblk: AssignBlock instance
def eval_updt_assignblk(self, assignblk): """ Apply an AssignBlock on the current state @assignblk: AssignBlock instance """ mem_dst = [] dst_src = self.eval_assignblk(assignblk) for dst, src in dst_src.iteritems(): self.apply_change(dst, src) if dst.is_mem(): mem_dst.append(dst) return mem_dst
def eval_updt_expr(
self, expr)
Inheritance:
SymbolicExecutionEngine
.eval_updt_expr
Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value
def eval_updt_expr(self, expr): """ Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value """ # Update value if needed if expr.is_aff(): ret = self.eval_expr(expr.src) self.eval_updt_assignblk(AssignBlock([expr])) else: ret = self.eval_expr(expr) return ret
def eval_updt_irblock(
self, irb, step=False)
Inheritance:
SymbolicExecutionEngine
.eval_updt_irblock
Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps
def eval_updt_irblock(self, irb, step=False): """ Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps """ for assignblk in irb: if step: print 'Instr', assignblk.instr print 'Assignblk:' print assignblk print '_' * 80 self.eval_updt_assignblk(assignblk) if step: self.dump(mems=False) self.dump(ids=False) print '_' * 80 dst = self.eval_expr(self.ir_arch.IRDst) return dst
def get_state(
self)
Inheritance:
SymbolicExecutionEngine
.get_state
Return the current state of the SymbolicEngine
def get_state(self): """Return the current state of the SymbolicEngine""" state = self.StateEngine(dict(self.symbols)) return state
def mem_read(
self, expr)
Inheritance:
SymbolicExecutionEngine
.mem_read
[DEV]: Override to modify the effective memory reads
Read symbolic value at ExprMem @expr @expr: ExprMem
def mem_read(self, expr): """ [DEV]: Override to modify the effective memory reads Read symbolic value at ExprMem @expr @expr: ExprMem """ parts = self._resolve_mem_parts(expr) out = [] for known, part in parts: if not known and part.is_mem() and self.func_read is not None: ret = self.func_read(part) else: ret = part out.append(ret) ret = self.expr_simp(ExprCompose(*out)) assert ret.size == expr.size return ret
def mem_write(
self, dst, src)
Inheritance:
SymbolicExecutionEngine
.mem_write
[DEV]: Override to modify the effective memory writes
Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression
def mem_write(self, dst, src): """ [DEV]: Override to modify the effective memory writes Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression """ if self.func_write is not None: self.func_write(self, dst, src) else: self.symbols.write(dst, src)
def modified(
self, init_state=None, ids=True, mems=True)
Inheritance:
SymbolicExecutionEngine
.modified
Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only
def modified(self, init_state=None, ids=True, mems=True): """ Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only """ if init_state is None: init_state = {} if ids: for variable, value in self.symbols.symbols_id.iteritems(): if variable in init_state and init_state[variable] == value: continue yield variable, value if mems: for mem, value in self.symbols.memory(): if mem in init_state and init_state[mem] == value: continue yield mem, value
def modified_mems(
self, init_state=None)
Inheritance:
SymbolicExecutionEngine
.modified_mems
Deprecated version of modified(ids=False)
def modified_mems(self, init_state=None): """Deprecated version of modified(ids=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems') for mem in self.modified(init_state=init_state, ids=False): yield mem
def modified_regs(
self, init_state=None)
Inheritance:
SymbolicExecutionEngine
.modified_regs
Deprecated version of modified(mems=False)
def modified_regs(self, init_state=None): """Deprecated version of modified(mems=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs') for reg in self.modified(init_state=init_state, mems=False): yield reg
def run_at(
self, ircfg, addr, lbl_stop=None, step=False)
Inheritance:
SymbolicExecutionEngine
.run_at
Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps
def run_at(self, ircfg, addr, lbl_stop=None, step=False): """ Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps """ while True: irblock = ircfg.get_block(addr) if irblock is None: break if irblock.loc_key == lbl_stop: break addr = self.eval_updt_irblock(irblock, step=step) return addr
def run_block_at(
self, ircfg, addr, step=False)
Inheritance:
SymbolicExecutionEngine
.run_block_at
Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps
def run_block_at(self, ircfg, addr, step=False): """ Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps """ irblock = ircfg.get_block(addr) if irblock is not None: addr = self.eval_updt_irblock(irblock, step=step) return addr
def set_state(
self, state)
Inheritance:
SymbolicExecutionEngine
.set_state
Restaure the @state of the engine @state: StateEngine instance
def set_state(self, state): """Restaure the @state of the engine @state: StateEngine instance """ self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp) for dst, src in dict(state).iteritems(): self.symbols[dst] = src