miasm2.analysis.dse module
Dynamic symbolic execution module.
Offers a way to have a symbolic execution along a concrete one. Basically, this is done through DSEEngine class, with scheme:
dse = DSEEngine(Machine("x86_32")) dse.attach(jitter)
The DSE state can be updated through:
- .update_state_from_concrete: update the values from the CPU, so the symbolic execution will be completely concrete from this point (until changes)
- .update_state: inject information, for instance RAX = symbolic_RAX
- .symbolize_memory: symbolize (using .memory_to_expr) memory areas (ie, reading from an address in one of these areas yield a symbol)
The DSE run can be instrumented through: - .add_handler: register an handler, modifying the state instead of the current execution. Can be used for stubbing external API - .add_lib_handler: register handlers for libraries - .add_instrumentation: register an handler, modifying the state but continuing the current execution. Can be used for logging facilities
On branch, if the decision is symbolic, one can also collect "path constraints" and inverse them to produce new inputs potentially reaching new paths.
Basically, this is done through DSEPathConstraint. In order to produce a new solution, one can extend this class, and override 'handle_solution' to produce a solution which fit its needs. It could avoid computing new solution by overriding 'produce_solution'.
If one is only interested in constraints associated to its path, the option "produce_solution" should be set to False, to speed up emulation. The constraints are accumulated in the .z3_cur z3.Solver object.
Here are a few remainings TODO: - handle endianess in check_state / atomic read: currently, but this is also true for others Miasm2 symbolic engines, the endianness is not take in account, and assumed to be Little Endian
- too many memory dependencies in constraint tracking: in order to let z3 find new solution, it does need information on memory values (for instance, a lookup in a table with a symbolic index). The estimated possible involved memory location could be too large to pass to the solver (threshold named MAX_MEMORY_INJECT). One possible solution, not yet implemented, is to call the solver for reducing the possible values thanks to its accumulated constraints.
"""Dynamic symbolic execution module. Offers a way to have a symbolic execution along a concrete one. Basically, this is done through DSEEngine class, with scheme: dse = DSEEngine(Machine("x86_32")) dse.attach(jitter) The DSE state can be updated through: - .update_state_from_concrete: update the values from the CPU, so the symbolic execution will be completely concrete from this point (until changes) - .update_state: inject information, for instance RAX = symbolic_RAX - .symbolize_memory: symbolize (using .memory_to_expr) memory areas (ie, reading from an address in one of these areas yield a symbol) The DSE run can be instrumented through: - .add_handler: register an handler, modifying the state instead of the current execution. Can be used for stubbing external API - .add_lib_handler: register handlers for libraries - .add_instrumentation: register an handler, modifying the state but continuing the current execution. Can be used for logging facilities On branch, if the decision is symbolic, one can also collect "path constraints" and inverse them to produce new inputs potentially reaching new paths. Basically, this is done through DSEPathConstraint. In order to produce a new solution, one can extend this class, and override 'handle_solution' to produce a solution which fit its needs. It could avoid computing new solution by overriding 'produce_solution'. If one is only interested in constraints associated to its path, the option "produce_solution" should be set to False, to speed up emulation. The constraints are accumulated in the .z3_cur z3.Solver object. Here are a few remainings TODO: - handle endianess in check_state / atomic read: currently, but this is also true for others Miasm2 symbolic engines, the endianness is not take in account, and assumed to be Little Endian - too many memory dependencies in constraint tracking: in order to let z3 find new solution, it does need information on memory values (for instance, a lookup in a table with a symbolic index). The estimated possible involved memory location could be too large to pass to the solver (threshold named MAX_MEMORY_INJECT). One possible solution, not yet implemented, is to call the solver for reducing the possible values thanks to its accumulated constraints. """ from collections import namedtuple try: import z3 except ImportError: z3 = None from miasm2.expression.expression import ExprMem, ExprInt, ExprCompose, \ ExprAff, ExprId, ExprLoc, LocKey from miasm2.core.bin_stream import bin_stream_vm from miasm2.jitter.emulatedsymbexec import EmulatedSymbExec from miasm2.expression.expression_helper import possible_values from miasm2.ir.translators import Translator from miasm2.analysis.expression_range import expr_range from miasm2.analysis.modularintervals import ModularIntervals from miasm2.core.locationdb import LocationDB DriftInfo = namedtuple("DriftInfo", ["symbol", "computed", "expected"]) class DriftException(Exception): """Raised when the emulation drift from the reference engine""" def __init__(self, info): super(DriftException, self).__init__() self.info = info def __str__(self): if len(self.info) == 1: return "Drift of %s: %s instead of %s" % ( self.info[0].symbol, self.info[0].computed, self.info[0].expected, ) else: return "Drift of:\n\t" + "\n\t".join("%s: %s instead of %s" % ( dinfo.symbol, dinfo.computed, dinfo.expected) for dinfo in self.info) class ESETrackModif(EmulatedSymbExec): """Extension of EmulatedSymbExec to be used by DSE engines Add the tracking of modified expressions, and the ability to symbolize memory areas """ def __init__(self, *args, **kwargs): super(ESETrackModif, self).__init__(*args, **kwargs) self.modified_expr = set() # Expr modified since the last reset self.dse_memory_range = [] # List/Intervals of memory addresses to # symbolize self.dse_memory_to_expr = None # function(addr) -> Expr used to # symbolize def _func_read(self, expr_mem): if not expr_mem.arg.is_int(): return expr_mem dst_addr = int(expr_mem.arg) if not self.dse_memory_range: # Trivial case (optimization) return super(ESETrackModif, self)._func_read(expr_mem) # Split access in atomic accesses out = [] for addr in xrange(dst_addr, dst_addr + (expr_mem.size / 8)): if addr in self.dse_memory_range: # Symbolize memory access out.append(self.dse_memory_to_expr(addr)) else: # Get concrete value atomic_access = ExprMem(ExprInt(addr, expr_mem.arg.size), 8) out.append(super(ESETrackModif, self)._func_read(atomic_access)) if len(out) == 1: # Trivial case (optimization) return out[0] # Simplify for constant merging (ex: {ExprInt(1, 8), ExprInt(2, 8)}) return self.expr_simp(ExprCompose(*out)) def reset_modified(self): """Reset modified expression tracker""" self.modified_expr.clear() def apply_change(self, dst, src): super(ESETrackModif, self).apply_change(dst, src) self.modified_expr.add(dst) class DSEEngine(object): """Dynamic Symbolic Execution Engine This class aims to be overrided for each specific purpose """ SYMB_ENGINE = ESETrackModif def __init__(self, machine): self.machine = machine self.loc_db = LocationDB() self.handler = {} # addr -> callback(DSEEngine instance) self.instrumentation = {} # addr -> callback(DSEEngine instance) self.addr_to_cacheblocks = {} # addr -> {label -> IRBlock} self.ir_arch = self.machine.ir(loc_db=self.loc_db) # corresponding IR self.ircfg = self.ir_arch.new_ircfg() # corresponding IR # Defined after attachment self.jitter = None # Jitload (concrete execution) self.symb = None # SymbolicExecutionEngine self.symb_concrete = None # Concrete SymbExec for path desambiguisation self.mdis = None # DisasmEngine def prepare(self): """Prepare the environment for attachment with a jitter""" # Disassembler self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm), lines_wd=1, loc_db=self.loc_db) # Symbexec engine ## Prepare symbexec engines self.symb = self.SYMB_ENGINE(self.jitter.cpu, self.jitter.vm, self.ir_arch, {}) self.symb.enable_emulated_simplifications() self.symb_concrete = EmulatedSymbExec( self.jitter.cpu, self.jitter.vm, self.ir_arch, {} ) ### Avoid side effects on jitter while using 'symb_concrete' self.symb_concrete.func_write = None ## Update registers value self.symb.symbols[self.ir_arch.IRDst] = ExprInt( getattr(self.jitter.cpu, self.ir_arch.pc.name), self.ir_arch.IRDst.size ) # Avoid memory write self.symb.func_write = None # Activate callback on each instr self.jitter.jit.set_options(max_exec_per_call=1, jit_maxline=1) self.jitter.exec_cb = self.callback # Clean jit cache to avoid multi-line basic blocks already jitted self.jitter.jit.clear_jitted_blocks() def attach(self, emulator): """Attach the DSE to @emulator @emulator: jitload (or API equivalent) instance To attach *DURING A BREAKPOINT*, one may consider using the following snippet: def breakpoint(self, jitter): ... dse.attach(jitter) dse.update... ... # Additionnal call to the exec callback is necessary, as breakpoints are # honored AFTER exec callback jitter.exec_cb(jitter) return True Without it, one may encounteer a DriftException error due to a "desynchronization" between jitter and dse states. Indeed, on 'handle' call, the jitter must be one instruction AFTER the dse. """ self.jitter = emulator self.prepare() def handle(self, cur_addr): r"""Handle destination @cur_addr: Expr of the next address in concrete execution /!\ cur_addr may be a loc_key In this method, self.symb is in the "just before branching" state """ pass def add_handler(self, addr, callback): """Add a @callback for address @addr before any state update. The state IS NOT updated after returning from the callback @addr: int @callback: func(dse instance)""" self.handler[addr] = callback def add_lib_handler(self, libimp, namespace): """Add search for handler based on a @libimp libimp instance Known functions will be looked by {name}_symb in the @namespace """ # lambda cannot contain statement def default_func(dse): fname = "%s_symb" % libimp.fad2cname[dse.jitter.pc] raise RuntimeError("Symbolic stub '%s' not found" % fname) for addr, fname in libimp.fad2cname.iteritems(): fname = "%s_symb" % fname func = namespace.get(fname, None) if func is not None: self.add_handler(addr, func) else: self.add_handler(addr, default_func) def add_instrumentation(self, addr, callback): """Add a @callback for address @addr before any state update. The state IS updated after returning from the callback @addr: int @callback: func(dse instance)""" self.instrumentation[addr] = callback def _check_state(self): """Check the current state against the concrete one""" errors = [] # List of DriftInfo for symbol in self.symb.modified_expr: # Do not consider PC if symbol in [self.ir_arch.pc, self.ir_arch.IRDst]: continue # Consider only concrete values symb_value = self.eval_expr(symbol) if not symb_value.is_int(): continue symb_value = int(symb_value) # Check computed values against real ones if symbol.is_id(): if hasattr(self.jitter.cpu, symbol.name): value = getattr(self.jitter.cpu, symbol.name) if value != symb_value: errors.append(DriftInfo(symbol, symb_value, value)) elif symbol.is_mem() and symbol.arg.is_int(): value_chr = self.jitter.vm.get_mem(int(symbol.arg), symbol.size / 8) exp_value = int(value_chr[::-1].encode("hex"), 16) if exp_value != symb_value: errors.append(DriftInfo(symbol, symb_value, exp_value)) # Check for drift, and act accordingly if errors: raise DriftException(errors) def callback(self, _): """Called before each instruction""" # Assert synchronization with concrete execution self._check_state() # Call callbacks associated to the current address cur_addr = self.jitter.pc if isinstance(cur_addr, LocKey): lbl = self.ir_arch.loc_db.loc_key_to_label(cur_addr) cur_addr = lbl.offset if cur_addr in self.handler: self.handler[cur_addr](self) return True if cur_addr in self.instrumentation: self.instrumentation[cur_addr](self) # Handle current address self.handle(ExprInt(cur_addr, self.ir_arch.IRDst.size)) # Avoid memory issue in ExpressionSimplifier if len(self.symb.expr_simp.simplified_exprs) > 100000: self.symb.expr_simp.simplified_exprs.clear() # Get IR blocks if cur_addr in self.addr_to_cacheblocks: self.ircfg.blocks.clear() self.ircfg.blocks.update(self.addr_to_cacheblocks[cur_addr]) else: ## Reset cache structures self.ircfg.blocks.clear()# = {} ## Update current state asm_block = self.mdis.dis_block(cur_addr) self.ir_arch.add_asmblock_to_ircfg(asm_block, self.ircfg) self.addr_to_cacheblocks[cur_addr] = dict(self.ircfg.blocks) # Emulate the current instruction self.symb.reset_modified() # Is the symbolic execution going (potentially) to jump on a lbl_gen? if len(self.ircfg.blocks) == 1: self.symb.run_at(self.ircfg, cur_addr) else: # Emulation could stuck in generated IR blocks # But concrete execution callback is not enough precise to obtain # the full IR blocks path # -> Use a fully concrete execution to get back path # Update the concrete execution self._update_state_from_concrete_symb( self.symb_concrete, cpu=True, mem=True ) while True: next_addr_concrete = self.symb_concrete.run_block_at( self.ircfg, cur_addr ) self.symb.run_block_at(self.ircfg, cur_addr) if not (isinstance(next_addr_concrete, ExprLoc) and self.ir_arch.loc_db.get_location_offset( next_addr_concrete.loc_key ) is None): # Not a lbl_gen, exit break # Call handle with lbl_gen state self.handle(next_addr_concrete) cur_addr = next_addr_concrete # At this stage, symbolic engine is one instruction after the concrete # engine return True def _get_gpregs(self): """Return a dict of regs: value from the jitter This version use the regs associated to the attrib (!= cpu.get_gpreg()) """ out = {} regs = self.ir_arch.arch.regs.attrib_to_regs[self.ir_arch.attrib] for reg in regs: if hasattr(self.jitter.cpu, reg.name): out[reg.name] = getattr(self.jitter.cpu, reg.name) return out def take_snapshot(self): """Return a snapshot of the current state (including jitter state)""" snapshot = { "mem": self.jitter.vm.get_all_memory(), "regs": self._get_gpregs(), "symb": self.symb.symbols.copy(), } return snapshot def restore_snapshot(self, snapshot, memory=True): """Restore a @snapshot taken with .take_snapshot @snapshot: .take_snapshot output @memory: (optional) if set, also restore the memory """ # Restore memory if memory: self.jitter.vm.reset_memory_page_pool() self.jitter.vm.reset_code_bloc_pool() for addr, metadata in snapshot["mem"].iteritems(): self.jitter.vm.add_memory_page(addr, metadata["access"], metadata["data"]) # Restore registers self.jitter.pc = snapshot["regs"][self.ir_arch.pc.name] for reg, value in snapshot["regs"].iteritems(): setattr(self.jitter.cpu, reg, value) # Reset intern elements self.jitter.vm.set_exception(0) self.jitter.cpu.set_exception(0) self.jitter.bs._atomic_mode = False # Reset symb exec for key, _ in self.symb.symbols.items(): del self.symb.symbols[key] for expr, value in snapshot["symb"].items(): self.symb.symbols[expr] = value def update_state(self, assignblk): """From this point, assume @assignblk in the symbolic execution @assignblk: AssignBlock/{dst -> src} """ for dst, src in assignblk.iteritems(): self.symb.apply_change(dst, src) def _update_state_from_concrete_symb(self, symbexec, cpu=True, mem=False): if mem: # Values will be retrieved from the concrete execution if they are # not present symbexec.symbols.symbols_mem.base_to_memarray.clear() if cpu: regs = self.ir_arch.arch.regs.attrib_to_regs[self.ir_arch.attrib] for reg in regs: if hasattr(self.jitter.cpu, reg.name): value = ExprInt(getattr(self.jitter.cpu, reg.name), size=reg.size) symbexec.symbols[reg] = value def update_state_from_concrete(self, cpu=True, mem=False): r"""Update the symbolic state with concrete values from the concrete engine @cpu: (optional) if set, update registers' value @mem: (optional) if set, update memory value /!\ all current states will be loss. This function is usually called when states are no more synchronized (at the beginning, returning from an unstubbed syscall, ...) """ self._update_state_from_concrete_symb(self.symb, cpu, mem) def eval_expr(self, expr): """Return the evaluation of @expr: @expr: Expr instance""" return self.symb.eval_expr(expr) @staticmethod def memory_to_expr(addr): """Translate an address to its corresponding symbolic ID (8bits) @addr: int""" return ExprId("MEM_0x%x" % int(addr), 8) def symbolize_memory(self, memory_range): """Register a range of memory addresses to symbolize @memory_range: object with support of __in__ operation (intervals, list, ...) """ self.symb.dse_memory_range = memory_range self.symb.dse_memory_to_expr = self.memory_to_expr class DSEPathConstraint(DSEEngine): """Dynamic Symbolic Execution Engine keeping the path constraint Possible new "solutions" are produced along the path, by inversing concrete path constraint. Thus, a "solution" is a potential initial context leading to a new path. In order to produce a new solution, one can extend this class, and override 'handle_solution' to produce a solution which fit its needs. It could avoid computing new solution by overriding 'produce_solution'. If one is only interested in constraints associated to its path, the option "produce_solution" should be set to False, to speed up emulation. The constraints are accumulated in the .z3_cur z3.Solver object. """ # Maximum memory size to inject in constraints solving MAX_MEMORY_INJECT = 0x10000 # Produce solution strategies PRODUCE_NO_SOLUTION = 0 PRODUCE_SOLUTION_CODE_COV = 1 PRODUCE_SOLUTION_BRANCH_COV = 2 PRODUCE_SOLUTION_PATH_COV = 3 def __init__(self, machine, produce_solution=PRODUCE_SOLUTION_CODE_COV, known_solutions=None, **kwargs): """Init a DSEPathConstraint @machine: Machine of the targeted architecture instance @produce_solution: (optional) if set, new solutions will be computed""" super(DSEPathConstraint, self).__init__(machine, **kwargs) # Dependency check assert z3 is not None # Init PathConstraint specifics structures self.cur_solver = z3.Solver() self.new_solutions = {} # solution identifier -> solution's model self._known_solutions = set() # set of solution identifiers self.z3_trans = Translator.to_language("z3") self._produce_solution_strategy = produce_solution self._previous_addr = None self._history = None if produce_solution == self.PRODUCE_SOLUTION_PATH_COV: self._history = [] # List of addresses in the current path def take_snapshot(self, *args, **kwargs): snap = super(DSEPathConstraint, self).take_snapshot(*args, **kwargs) snap["new_solutions"] = {dst: src.copy for dst, src in self.new_solutions.iteritems()} snap["cur_constraints"] = self.cur_solver.assertions() if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: snap["_history"] = list(self._history) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: snap["_previous_addr"] = self._previous_addr return snap def restore_snapshot(self, snapshot, keep_known_solutions=True, **kwargs): """Restore a DSEPathConstraint snapshot @keep_known_solutions: if set, do not forget solutions already found. -> They will not appear in 'new_solutions' """ super(DSEPathConstraint, self).restore_snapshot(snapshot, **kwargs) self.new_solutions.clear() self.new_solutions.update(snapshot["new_solutions"]) self.cur_solver = z3.Solver() self.cur_solver.add(snapshot["cur_constraints"]) if not keep_known_solutions: self._known_solutions.clear() if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: self._history = list(snapshot["_history"]) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: self._previous_addr = snapshot["_previous_addr"] def _key_for_solution_strategy(self, destination): """Return the associated identifier for the current solution strategy""" if self._produce_solution_strategy == self.PRODUCE_NO_SOLUTION: # Never produce a solution return None elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_CODE_COV: # Decision based on code coverage # -> produce a solution if the destination has never been seen key = destination elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: # Decision based on branch coverage # -> produce a solution if the current branch has never been take key = (self._previous_addr, destination) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: # Decision based on path coverage # -> produce a solution if the current path has never been take key = tuple(self._history + [destination]) else: raise ValueError("Unknown produce solution strategy") return key def produce_solution(self, destination): """Called to determine if a solution for @destination should be test for satisfiability and computed @destination: Expr instance of the target @destination """ key = self._key_for_solution_strategy(destination) if key is None: return False return key not in self._known_solutions def handle_solution(self, model, destination): """Called when a new solution for destination @destination is founded @model: z3 model instance @destination: Expr instance for an addr which is not on the DSE path """ key = self._key_for_solution_strategy(destination) assert key is not None self.new_solutions[key] = model self._known_solutions.add(key) def handle_correct_destination(self, destination, path_constraints): """[DEV] Called by handle() to update internal structures giving the correct destination (the concrete execution one). """ # Update structure used by produce_solution() if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: self._history.append(destination) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: self._previous_addr = destination # Update current solver for cons in path_constraints: self.cur_solver.add(self.z3_trans.from_expr(cons)) def handle(self, cur_addr): cur_addr = self.ir_arch.loc_db.canonize_to_exprloc(cur_addr) symb_pc = self.eval_expr(self.ir_arch.IRDst) possibilities = possible_values(symb_pc) cur_path_constraint = set() # path_constraint for the concrete path if len(possibilities) == 1: dst = next(iter(possibilities)).value dst = self.ir_arch.loc_db.canonize_to_exprloc(dst) assert dst == cur_addr else: for possibility in possibilities: target_addr = self.ir_arch.loc_db.canonize_to_exprloc( possibility.value ) path_constraint = set() # Set of ExprAff for the possible path # Get constraint associated to the possible path memory_to_add = ModularIntervals(symb_pc.size) for cons in possibility.constraints: eaff = cons.to_constraint() # eaff.get_r(mem_read=True) is not enough # ExprAff consider a Memory access in dst as a write mem = eaff.dst.get_r(mem_read=True) mem.update(eaff.src.get_r(mem_read=True)) for expr in mem: if expr.is_mem(): addr_range = expr_range(expr.arg) # At upper bounds, add the size of the memory access # if addr (- [a, b], then @size[addr] reachables # values are in @8[a, b + size[ for start, stop in addr_range: stop += (expr.size / 8) - 1 full_range = ModularIntervals(symb_pc.size, [(start, stop)]) memory_to_add.update(full_range) path_constraint.add(eaff) if memory_to_add.length > self.MAX_MEMORY_INJECT: # TODO re-croncretize the constraint or z3-try raise RuntimeError("Not implemented: too long memory area") # Inject memory for start, stop in memory_to_add: for address in xrange(start, stop + 1): expr_mem = ExprMem(ExprInt(address, self.ir_arch.pc.size), 8) value = self.eval_expr(expr_mem) if not value.is_int(): raise TypeError("Rely on a symbolic memory case, " \ "address 0x%x" % address) path_constraint.add(ExprAff(expr_mem, value)) if target_addr == cur_addr: # Add path constraint cur_path_constraint = path_constraint elif self.produce_solution(target_addr): # Looking for a new solution self.cur_solver.push() for cons in path_constraint: trans = self.z3_trans.from_expr(cons) trans = z3.simplify(trans) self.cur_solver.add(trans) result = self.cur_solver.check() if result == z3.sat: model = self.cur_solver.model() self.handle_solution(model, target_addr) self.cur_solver.pop() self.handle_correct_destination(cur_addr, cur_path_constraint)
Classes
class DSEEngine
Dynamic Symbolic Execution Engine
This class aims to be overrided for each specific purpose
class DSEEngine(object): """Dynamic Symbolic Execution Engine This class aims to be overrided for each specific purpose """ SYMB_ENGINE = ESETrackModif def __init__(self, machine): self.machine = machine self.loc_db = LocationDB() self.handler = {} # addr -> callback(DSEEngine instance) self.instrumentation = {} # addr -> callback(DSEEngine instance) self.addr_to_cacheblocks = {} # addr -> {label -> IRBlock} self.ir_arch = self.machine.ir(loc_db=self.loc_db) # corresponding IR self.ircfg = self.ir_arch.new_ircfg() # corresponding IR # Defined after attachment self.jitter = None # Jitload (concrete execution) self.symb = None # SymbolicExecutionEngine self.symb_concrete = None # Concrete SymbExec for path desambiguisation self.mdis = None # DisasmEngine def prepare(self): """Prepare the environment for attachment with a jitter""" # Disassembler self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm), lines_wd=1, loc_db=self.loc_db) # Symbexec engine ## Prepare symbexec engines self.symb = self.SYMB_ENGINE(self.jitter.cpu, self.jitter.vm, self.ir_arch, {}) self.symb.enable_emulated_simplifications() self.symb_concrete = EmulatedSymbExec( self.jitter.cpu, self.jitter.vm, self.ir_arch, {} ) ### Avoid side effects on jitter while using 'symb_concrete' self.symb_concrete.func_write = None ## Update registers value self.symb.symbols[self.ir_arch.IRDst] = ExprInt( getattr(self.jitter.cpu, self.ir_arch.pc.name), self.ir_arch.IRDst.size ) # Avoid memory write self.symb.func_write = None # Activate callback on each instr self.jitter.jit.set_options(max_exec_per_call=1, jit_maxline=1) self.jitter.exec_cb = self.callback # Clean jit cache to avoid multi-line basic blocks already jitted self.jitter.jit.clear_jitted_blocks() def attach(self, emulator): """Attach the DSE to @emulator @emulator: jitload (or API equivalent) instance To attach *DURING A BREAKPOINT*, one may consider using the following snippet: def breakpoint(self, jitter): ... dse.attach(jitter) dse.update... ... # Additionnal call to the exec callback is necessary, as breakpoints are # honored AFTER exec callback jitter.exec_cb(jitter) return True Without it, one may encounteer a DriftException error due to a "desynchronization" between jitter and dse states. Indeed, on 'handle' call, the jitter must be one instruction AFTER the dse. """ self.jitter = emulator self.prepare() def handle(self, cur_addr): r"""Handle destination @cur_addr: Expr of the next address in concrete execution /!\ cur_addr may be a loc_key In this method, self.symb is in the "just before branching" state """ pass def add_handler(self, addr, callback): """Add a @callback for address @addr before any state update. The state IS NOT updated after returning from the callback @addr: int @callback: func(dse instance)""" self.handler[addr] = callback def add_lib_handler(self, libimp, namespace): """Add search for handler based on a @libimp libimp instance Known functions will be looked by {name}_symb in the @namespace """ # lambda cannot contain statement def default_func(dse): fname = "%s_symb" % libimp.fad2cname[dse.jitter.pc] raise RuntimeError("Symbolic stub '%s' not found" % fname) for addr, fname in libimp.fad2cname.iteritems(): fname = "%s_symb" % fname func = namespace.get(fname, None) if func is not None: self.add_handler(addr, func) else: self.add_handler(addr, default_func) def add_instrumentation(self, addr, callback): """Add a @callback for address @addr before any state update. The state IS updated after returning from the callback @addr: int @callback: func(dse instance)""" self.instrumentation[addr] = callback def _check_state(self): """Check the current state against the concrete one""" errors = [] # List of DriftInfo for symbol in self.symb.modified_expr: # Do not consider PC if symbol in [self.ir_arch.pc, self.ir_arch.IRDst]: continue # Consider only concrete values symb_value = self.eval_expr(symbol) if not symb_value.is_int(): continue symb_value = int(symb_value) # Check computed values against real ones if symbol.is_id(): if hasattr(self.jitter.cpu, symbol.name): value = getattr(self.jitter.cpu, symbol.name) if value != symb_value: errors.append(DriftInfo(symbol, symb_value, value)) elif symbol.is_mem() and symbol.arg.is_int(): value_chr = self.jitter.vm.get_mem(int(symbol.arg), symbol.size / 8) exp_value = int(value_chr[::-1].encode("hex"), 16) if exp_value != symb_value: errors.append(DriftInfo(symbol, symb_value, exp_value)) # Check for drift, and act accordingly if errors: raise DriftException(errors) def callback(self, _): """Called before each instruction""" # Assert synchronization with concrete execution self._check_state() # Call callbacks associated to the current address cur_addr = self.jitter.pc if isinstance(cur_addr, LocKey): lbl = self.ir_arch.loc_db.loc_key_to_label(cur_addr) cur_addr = lbl.offset if cur_addr in self.handler: self.handler[cur_addr](self) return True if cur_addr in self.instrumentation: self.instrumentation[cur_addr](self) # Handle current address self.handle(ExprInt(cur_addr, self.ir_arch.IRDst.size)) # Avoid memory issue in ExpressionSimplifier if len(self.symb.expr_simp.simplified_exprs) > 100000: self.symb.expr_simp.simplified_exprs.clear() # Get IR blocks if cur_addr in self.addr_to_cacheblocks: self.ircfg.blocks.clear() self.ircfg.blocks.update(self.addr_to_cacheblocks[cur_addr]) else: ## Reset cache structures self.ircfg.blocks.clear()# = {} ## Update current state asm_block = self.mdis.dis_block(cur_addr) self.ir_arch.add_asmblock_to_ircfg(asm_block, self.ircfg) self.addr_to_cacheblocks[cur_addr] = dict(self.ircfg.blocks) # Emulate the current instruction self.symb.reset_modified() # Is the symbolic execution going (potentially) to jump on a lbl_gen? if len(self.ircfg.blocks) == 1: self.symb.run_at(self.ircfg, cur_addr) else: # Emulation could stuck in generated IR blocks # But concrete execution callback is not enough precise to obtain # the full IR blocks path # -> Use a fully concrete execution to get back path # Update the concrete execution self._update_state_from_concrete_symb( self.symb_concrete, cpu=True, mem=True ) while True: next_addr_concrete = self.symb_concrete.run_block_at( self.ircfg, cur_addr ) self.symb.run_block_at(self.ircfg, cur_addr) if not (isinstance(next_addr_concrete, ExprLoc) and self.ir_arch.loc_db.get_location_offset( next_addr_concrete.loc_key ) is None): # Not a lbl_gen, exit break # Call handle with lbl_gen state self.handle(next_addr_concrete) cur_addr = next_addr_concrete # At this stage, symbolic engine is one instruction after the concrete # engine return True def _get_gpregs(self): """Return a dict of regs: value from the jitter This version use the regs associated to the attrib (!= cpu.get_gpreg()) """ out = {} regs = self.ir_arch.arch.regs.attrib_to_regs[self.ir_arch.attrib] for reg in regs: if hasattr(self.jitter.cpu, reg.name): out[reg.name] = getattr(self.jitter.cpu, reg.name) return out def take_snapshot(self): """Return a snapshot of the current state (including jitter state)""" snapshot = { "mem": self.jitter.vm.get_all_memory(), "regs": self._get_gpregs(), "symb": self.symb.symbols.copy(), } return snapshot def restore_snapshot(self, snapshot, memory=True): """Restore a @snapshot taken with .take_snapshot @snapshot: .take_snapshot output @memory: (optional) if set, also restore the memory """ # Restore memory if memory: self.jitter.vm.reset_memory_page_pool() self.jitter.vm.reset_code_bloc_pool() for addr, metadata in snapshot["mem"].iteritems(): self.jitter.vm.add_memory_page(addr, metadata["access"], metadata["data"]) # Restore registers self.jitter.pc = snapshot["regs"][self.ir_arch.pc.name] for reg, value in snapshot["regs"].iteritems(): setattr(self.jitter.cpu, reg, value) # Reset intern elements self.jitter.vm.set_exception(0) self.jitter.cpu.set_exception(0) self.jitter.bs._atomic_mode = False # Reset symb exec for key, _ in self.symb.symbols.items(): del self.symb.symbols[key] for expr, value in snapshot["symb"].items(): self.symb.symbols[expr] = value def update_state(self, assignblk): """From this point, assume @assignblk in the symbolic execution @assignblk: AssignBlock/{dst -> src} """ for dst, src in assignblk.iteritems(): self.symb.apply_change(dst, src) def _update_state_from_concrete_symb(self, symbexec, cpu=True, mem=False): if mem: # Values will be retrieved from the concrete execution if they are # not present symbexec.symbols.symbols_mem.base_to_memarray.clear() if cpu: regs = self.ir_arch.arch.regs.attrib_to_regs[self.ir_arch.attrib] for reg in regs: if hasattr(self.jitter.cpu, reg.name): value = ExprInt(getattr(self.jitter.cpu, reg.name), size=reg.size) symbexec.symbols[reg] = value def update_state_from_concrete(self, cpu=True, mem=False): r"""Update the symbolic state with concrete values from the concrete engine @cpu: (optional) if set, update registers' value @mem: (optional) if set, update memory value /!\ all current states will be loss. This function is usually called when states are no more synchronized (at the beginning, returning from an unstubbed syscall, ...) """ self._update_state_from_concrete_symb(self.symb, cpu, mem) def eval_expr(self, expr): """Return the evaluation of @expr: @expr: Expr instance""" return self.symb.eval_expr(expr) @staticmethod def memory_to_expr(addr): """Translate an address to its corresponding symbolic ID (8bits) @addr: int""" return ExprId("MEM_0x%x" % int(addr), 8) def symbolize_memory(self, memory_range): """Register a range of memory addresses to symbolize @memory_range: object with support of __in__ operation (intervals, list, ...) """ self.symb.dse_memory_range = memory_range self.symb.dse_memory_to_expr = self.memory_to_expr
Ancestors (in MRO)
- DSEEngine
- __builtin__.object
Class variables
var SYMB_ENGINE
Static methods
def memory_to_expr(
addr)
Translate an address to its corresponding symbolic ID (8bits) @addr: int
@staticmethod def memory_to_expr(addr): """Translate an address to its corresponding symbolic ID (8bits) @addr: int""" return ExprId("MEM_0x%x" % int(addr), 8)
Instance variables
var addr_to_cacheblocks
var handler
var instrumentation
var ir_arch
var ircfg
var jitter
var loc_db
var machine
var mdis
var symb
var symb_concrete
Methods
def __init__(
self, machine)
def __init__(self, machine): self.machine = machine self.loc_db = LocationDB() self.handler = {} # addr -> callback(DSEEngine instance) self.instrumentation = {} # addr -> callback(DSEEngine instance) self.addr_to_cacheblocks = {} # addr -> {label -> IRBlock} self.ir_arch = self.machine.ir(loc_db=self.loc_db) # corresponding IR self.ircfg = self.ir_arch.new_ircfg() # corresponding IR # Defined after attachment self.jitter = None # Jitload (concrete execution) self.symb = None # SymbolicExecutionEngine self.symb_concrete = None # Concrete SymbExec for path desambiguisation self.mdis = None # DisasmEngine
def add_handler(
self, addr, callback)
Add a @callback for address @addr before any state update. The state IS NOT updated after returning from the callback @addr: int @callback: func(dse instance)
def add_handler(self, addr, callback): """Add a @callback for address @addr before any state update. The state IS NOT updated after returning from the callback @addr: int @callback: func(dse instance)""" self.handler[addr] = callback
def add_instrumentation(
self, addr, callback)
Add a @callback for address @addr before any state update. The state IS updated after returning from the callback @addr: int @callback: func(dse instance)
def add_instrumentation(self, addr, callback): """Add a @callback for address @addr before any state update. The state IS updated after returning from the callback @addr: int @callback: func(dse instance)""" self.instrumentation[addr] = callback
def add_lib_handler(
self, libimp, namespace)
Add search for handler based on a @libimp libimp instance
Known functions will be looked by {name}_symb in the @namespace
def add_lib_handler(self, libimp, namespace): """Add search for handler based on a @libimp libimp instance Known functions will be looked by {name}_symb in the @namespace """ # lambda cannot contain statement def default_func(dse): fname = "%s_symb" % libimp.fad2cname[dse.jitter.pc] raise RuntimeError("Symbolic stub '%s' not found" % fname) for addr, fname in libimp.fad2cname.iteritems(): fname = "%s_symb" % fname func = namespace.get(fname, None) if func is not None: self.add_handler(addr, func) else: self.add_handler(addr, default_func)
def attach(
self, emulator)
Attach the DSE to @emulator @emulator: jitload (or API equivalent) instance
To attach DURING A BREAKPOINT, one may consider using the following snippet:
def breakpoint(self, jitter): ... dse.attach(jitter) dse.update... ... # Additionnal call to the exec callback is necessary, as breakpoints are # honored AFTER exec callback jitter.exec_cb(jitter)
return True
Without it, one may encounteer a DriftException error due to a "desynchronization" between jitter and dse states. Indeed, on 'handle' call, the jitter must be one instruction AFTER the dse.
def attach(self, emulator): """Attach the DSE to @emulator @emulator: jitload (or API equivalent) instance To attach *DURING A BREAKPOINT*, one may consider using the following snippet: def breakpoint(self, jitter): ... dse.attach(jitter) dse.update... ... # Additionnal call to the exec callback is necessary, as breakpoints are # honored AFTER exec callback jitter.exec_cb(jitter) return True Without it, one may encounteer a DriftException error due to a "desynchronization" between jitter and dse states. Indeed, on 'handle' call, the jitter must be one instruction AFTER the dse. """ self.jitter = emulator self.prepare()
def callback(
self, _)
Called before each instruction
def callback(self, _): """Called before each instruction""" # Assert synchronization with concrete execution self._check_state() # Call callbacks associated to the current address cur_addr = self.jitter.pc if isinstance(cur_addr, LocKey): lbl = self.ir_arch.loc_db.loc_key_to_label(cur_addr) cur_addr = lbl.offset if cur_addr in self.handler: self.handler[cur_addr](self) return True if cur_addr in self.instrumentation: self.instrumentation[cur_addr](self) # Handle current address self.handle(ExprInt(cur_addr, self.ir_arch.IRDst.size)) # Avoid memory issue in ExpressionSimplifier if len(self.symb.expr_simp.simplified_exprs) > 100000: self.symb.expr_simp.simplified_exprs.clear() # Get IR blocks if cur_addr in self.addr_to_cacheblocks: self.ircfg.blocks.clear() self.ircfg.blocks.update(self.addr_to_cacheblocks[cur_addr]) else: ## Reset cache structures self.ircfg.blocks.clear()# = {} ## Update current state asm_block = self.mdis.dis_block(cur_addr) self.ir_arch.add_asmblock_to_ircfg(asm_block, self.ircfg) self.addr_to_cacheblocks[cur_addr] = dict(self.ircfg.blocks) # Emulate the current instruction self.symb.reset_modified() # Is the symbolic execution going (potentially) to jump on a lbl_gen? if len(self.ircfg.blocks) == 1: self.symb.run_at(self.ircfg, cur_addr) else: # Emulation could stuck in generated IR blocks # But concrete execution callback is not enough precise to obtain # the full IR blocks path # -> Use a fully concrete execution to get back path # Update the concrete execution self._update_state_from_concrete_symb( self.symb_concrete, cpu=True, mem=True ) while True: next_addr_concrete = self.symb_concrete.run_block_at( self.ircfg, cur_addr ) self.symb.run_block_at(self.ircfg, cur_addr) if not (isinstance(next_addr_concrete, ExprLoc) and self.ir_arch.loc_db.get_location_offset( next_addr_concrete.loc_key ) is None): # Not a lbl_gen, exit break # Call handle with lbl_gen state self.handle(next_addr_concrete) cur_addr = next_addr_concrete # At this stage, symbolic engine is one instruction after the concrete # engine return True
def eval_expr(
self, expr)
Return the evaluation of @expr: @expr: Expr instance
def eval_expr(self, expr): """Return the evaluation of @expr: @expr: Expr instance""" return self.symb.eval_expr(expr)
def handle(
self, cur_addr)
Handle destination @cur_addr: Expr of the next address in concrete execution /!\ cur_addr may be a loc_key
In this method, self.symb is in the "just before branching" state
def handle(self, cur_addr): r"""Handle destination @cur_addr: Expr of the next address in concrete execution /!\ cur_addr may be a loc_key In this method, self.symb is in the "just before branching" state """ pass
def prepare(
self)
Prepare the environment for attachment with a jitter
def prepare(self): """Prepare the environment for attachment with a jitter""" # Disassembler self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm), lines_wd=1, loc_db=self.loc_db) # Symbexec engine ## Prepare symbexec engines self.symb = self.SYMB_ENGINE(self.jitter.cpu, self.jitter.vm, self.ir_arch, {}) self.symb.enable_emulated_simplifications() self.symb_concrete = EmulatedSymbExec( self.jitter.cpu, self.jitter.vm, self.ir_arch, {} ) ### Avoid side effects on jitter while using 'symb_concrete' self.symb_concrete.func_write = None ## Update registers value self.symb.symbols[self.ir_arch.IRDst] = ExprInt( getattr(self.jitter.cpu, self.ir_arch.pc.name), self.ir_arch.IRDst.size ) # Avoid memory write self.symb.func_write = None # Activate callback on each instr self.jitter.jit.set_options(max_exec_per_call=1, jit_maxline=1) self.jitter.exec_cb = self.callback # Clean jit cache to avoid multi-line basic blocks already jitted self.jitter.jit.clear_jitted_blocks()
def restore_snapshot(
self, snapshot, memory=True)
Restore a @snapshot taken with .take_snapshot @snapshot: .take_snapshot output @memory: (optional) if set, also restore the memory
def restore_snapshot(self, snapshot, memory=True): """Restore a @snapshot taken with .take_snapshot @snapshot: .take_snapshot output @memory: (optional) if set, also restore the memory """ # Restore memory if memory: self.jitter.vm.reset_memory_page_pool() self.jitter.vm.reset_code_bloc_pool() for addr, metadata in snapshot["mem"].iteritems(): self.jitter.vm.add_memory_page(addr, metadata["access"], metadata["data"]) # Restore registers self.jitter.pc = snapshot["regs"][self.ir_arch.pc.name] for reg, value in snapshot["regs"].iteritems(): setattr(self.jitter.cpu, reg, value) # Reset intern elements self.jitter.vm.set_exception(0) self.jitter.cpu.set_exception(0) self.jitter.bs._atomic_mode = False # Reset symb exec for key, _ in self.symb.symbols.items(): del self.symb.symbols[key] for expr, value in snapshot["symb"].items(): self.symb.symbols[expr] = value
def symbolize_memory(
self, memory_range)
Register a range of memory addresses to symbolize @memory_range: object with support of in operation (intervals, list, ...)
def symbolize_memory(self, memory_range): """Register a range of memory addresses to symbolize @memory_range: object with support of __in__ operation (intervals, list, ...) """ self.symb.dse_memory_range = memory_range self.symb.dse_memory_to_expr = self.memory_to_expr
def take_snapshot(
self)
Return a snapshot of the current state (including jitter state)
def take_snapshot(self): """Return a snapshot of the current state (including jitter state)""" snapshot = { "mem": self.jitter.vm.get_all_memory(), "regs": self._get_gpregs(), "symb": self.symb.symbols.copy(), } return snapshot
def update_state(
self, assignblk)
From this point, assume @assignblk in the symbolic execution @assignblk: AssignBlock/{dst -> src}
def update_state(self, assignblk): """From this point, assume @assignblk in the symbolic execution @assignblk: AssignBlock/{dst -> src} """ for dst, src in assignblk.iteritems(): self.symb.apply_change(dst, src)
def update_state_from_concrete(
self, cpu=True, mem=False)
Update the symbolic state with concrete values from the concrete engine
@cpu: (optional) if set, update registers' value @mem: (optional) if set, update memory value
/!\ all current states will be loss. This function is usually called when states are no more synchronized (at the beginning, returning from an unstubbed syscall, ...)
def update_state_from_concrete(self, cpu=True, mem=False): r"""Update the symbolic state with concrete values from the concrete engine @cpu: (optional) if set, update registers' value @mem: (optional) if set, update memory value /!\ all current states will be loss. This function is usually called when states are no more synchronized (at the beginning, returning from an unstubbed syscall, ...) """ self._update_state_from_concrete_symb(self.symb, cpu, mem)
class DSEPathConstraint
Dynamic Symbolic Execution Engine keeping the path constraint
Possible new "solutions" are produced along the path, by inversing concrete path constraint. Thus, a "solution" is a potential initial context leading to a new path.
In order to produce a new solution, one can extend this class, and override 'handle_solution' to produce a solution which fit its needs. It could avoid computing new solution by overriding 'produce_solution'.
If one is only interested in constraints associated to its path, the option "produce_solution" should be set to False, to speed up emulation. The constraints are accumulated in the .z3_cur z3.Solver object.
class DSEPathConstraint(DSEEngine): """Dynamic Symbolic Execution Engine keeping the path constraint Possible new "solutions" are produced along the path, by inversing concrete path constraint. Thus, a "solution" is a potential initial context leading to a new path. In order to produce a new solution, one can extend this class, and override 'handle_solution' to produce a solution which fit its needs. It could avoid computing new solution by overriding 'produce_solution'. If one is only interested in constraints associated to its path, the option "produce_solution" should be set to False, to speed up emulation. The constraints are accumulated in the .z3_cur z3.Solver object. """ # Maximum memory size to inject in constraints solving MAX_MEMORY_INJECT = 0x10000 # Produce solution strategies PRODUCE_NO_SOLUTION = 0 PRODUCE_SOLUTION_CODE_COV = 1 PRODUCE_SOLUTION_BRANCH_COV = 2 PRODUCE_SOLUTION_PATH_COV = 3 def __init__(self, machine, produce_solution=PRODUCE_SOLUTION_CODE_COV, known_solutions=None, **kwargs): """Init a DSEPathConstraint @machine: Machine of the targeted architecture instance @produce_solution: (optional) if set, new solutions will be computed""" super(DSEPathConstraint, self).__init__(machine, **kwargs) # Dependency check assert z3 is not None # Init PathConstraint specifics structures self.cur_solver = z3.Solver() self.new_solutions = {} # solution identifier -> solution's model self._known_solutions = set() # set of solution identifiers self.z3_trans = Translator.to_language("z3") self._produce_solution_strategy = produce_solution self._previous_addr = None self._history = None if produce_solution == self.PRODUCE_SOLUTION_PATH_COV: self._history = [] # List of addresses in the current path def take_snapshot(self, *args, **kwargs): snap = super(DSEPathConstraint, self).take_snapshot(*args, **kwargs) snap["new_solutions"] = {dst: src.copy for dst, src in self.new_solutions.iteritems()} snap["cur_constraints"] = self.cur_solver.assertions() if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: snap["_history"] = list(self._history) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: snap["_previous_addr"] = self._previous_addr return snap def restore_snapshot(self, snapshot, keep_known_solutions=True, **kwargs): """Restore a DSEPathConstraint snapshot @keep_known_solutions: if set, do not forget solutions already found. -> They will not appear in 'new_solutions' """ super(DSEPathConstraint, self).restore_snapshot(snapshot, **kwargs) self.new_solutions.clear() self.new_solutions.update(snapshot["new_solutions"]) self.cur_solver = z3.Solver() self.cur_solver.add(snapshot["cur_constraints"]) if not keep_known_solutions: self._known_solutions.clear() if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: self._history = list(snapshot["_history"]) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: self._previous_addr = snapshot["_previous_addr"] def _key_for_solution_strategy(self, destination): """Return the associated identifier for the current solution strategy""" if self._produce_solution_strategy == self.PRODUCE_NO_SOLUTION: # Never produce a solution return None elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_CODE_COV: # Decision based on code coverage # -> produce a solution if the destination has never been seen key = destination elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: # Decision based on branch coverage # -> produce a solution if the current branch has never been take key = (self._previous_addr, destination) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: # Decision based on path coverage # -> produce a solution if the current path has never been take key = tuple(self._history + [destination]) else: raise ValueError("Unknown produce solution strategy") return key def produce_solution(self, destination): """Called to determine if a solution for @destination should be test for satisfiability and computed @destination: Expr instance of the target @destination """ key = self._key_for_solution_strategy(destination) if key is None: return False return key not in self._known_solutions def handle_solution(self, model, destination): """Called when a new solution for destination @destination is founded @model: z3 model instance @destination: Expr instance for an addr which is not on the DSE path """ key = self._key_for_solution_strategy(destination) assert key is not None self.new_solutions[key] = model self._known_solutions.add(key) def handle_correct_destination(self, destination, path_constraints): """[DEV] Called by handle() to update internal structures giving the correct destination (the concrete execution one). """ # Update structure used by produce_solution() if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: self._history.append(destination) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: self._previous_addr = destination # Update current solver for cons in path_constraints: self.cur_solver.add(self.z3_trans.from_expr(cons)) def handle(self, cur_addr): cur_addr = self.ir_arch.loc_db.canonize_to_exprloc(cur_addr) symb_pc = self.eval_expr(self.ir_arch.IRDst) possibilities = possible_values(symb_pc) cur_path_constraint = set() # path_constraint for the concrete path if len(possibilities) == 1: dst = next(iter(possibilities)).value dst = self.ir_arch.loc_db.canonize_to_exprloc(dst) assert dst == cur_addr else: for possibility in possibilities: target_addr = self.ir_arch.loc_db.canonize_to_exprloc( possibility.value ) path_constraint = set() # Set of ExprAff for the possible path # Get constraint associated to the possible path memory_to_add = ModularIntervals(symb_pc.size) for cons in possibility.constraints: eaff = cons.to_constraint() # eaff.get_r(mem_read=True) is not enough # ExprAff consider a Memory access in dst as a write mem = eaff.dst.get_r(mem_read=True) mem.update(eaff.src.get_r(mem_read=True)) for expr in mem: if expr.is_mem(): addr_range = expr_range(expr.arg) # At upper bounds, add the size of the memory access # if addr (- [a, b], then @size[addr] reachables # values are in @8[a, b + size[ for start, stop in addr_range: stop += (expr.size / 8) - 1 full_range = ModularIntervals(symb_pc.size, [(start, stop)]) memory_to_add.update(full_range) path_constraint.add(eaff) if memory_to_add.length > self.MAX_MEMORY_INJECT: # TODO re-croncretize the constraint or z3-try raise RuntimeError("Not implemented: too long memory area") # Inject memory for start, stop in memory_to_add: for address in xrange(start, stop + 1): expr_mem = ExprMem(ExprInt(address, self.ir_arch.pc.size), 8) value = self.eval_expr(expr_mem) if not value.is_int(): raise TypeError("Rely on a symbolic memory case, " \ "address 0x%x" % address) path_constraint.add(ExprAff(expr_mem, value)) if target_addr == cur_addr: # Add path constraint cur_path_constraint = path_constraint elif self.produce_solution(target_addr): # Looking for a new solution self.cur_solver.push() for cons in path_constraint: trans = self.z3_trans.from_expr(cons) trans = z3.simplify(trans) self.cur_solver.add(trans) result = self.cur_solver.check() if result == z3.sat: model = self.cur_solver.model() self.handle_solution(model, target_addr) self.cur_solver.pop() self.handle_correct_destination(cur_addr, cur_path_constraint)
Ancestors (in MRO)
- DSEPathConstraint
- DSEEngine
- __builtin__.object
Class variables
var MAX_MEMORY_INJECT
var PRODUCE_NO_SOLUTION
var PRODUCE_SOLUTION_BRANCH_COV
var PRODUCE_SOLUTION_CODE_COV
var PRODUCE_SOLUTION_PATH_COV
Static methods
def memory_to_expr(
addr)
Inheritance:
DSEEngine
.memory_to_expr
Translate an address to its corresponding symbolic ID (8bits) @addr: int
@staticmethod def memory_to_expr(addr): """Translate an address to its corresponding symbolic ID (8bits) @addr: int""" return ExprId("MEM_0x%x" % int(addr), 8)
Instance variables
var cur_solver
var new_solutions
var z3_trans
Methods
def __init__(
self, machine, produce_solution=1, known_solutions=None, **kwargs)
Inheritance:
DSEEngine
.__init__
Init a DSEPathConstraint @machine: Machine of the targeted architecture instance @produce_solution: (optional) if set, new solutions will be computed
def __init__(self, machine, produce_solution=PRODUCE_SOLUTION_CODE_COV, known_solutions=None, **kwargs): """Init a DSEPathConstraint @machine: Machine of the targeted architecture instance @produce_solution: (optional) if set, new solutions will be computed""" super(DSEPathConstraint, self).__init__(machine, **kwargs) # Dependency check assert z3 is not None # Init PathConstraint specifics structures self.cur_solver = z3.Solver() self.new_solutions = {} # solution identifier -> solution's model self._known_solutions = set() # set of solution identifiers self.z3_trans = Translator.to_language("z3") self._produce_solution_strategy = produce_solution self._previous_addr = None self._history = None if produce_solution == self.PRODUCE_SOLUTION_PATH_COV: self._history = [] # List of addresses in the current path
def add_handler(
self, addr, callback)
Inheritance:
DSEEngine
.add_handler
Add a @callback for address @addr before any state update. The state IS NOT updated after returning from the callback @addr: int @callback: func(dse instance)
def add_handler(self, addr, callback): """Add a @callback for address @addr before any state update. The state IS NOT updated after returning from the callback @addr: int @callback: func(dse instance)""" self.handler[addr] = callback
def add_instrumentation(
self, addr, callback)
Inheritance:
DSEEngine
.add_instrumentation
Add a @callback for address @addr before any state update. The state IS updated after returning from the callback @addr: int @callback: func(dse instance)
def add_instrumentation(self, addr, callback): """Add a @callback for address @addr before any state update. The state IS updated after returning from the callback @addr: int @callback: func(dse instance)""" self.instrumentation[addr] = callback
def add_lib_handler(
self, libimp, namespace)
Inheritance:
DSEEngine
.add_lib_handler
Add search for handler based on a @libimp libimp instance
Known functions will be looked by {name}_symb in the @namespace
def add_lib_handler(self, libimp, namespace): """Add search for handler based on a @libimp libimp instance Known functions will be looked by {name}_symb in the @namespace """ # lambda cannot contain statement def default_func(dse): fname = "%s_symb" % libimp.fad2cname[dse.jitter.pc] raise RuntimeError("Symbolic stub '%s' not found" % fname) for addr, fname in libimp.fad2cname.iteritems(): fname = "%s_symb" % fname func = namespace.get(fname, None) if func is not None: self.add_handler(addr, func) else: self.add_handler(addr, default_func)
def attach(
self, emulator)
Attach the DSE to @emulator @emulator: jitload (or API equivalent) instance
To attach DURING A BREAKPOINT, one may consider using the following snippet:
def breakpoint(self, jitter): ... dse.attach(jitter) dse.update... ... # Additionnal call to the exec callback is necessary, as breakpoints are # honored AFTER exec callback jitter.exec_cb(jitter)
return True
Without it, one may encounteer a DriftException error due to a "desynchronization" between jitter and dse states. Indeed, on 'handle' call, the jitter must be one instruction AFTER the dse.
def attach(self, emulator): """Attach the DSE to @emulator @emulator: jitload (or API equivalent) instance To attach *DURING A BREAKPOINT*, one may consider using the following snippet: def breakpoint(self, jitter): ... dse.attach(jitter) dse.update... ... # Additionnal call to the exec callback is necessary, as breakpoints are # honored AFTER exec callback jitter.exec_cb(jitter) return True Without it, one may encounteer a DriftException error due to a "desynchronization" between jitter and dse states. Indeed, on 'handle' call, the jitter must be one instruction AFTER the dse. """ self.jitter = emulator self.prepare()
def callback(
self, _)
Inheritance:
DSEEngine
.callback
Called before each instruction
def callback(self, _): """Called before each instruction""" # Assert synchronization with concrete execution self._check_state() # Call callbacks associated to the current address cur_addr = self.jitter.pc if isinstance(cur_addr, LocKey): lbl = self.ir_arch.loc_db.loc_key_to_label(cur_addr) cur_addr = lbl.offset if cur_addr in self.handler: self.handler[cur_addr](self) return True if cur_addr in self.instrumentation: self.instrumentation[cur_addr](self) # Handle current address self.handle(ExprInt(cur_addr, self.ir_arch.IRDst.size)) # Avoid memory issue in ExpressionSimplifier if len(self.symb.expr_simp.simplified_exprs) > 100000: self.symb.expr_simp.simplified_exprs.clear() # Get IR blocks if cur_addr in self.addr_to_cacheblocks: self.ircfg.blocks.clear() self.ircfg.blocks.update(self.addr_to_cacheblocks[cur_addr]) else: ## Reset cache structures self.ircfg.blocks.clear()# = {} ## Update current state asm_block = self.mdis.dis_block(cur_addr) self.ir_arch.add_asmblock_to_ircfg(asm_block, self.ircfg) self.addr_to_cacheblocks[cur_addr] = dict(self.ircfg.blocks) # Emulate the current instruction self.symb.reset_modified() # Is the symbolic execution going (potentially) to jump on a lbl_gen? if len(self.ircfg.blocks) == 1: self.symb.run_at(self.ircfg, cur_addr) else: # Emulation could stuck in generated IR blocks # But concrete execution callback is not enough precise to obtain # the full IR blocks path # -> Use a fully concrete execution to get back path # Update the concrete execution self._update_state_from_concrete_symb( self.symb_concrete, cpu=True, mem=True ) while True: next_addr_concrete = self.symb_concrete.run_block_at( self.ircfg, cur_addr ) self.symb.run_block_at(self.ircfg, cur_addr) if not (isinstance(next_addr_concrete, ExprLoc) and self.ir_arch.loc_db.get_location_offset( next_addr_concrete.loc_key ) is None): # Not a lbl_gen, exit break # Call handle with lbl_gen state self.handle(next_addr_concrete) cur_addr = next_addr_concrete # At this stage, symbolic engine is one instruction after the concrete # engine return True
def eval_expr(
self, expr)
Inheritance:
DSEEngine
.eval_expr
Return the evaluation of @expr: @expr: Expr instance
def eval_expr(self, expr): """Return the evaluation of @expr: @expr: Expr instance""" return self.symb.eval_expr(expr)
def handle(
self, cur_addr)
Handle destination @cur_addr: Expr of the next address in concrete execution /!\ cur_addr may be a loc_key
In this method, self.symb is in the "just before branching" state
def handle(self, cur_addr): cur_addr = self.ir_arch.loc_db.canonize_to_exprloc(cur_addr) symb_pc = self.eval_expr(self.ir_arch.IRDst) possibilities = possible_values(symb_pc) cur_path_constraint = set() # path_constraint for the concrete path if len(possibilities) == 1: dst = next(iter(possibilities)).value dst = self.ir_arch.loc_db.canonize_to_exprloc(dst) assert dst == cur_addr else: for possibility in possibilities: target_addr = self.ir_arch.loc_db.canonize_to_exprloc( possibility.value ) path_constraint = set() # Set of ExprAff for the possible path # Get constraint associated to the possible path memory_to_add = ModularIntervals(symb_pc.size) for cons in possibility.constraints: eaff = cons.to_constraint() # eaff.get_r(mem_read=True) is not enough # ExprAff consider a Memory access in dst as a write mem = eaff.dst.get_r(mem_read=True) mem.update(eaff.src.get_r(mem_read=True)) for expr in mem: if expr.is_mem(): addr_range = expr_range(expr.arg) # At upper bounds, add the size of the memory access # if addr (- [a, b], then @size[addr] reachables # values are in @8[a, b + size[ for start, stop in addr_range: stop += (expr.size / 8) - 1 full_range = ModularIntervals(symb_pc.size, [(start, stop)]) memory_to_add.update(full_range) path_constraint.add(eaff) if memory_to_add.length > self.MAX_MEMORY_INJECT: # TODO re-croncretize the constraint or z3-try raise RuntimeError("Not implemented: too long memory area") # Inject memory for start, stop in memory_to_add: for address in xrange(start, stop + 1): expr_mem = ExprMem(ExprInt(address, self.ir_arch.pc.size), 8) value = self.eval_expr(expr_mem) if not value.is_int(): raise TypeError("Rely on a symbolic memory case, " \ "address 0x%x" % address) path_constraint.add(ExprAff(expr_mem, value)) if target_addr == cur_addr: # Add path constraint cur_path_constraint = path_constraint elif self.produce_solution(target_addr): # Looking for a new solution self.cur_solver.push() for cons in path_constraint: trans = self.z3_trans.from_expr(cons) trans = z3.simplify(trans) self.cur_solver.add(trans) result = self.cur_solver.check() if result == z3.sat: model = self.cur_solver.model() self.handle_solution(model, target_addr) self.cur_solver.pop() self.handle_correct_destination(cur_addr, cur_path_constraint)
def handle_correct_destination(
self, destination, path_constraints)
[DEV] Called by handle() to update internal structures giving the correct destination (the concrete execution one).
def handle_correct_destination(self, destination, path_constraints): """[DEV] Called by handle() to update internal structures giving the correct destination (the concrete execution one). """ # Update structure used by produce_solution() if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: self._history.append(destination) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: self._previous_addr = destination # Update current solver for cons in path_constraints: self.cur_solver.add(self.z3_trans.from_expr(cons))
def handle_solution(
self, model, destination)
Called when a new solution for destination @destination is founded @model: z3 model instance @destination: Expr instance for an addr which is not on the DSE path
def handle_solution(self, model, destination): """Called when a new solution for destination @destination is founded @model: z3 model instance @destination: Expr instance for an addr which is not on the DSE path """ key = self._key_for_solution_strategy(destination) assert key is not None self.new_solutions[key] = model self._known_solutions.add(key)
def prepare(
self)
Inheritance:
DSEEngine
.prepare
Prepare the environment for attachment with a jitter
def prepare(self): """Prepare the environment for attachment with a jitter""" # Disassembler self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm), lines_wd=1, loc_db=self.loc_db) # Symbexec engine ## Prepare symbexec engines self.symb = self.SYMB_ENGINE(self.jitter.cpu, self.jitter.vm, self.ir_arch, {}) self.symb.enable_emulated_simplifications() self.symb_concrete = EmulatedSymbExec( self.jitter.cpu, self.jitter.vm, self.ir_arch, {} ) ### Avoid side effects on jitter while using 'symb_concrete' self.symb_concrete.func_write = None ## Update registers value self.symb.symbols[self.ir_arch.IRDst] = ExprInt( getattr(self.jitter.cpu, self.ir_arch.pc.name), self.ir_arch.IRDst.size ) # Avoid memory write self.symb.func_write = None # Activate callback on each instr self.jitter.jit.set_options(max_exec_per_call=1, jit_maxline=1) self.jitter.exec_cb = self.callback # Clean jit cache to avoid multi-line basic blocks already jitted self.jitter.jit.clear_jitted_blocks()
def produce_solution(
self, destination)
Called to determine if a solution for @destination should be test for satisfiability and computed @destination: Expr instance of the target @destination
def produce_solution(self, destination): """Called to determine if a solution for @destination should be test for satisfiability and computed @destination: Expr instance of the target @destination """ key = self._key_for_solution_strategy(destination) if key is None: return False return key not in self._known_solutions
def restore_snapshot(
self, snapshot, keep_known_solutions=True, **kwargs)
Inheritance:
DSEEngine
.restore_snapshot
Restore a DSEPathConstraint snapshot @keep_known_solutions: if set, do not forget solutions already found. -> They will not appear in 'new_solutions'
def restore_snapshot(self, snapshot, keep_known_solutions=True, **kwargs): """Restore a DSEPathConstraint snapshot @keep_known_solutions: if set, do not forget solutions already found. -> They will not appear in 'new_solutions' """ super(DSEPathConstraint, self).restore_snapshot(snapshot, **kwargs) self.new_solutions.clear() self.new_solutions.update(snapshot["new_solutions"]) self.cur_solver = z3.Solver() self.cur_solver.add(snapshot["cur_constraints"]) if not keep_known_solutions: self._known_solutions.clear() if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: self._history = list(snapshot["_history"]) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: self._previous_addr = snapshot["_previous_addr"]
def symbolize_memory(
self, memory_range)
Inheritance:
DSEEngine
.symbolize_memory
Register a range of memory addresses to symbolize @memory_range: object with support of in operation (intervals, list, ...)
def symbolize_memory(self, memory_range): """Register a range of memory addresses to symbolize @memory_range: object with support of __in__ operation (intervals, list, ...) """ self.symb.dse_memory_range = memory_range self.symb.dse_memory_to_expr = self.memory_to_expr
def take_snapshot(
self, *args, **kwargs)
Inheritance:
DSEEngine
.take_snapshot
Return a snapshot of the current state (including jitter state)
def take_snapshot(self, *args, **kwargs): snap = super(DSEPathConstraint, self).take_snapshot(*args, **kwargs) snap["new_solutions"] = {dst: src.copy for dst, src in self.new_solutions.iteritems()} snap["cur_constraints"] = self.cur_solver.assertions() if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: snap["_history"] = list(self._history) elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: snap["_previous_addr"] = self._previous_addr return snap
def update_state(
self, assignblk)
Inheritance:
DSEEngine
.update_state
From this point, assume @assignblk in the symbolic execution @assignblk: AssignBlock/{dst -> src}
def update_state(self, assignblk): """From this point, assume @assignblk in the symbolic execution @assignblk: AssignBlock/{dst -> src} """ for dst, src in assignblk.iteritems(): self.symb.apply_change(dst, src)
def update_state_from_concrete(
self, cpu=True, mem=False)
Inheritance:
DSEEngine
.update_state_from_concrete
Update the symbolic state with concrete values from the concrete engine
@cpu: (optional) if set, update registers' value @mem: (optional) if set, update memory value
/!\ all current states will be loss. This function is usually called when states are no more synchronized (at the beginning, returning from an unstubbed syscall, ...)
def update_state_from_concrete(self, cpu=True, mem=False): r"""Update the symbolic state with concrete values from the concrete engine @cpu: (optional) if set, update registers' value @mem: (optional) if set, update memory value /!\ all current states will be loss. This function is usually called when states are no more synchronized (at the beginning, returning from an unstubbed syscall, ...) """ self._update_state_from_concrete_symb(self.symb, cpu, mem)
class DriftException
Raised when the emulation drift from the reference engine
class DriftException(Exception): """Raised when the emulation drift from the reference engine""" def __init__(self, info): super(DriftException, self).__init__() self.info = info def __str__(self): if len(self.info) == 1: return "Drift of %s: %s instead of %s" % ( self.info[0].symbol, self.info[0].computed, self.info[0].expected, ) else: return "Drift of:\n\t" + "\n\t".join("%s: %s instead of %s" % ( dinfo.symbol, dinfo.computed, dinfo.expected) for dinfo in self.info)
Ancestors (in MRO)
- DriftException
- exceptions.Exception
- exceptions.BaseException
- __builtin__.object
Class variables
var args
var message
Instance variables
var info
Methods
def __init__(
self, info)
def __init__(self, info): super(DriftException, self).__init__() self.info = info
class DriftInfo
DriftInfo(symbol, computed, expected)
Ancestors (in MRO)
- DriftInfo
- __builtin__.tuple
- __builtin__.object
Instance variables
var computed
Alias for field number 1
var expected
Alias for field number 2
var symbol
Alias for field number 0
class ESETrackModif
Extension of EmulatedSymbExec to be used by DSE engines
Add the tracking of modified expressions, and the ability to symbolize memory areas
class ESETrackModif(EmulatedSymbExec): """Extension of EmulatedSymbExec to be used by DSE engines Add the tracking of modified expressions, and the ability to symbolize memory areas """ def __init__(self, *args, **kwargs): super(ESETrackModif, self).__init__(*args, **kwargs) self.modified_expr = set() # Expr modified since the last reset self.dse_memory_range = [] # List/Intervals of memory addresses to # symbolize self.dse_memory_to_expr = None # function(addr) -> Expr used to # symbolize def _func_read(self, expr_mem): if not expr_mem.arg.is_int(): return expr_mem dst_addr = int(expr_mem.arg) if not self.dse_memory_range: # Trivial case (optimization) return super(ESETrackModif, self)._func_read(expr_mem) # Split access in atomic accesses out = [] for addr in xrange(dst_addr, dst_addr + (expr_mem.size / 8)): if addr in self.dse_memory_range: # Symbolize memory access out.append(self.dse_memory_to_expr(addr)) else: # Get concrete value atomic_access = ExprMem(ExprInt(addr, expr_mem.arg.size), 8) out.append(super(ESETrackModif, self)._func_read(atomic_access)) if len(out) == 1: # Trivial case (optimization) return out[0] # Simplify for constant merging (ex: {ExprInt(1, 8), ExprInt(2, 8)}) return self.expr_simp(ExprCompose(*out)) def reset_modified(self): """Reset modified expression tracker""" self.modified_expr.clear() def apply_change(self, dst, src): super(ESETrackModif, self).apply_change(dst, src) self.modified_expr.add(dst)
Ancestors (in MRO)
- ESETrackModif
- miasm2.jitter.emulatedsymbexec.EmulatedSymbExec
- miasm2.ir.symbexec.SymbolicExecutionEngine
- __builtin__.object
Class variables
var StateEngine
var x86_cpuid
Instance variables
var dse_memory_range
var dse_memory_to_expr
var modified_expr
var state
Return the current state of the SymbolicEngine
Methods
def __init__(
self, *args, **kwargs)
def __init__(self, *args, **kwargs): super(ESETrackModif, self).__init__(*args, **kwargs) self.modified_expr = set() # Expr modified since the last reset self.dse_memory_range = [] # List/Intervals of memory addresses to # symbolize self.dse_memory_to_expr = None # function(addr) -> Expr used to
def apply_change(
self, dst, src)
def apply_change(self, dst, src): super(ESETrackModif, self).apply_change(dst, src) self.modified_expr.add(dst)
def apply_expr(
self, expr)
Deprecated version of eval_updt_expr
def apply_expr(self, expr): """Deprecated version of eval_updt_expr""" warnings.warn('DEPRECATION WARNING: use "eval_updt_expr" instead of apply_expr') return self.eval_updt_expr(expr)
def apply_expr_on_state(
self, expr, cache)
Deprecated version of eval_expr
def apply_expr_on_state(self, expr, cache): """Deprecated version of eval_expr""" warnings.warn('DEPRECATION WARNING: use "eval_expr" instead of apply_expr_on_state') if cache is None: cache = {} ret = self.eval_expr(expr, eval_cache=cache) return ret
def as_assignblock(
self)
Return the current state as an AssignBlock
def as_assignblock(self): """Return the current state as an AssignBlock""" warnings.warn('DEPRECATION WARNING: use "modified(ids=True, mems=True)" instead of as_assignblock') out = [] for dst, src in self.modified(ids=True, mems=True): out.append((dst, src)) return AssignBlock(dict(out))
def del_mem_above_stack(
self, stack_ptr)
Remove all stored memory values with following properties: pointer based on initial stack value pointer below current stack pointer
def del_mem_above_stack(self, stack_ptr): """ Remove all stored memory values with following properties: * pointer based on initial stack value * pointer below current stack pointer """ stack_ptr = self.eval_expr(stack_ptr) base, stk_offset = get_expr_base_offset(stack_ptr) memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) if memarray: to_del = set() for offset in memarray: if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: to_del.add(offset) for offset in to_del: del memarray[offset]
def dump(
self, ids=True, mems=True)
Display modififed variables @ids: display modified ids @mems: display modified memory
def dump(self, ids=True, mems=True): """ Display modififed variables @ids: display modified ids @mems: display modified memory """ for variable, value in self.modified(None, ids, mems): print "%-18s" % variable, "=", "%s" % value
def dump_id(
self)
Deprecated version of dump(mems=False)
def dump_id(self): """Deprecated version of dump(mems=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, mems=False)" instead of dump_id') self.dump(mems=False)
def dump_mem(
self)
Deprecated version of dump(ids=False)
def dump_mem(self): """Deprecated version of dump(ids=False)""" warnings.warn('DEPRECATION WARNING: use "dump(self, ids=False)" instead of dump_mem') self.dump(ids=False)
def emul_ir_bloc(
self, _, addr, step=False)
Deprecated version of run_block_at
def emul_ir_bloc(self, _, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_bloc') return self.run_block_at(addr, step)
def emul_ir_block(
self, addr, step=False)
Deprecated version of run_block_at
def emul_ir_block(self, addr, step=False): """Deprecated version of run_block_at""" warnings.warn('DEPRECATION WARNING: use "run_block_at(self, addr, step=False)" instead of emul_ir_block') return self.run_block_at(addr, step)
def emul_ir_blocks(
self, addr, lbl_stop=None, step=False)
Deprecated version of run_at
def emul_ir_blocks(self, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocks') return self.run_at(addr, lbl_stop, step)
def emul_ir_blocs(
self, _, addr, lbl_stop=None, step=False)
Deprecated version of run_at
def emul_ir_blocs(self, _, addr, lbl_stop=None, step=False): """Deprecated version of run_at""" warnings.warn('DEPRECATION WARNING: use "run_at(self, addr, lbl_stop=None, step=False):" instead of emul_ir_blocs') return self.run_at(addr, lbl_stop, step)
def emulbloc(
self, irb, step=False)
Deprecated version of eval_updt_irblock(self, irb, step=False)
def emulbloc(self, irb, step=False): """Deprecated version of eval_updt_irblock(self, irb, step=False)""" warnings.warn('DEPRECATION WARNING: use "eval_updt_irblock(self, irb, step=False)" instead of emulbloc') return self.eval_updt_irblock(irb, step)
def enable_emulated_simplifications(
self)
Enable simplifications needing a CPU instance on associated ExpressionSimplifier
def enable_emulated_simplifications(self): """Enable simplifications needing a CPU instance on associated ExpressionSimplifier """ self.expr_simp.enable_passes({ m2_expr.ExprOp: [self._simp_handle_segm, self._simp_handle_x86_cpuid], })
def eval_assignblk(
self, assignblk)
Evaluate AssignBlock using the current state
Returns a dictionary containing modified keys associated to their values
@assignblk: AssignBlock instance
def eval_assignblk(self, assignblk): """ Evaluate AssignBlock using the current state Returns a dictionary containing modified keys associated to their values @assignblk: AssignBlock instance """ pool_out = {} eval_cache = {} for dst, src in assignblk.iteritems(): src = self.eval_expr(src, eval_cache) if dst.is_mem(): ptr = self.eval_expr(dst.arg, eval_cache) # Test if mem lookup is known tmp = ExprMem(ptr, dst.size) pool_out[tmp] = src elif dst.is_id(): pool_out[dst] = src else: raise ValueError("Unknown destination type", str(dst)) return pool_out
def eval_expr(
self, expr, eval_cache=None)
Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values
def eval_expr(self, expr, eval_cache=None): """ Evaluate @expr @expr: Expresion instance to evaluate @cache: None or dictionary linking variables to their values """ if eval_cache is None: eval_cache = {} ret = self.eval_expr_visitor(expr, cache=eval_cache) assert ret is not None return ret
def eval_expr_visitor(
self, expr, cache=None)
[DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression.
def eval_expr_visitor(self, expr, cache=None): """ [DEV]: Override to change the behavior of an Expr evaluation. This function recursively applies 'eval_expr*' to @expr. This function uses @cache to speedup re-evaluation of expression. """ if cache is None: cache = {} ret = cache.get(expr, None) if ret is not None: return ret new_expr = self.expr_simp(expr) ret = cache.get(new_expr, None) if ret is not None: return ret func = self.expr_to_visitor.get(new_expr.__class__, None) if func is None: raise TypeError("Unknown expr type") ret = func(new_expr, cache=cache) ret = self.expr_simp(ret) assert ret is not None cache[expr] = ret cache[new_expr] = ret return ret
def eval_exprcompose(
self, expr, **kwargs)
[DEV]: Evaluate an ExprCompose using the current state
def eval_exprcompose(self, expr, **kwargs): """[DEV]: Evaluate an ExprCompose using the current state""" args = [] for arg in expr.args: args.append(self.eval_expr_visitor(arg, **kwargs)) ret = ExprCompose(*args) return ret
def eval_exprcond(
self, expr, **kwargs)
[DEV]: Evaluate an ExprCond using the current state
def eval_exprcond(self, expr, **kwargs): """[DEV]: Evaluate an ExprCond using the current state""" cond = self.eval_expr_visitor(expr.cond, **kwargs) src1 = self.eval_expr_visitor(expr.src1, **kwargs) src2 = self.eval_expr_visitor(expr.src2, **kwargs) ret = ExprCond(cond, src1, src2) return ret
def eval_exprid(
self, expr, **kwargs)
[DEV]: Evaluate an ExprId using the current state
def eval_exprid(self, expr, **kwargs): """[DEV]: Evaluate an ExprId using the current state""" ret = self.symbols.read(expr) return ret
def eval_exprint(
self, expr, **kwargs)
[DEV]: Evaluate an ExprInt using the current state
def eval_exprint(self, expr, **kwargs): """[DEV]: Evaluate an ExprInt using the current state""" return expr
def eval_exprloc(
self, expr, **kwargs)
[DEV]: Evaluate an ExprLoc using the current state
def eval_exprloc(self, expr, **kwargs): """[DEV]: Evaluate an ExprLoc using the current state""" offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key) if offset is not None: ret = ExprInt(offset, expr.size) else: ret = expr return ret
def eval_exprmem(
self, expr, **kwargs)
[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses
def eval_exprmem(self, expr, **kwargs): """[DEV]: Evaluate an ExprMem using the current state This function first evaluate the memory pointer value. Override 'mem_read' to modify the effective memory accesses """ ptr = self.eval_expr_visitor(expr.arg, **kwargs) mem = ExprMem(ptr, expr.size) ret = self.mem_read(mem) return ret
def eval_exprop(
self, expr, **kwargs)
[DEV]: Evaluate an ExprOp using the current state
def eval_exprop(self, expr, **kwargs): """[DEV]: Evaluate an ExprOp using the current state""" args = [] for oarg in expr.args: arg = self.eval_expr_visitor(oarg, **kwargs) args.append(arg) ret = ExprOp(expr.op, *args) return ret
def eval_exprslice(
self, expr, **kwargs)
[DEV]: Evaluate an ExprSlice using the current state
def eval_exprslice(self, expr, **kwargs): """[DEV]: Evaluate an ExprSlice using the current state""" arg = self.eval_expr_visitor(expr.arg, **kwargs) ret = ExprSlice(arg, expr.start, expr.stop) return ret
def eval_ir(
self, assignblk)
Deprecated version of eval_updt_assignblk(self, assignblk)
def eval_ir(self, assignblk): """Deprecated version of eval_updt_assignblk(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir') return self.eval_updt_assignblk(assignblk)
def eval_ir_expr(
self, assignblk)
Deprecated version of eval_ir_expr(self, assignblk)
def eval_ir_expr(self, assignblk): """Deprecated version of eval_ir_expr(self, assignblk)""" warnings.warn('DEPRECATION WARNING: use "eval_assignblk(self, assignblk)" instead of eval_ir_expr') return self.eval_assignblk(assignblk).iteritems()
def eval_updt_assignblk(
self, assignblk)
Apply an AssignBlock on the current state @assignblk: AssignBlock instance
def eval_updt_assignblk(self, assignblk): """ Apply an AssignBlock on the current state @assignblk: AssignBlock instance """ mem_dst = [] dst_src = self.eval_assignblk(assignblk) for dst, src in dst_src.iteritems(): self.apply_change(dst, src) if dst.is_mem(): mem_dst.append(dst) return mem_dst
def eval_updt_expr(
self, expr)
Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value
def eval_updt_expr(self, expr): """ Evaluate @expr and apply side effect if needed (ie. if expr is an assignment). Return the evaluated value """ # Update value if needed if expr.is_aff(): ret = self.eval_expr(expr.src) self.eval_updt_assignblk(AssignBlock([expr])) else: ret = self.eval_expr(expr) return ret
def eval_updt_irblock(
self, irb, step=False)
Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps
def eval_updt_irblock(self, irb, step=False): """ Symbolic execution of the @irb on the current state @irb: irbloc instance @step: display intermediate steps """ for assignblk in irb: if step: print 'Instr', assignblk.instr print 'Assignblk:' print assignblk print '_' * 80 self.eval_updt_assignblk(assignblk) if step: self.dump(mems=False) self.dump(ids=False) print '_' * 80 dst = self.eval_expr(self.ir_arch.IRDst) return dst
def get_state(
self)
Return the current state of the SymbolicEngine
def get_state(self): """Return the current state of the SymbolicEngine""" state = self.StateEngine(dict(self.symbols)) return state
def mem_read(
self, expr)
[DEV]: Override to modify the effective memory reads
Read symbolic value at ExprMem @expr @expr: ExprMem
def mem_read(self, expr): """ [DEV]: Override to modify the effective memory reads Read symbolic value at ExprMem @expr @expr: ExprMem """ parts = self._resolve_mem_parts(expr) out = [] for known, part in parts: if not known and part.is_mem() and self.func_read is not None: ret = self.func_read(part) else: ret = part out.append(ret) ret = self.expr_simp(ExprCompose(*out)) assert ret.size == expr.size return ret
def mem_write(
self, dst, src)
[DEV]: Override to modify the effective memory writes
Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression
def mem_write(self, dst, src): """ [DEV]: Override to modify the effective memory writes Write symbolic value @src at ExprMem @dst @dst: destination ExprMem @src: source Expression """ if self.func_write is not None: self.func_write(self, dst, src) else: self.symbols.write(dst, src)
def modified(
self, init_state=None, ids=True, mems=True)
Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only
def modified(self, init_state=None, ids=True, mems=True): """ Return the modified variables. @init_state: a base dictionary linking variables to their initial values to diff. Can be None. @ids: track ids only @mems: track mems only """ if init_state is None: init_state = {} if ids: for variable, value in self.symbols.symbols_id.iteritems(): if variable in init_state and init_state[variable] == value: continue yield variable, value if mems: for mem, value in self.symbols.memory(): if mem in init_state and init_state[mem] == value: continue yield mem, value
def modified_mems(
self, init_state=None)
Deprecated version of modified(ids=False)
def modified_mems(self, init_state=None): """Deprecated version of modified(ids=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, ids=False)" instead of modified_mems') for mem in self.modified(init_state=init_state, ids=False): yield mem
def modified_regs(
self, init_state=None)
Deprecated version of modified(mems=False)
def modified_regs(self, init_state=None): """Deprecated version of modified(mems=False)""" warnings.warn('DEPRECATION WARNING: use "modified(self, mems=False)" instead of modified_regs') for reg in self.modified(init_state=init_state, mems=False): yield reg
def reset_modified(
self)
Reset modified expression tracker
def reset_modified(self): """Reset modified expression tracker""" self.modified_expr.clear()
def reset_regs(
self)
Set registers value to 0. Ignore register aliases
def reset_regs(self): """Set registers value to 0. Ignore register aliases""" for reg in self.ir_arch.arch.regs.all_regs_ids_no_alias: self.symbols.symbols_id[reg] = m2_expr.ExprInt(0, size=reg.size)
def run_at(
self, ircfg, addr, lbl_stop=None, step=False)
Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps
def run_at(self, ircfg, addr, lbl_stop=None, step=False): """ Symbolic execution starting at @addr @addr: address to execute (int or ExprInt or label) @lbl_stop: LocKey to stop execution on @step: display intermediate steps """ while True: irblock = ircfg.get_block(addr) if irblock is None: break if irblock.loc_key == lbl_stop: break addr = self.eval_updt_irblock(irblock, step=step) return addr
def run_block_at(
self, ircfg, addr, step=False)
Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps
def run_block_at(self, ircfg, addr, step=False): """ Symbolic execution of the block at @addr @addr: address to execute (int or ExprInt or label) @step: display intermediate steps """ irblock = ircfg.get_block(addr) if irblock is not None: addr = self.eval_updt_irblock(irblock, step=step) return addr
def set_state(
self, state)
Restaure the @state of the engine @state: StateEngine instance
def set_state(self, state): """Restaure the @state of the engine @state: StateEngine instance """ self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp) for dst, src in dict(state).iteritems(): self.symbols[dst] = src
def update_cpu_from_engine(
self)
Updates @cpu instance according to new CPU values
def update_cpu_from_engine(self): """Updates @cpu instance according to new CPU values""" for symbol in self.symbols: if isinstance(symbol, m2_expr.ExprId): if hasattr(self.cpu, symbol.name): value = self.symbols.symbols_id[symbol] if not isinstance(value, m2_expr.ExprInt): raise ValueError("A simplification is missing: %s" % value) setattr(self.cpu, symbol.name, value.arg.arg) else: raise NotImplementedError("Type not handled: %s" % symbol)
def update_engine_from_cpu(
self)
Updates CPU values according to @cpu instance
def update_engine_from_cpu(self): """Updates CPU values according to @cpu instance""" for symbol in self.symbols: if isinstance(symbol, m2_expr.ExprId): if hasattr(self.cpu, symbol.name): value = m2_expr.ExprInt(getattr(self.cpu, symbol.name), symbol.size) self.symbols.symbols_id[symbol] = value else: raise NotImplementedError("Type not handled: %s" % symbol)