miasm2.core.asmblock module
#-*- coding:utf-8 -*- import logging import warnings from collections import namedtuple from miasm2.expression.expression import ExprId, ExprInt, ExprLoc, \ get_expr_locs from miasm2.expression.expression import LocKey from miasm2.expression.simplifications import expr_simp from miasm2.expression.modint import moduint, modint from miasm2.core.utils import Disasm_Exception, pck from miasm2.core.graph import DiGraph, DiGraphSimplifier, MatchGraphJoker from miasm2.core.interval import interval from miasm2.core.locationdb import LocationDB log_asmblock = logging.getLogger("asmblock") console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) log_asmblock.addHandler(console_handler) log_asmblock.setLevel(logging.WARNING) def is_int(a): return isinstance(a, int) or isinstance(a, long) or \ isinstance(a, moduint) or isinstance(a, modint) class AsmRaw(object): def __init__(self, raw=""): self.raw = raw def __str__(self): return repr(self.raw) def to_string(self, loc_db): return str(self) class asm_raw(AsmRaw): def __init__(self, raw=""): warnings.warn('DEPRECATION WARNING: use "AsmRaw" instead of "asm_raw"') super(asm_label, self).__init__(raw) class AsmConstraint(object): c_to = "c_to" c_next = "c_next" def __init__(self, loc_key, c_t=c_to): # Sanity check assert isinstance(loc_key, LocKey) self.loc_key = loc_key self.c_t = c_t def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key def set_label(self, loc_key): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') self.loc_key = loc_key label = property(get_label, set_label) def to_string(self, loc_db=None): if loc_db is None: return "%s:%s" % (self.c_t, self.loc_key) else: return "%s:%s" % ( self.c_t, loc_db.pretty_str(self.loc_key) ) def __str__(self): return self.to_string() class asm_constraint(AsmConstraint): def __init__(self, loc_key, c_t=AsmConstraint.c_to): warnings.warn('DEPRECATION WARNING: use "AsmConstraint" instead of "asm_constraint"') super(asm_constraint, self).__init__(loc_key, c_t) class AsmConstraintNext(AsmConstraint): def __init__(self, loc_key): super(AsmConstraintNext, self).__init__( loc_key, c_t=AsmConstraint.c_next ) class asm_constraint_next(AsmConstraint): def __init__(self, loc_key): warnings.warn('DEPRECATION WARNING: use "AsmConstraintNext" instead of "asm_constraint_next"') super(asm_constraint_next, self).__init__(loc_key) class AsmConstraintTo(AsmConstraint): def __init__(self, loc_key): super(AsmConstraintTo, self).__init__( loc_key, c_t=AsmConstraint.c_to ) class asm_constraint_to(AsmConstraint): def __init__(self, loc_key): warnings.warn('DEPRECATION WARNING: use "AsmConstraintTo" instead of "asm_constraint_to"') super(asm_constraint_to, self).__init__(loc_key) class AsmBlock(object): def __init__(self, loc_key, alignment=1): assert isinstance(loc_key, LocKey) self.bto = set() self.lines = [] self._loc_key = loc_key self.alignment = alignment def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key loc_key = property(lambda self:self._loc_key) label = property(get_label) def to_string(self, loc_db=None): out = [] if loc_db is None: out.append(str(self.loc_key)) else: out.append(loc_db.pretty_str(self.loc_key)) for instr in self.lines: out.append(instr.to_string(loc_db)) if self.bto: lbls = ["->"] for dst in self.bto: if dst is None: lbls.append("Unknown? ") else: lbls.append(dst.to_string(loc_db) + " ") lbls = '\t'.join(sorted(lbls)) out.append(lbls) return '\n'.join(out) def __str__(self): return self.to_string() def addline(self, l): self.lines.append(l) def addto(self, c): assert isinstance(self.bto, set) self.bto.add(c) def split(self, loc_db, offset): loc_key = loc_db.get_or_create_offset_location(offset) log_asmblock.debug('split at %x', offset) i = -1 offsets = [x.offset for x in self.lines] offset = loc_db.get_location_offset(loc_key) if offset not in offsets: log_asmblock.warning( 'cannot split bloc at %X ' % offset + 'middle instruction? default middle') offsets.sort() return None new_bloc = AsmBlock(loc_key) i = offsets.index(offset) self.lines, new_bloc.lines = self.lines[:i], self.lines[i:] flow_mod_instr = self.get_flow_instr() log_asmblock.debug('flow mod %r', flow_mod_instr) c = AsmConstraint(loc_key, AsmConstraint.c_next) # move dst if flowgraph modifier was in original bloc # (usecase: split delayslot bloc) if flow_mod_instr: for xx in self.bto: log_asmblock.debug('lbl %s', xx) c_next = set( [x for x in self.bto if x.c_t == AsmConstraint.c_next]) c_to = [x for x in self.bto if x.c_t != AsmConstraint.c_next] self.bto = set([c] + c_to) new_bloc.bto = c_next else: new_bloc.bto = self.bto self.bto = set([c]) return new_bloc def get_range(self): """Returns the offset hull of an AsmBlock""" if len(self.lines): return (self.lines[0].offset, self.lines[-1].offset + self.lines[-1].l) else: return 0, 0 def get_offsets(self): return [x.offset for x in self.lines] def add_cst(self, loc_key, constraint_type): """ Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next """ assert isinstance(loc_key, LocKey) c = AsmConstraint(loc_key, constraint_type) self.bto.add(c) def get_flow_instr(self): if not self.lines: return None for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1): if not 0 <= i < len(self.lines): return None l = self.lines[i] if l.splitflow() or l.breakflow(): raise NotImplementedError('not fully functional') def get_subcall_instr(self): if not self.lines: return None delayslot = self.lines[0].delayslot end_index = len(self.lines) - 1 ds_max_index = max(end_index - delayslot, 0) for i in xrange(end_index, ds_max_index - 1, -1): l = self.lines[i] if l.is_subcall(): return l return None def get_next(self): for constraint in self.bto: if constraint.c_t == AsmConstraint.c_next: return constraint.loc_key return None @staticmethod def _filter_constraint(constraints): """Sort and filter @constraints for AsmBlock.bto @constraints: non-empty set of AsmConstraint instance Always the same type -> one of the constraint c_next and c_to -> c_next """ # Only one constraint if len(constraints) == 1: return next(iter(constraints)) # Constraint type -> set of corresponding constraint cbytype = {} for cons in constraints: cbytype.setdefault(cons.c_t, set()).add(cons) # Only one type -> any constraint is OK if len(cbytype) == 1: return next(iter(constraints)) # At least 2 types -> types = {c_next, c_to} # c_to is included in c_next return next(iter(cbytype[AsmConstraint.c_next])) def fix_constraints(self): """Fix next block constraints""" # destination -> associated constraints dests = {} for constraint in self.bto: dests.setdefault(constraint.loc_key, set()).add(constraint) self.bto = set(self._filter_constraint(constraints) for constraints in dests.itervalues()) class asm_bloc(object): def __init__(self, loc_key, alignment=1): warnings.warn('DEPRECATION WARNING: use "AsmBlock" instead of "asm_bloc"') super(asm_bloc, self).__init__(loc_key, alignment) class AsmBlockBad(AsmBlock): """Stand for a *bad* ASM block (malformed, unreachable, not disassembled, ...)""" ERROR_UNKNOWN = -1 ERROR_CANNOT_DISASM = 0 ERROR_NULL_STARTING_BLOCK = 1 ERROR_FORBIDDEN = 2 ERROR_IO = 3 ERROR_TYPES = { ERROR_UNKNOWN: "Unknown error", ERROR_CANNOT_DISASM: "Unable to disassemble", ERROR_NULL_STARTING_BLOCK: "Null starting block", ERROR_FORBIDDEN: "Address forbidden by dont_dis", ERROR_IO: "IOError", } def __init__(self, loc_key=None, alignment=1, errno=ERROR_UNKNOWN, *args, **kwargs): """Instanciate an AsmBlock_bad. @loc_key, @alignement: same as AsmBlock.__init__ @errno: (optional) specify a error type associated with the block """ super(AsmBlockBad, self).__init__(loc_key, alignment, *args, **kwargs) self._errno = errno errno = property(lambda self: self._errno) def __str__(self): error_txt = self.ERROR_TYPES.get(self._errno, self._errno) return "\n".join([str(self.loc_key), "\tBad block: %s" % error_txt]) def addline(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot have line") def addto(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot have bto") def split(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot be splitted") class asm_block_bad(AsmBlockBad): def __init__(self, loc_key=None, alignment=1, errno=-1, *args, **kwargs): warnings.warn('DEPRECATION WARNING: use "AsmBlockBad" instead of "asm_block_bad"') super(asm_block_bad, self).__init__(loc_key, alignment, *args, **kwargs) class AsmSymbolPool(LocationDB): """[DEPRECATED API] use 'LocationDB' instead""" def __init__(self, *args, **kwargs): warnings.warn("Deprecated API, use 'LocationDB' instead") super(AsmSymbolPool, self).__init__(*args, **kwargs) class asm_symbol_pool(AsmSymbolPool): def __init__(self): warnings.warn('DEPRECATION WARNING: use "LocationDB" instead of "asm_symbol_pool"') super(asm_symbol_pool, self).__init__() class AsmCFG(DiGraph): """Directed graph standing for a ASM Control Flow Graph with: - nodes: AsmBlock - edges: constraints between blocks, synchronized with AsmBlock's "bto" Specialized the .dot export and force the relation between block to be uniq, and associated with a constraint. Offer helpers on AsmCFG management, such as research by loc_key, sanity checking and mnemonic size guessing. """ # Internal structure for pending management AsmCFGPending = namedtuple("AsmCFGPending", ["waiter", "constraint"]) def __init__(self, loc_db=None, *args, **kwargs): super(AsmCFG, self).__init__(*args, **kwargs) # Edges -> constraint self.edges2constraint = {} # Expected LocKey -> set( (src, dst), constraint ) self._pendings = {} # Loc_Key2block built on the fly self._loc_key_to_block = {} # loc_db self.loc_db = loc_db def copy(self): """Copy the current graph instance""" graph = self.__class__(self.loc_db) return graph + self # Compatibility with old list API def append(self, *args, **kwargs): raise DeprecationWarning("AsmCFG is a graph, use add_node") def remove(self, *args, **kwargs): raise DeprecationWarning("AsmCFG is a graph, use del_node") def __getitem__(self, *args, **kwargs): raise DeprecationWarning("Order of AsmCFG elements is not reliable") def __contains__(self, _): """ DEPRECATED. Use: - loc_key in AsmCFG.nodes() to test loc_key existence """ raise RuntimeError("DEPRECATED") def __iter__(self): """ DEPRECATED. Use: - AsmCFG.blocks() to iter on blocks - loc_key in AsmCFG.nodes() to test loc_key existence """ raise RuntimeError("DEPRECATED") def __len__(self): """Return the number of blocks in AsmCFG""" return len(self._nodes) blocks = property(lambda x:x._loc_key_to_block.itervalues()) # Manage graph with associated constraints def add_edge(self, src, dst, constraint): """Add an edge to the graph @src: LocKey instance, source @dst: LocKey instance, destination @constraint: constraint associated to this edge """ # Sanity check assert isinstance(src, LocKey) assert isinstance(dst, LocKey) known_cst = self.edges2constraint.get((src, dst), None) if known_cst is not None: assert known_cst == constraint return # Add the edge to src.bto if needed block_src = self.loc_key_to_block(src) if block_src: if dst not in [cons.loc_key for cons in block_src.bto]: block_src.bto.add(AsmConstraint(dst, constraint)) # Add edge self.edges2constraint[(src, dst)] = constraint super(AsmCFG, self).add_edge(src, dst) def add_uniq_edge(self, src, dst, constraint): """ Synonym for `add_edge` """ self.add_edge(src, dst, constraint) def del_edge(self, src, dst): """Delete the edge @src->@dst and its associated constraint""" src_blk = self.loc_key_to_block(src) dst_blk = self.loc_key_to_block(dst) assert src_blk is not None assert dst_blk is not None # Delete from src.bto to_remove = [cons for cons in src_blk.bto if cons.loc_key == dst] if to_remove: assert len(to_remove) == 1 src_blk.bto.remove(to_remove[0]) # Del edge del self.edges2constraint[(src, dst)] super(AsmCFG, self).del_edge(src, dst) def del_block(self, block): super(AsmCFG, self).del_node(block.loc_key) del self._loc_key_to_block[block.loc_key] def add_node(self, node): assert isinstance(node, LocKey) return super(AsmCFG, self).add_node(node) def add_block(self, block): """ Add the block @block to the current instance, if it is not already in @block: AsmBlock instance Edges will be created for @block.bto, if destinations are already in this instance. If not, they will be resolved when adding these aforementionned destinations. `self.pendings` indicates which blocks are not yet resolved. """ status = super(AsmCFG, self).add_node(block.loc_key) if not status: return status # Update waiters if block.loc_key in self._pendings: for bblpend in self._pendings[block.loc_key]: self.add_edge(bblpend.waiter.loc_key, block.loc_key, bblpend.constraint) del self._pendings[block.loc_key] # Synchronize edges with block destinations self._loc_key_to_block[block.loc_key] = block for constraint in block.bto: dst = self._loc_key_to_block.get(constraint.loc_key, None) if dst is None: # Block is yet unknown, add it to pendings to_add = self.AsmCFGPending(waiter=block, constraint=constraint.c_t) self._pendings.setdefault(constraint.loc_key, set()).add(to_add) else: # Block is already in known nodes self.add_edge(block.loc_key, dst.loc_key, constraint.c_t) return status def merge(self, graph): """Merge with @graph, taking in account constraints""" # Add known blocks for block in graph.blocks: self.add_block(block) # Add nodes not already in it (ie. not linked to a block) for node in graph.nodes(): self.add_node(node) # -> add_edge(x, y, constraint) for edge in graph._edges: # May fail if there is an incompatibility in edges constraints # between the two graphs self.add_edge(*edge, constraint=graph.edges2constraint[edge]) def node2lines(self, node): if self.loc_db is None: loc_key_name = str(node) else: loc_key_name = self.loc_db.pretty_str(node) yield self.DotCellDescription(text=loc_key_name, attr={'align': 'center', 'colspan': 2, 'bgcolor': 'grey'}) block = self._loc_key_to_block.get(node, None) if block is None: raise StopIteration if isinstance(block, AsmBlockBad): yield [ self.DotCellDescription( text=block.ERROR_TYPES.get(block._errno, block._errno ), attr={}) ] raise StopIteration for line in block.lines: if self._dot_offset: yield [self.DotCellDescription(text="%.8X" % line.offset, attr={}), self.DotCellDescription(text=line.to_string(self.loc_db), attr={})] else: yield self.DotCellDescription(text=line.to_string(self.loc_db), attr={}) def node_attr(self, node): block = self._loc_key_to_block.get(node, None) if isinstance(block, AsmBlockBad): return {'style': 'filled', 'fillcolor': 'red'} return {} def edge_attr(self, src, dst): cst = self.edges2constraint.get((src, dst), None) edge_color = "blue" if len(self.successors(src)) > 1: if cst == AsmConstraint.c_next: edge_color = "red" else: edge_color = "limegreen" return {"color": edge_color} def dot(self, offset=False): """ @offset: (optional) if set, add the corresponding offsets in each node """ self._dot_offset = offset return super(AsmCFG, self).dot() # Helpers @property def pendings(self): """Dictionary of loc_key -> set(AsmCFGPending instance) indicating which loc_key are missing in the current instance. A loc_key is missing if a block which is already in nodes has constraints with him (thanks to its .bto) and the corresponding block is not yet in nodes """ return self._pendings def label2block(self, loc_key): """Return the block corresponding to loc_key @loc_key @loc_key: LocKey instance""" warnings.warn('DEPRECATION WARNING: use "loc_key_to_block" instead of "label2block"') return self.loc_key_to_block(loc_key) def rebuild_edges(self): """Consider blocks '.bto' and rebuild edges according to them, ie: - update constraint type - add missing edge - remove no more used edge This method should be called if a block's '.bto' in nodes have been modified without notifying this instance to resynchronize edges. """ for block in self.blocks: edges = [] # Rebuild edges from bto for constraint in block.bto: dst = self._loc_key_to_block.get(constraint.loc_key, None) if dst is None: # Missing destination, add to pendings self._pendings.setdefault( constraint.loc_key, set() ).add( self.AsmCFGPending( block, constraint.c_t ) ) continue edge = (block.loc_key, dst.loc_key) edges.append(edge) if edge in self._edges: # Already known edge, constraint may have changed self.edges2constraint[edge] = constraint.c_t else: # An edge is missing self.add_edge(edge[0], edge[1], constraint.c_t) # Remove useless edges for succ in self.successors(block.loc_key): edge = (block.loc_key, succ) if edge not in edges: self.del_edge(*edge) def get_bad_blocks(self): """Iterator on AsmBlockBad elements""" # A bad asm block is always a leaf for loc_key in self.leaves(): block = self._loc_key_to_block.get(loc_key, None) if isinstance(block, AsmBlockBad): yield block def get_bad_blocks_predecessors(self, strict=False): """Iterator on loc_keys with an AsmBlockBad destination @strict: (optional) if set, return loc_key with only bad successors """ # Avoid returning the same block done = set() for badblock in self.get_bad_blocks(): for predecessor in self.predecessors_iter(badblock.loc_key): if predecessor not in done: if (strict and not all(isinstance(self._loc_key_to_block.get(block, None), AsmBlockBad) for block in self.successors_iter(predecessor))): continue yield predecessor done.add(predecessor) def getby_offset(self, offset): """Return asmblock containing @offset""" for block in self.blocks: if block.lines[0].offset <= offset < \ (block.lines[-1].offset + block.lines[-1].l): return block return None def loc_key_to_block(self, loc_key): """ Return the asmblock corresponding to loc_key @loc_key, None if unknown loc_key @loc_key: LocKey instance """ return self._loc_key_to_block.get(loc_key, None) def sanity_check(self): """Do sanity checks on blocks' constraints: * no pendings * no multiple next constraint to same block * no next constraint to self """ if len(self._pendings) != 0: raise RuntimeError("Some blocks are missing: %s" % map( str, self._pendings.keys() )) next_edges = {edge: constraint for edge, constraint in self.edges2constraint.iteritems() if constraint == AsmConstraint.c_next} for loc_key in self._nodes: if loc_key not in self._loc_key_to_block: raise RuntimeError("Not supported yet: every node must have a corresponding AsmBlock") # No next constraint to self if (loc_key, loc_key) in next_edges: raise RuntimeError('Bad constraint: self in next') # No multiple next constraint to same block pred_next = list(ploc_key for (ploc_key, dloc_key) in next_edges if dloc_key == loc_key) if len(pred_next) > 1: raise RuntimeError("Too many next constraints for bloc %r" "(%s)" % (loc_key, pred_next)) def guess_blocks_size(self, mnemo): """Asm and compute max block size Add a 'size' and 'max_size' attribute on each block @mnemo: metamn instance""" for block in self.blocks: size = 0 for instr in block.lines: if isinstance(instr, AsmRaw): # for special AsmRaw, only extract len if isinstance(instr.raw, list): data = None if len(instr.raw) == 0: l = 0 else: l = instr.raw[0].size / 8 * len(instr.raw) elif isinstance(instr.raw, str): data = instr.raw l = len(data) else: raise NotImplementedError('asm raw') else: # Assemble the instruction to retrieve its len. # If the instruction uses symbol it will fail # In this case, the max_instruction_len is used try: candidates = mnemo.asm(instr) l = len(candidates[-1]) except: l = mnemo.max_instruction_len data = None instr.data = data instr.l = l size += l block.size = size block.max_size = size log_asmblock.info("size: %d max: %d", block.size, block.max_size) def apply_splitting(self, loc_db, dis_block_callback=None, **kwargs): """Consider @self' bto destinations and split block in @self if one of these destinations jumps in the middle of this block. In order to work, they must be only one block in @self per loc_key in @loc_db (which is true if @self come from the same disasmEngine). @loc_db: LocationDB instance associated with @self'loc_keys @dis_block_callback: (optional) if set, this callback will be called on new block destinations @kwargs: (optional) named arguments to pass to dis_block_callback """ # Get all possible destinations not yet resolved, with a resolved # offset block_dst = [] for loc_key in self.pendings: offset = loc_db.get_location_offset(loc_key) if offset is not None: block_dst.append(offset) todo = set(self.blocks) rebuild_needed = False while todo: # Find a block with a destination inside another one cur_block = todo.pop() range_start, range_stop = cur_block.get_range() for off in block_dst: if not (off > range_start and off < range_stop): continue # `cur_block` must be splitted at offset `off`from miasm2.core.locationdb import LocationDB new_b = cur_block.split(loc_db, off) log_asmblock.debug("Split block %x", off) if new_b is None: log_asmblock.error("Cannot split %x!!", off) continue # Remove pending from cur_block # Links from new_b will be generated in rebuild_edges for dst in new_b.bto: if dst.loc_key not in self.pendings: continue self.pendings[dst.loc_key] = set(pending for pending in self.pendings[dst.loc_key] if pending.waiter != cur_block) # The new block destinations may need to be disassembled if dis_block_callback: offsets_to_dis = set( self.loc_db.get_location_offset(constraint.loc_key) for constraint in new_b.bto ) dis_block_callback(cur_bloc=new_b, offsets_to_dis=offsets_to_dis, loc_db=loc_db, **kwargs) # Update structure rebuild_needed = True self.add_block(new_b) # The new block must be considered todo.add(new_b) range_start, range_stop = cur_block.get_range() # Rebuild edges to match new blocks'bto if rebuild_needed: self.rebuild_edges() def __str__(self): out = [] for block in self.blocks: out.append(str(block)) for loc_key_a, loc_key_b in self.edges(): out.append("%s -> %s" % (loc_key_a, loc_key_b)) return '\n'.join(out) def __repr__(self): return "<%s %s>" % (self.__class__.__name__, hex(id(self))) # Out of _merge_blocks to be computed only once _acceptable_block = lambda graph, loc_key: (not isinstance(graph.loc_key_to_block(loc_key), AsmBlockBad) and len(graph.loc_key_to_block(loc_key).lines) > 0) _parent = MatchGraphJoker(restrict_in=False, filt=_acceptable_block) _son = MatchGraphJoker(restrict_out=False, filt=_acceptable_block) _expgraph = _parent >> _son def _merge_blocks(dg, graph): """Graph simplification merging AsmBlock with one and only one son with this son if this son has one and only one parent""" # Blocks to ignore, because they have been removed from the graph to_ignore = set() for match in _expgraph.match(graph): # Get matching blocks lbl_block, lbl_succ = match[_parent], match[_son] block = graph.loc_key_to_block(lbl_block) succ = graph.loc_key_to_block(lbl_succ) # Ignore already deleted blocks if (block in to_ignore or succ in to_ignore): continue # Remove block last instruction if needed last_instr = block.lines[-1] if last_instr.delayslot > 0: # TODO: delayslot raise RuntimeError("Not implemented yet") if last_instr.is_subcall(): continue if last_instr.breakflow() and last_instr.dstflow(): block.lines.pop() # Merge block block.lines += succ.lines for nextb in graph.successors_iter(lbl_succ): graph.add_edge(lbl_block, nextb, graph.edges2constraint[(lbl_succ, nextb)]) graph.del_block(succ) to_ignore.add(lbl_succ) bbl_simplifier = DiGraphSimplifier() bbl_simplifier.enable_passes([_merge_blocks]) def conservative_asm(mnemo, instr, symbols, conservative): """ Asm instruction; Try to keep original instruction bytes if it exists """ candidates = mnemo.asm(instr, symbols) if not candidates: raise ValueError('cannot asm:%s' % str(instr)) if not hasattr(instr, "b"): return candidates[0], candidates if instr.b in candidates: return instr.b, candidates if conservative: for c in candidates: if len(c) == len(instr.b): return c, candidates return candidates[0], candidates def fix_expr_val(expr, symbols): """Resolve an expression @expr using @symbols""" def expr_calc(e): if isinstance(e, ExprId): # Example: # toto: # .dword label loc_key = symbols.get_name_location(e.name) offset = symbols.get_location_offset(loc_key) e = ExprInt(offset, e.size) return e result = expr.visit(expr_calc) result = expr_simp(result) if not isinstance(result, ExprInt): raise RuntimeError('Cannot resolve symbol %s' % expr) return result def fix_loc_offset(loc_db, loc_key, offset, modified): """ Fix the @loc_key offset to @offset. If the @offset has changed, add @loc_key to @modified @loc_db: current loc_db """ loc_offset = loc_db.get_location_offset(loc_key) if loc_offset == offset: return loc_db.set_location_offset(loc_key, offset, force=True) modified.add(loc_key) class BlockChain(object): """Manage blocks linked with an asm_constraint_next""" def __init__(self, loc_db, blocks): self.loc_db = loc_db self.blocks = blocks self.place() @property def pinned(self): """Return True iff at least one block is pinned""" return self.pinned_block_idx is not None def _set_pinned_block_idx(self): self.pinned_block_idx = None for i, block in enumerate(self.blocks): loc_key = block.loc_key if self.loc_db.get_location_offset(loc_key) is not None: if self.pinned_block_idx is not None: raise ValueError("Multiples pinned block detected") self.pinned_block_idx = i def place(self): """Compute BlockChain min_offset and max_offset using pinned block and blocks' size """ self._set_pinned_block_idx() self.max_size = 0 for block in self.blocks: self.max_size += block.max_size + block.alignment - 1 # Check if chain has one block pinned if not self.pinned: return loc = self.blocks[self.pinned_block_idx].loc_key offset_base = self.loc_db.get_location_offset(loc) assert(offset_base % self.blocks[self.pinned_block_idx].alignment == 0) self.offset_min = offset_base for block in self.blocks[:self.pinned_block_idx - 1:-1]: self.offset_min -= block.max_size + \ (block.alignment - block.max_size) % block.alignment self.offset_max = offset_base for block in self.blocks[self.pinned_block_idx:]: self.offset_max += block.max_size + \ (block.alignment - block.max_size) % block.alignment def merge(self, chain): """Best effort merge two block chains Return the list of resulting blockchains""" self.blocks += chain.blocks self.place() return [self] def fix_blocks(self, modified_loc_keys): """Propagate a pinned to its blocks' neighbour @modified_loc_keys: store new pinned loc_keys""" if not self.pinned: raise ValueError('Trying to fix unpinned block') # Propagate offset to blocks before pinned block pinned_block = self.blocks[self.pinned_block_idx] offset = self.loc_db.get_location_offset(pinned_block.loc_key) if offset % pinned_block.alignment != 0: raise RuntimeError('Bad alignment') for block in self.blocks[:self.pinned_block_idx - 1:-1]: new_offset = offset - block.size new_offset = new_offset - new_offset % pinned_block.alignment fix_loc_offset(self.loc_db, block.loc_key, new_offset, modified_loc_keys) # Propagate offset to blocks after pinned block offset = self.loc_db.get_location_offset(pinned_block.loc_key) + pinned_block.size last_block = pinned_block for block in self.blocks[self.pinned_block_idx + 1:]: offset += (- offset) % last_block.alignment fix_loc_offset(self.loc_db, block.loc_key, offset, modified_loc_keys) offset += block.size last_block = block return modified_loc_keys class BlockChainWedge(object): """Stand for wedges between blocks""" def __init__(self, loc_db, offset, size): self.loc_db = loc_db self.offset = offset self.max_size = size self.offset_min = offset self.offset_max = offset + size def merge(self, chain): """Best effort merge two block chains Return the list of resulting blockchains""" self.loc_db.set_location_offset(chain.blocks[0].loc_key, self.offset_max) chain.place() return [self, chain] def group_constrained_blocks(loc_db, asmcfg): """ Return the BlockChains list built from grouped blocks in asmcfg linked by asm_constraint_next @asmcfg: an AsmCfg instance """ log_asmblock.info('group_constrained_blocks') # Group adjacent asmcfg remaining_blocks = list(asmcfg.blocks) known_block_chains = {} while remaining_blocks: # Create a new block chain block_list = [remaining_blocks.pop()] # Find sons in remainings blocks linked with a next constraint while True: # Get next block next_loc_key = block_list[-1].get_next() if next_loc_key is None or asmcfg.loc_key_to_block(next_loc_key) is None: break next_block = asmcfg.loc_key_to_block(next_loc_key) # Add the block at the end of the current chain if next_block not in remaining_blocks: break block_list.append(next_block) remaining_blocks.remove(next_block) # Check if son is in a known block group if next_loc_key is not None and next_loc_key in known_block_chains: block_list += known_block_chains[next_loc_key] del known_block_chains[next_loc_key] known_block_chains[block_list[0].loc_key] = block_list out_block_chains = [] for loc_key in known_block_chains: chain = BlockChain(loc_db, known_block_chains[loc_key]) out_block_chains.append(chain) return out_block_chains def get_blockchains_address_interval(blockChains, dst_interval): """Compute the interval used by the pinned @blockChains Check if the placed chains are in the @dst_interval""" allocated_interval = interval() for chain in blockChains: if not chain.pinned: continue chain_interval = interval([(chain.offset_min, chain.offset_max - 1)]) if chain_interval not in dst_interval: raise ValueError('Chain placed out of destination interval') allocated_interval += chain_interval return allocated_interval def resolve_symbol(blockChains, loc_db, dst_interval=None): """Place @blockChains in the @dst_interval""" log_asmblock.info('resolve_symbol') if dst_interval is None: dst_interval = interval([(0, 0xFFFFFFFFFFFFFFFF)]) forbidden_interval = interval( [(-1, 0xFFFFFFFFFFFFFFFF + 1)]) - dst_interval allocated_interval = get_blockchains_address_interval(blockChains, dst_interval) log_asmblock.debug('allocated interval: %s', allocated_interval) pinned_chains = [chain for chain in blockChains if chain.pinned] # Add wedge in forbidden intervals for start, stop in forbidden_interval.intervals: wedge = BlockChainWedge( loc_db, offset=start, size=stop + 1 - start) pinned_chains.append(wedge) # Try to place bigger blockChains first pinned_chains.sort(key=lambda x: x.offset_min) blockChains.sort(key=lambda x: -x.max_size) fixed_chains = list(pinned_chains) log_asmblock.debug("place chains") for chain in blockChains: if chain.pinned: continue fixed = False for i in xrange(1, len(fixed_chains)): prev_chain = fixed_chains[i - 1] next_chain = fixed_chains[i] if prev_chain.offset_max + chain.max_size < next_chain.offset_min: new_chains = prev_chain.merge(chain) fixed_chains[i - 1:i] = new_chains fixed = True break if not fixed: raise RuntimeError('Cannot find enough space to place blocks') return [chain for chain in fixed_chains if isinstance(chain, BlockChain)] def get_block_loc_keys(block): """Extract loc_keys used by @block""" symbols = set() for instr in block.lines: if isinstance(instr, AsmRaw): if isinstance(instr.raw, list): for expr in instr.raw: symbols.update(get_expr_locs(expr)) else: for arg in instr.args: symbols.update(get_expr_locs(arg)) return symbols def assemble_block(mnemo, block, loc_db, conservative=False): """Assemble a @block using @loc_db @conservative: (optional) use original bytes when possible """ offset_i = 0 for instr in block.lines: if isinstance(instr, AsmRaw): if isinstance(instr.raw, list): # Fix special AsmRaw data = "" for expr in instr.raw: expr_int = fix_expr_val(expr, loc_db) data += pck[expr_int.size](expr_int.arg) instr.data = data instr.offset = offset_i offset_i += instr.l continue # Assemble an instruction saved_args = list(instr.args) instr.offset = loc_db.get_location_offset(block.loc_key) + offset_i # Replace instruction's arguments by resolved ones instr.args = instr.resolve_args_with_symbols(loc_db) if instr.dstflow(): instr.fixDstOffset() old_l = instr.l cached_candidate, _ = conservative_asm(mnemo, instr, loc_db, conservative) # Restore original arguments instr.args = saved_args # We need to update the block size block.size = block.size - old_l + len(cached_candidate) instr.data = cached_candidate instr.l = len(cached_candidate) offset_i += instr.l def asmblock_final(mnemo, asmcfg, blockChains, loc_db, conservative=False): """Resolve and assemble @blockChains using @loc_db until fixed point is reached""" log_asmblock.debug("asmbloc_final") # Init structures blocks_using_loc_key = {} for block in asmcfg.blocks: exprlocs = get_block_loc_keys(block) loc_keys = set(expr.loc_key for expr in exprlocs) for loc_key in loc_keys: blocks_using_loc_key.setdefault(loc_key, set()).add(block) block2chain = {} for chain in blockChains: for block in chain.blocks: block2chain[block] = chain # Init worklist blocks_to_rework = set(asmcfg.blocks) # Fix and re-assemble blocks until fixed point is reached while True: # Propagate pinned blocks into chains modified_loc_keys = set() for chain in blockChains: chain.fix_blocks(modified_loc_keys) for loc_key in modified_loc_keys: # Retrive block with modified reference mod_block = asmcfg.loc_key_to_block(loc_key) if mod_block is not None: blocks_to_rework.add(mod_block) # Enqueue blocks referencing a modified loc_key if loc_key not in blocks_using_loc_key: continue for block in blocks_using_loc_key[loc_key]: blocks_to_rework.add(block) # No more work if not blocks_to_rework: break while blocks_to_rework: block = blocks_to_rework.pop() assemble_block(mnemo, block, loc_db, conservative) def asmbloc_final(mnemo, blocks, blockChains, loc_db, conservative=False): """Resolve and assemble @blockChains using @loc_db until fixed point is reached""" warnings.warn('DEPRECATION WARNING: use "asmblock_final" instead of "asmbloc_final"') asmblock_final(mnemo, blocks, blockChains, loc_db, conservative) def asm_resolve_final(mnemo, asmcfg, loc_db, dst_interval=None): """Resolve and assemble @asmcfg using @loc_db into interval @dst_interval""" asmcfg.sanity_check() asmcfg.guess_blocks_size(mnemo) blockChains = group_constrained_blocks(loc_db, asmcfg) resolved_blockChains = resolve_symbol( blockChains, loc_db, dst_interval ) asmblock_final(mnemo, asmcfg, resolved_blockChains, loc_db) patches = {} output_interval = interval() for block in asmcfg.blocks: offset = loc_db.get_location_offset(block.loc_key) for instr in block.lines: if not instr.data: # Empty line continue assert len(instr.data) == instr.l patches[offset] = instr.data instruction_interval = interval([(offset, offset + instr.l - 1)]) if not (instruction_interval & output_interval).empty: raise RuntimeError("overlapping bytes %X" % int(offset)) instr.offset = offset offset += instr.l return patches class disasmEngine(object): """Disassembly engine, taking care of disassembler options and mutli-block strategy. Engine options: + Object supporting membership test (offset in ..) - dont_dis: stop the current disassembly branch if reached - split_dis: force a basic block end if reached, with a next constraint on its successor - dont_dis_retcall_funcs: stop disassembly after a call to one of the given functions + On/Off - follow_call: recursively disassemble CALL destinations - dontdis_retcall: stop on CALL return addresses - dont_dis_nulstart_bloc: stop if a block begin with a few \x00 + Number - lines_wd: maximum block's size (in number of instruction) - blocs_wd: maximum number of distinct disassembled block + callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis, loc_db) - dis_block_callback: callback after each new disassembled block """ def __init__(self, arch, attrib, bin_stream, **kwargs): """Instanciate a new disassembly engine @arch: targeted architecture @attrib: architecture attribute @bin_stream: bytes source @kwargs: (optional) custom options """ self.arch = arch self.attrib = attrib self.bin_stream = bin_stream self.loc_db = LocationDB() # Setup options self.dont_dis = [] self.split_dis = [] self.follow_call = False self.dontdis_retcall = False self.lines_wd = None self.blocs_wd = None self.dis_block_callback = None self.dont_dis_nulstart_bloc = False self.dont_dis_retcall_funcs = set() # Override options if needed self.__dict__.update(kwargs) def get_job_done(self): warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""") return set() def set_job_done(self, _): warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""") return def get_dis_bloc_callback(self): warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""") return self.dis_block_callback def set_dis_bloc_callback(self, function): warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""") self.dis_block_callback = function @property def symbol_pool(self): warnings.warn("""DEPRECATION WARNING: use 'loc_db'""") return self.loc_db # Deprecated job_done = property(get_job_done, set_job_done) dis_bloc_callback = property(get_dis_bloc_callback, set_dis_bloc_callback) def _dis_block(self, offset, job_done=None): """Disassemble the block at offset @offset @job_done: a set of already disassembled addresses Return the created AsmBlock and future offsets to disassemble """ if job_done is None: job_done = set() lines_cpt = 0 in_delayslot = False delayslot_count = self.arch.delayslot offsets_to_dis = set() add_next_offset = False loc_key = self.loc_db.get_or_create_offset_location(offset) cur_block = AsmBlock(loc_key) log_asmblock.debug("dis at %X", int(offset)) while not in_delayslot or delayslot_count > 0: if in_delayslot: delayslot_count -= 1 if offset in self.dont_dis: if not cur_block.lines: job_done.add(offset) # Block is empty -> bad block cur_block = AsmBlockBad(loc_key, errno=AsmBlockBad.ERROR_FORBIDDEN) else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block loc_key_cst = self.loc_db.get_or_create_offset_location(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break if lines_cpt > 0 and offset in self.split_dis: loc_key_cst = self.loc_db.get_or_create_offset_location(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) offsets_to_dis.add(offset) break lines_cpt += 1 if self.lines_wd is not None and lines_cpt > self.lines_wd: log_asmblock.debug("lines watchdog reached at %X", int(offset)) break if offset in job_done: loc_key_cst = self.loc_db.get_or_create_offset_location(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break off_i = offset error = None try: instr = self.arch.dis(self.bin_stream, self.attrib, offset) except Disasm_Exception as e: log_asmblock.warning(e) instr = None error = AsmBlockBad.ERROR_CANNOT_DISASM except IOError as e: log_asmblock.warning(e) instr = None error = AsmBlockBad.ERROR_IO if instr is None: log_asmblock.warning("cannot disasm at %X", int(off_i)) if not cur_block.lines: job_done.add(offset) # Block is empty -> bad block cur_block = AsmBlockBad(loc_key, errno=error) else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block loc_key_cst = self.loc_db.get_or_create_offset_location(off_i) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break # XXX TODO nul start block option if self.dont_dis_nulstart_bloc and instr.b.count('\x00') == instr.l: log_asmblock.warning("reach nul instr at %X", int(off_i)) if not cur_block.lines: # Block is empty -> bad block cur_block = AsmBlockBad(loc_key, errno=AsmBlockBad.ERROR_NULL_STARTING_BLOCK) else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block loc_key_cst = self.loc_db.get_or_create_offset_location(off_i) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break # special case: flow graph modificator in delayslot if in_delayslot and instr and (instr.splitflow() or instr.breakflow()): add_next_offset = True break job_done.add(offset) log_asmblock.debug("dis at %X", int(offset)) offset += instr.l log_asmblock.debug(instr) log_asmblock.debug(instr.args) cur_block.addline(instr) if not instr.breakflow(): continue # test split if instr.splitflow() and not (instr.is_subcall() and self.dontdis_retcall): add_next_offset = True if instr.dstflow(): instr.dstflow2label(self.loc_db) destinations = instr.getdstflow(self.loc_db) known_dsts = [] for dst in destinations: if not dst.is_loc(): continue loc_key = dst.loc_key loc_key_offset = self.loc_db.get_location_offset(loc_key) known_dsts.append(loc_key) if loc_key_offset in self.dont_dis_retcall_funcs: add_next_offset = False if (not instr.is_subcall()) or self.follow_call: cur_block.bto.update([AsmConstraint(loc_key, AsmConstraint.c_to) for loc_key in known_dsts]) # get in delayslot mode in_delayslot = True delayslot_count = instr.delayslot for c in cur_block.bto: loc_key_offset = self.loc_db.get_location_offset(c.loc_key) offsets_to_dis.add(loc_key_offset) if add_next_offset: loc_key_cst = self.loc_db.get_or_create_offset_location(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) offsets_to_dis.add(offset) # Fix multiple constraints cur_block.fix_constraints() if self.dis_block_callback is not None: self.dis_block_callback(mn=self.arch, attrib=self.attrib, pool_bin=self.bin_stream, cur_bloc=cur_block, offsets_to_dis=offsets_to_dis, loc_db=self.loc_db, # Deprecated API symbol_pool=self.loc_db) return cur_block, offsets_to_dis def dis_block(self, offset): """Disassemble the block at offset @offset and return the created AsmBlock @offset: targeted offset to disassemble """ current_block, _ = self._dis_block(offset) return current_block def dis_bloc(self, offset): """ DEPRECATED function Use dis_block instead of dis_bloc """ warnings.warn('DEPRECATION WARNING: use "dis_block" instead of "dis_bloc"') return self.dis_block(offset) def dis_multiblock(self, offset, blocks=None): """Disassemble every block reachable from @offset regarding specific disasmEngine conditions Return an AsmCFG instance containing disassembled blocks @offset: starting offset @blocks: (optional) AsmCFG instance of already disassembled blocks to merge with """ log_asmblock.info("dis bloc all") job_done = set() if blocks is None: blocks = AsmCFG(self.loc_db) todo = [offset] bloc_cpt = 0 while len(todo): bloc_cpt += 1 if self.blocs_wd is not None and bloc_cpt > self.blocs_wd: log_asmblock.debug("blocks watchdog reached at %X", int(offset)) break target_offset = int(todo.pop(0)) if (target_offset is None or target_offset in job_done): continue cur_block, nexts = self._dis_block(target_offset, job_done) todo += nexts blocks.add_block(cur_block) blocks.apply_splitting(self.loc_db, dis_block_callback=self.dis_block_callback, mn=self.arch, attrib=self.attrib, pool_bin=self.bin_stream) return blocks def dis_multibloc(self, offset, blocs=None): """ DEPRECATED function Use dis_multiblock instead of dis_multibloc """ warnings.warn('DEPRECATION WARNING: use "dis_multiblock" instead of "dis_multibloc"') return self.dis_multiblock(offset, blocs) def dis_instr(self, offset): """Disassemble one instruction at offset @offset and return the corresponding instruction instance @offset: targeted offset to disassemble """ old_lineswd = self.lines_wd self.lines_wd = 1 try: block = self.dis_block(offset) finally: self.lines_wd = old_lineswd instr = block.lines[0] return instr
Module variables
var bbl_simplifier
var console_handler
var log_asmblock
var pck
Functions
def asm_resolve_final(
mnemo, asmcfg, loc_db, dst_interval=None)
Resolve and assemble @asmcfg using @loc_db into interval @dst_interval
def asm_resolve_final(mnemo, asmcfg, loc_db, dst_interval=None): """Resolve and assemble @asmcfg using @loc_db into interval @dst_interval""" asmcfg.sanity_check() asmcfg.guess_blocks_size(mnemo) blockChains = group_constrained_blocks(loc_db, asmcfg) resolved_blockChains = resolve_symbol( blockChains, loc_db, dst_interval ) asmblock_final(mnemo, asmcfg, resolved_blockChains, loc_db) patches = {} output_interval = interval() for block in asmcfg.blocks: offset = loc_db.get_location_offset(block.loc_key) for instr in block.lines: if not instr.data: # Empty line continue assert len(instr.data) == instr.l patches[offset] = instr.data instruction_interval = interval([(offset, offset + instr.l - 1)]) if not (instruction_interval & output_interval).empty: raise RuntimeError("overlapping bytes %X" % int(offset)) instr.offset = offset offset += instr.l return patches
def asmbloc_final(
mnemo, blocks, blockChains, loc_db, conservative=False)
Resolve and assemble @blockChains using @loc_db until fixed point is reached
def asmbloc_final(mnemo, blocks, blockChains, loc_db, conservative=False): """Resolve and assemble @blockChains using @loc_db until fixed point is reached""" warnings.warn('DEPRECATION WARNING: use "asmblock_final" instead of "asmbloc_final"') asmblock_final(mnemo, blocks, blockChains, loc_db, conservative)
def asmblock_final(
mnemo, asmcfg, blockChains, loc_db, conservative=False)
Resolve and assemble @blockChains using @loc_db until fixed point is reached
def asmblock_final(mnemo, asmcfg, blockChains, loc_db, conservative=False): """Resolve and assemble @blockChains using @loc_db until fixed point is reached""" log_asmblock.debug("asmbloc_final") # Init structures blocks_using_loc_key = {} for block in asmcfg.blocks: exprlocs = get_block_loc_keys(block) loc_keys = set(expr.loc_key for expr in exprlocs) for loc_key in loc_keys: blocks_using_loc_key.setdefault(loc_key, set()).add(block) block2chain = {} for chain in blockChains: for block in chain.blocks: block2chain[block] = chain # Init worklist blocks_to_rework = set(asmcfg.blocks) # Fix and re-assemble blocks until fixed point is reached while True: # Propagate pinned blocks into chains modified_loc_keys = set() for chain in blockChains: chain.fix_blocks(modified_loc_keys) for loc_key in modified_loc_keys: # Retrive block with modified reference mod_block = asmcfg.loc_key_to_block(loc_key) if mod_block is not None: blocks_to_rework.add(mod_block) # Enqueue blocks referencing a modified loc_key if loc_key not in blocks_using_loc_key: continue for block in blocks_using_loc_key[loc_key]: blocks_to_rework.add(block) # No more work if not blocks_to_rework: break while blocks_to_rework: block = blocks_to_rework.pop() assemble_block(mnemo, block, loc_db, conservative)
def assemble_block(
mnemo, block, loc_db, conservative=False)
Assemble a @block using @loc_db @conservative: (optional) use original bytes when possible
def assemble_block(mnemo, block, loc_db, conservative=False): """Assemble a @block using @loc_db @conservative: (optional) use original bytes when possible """ offset_i = 0 for instr in block.lines: if isinstance(instr, AsmRaw): if isinstance(instr.raw, list): # Fix special AsmRaw data = "" for expr in instr.raw: expr_int = fix_expr_val(expr, loc_db) data += pck[expr_int.size](expr_int.arg) instr.data = data instr.offset = offset_i offset_i += instr.l continue # Assemble an instruction saved_args = list(instr.args) instr.offset = loc_db.get_location_offset(block.loc_key) + offset_i # Replace instruction's arguments by resolved ones instr.args = instr.resolve_args_with_symbols(loc_db) if instr.dstflow(): instr.fixDstOffset() old_l = instr.l cached_candidate, _ = conservative_asm(mnemo, instr, loc_db, conservative) # Restore original arguments instr.args = saved_args # We need to update the block size block.size = block.size - old_l + len(cached_candidate) instr.data = cached_candidate instr.l = len(cached_candidate) offset_i += instr.l
def conservative_asm(
mnemo, instr, symbols, conservative)
Asm instruction; Try to keep original instruction bytes if it exists
def conservative_asm(mnemo, instr, symbols, conservative): """ Asm instruction; Try to keep original instruction bytes if it exists """ candidates = mnemo.asm(instr, symbols) if not candidates: raise ValueError('cannot asm:%s' % str(instr)) if not hasattr(instr, "b"): return candidates[0], candidates if instr.b in candidates: return instr.b, candidates if conservative: for c in candidates: if len(c) == len(instr.b): return c, candidates return candidates[0], candidates
def fix_expr_val(
expr, symbols)
Resolve an expression @expr using @symbols
def fix_expr_val(expr, symbols): """Resolve an expression @expr using @symbols""" def expr_calc(e): if isinstance(e, ExprId): # Example: # toto: # .dword label loc_key = symbols.get_name_location(e.name) offset = symbols.get_location_offset(loc_key) e = ExprInt(offset, e.size) return e result = expr.visit(expr_calc) result = expr_simp(result) if not isinstance(result, ExprInt): raise RuntimeError('Cannot resolve symbol %s' % expr) return result
def fix_loc_offset(
loc_db, loc_key, offset, modified)
Fix the @loc_key offset to @offset. If the @offset has changed, add @loc_key to @modified @loc_db: current loc_db
def fix_loc_offset(loc_db, loc_key, offset, modified): """ Fix the @loc_key offset to @offset. If the @offset has changed, add @loc_key to @modified @loc_db: current loc_db """ loc_offset = loc_db.get_location_offset(loc_key) if loc_offset == offset: return loc_db.set_location_offset(loc_key, offset, force=True) modified.add(loc_key)
def get_block_loc_keys(
block)
Extract loc_keys used by @block
def get_block_loc_keys(block): """Extract loc_keys used by @block""" symbols = set() for instr in block.lines: if isinstance(instr, AsmRaw): if isinstance(instr.raw, list): for expr in instr.raw: symbols.update(get_expr_locs(expr)) else: for arg in instr.args: symbols.update(get_expr_locs(arg)) return symbols
def get_blockchains_address_interval(
blockChains, dst_interval)
Compute the interval used by the pinned @blockChains Check if the placed chains are in the @dst_interval
def get_blockchains_address_interval(blockChains, dst_interval): """Compute the interval used by the pinned @blockChains Check if the placed chains are in the @dst_interval""" allocated_interval = interval() for chain in blockChains: if not chain.pinned: continue chain_interval = interval([(chain.offset_min, chain.offset_max - 1)]) if chain_interval not in dst_interval: raise ValueError('Chain placed out of destination interval') allocated_interval += chain_interval return allocated_interval
def group_constrained_blocks(
loc_db, asmcfg)
Return the BlockChains list built from grouped blocks in asmcfg linked by asm_constraint_next @asmcfg: an AsmCfg instance
def group_constrained_blocks(loc_db, asmcfg): """ Return the BlockChains list built from grouped blocks in asmcfg linked by asm_constraint_next @asmcfg: an AsmCfg instance """ log_asmblock.info('group_constrained_blocks') # Group adjacent asmcfg remaining_blocks = list(asmcfg.blocks) known_block_chains = {} while remaining_blocks: # Create a new block chain block_list = [remaining_blocks.pop()] # Find sons in remainings blocks linked with a next constraint while True: # Get next block next_loc_key = block_list[-1].get_next() if next_loc_key is None or asmcfg.loc_key_to_block(next_loc_key) is None: break next_block = asmcfg.loc_key_to_block(next_loc_key) # Add the block at the end of the current chain if next_block not in remaining_blocks: break block_list.append(next_block) remaining_blocks.remove(next_block) # Check if son is in a known block group if next_loc_key is not None and next_loc_key in known_block_chains: block_list += known_block_chains[next_loc_key] del known_block_chains[next_loc_key] known_block_chains[block_list[0].loc_key] = block_list out_block_chains = [] for loc_key in known_block_chains: chain = BlockChain(loc_db, known_block_chains[loc_key]) out_block_chains.append(chain) return out_block_chains
def is_int(
a)
def is_int(a): return isinstance(a, int) or isinstance(a, long) or \ isinstance(a, moduint) or isinstance(a, modint)
def resolve_symbol(
blockChains, loc_db, dst_interval=None)
Place @blockChains in the @dst_interval
def resolve_symbol(blockChains, loc_db, dst_interval=None): """Place @blockChains in the @dst_interval""" log_asmblock.info('resolve_symbol') if dst_interval is None: dst_interval = interval([(0, 0xFFFFFFFFFFFFFFFF)]) forbidden_interval = interval( [(-1, 0xFFFFFFFFFFFFFFFF + 1)]) - dst_interval allocated_interval = get_blockchains_address_interval(blockChains, dst_interval) log_asmblock.debug('allocated interval: %s', allocated_interval) pinned_chains = [chain for chain in blockChains if chain.pinned] # Add wedge in forbidden intervals for start, stop in forbidden_interval.intervals: wedge = BlockChainWedge( loc_db, offset=start, size=stop + 1 - start) pinned_chains.append(wedge) # Try to place bigger blockChains first pinned_chains.sort(key=lambda x: x.offset_min) blockChains.sort(key=lambda x: -x.max_size) fixed_chains = list(pinned_chains) log_asmblock.debug("place chains") for chain in blockChains: if chain.pinned: continue fixed = False for i in xrange(1, len(fixed_chains)): prev_chain = fixed_chains[i - 1] next_chain = fixed_chains[i] if prev_chain.offset_max + chain.max_size < next_chain.offset_min: new_chains = prev_chain.merge(chain) fixed_chains[i - 1:i] = new_chains fixed = True break if not fixed: raise RuntimeError('Cannot find enough space to place blocks') return [chain for chain in fixed_chains if isinstance(chain, BlockChain)]
Classes
class AsmBlock
class AsmBlock(object): def __init__(self, loc_key, alignment=1): assert isinstance(loc_key, LocKey) self.bto = set() self.lines = [] self._loc_key = loc_key self.alignment = alignment def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key loc_key = property(lambda self:self._loc_key) label = property(get_label) def to_string(self, loc_db=None): out = [] if loc_db is None: out.append(str(self.loc_key)) else: out.append(loc_db.pretty_str(self.loc_key)) for instr in self.lines: out.append(instr.to_string(loc_db)) if self.bto: lbls = ["->"] for dst in self.bto: if dst is None: lbls.append("Unknown? ") else: lbls.append(dst.to_string(loc_db) + " ") lbls = '\t'.join(sorted(lbls)) out.append(lbls) return '\n'.join(out) def __str__(self): return self.to_string() def addline(self, l): self.lines.append(l) def addto(self, c): assert isinstance(self.bto, set) self.bto.add(c) def split(self, loc_db, offset): loc_key = loc_db.get_or_create_offset_location(offset) log_asmblock.debug('split at %x', offset) i = -1 offsets = [x.offset for x in self.lines] offset = loc_db.get_location_offset(loc_key) if offset not in offsets: log_asmblock.warning( 'cannot split bloc at %X ' % offset + 'middle instruction? default middle') offsets.sort() return None new_bloc = AsmBlock(loc_key) i = offsets.index(offset) self.lines, new_bloc.lines = self.lines[:i], self.lines[i:] flow_mod_instr = self.get_flow_instr() log_asmblock.debug('flow mod %r', flow_mod_instr) c = AsmConstraint(loc_key, AsmConstraint.c_next) # move dst if flowgraph modifier was in original bloc # (usecase: split delayslot bloc) if flow_mod_instr: for xx in self.bto: log_asmblock.debug('lbl %s', xx) c_next = set( [x for x in self.bto if x.c_t == AsmConstraint.c_next]) c_to = [x for x in self.bto if x.c_t != AsmConstraint.c_next] self.bto = set([c] + c_to) new_bloc.bto = c_next else: new_bloc.bto = self.bto self.bto = set([c]) return new_bloc def get_range(self): """Returns the offset hull of an AsmBlock""" if len(self.lines): return (self.lines[0].offset, self.lines[-1].offset + self.lines[-1].l) else: return 0, 0 def get_offsets(self): return [x.offset for x in self.lines] def add_cst(self, loc_key, constraint_type): """ Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next """ assert isinstance(loc_key, LocKey) c = AsmConstraint(loc_key, constraint_type) self.bto.add(c) def get_flow_instr(self): if not self.lines: return None for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1): if not 0 <= i < len(self.lines): return None l = self.lines[i] if l.splitflow() or l.breakflow(): raise NotImplementedError('not fully functional') def get_subcall_instr(self): if not self.lines: return None delayslot = self.lines[0].delayslot end_index = len(self.lines) - 1 ds_max_index = max(end_index - delayslot, 0) for i in xrange(end_index, ds_max_index - 1, -1): l = self.lines[i] if l.is_subcall(): return l return None def get_next(self): for constraint in self.bto: if constraint.c_t == AsmConstraint.c_next: return constraint.loc_key return None @staticmethod def _filter_constraint(constraints): """Sort and filter @constraints for AsmBlock.bto @constraints: non-empty set of AsmConstraint instance Always the same type -> one of the constraint c_next and c_to -> c_next """ # Only one constraint if len(constraints) == 1: return next(iter(constraints)) # Constraint type -> set of corresponding constraint cbytype = {} for cons in constraints: cbytype.setdefault(cons.c_t, set()).add(cons) # Only one type -> any constraint is OK if len(cbytype) == 1: return next(iter(constraints)) # At least 2 types -> types = {c_next, c_to} # c_to is included in c_next return next(iter(cbytype[AsmConstraint.c_next])) def fix_constraints(self): """Fix next block constraints""" # destination -> associated constraints dests = {} for constraint in self.bto: dests.setdefault(constraint.loc_key, set()).add(constraint) self.bto = set(self._filter_constraint(constraints) for constraints in dests.itervalues())
Ancestors (in MRO)
- AsmBlock
- __builtin__.object
Class variables
var label
var loc_key
Instance variables
var alignment
var bto
var label
var lines
var loc_key
Methods
def __init__(
self, loc_key, alignment=1)
def __init__(self, loc_key, alignment=1): assert isinstance(loc_key, LocKey) self.bto = set() self.lines = [] self._loc_key = loc_key self.alignment = alignment
def add_cst(
self, loc_key, constraint_type)
Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next
def add_cst(self, loc_key, constraint_type): """ Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next """ assert isinstance(loc_key, LocKey) c = AsmConstraint(loc_key, constraint_type) self.bto.add(c)
def addline(
self, l)
def addline(self, l): self.lines.append(l)
def addto(
self, c)
def addto(self, c): assert isinstance(self.bto, set) self.bto.add(c)
def fix_constraints(
self)
Fix next block constraints
def fix_constraints(self): """Fix next block constraints""" # destination -> associated constraints dests = {} for constraint in self.bto: dests.setdefault(constraint.loc_key, set()).add(constraint) self.bto = set(self._filter_constraint(constraints) for constraints in dests.itervalues())
def get_flow_instr(
self)
def get_flow_instr(self): if not self.lines: return None for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1): if not 0 <= i < len(self.lines): return None l = self.lines[i] if l.splitflow() or l.breakflow(): raise NotImplementedError('not fully functional')
def get_label(
self)
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def get_next(
self)
def get_next(self): for constraint in self.bto: if constraint.c_t == AsmConstraint.c_next: return constraint.loc_key return None
def get_offsets(
self)
def get_offsets(self): return [x.offset for x in self.lines]
def get_range(
self)
Returns the offset hull of an AsmBlock
def get_range(self): """Returns the offset hull of an AsmBlock""" if len(self.lines): return (self.lines[0].offset, self.lines[-1].offset + self.lines[-1].l) else: return 0, 0
def get_subcall_instr(
self)
def get_subcall_instr(self): if not self.lines: return None delayslot = self.lines[0].delayslot end_index = len(self.lines) - 1 ds_max_index = max(end_index - delayslot, 0) for i in xrange(end_index, ds_max_index - 1, -1): l = self.lines[i] if l.is_subcall(): return l return None
def split(
self, loc_db, offset)
def split(self, loc_db, offset): loc_key = loc_db.get_or_create_offset_location(offset) log_asmblock.debug('split at %x', offset) i = -1 offsets = [x.offset for x in self.lines] offset = loc_db.get_location_offset(loc_key) if offset not in offsets: log_asmblock.warning( 'cannot split bloc at %X ' % offset + 'middle instruction? default middle') offsets.sort() return None new_bloc = AsmBlock(loc_key) i = offsets.index(offset) self.lines, new_bloc.lines = self.lines[:i], self.lines[i:] flow_mod_instr = self.get_flow_instr() log_asmblock.debug('flow mod %r', flow_mod_instr) c = AsmConstraint(loc_key, AsmConstraint.c_next) # move dst if flowgraph modifier was in original bloc # (usecase: split delayslot bloc) if flow_mod_instr: for xx in self.bto: log_asmblock.debug('lbl %s', xx) c_next = set( [x for x in self.bto if x.c_t == AsmConstraint.c_next]) c_to = [x for x in self.bto if x.c_t != AsmConstraint.c_next] self.bto = set([c] + c_to) new_bloc.bto = c_next else: new_bloc.bto = self.bto self.bto = set([c]) return new_bloc
def to_string(
self, loc_db=None)
def to_string(self, loc_db=None): out = [] if loc_db is None: out.append(str(self.loc_key)) else: out.append(loc_db.pretty_str(self.loc_key)) for instr in self.lines: out.append(instr.to_string(loc_db)) if self.bto: lbls = ["->"] for dst in self.bto: if dst is None: lbls.append("Unknown? ") else: lbls.append(dst.to_string(loc_db) + " ") lbls = '\t'.join(sorted(lbls)) out.append(lbls) return '\n'.join(out)
class AsmBlockBad
Stand for a bad ASM block (malformed, unreachable, not disassembled, ...)
class AsmBlockBad(AsmBlock): """Stand for a *bad* ASM block (malformed, unreachable, not disassembled, ...)""" ERROR_UNKNOWN = -1 ERROR_CANNOT_DISASM = 0 ERROR_NULL_STARTING_BLOCK = 1 ERROR_FORBIDDEN = 2 ERROR_IO = 3 ERROR_TYPES = { ERROR_UNKNOWN: "Unknown error", ERROR_CANNOT_DISASM: "Unable to disassemble", ERROR_NULL_STARTING_BLOCK: "Null starting block", ERROR_FORBIDDEN: "Address forbidden by dont_dis", ERROR_IO: "IOError", } def __init__(self, loc_key=None, alignment=1, errno=ERROR_UNKNOWN, *args, **kwargs): """Instanciate an AsmBlock_bad. @loc_key, @alignement: same as AsmBlock.__init__ @errno: (optional) specify a error type associated with the block """ super(AsmBlockBad, self).__init__(loc_key, alignment, *args, **kwargs) self._errno = errno errno = property(lambda self: self._errno) def __str__(self): error_txt = self.ERROR_TYPES.get(self._errno, self._errno) return "\n".join([str(self.loc_key), "\tBad block: %s" % error_txt]) def addline(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot have line") def addto(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot have bto") def split(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot be splitted")
Ancestors (in MRO)
- AsmBlockBad
- AsmBlock
- __builtin__.object
Class variables
var ERROR_CANNOT_DISASM
var ERROR_FORBIDDEN
var ERROR_IO
var ERROR_NULL_STARTING_BLOCK
var ERROR_TYPES
var ERROR_UNKNOWN
var errno
Instance variables
var errno
Methods
def __init__(
self, loc_key=None, alignment=1, errno=-1, *args, **kwargs)
Inheritance:
AsmBlock
.__init__
Instanciate an AsmBlock_bad. @loc_key, @alignement: same as AsmBlock.init @errno: (optional) specify a error type associated with the block
def __init__(self, loc_key=None, alignment=1, errno=ERROR_UNKNOWN, *args, **kwargs): """Instanciate an AsmBlock_bad. @loc_key, @alignement: same as AsmBlock.__init__ @errno: (optional) specify a error type associated with the block """ super(AsmBlockBad, self).__init__(loc_key, alignment, *args, **kwargs) self._errno = errno
def add_cst(
self, loc_key, constraint_type)
Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next
def add_cst(self, loc_key, constraint_type): """ Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next """ assert isinstance(loc_key, LocKey) c = AsmConstraint(loc_key, constraint_type) self.bto.add(c)
def addline(
self, *args, **kwargs)
def addline(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot have line")
def addto(
self, *args, **kwargs)
def addto(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot have bto")
def fix_constraints(
self)
Inheritance:
AsmBlock
.fix_constraints
Fix next block constraints
def fix_constraints(self): """Fix next block constraints""" # destination -> associated constraints dests = {} for constraint in self.bto: dests.setdefault(constraint.loc_key, set()).add(constraint) self.bto = set(self._filter_constraint(constraints) for constraints in dests.itervalues())
def get_flow_instr(
self)
Inheritance:
AsmBlock
.get_flow_instr
def get_flow_instr(self): if not self.lines: return None for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1): if not 0 <= i < len(self.lines): return None l = self.lines[i] if l.splitflow() or l.breakflow(): raise NotImplementedError('not fully functional')
def get_label(
self)
Inheritance:
AsmBlock
.get_label
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def get_next(
self)
Inheritance:
AsmBlock
.get_next
def get_next(self): for constraint in self.bto: if constraint.c_t == AsmConstraint.c_next: return constraint.loc_key return None
def get_offsets(
self)
Inheritance:
AsmBlock
.get_offsets
def get_offsets(self): return [x.offset for x in self.lines]
def get_range(
self)
Inheritance:
AsmBlock
.get_range
Returns the offset hull of an AsmBlock
def get_range(self): """Returns the offset hull of an AsmBlock""" if len(self.lines): return (self.lines[0].offset, self.lines[-1].offset + self.lines[-1].l) else: return 0, 0
def get_subcall_instr(
self)
Inheritance:
AsmBlock
.get_subcall_instr
def get_subcall_instr(self): if not self.lines: return None delayslot = self.lines[0].delayslot end_index = len(self.lines) - 1 ds_max_index = max(end_index - delayslot, 0) for i in xrange(end_index, ds_max_index - 1, -1): l = self.lines[i] if l.is_subcall(): return l return None
def split(
self, *args, **kwargs)
def split(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot be splitted")
def to_string(
self, loc_db=None)
Inheritance:
AsmBlock
.to_string
def to_string(self, loc_db=None): out = [] if loc_db is None: out.append(str(self.loc_key)) else: out.append(loc_db.pretty_str(self.loc_key)) for instr in self.lines: out.append(instr.to_string(loc_db)) if self.bto: lbls = ["->"] for dst in self.bto: if dst is None: lbls.append("Unknown? ") else: lbls.append(dst.to_string(loc_db) + " ") lbls = '\t'.join(sorted(lbls)) out.append(lbls) return '\n'.join(out)
class AsmCFG
Directed graph standing for a ASM Control Flow Graph with: - nodes: AsmBlock - edges: constraints between blocks, synchronized with AsmBlock's "bto"
Specialized the .dot export and force the relation between block to be uniq, and associated with a constraint.
Offer helpers on AsmCFG management, such as research by loc_key, sanity checking and mnemonic size guessing.
class AsmCFG(DiGraph): """Directed graph standing for a ASM Control Flow Graph with: - nodes: AsmBlock - edges: constraints between blocks, synchronized with AsmBlock's "bto" Specialized the .dot export and force the relation between block to be uniq, and associated with a constraint. Offer helpers on AsmCFG management, such as research by loc_key, sanity checking and mnemonic size guessing. """ # Internal structure for pending management AsmCFGPending = namedtuple("AsmCFGPending", ["waiter", "constraint"]) def __init__(self, loc_db=None, *args, **kwargs): super(AsmCFG, self).__init__(*args, **kwargs) # Edges -> constraint self.edges2constraint = {} # Expected LocKey -> set( (src, dst), constraint ) self._pendings = {} # Loc_Key2block built on the fly self._loc_key_to_block = {} # loc_db self.loc_db = loc_db def copy(self): """Copy the current graph instance""" graph = self.__class__(self.loc_db) return graph + self # Compatibility with old list API def append(self, *args, **kwargs): raise DeprecationWarning("AsmCFG is a graph, use add_node") def remove(self, *args, **kwargs): raise DeprecationWarning("AsmCFG is a graph, use del_node") def __getitem__(self, *args, **kwargs): raise DeprecationWarning("Order of AsmCFG elements is not reliable") def __contains__(self, _): """ DEPRECATED. Use: - loc_key in AsmCFG.nodes() to test loc_key existence """ raise RuntimeError("DEPRECATED") def __iter__(self): """ DEPRECATED. Use: - AsmCFG.blocks() to iter on blocks - loc_key in AsmCFG.nodes() to test loc_key existence """ raise RuntimeError("DEPRECATED") def __len__(self): """Return the number of blocks in AsmCFG""" return len(self._nodes) blocks = property(lambda x:x._loc_key_to_block.itervalues()) # Manage graph with associated constraints def add_edge(self, src, dst, constraint): """Add an edge to the graph @src: LocKey instance, source @dst: LocKey instance, destination @constraint: constraint associated to this edge """ # Sanity check assert isinstance(src, LocKey) assert isinstance(dst, LocKey) known_cst = self.edges2constraint.get((src, dst), None) if known_cst is not None: assert known_cst == constraint return # Add the edge to src.bto if needed block_src = self.loc_key_to_block(src) if block_src: if dst not in [cons.loc_key for cons in block_src.bto]: block_src.bto.add(AsmConstraint(dst, constraint)) # Add edge self.edges2constraint[(src, dst)] = constraint super(AsmCFG, self).add_edge(src, dst) def add_uniq_edge(self, src, dst, constraint): """ Synonym for `add_edge` """ self.add_edge(src, dst, constraint) def del_edge(self, src, dst): """Delete the edge @src->@dst and its associated constraint""" src_blk = self.loc_key_to_block(src) dst_blk = self.loc_key_to_block(dst) assert src_blk is not None assert dst_blk is not None # Delete from src.bto to_remove = [cons for cons in src_blk.bto if cons.loc_key == dst] if to_remove: assert len(to_remove) == 1 src_blk.bto.remove(to_remove[0]) # Del edge del self.edges2constraint[(src, dst)] super(AsmCFG, self).del_edge(src, dst) def del_block(self, block): super(AsmCFG, self).del_node(block.loc_key) del self._loc_key_to_block[block.loc_key] def add_node(self, node): assert isinstance(node, LocKey) return super(AsmCFG, self).add_node(node) def add_block(self, block): """ Add the block @block to the current instance, if it is not already in @block: AsmBlock instance Edges will be created for @block.bto, if destinations are already in this instance. If not, they will be resolved when adding these aforementionned destinations. `self.pendings` indicates which blocks are not yet resolved. """ status = super(AsmCFG, self).add_node(block.loc_key) if not status: return status # Update waiters if block.loc_key in self._pendings: for bblpend in self._pendings[block.loc_key]: self.add_edge(bblpend.waiter.loc_key, block.loc_key, bblpend.constraint) del self._pendings[block.loc_key] # Synchronize edges with block destinations self._loc_key_to_block[block.loc_key] = block for constraint in block.bto: dst = self._loc_key_to_block.get(constraint.loc_key, None) if dst is None: # Block is yet unknown, add it to pendings to_add = self.AsmCFGPending(waiter=block, constraint=constraint.c_t) self._pendings.setdefault(constraint.loc_key, set()).add(to_add) else: # Block is already in known nodes self.add_edge(block.loc_key, dst.loc_key, constraint.c_t) return status def merge(self, graph): """Merge with @graph, taking in account constraints""" # Add known blocks for block in graph.blocks: self.add_block(block) # Add nodes not already in it (ie. not linked to a block) for node in graph.nodes(): self.add_node(node) # -> add_edge(x, y, constraint) for edge in graph._edges: # May fail if there is an incompatibility in edges constraints # between the two graphs self.add_edge(*edge, constraint=graph.edges2constraint[edge]) def node2lines(self, node): if self.loc_db is None: loc_key_name = str(node) else: loc_key_name = self.loc_db.pretty_str(node) yield self.DotCellDescription(text=loc_key_name, attr={'align': 'center', 'colspan': 2, 'bgcolor': 'grey'}) block = self._loc_key_to_block.get(node, None) if block is None: raise StopIteration if isinstance(block, AsmBlockBad): yield [ self.DotCellDescription( text=block.ERROR_TYPES.get(block._errno, block._errno ), attr={}) ] raise StopIteration for line in block.lines: if self._dot_offset: yield [self.DotCellDescription(text="%.8X" % line.offset, attr={}), self.DotCellDescription(text=line.to_string(self.loc_db), attr={})] else: yield self.DotCellDescription(text=line.to_string(self.loc_db), attr={}) def node_attr(self, node): block = self._loc_key_to_block.get(node, None) if isinstance(block, AsmBlockBad): return {'style': 'filled', 'fillcolor': 'red'} return {} def edge_attr(self, src, dst): cst = self.edges2constraint.get((src, dst), None) edge_color = "blue" if len(self.successors(src)) > 1: if cst == AsmConstraint.c_next: edge_color = "red" else: edge_color = "limegreen" return {"color": edge_color} def dot(self, offset=False): """ @offset: (optional) if set, add the corresponding offsets in each node """ self._dot_offset = offset return super(AsmCFG, self).dot() # Helpers @property def pendings(self): """Dictionary of loc_key -> set(AsmCFGPending instance) indicating which loc_key are missing in the current instance. A loc_key is missing if a block which is already in nodes has constraints with him (thanks to its .bto) and the corresponding block is not yet in nodes """ return self._pendings def label2block(self, loc_key): """Return the block corresponding to loc_key @loc_key @loc_key: LocKey instance""" warnings.warn('DEPRECATION WARNING: use "loc_key_to_block" instead of "label2block"') return self.loc_key_to_block(loc_key) def rebuild_edges(self): """Consider blocks '.bto' and rebuild edges according to them, ie: - update constraint type - add missing edge - remove no more used edge This method should be called if a block's '.bto' in nodes have been modified without notifying this instance to resynchronize edges. """ for block in self.blocks: edges = [] # Rebuild edges from bto for constraint in block.bto: dst = self._loc_key_to_block.get(constraint.loc_key, None) if dst is None: # Missing destination, add to pendings self._pendings.setdefault( constraint.loc_key, set() ).add( self.AsmCFGPending( block, constraint.c_t ) ) continue edge = (block.loc_key, dst.loc_key) edges.append(edge) if edge in self._edges: # Already known edge, constraint may have changed self.edges2constraint[edge] = constraint.c_t else: # An edge is missing self.add_edge(edge[0], edge[1], constraint.c_t) # Remove useless edges for succ in self.successors(block.loc_key): edge = (block.loc_key, succ) if edge not in edges: self.del_edge(*edge) def get_bad_blocks(self): """Iterator on AsmBlockBad elements""" # A bad asm block is always a leaf for loc_key in self.leaves(): block = self._loc_key_to_block.get(loc_key, None) if isinstance(block, AsmBlockBad): yield block def get_bad_blocks_predecessors(self, strict=False): """Iterator on loc_keys with an AsmBlockBad destination @strict: (optional) if set, return loc_key with only bad successors """ # Avoid returning the same block done = set() for badblock in self.get_bad_blocks(): for predecessor in self.predecessors_iter(badblock.loc_key): if predecessor not in done: if (strict and not all(isinstance(self._loc_key_to_block.get(block, None), AsmBlockBad) for block in self.successors_iter(predecessor))): continue yield predecessor done.add(predecessor) def getby_offset(self, offset): """Return asmblock containing @offset""" for block in self.blocks: if block.lines[0].offset <= offset < \ (block.lines[-1].offset + block.lines[-1].l): return block return None def loc_key_to_block(self, loc_key): """ Return the asmblock corresponding to loc_key @loc_key, None if unknown loc_key @loc_key: LocKey instance """ return self._loc_key_to_block.get(loc_key, None) def sanity_check(self): """Do sanity checks on blocks' constraints: * no pendings * no multiple next constraint to same block * no next constraint to self """ if len(self._pendings) != 0: raise RuntimeError("Some blocks are missing: %s" % map( str, self._pendings.keys() )) next_edges = {edge: constraint for edge, constraint in self.edges2constraint.iteritems() if constraint == AsmConstraint.c_next} for loc_key in self._nodes: if loc_key not in self._loc_key_to_block: raise RuntimeError("Not supported yet: every node must have a corresponding AsmBlock") # No next constraint to self if (loc_key, loc_key) in next_edges: raise RuntimeError('Bad constraint: self in next') # No multiple next constraint to same block pred_next = list(ploc_key for (ploc_key, dloc_key) in next_edges if dloc_key == loc_key) if len(pred_next) > 1: raise RuntimeError("Too many next constraints for bloc %r" "(%s)" % (loc_key, pred_next)) def guess_blocks_size(self, mnemo): """Asm and compute max block size Add a 'size' and 'max_size' attribute on each block @mnemo: metamn instance""" for block in self.blocks: size = 0 for instr in block.lines: if isinstance(instr, AsmRaw): # for special AsmRaw, only extract len if isinstance(instr.raw, list): data = None if len(instr.raw) == 0: l = 0 else: l = instr.raw[0].size / 8 * len(instr.raw) elif isinstance(instr.raw, str): data = instr.raw l = len(data) else: raise NotImplementedError('asm raw') else: # Assemble the instruction to retrieve its len. # If the instruction uses symbol it will fail # In this case, the max_instruction_len is used try: candidates = mnemo.asm(instr) l = len(candidates[-1]) except: l = mnemo.max_instruction_len data = None instr.data = data instr.l = l size += l block.size = size block.max_size = size log_asmblock.info("size: %d max: %d", block.size, block.max_size) def apply_splitting(self, loc_db, dis_block_callback=None, **kwargs): """Consider @self' bto destinations and split block in @self if one of these destinations jumps in the middle of this block. In order to work, they must be only one block in @self per loc_key in @loc_db (which is true if @self come from the same disasmEngine). @loc_db: LocationDB instance associated with @self'loc_keys @dis_block_callback: (optional) if set, this callback will be called on new block destinations @kwargs: (optional) named arguments to pass to dis_block_callback """ # Get all possible destinations not yet resolved, with a resolved # offset block_dst = [] for loc_key in self.pendings: offset = loc_db.get_location_offset(loc_key) if offset is not None: block_dst.append(offset) todo = set(self.blocks) rebuild_needed = False while todo: # Find a block with a destination inside another one cur_block = todo.pop() range_start, range_stop = cur_block.get_range() for off in block_dst: if not (off > range_start and off < range_stop): continue # `cur_block` must be splitted at offset `off`from miasm2.core.locationdb import LocationDB new_b = cur_block.split(loc_db, off) log_asmblock.debug("Split block %x", off) if new_b is None: log_asmblock.error("Cannot split %x!!", off) continue # Remove pending from cur_block # Links from new_b will be generated in rebuild_edges for dst in new_b.bto: if dst.loc_key not in self.pendings: continue self.pendings[dst.loc_key] = set(pending for pending in self.pendings[dst.loc_key] if pending.waiter != cur_block) # The new block destinations may need to be disassembled if dis_block_callback: offsets_to_dis = set( self.loc_db.get_location_offset(constraint.loc_key) for constraint in new_b.bto ) dis_block_callback(cur_bloc=new_b, offsets_to_dis=offsets_to_dis, loc_db=loc_db, **kwargs) # Update structure rebuild_needed = True self.add_block(new_b) # The new block must be considered todo.add(new_b) range_start, range_stop = cur_block.get_range() # Rebuild edges to match new blocks'bto if rebuild_needed: self.rebuild_edges() def __str__(self): out = [] for block in self.blocks: out.append(str(block)) for loc_key_a, loc_key_b in self.edges(): out.append("%s -> %s" % (loc_key_a, loc_key_b)) return '\n'.join(out) def __repr__(self): return "<%s %s>" % (self.__class__.__name__, hex(id(self)))
Ancestors (in MRO)
- AsmCFG
- miasm2.core.graph.DiGraph
- __builtin__.object
Class variables
var AsmCFGPending
var DotCellDescription
var blocks
Instance variables
var blocks
var edges2constraint
var loc_db
var pendings
Dictionary of loc_key -> set(AsmCFGPending instance) indicating which loc_key are missing in the current instance. A loc_key is missing if a block which is already in nodes has constraints with him (thanks to its .bto) and the corresponding block is not yet in nodes
Methods
def __init__(
self, loc_db=None, *args, **kwargs)
def __init__(self, loc_db=None, *args, **kwargs): super(AsmCFG, self).__init__(*args, **kwargs) # Edges -> constraint self.edges2constraint = {} # Expected LocKey -> set( (src, dst), constraint ) self._pendings = {} # Loc_Key2block built on the fly self._loc_key_to_block = {} # loc_db self.loc_db = loc_db
def add_block(
self, block)
Add the block @block to the current instance, if it is not already in @block: AsmBlock instance
Edges will be created for @block.bto, if destinations are already in
this instance. If not, they will be resolved when adding these
aforementionned destinations.
self.pendings
indicates which blocks are not yet resolved.
def add_block(self, block): """ Add the block @block to the current instance, if it is not already in @block: AsmBlock instance Edges will be created for @block.bto, if destinations are already in this instance. If not, they will be resolved when adding these aforementionned destinations. `self.pendings` indicates which blocks are not yet resolved. """ status = super(AsmCFG, self).add_node(block.loc_key) if not status: return status # Update waiters if block.loc_key in self._pendings: for bblpend in self._pendings[block.loc_key]: self.add_edge(bblpend.waiter.loc_key, block.loc_key, bblpend.constraint) del self._pendings[block.loc_key] # Synchronize edges with block destinations self._loc_key_to_block[block.loc_key] = block for constraint in block.bto: dst = self._loc_key_to_block.get(constraint.loc_key, None) if dst is None: # Block is yet unknown, add it to pendings to_add = self.AsmCFGPending(waiter=block, constraint=constraint.c_t) self._pendings.setdefault(constraint.loc_key, set()).add(to_add) else: # Block is already in known nodes self.add_edge(block.loc_key, dst.loc_key, constraint.c_t) return status
def add_edge(
self, src, dst, constraint)
Add an edge to the graph @src: LocKey instance, source @dst: LocKey instance, destination @constraint: constraint associated to this edge
def add_edge(self, src, dst, constraint): """Add an edge to the graph @src: LocKey instance, source @dst: LocKey instance, destination @constraint: constraint associated to this edge """ # Sanity check assert isinstance(src, LocKey) assert isinstance(dst, LocKey) known_cst = self.edges2constraint.get((src, dst), None) if known_cst is not None: assert known_cst == constraint return # Add the edge to src.bto if needed block_src = self.loc_key_to_block(src) if block_src: if dst not in [cons.loc_key for cons in block_src.bto]: block_src.bto.add(AsmConstraint(dst, constraint)) # Add edge self.edges2constraint[(src, dst)] = constraint super(AsmCFG, self).add_edge(src, dst)
def add_node(
self, node)
def add_node(self, node): assert isinstance(node, LocKey) return super(AsmCFG, self).add_node(node)
def add_uniq_edge(
self, src, dst, constraint)
Synonym for add_edge
def add_uniq_edge(self, src, dst, constraint): """ Synonym for `add_edge` """ self.add_edge(src, dst, constraint)
def append(
self, *args, **kwargs)
def append(self, *args, **kwargs): raise DeprecationWarning("AsmCFG is a graph, use add_node")
def apply_splitting(
self, loc_db, dis_block_callback=None, **kwargs)
Consider @self' bto destinations and split block in @self if one of these destinations jumps in the middle of this block. In order to work, they must be only one block in @self per loc_key in @loc_db (which is true if @self come from the same disasmEngine).
@loc_db: LocationDB instance associated with @self'loc_keys @dis_block_callback: (optional) if set, this callback will be called on new block destinations @kwargs: (optional) named arguments to pass to dis_block_callback
def apply_splitting(self, loc_db, dis_block_callback=None, **kwargs): """Consider @self' bto destinations and split block in @self if one of these destinations jumps in the middle of this block. In order to work, they must be only one block in @self per loc_key in @loc_db (which is true if @self come from the same disasmEngine). @loc_db: LocationDB instance associated with @self'loc_keys @dis_block_callback: (optional) if set, this callback will be called on new block destinations @kwargs: (optional) named arguments to pass to dis_block_callback """ # Get all possible destinations not yet resolved, with a resolved # offset block_dst = [] for loc_key in self.pendings: offset = loc_db.get_location_offset(loc_key) if offset is not None: block_dst.append(offset) todo = set(self.blocks) rebuild_needed = False while todo: # Find a block with a destination inside another one cur_block = todo.pop() range_start, range_stop = cur_block.get_range() for off in block_dst: if not (off > range_start and off < range_stop): continue # `cur_block` must be splitted at offset `off`from miasm2.core.locationdb import LocationDB new_b = cur_block.split(loc_db, off) log_asmblock.debug("Split block %x", off) if new_b is None: log_asmblock.error("Cannot split %x!!", off) continue # Remove pending from cur_block # Links from new_b will be generated in rebuild_edges for dst in new_b.bto: if dst.loc_key not in self.pendings: continue self.pendings[dst.loc_key] = set(pending for pending in self.pendings[dst.loc_key] if pending.waiter != cur_block) # The new block destinations may need to be disassembled if dis_block_callback: offsets_to_dis = set( self.loc_db.get_location_offset(constraint.loc_key) for constraint in new_b.bto ) dis_block_callback(cur_bloc=new_b, offsets_to_dis=offsets_to_dis, loc_db=loc_db, **kwargs) # Update structure rebuild_needed = True self.add_block(new_b) # The new block must be considered todo.add(new_b) range_start, range_stop = cur_block.get_range() # Rebuild edges to match new blocks'bto if rebuild_needed: self.rebuild_edges()
def compute_back_edges(
self, head)
Computes all back edges from a node to a dominator in the graph. :param head: head of graph :return: yield a back edge
def compute_back_edges(self, head): """ Computes all back edges from a node to a dominator in the graph. :param head: head of graph :return: yield a back edge """ dominators = self.compute_dominators(head) # traverse graph for node in self.walk_depth_first_forward(head): for successor in self.successors_iter(node): # check for a back edge to a dominator if successor in dominators[node]: edge = (node, successor) yield edge
def compute_dominance_frontier(
self, head)
Compute the dominance frontier of the graph
Source: Cooper, Keith D., Timothy J. Harvey, and Ken Kennedy. "A simple, fast dominance algorithm." Software Practice & Experience 4 (2001), p. 9
def compute_dominance_frontier(self, head): """ Compute the dominance frontier of the graph Source: Cooper, Keith D., Timothy J. Harvey, and Ken Kennedy. "A simple, fast dominance algorithm." Software Practice & Experience 4 (2001), p. 9 """ idoms = self.compute_immediate_dominators(head) frontier = {} for node in idoms: if self._nodes_pred[node] >= 2: for predecessor in self.predecessors_iter(node): runner = predecessor if runner not in idoms: continue while runner != idoms[node]: if runner not in frontier: frontier[runner] = set() frontier[runner].add(node) runner = idoms[runner] return frontier
def compute_dominator_tree(
self, head)
Computes the dominator tree of a graph :param head: head of graph :return: DiGraph
def compute_dominator_tree(self, head): """ Computes the dominator tree of a graph :param head: head of graph :return: DiGraph """ idoms = self.compute_immediate_dominators(head) dominator_tree = DiGraph() for node in idoms: dominator_tree.add_edge(idoms[node], node) return dominator_tree
def compute_dominators(
self, head)
Compute the dominators of the graph
def compute_dominators(self, head): """Compute the dominators of the graph""" return self._compute_generic_dominators(head, self.reachable_sons, self.predecessors_iter, self.successors_iter)
def compute_immediate_dominators(
self, head)
Compute the immediate dominators of the graph
def compute_immediate_dominators(self, head): """Compute the immediate dominators of the graph""" dominators = self.compute_dominators(head) idoms = {} for node in dominators: for predecessor in self.walk_dominators(node, dominators): if predecessor in dominators[node] and node != predecessor: idoms[node] = predecessor break return idoms
def compute_natural_loops(
self, head)
Computes all natural loops in the graph.
Source: Aho, Alfred V., Lam, Monica S., Sethi, R. and Jeffrey Ullman. "Compilers: Principles, Techniques, & Tools, Second Edition" Pearson/Addison Wesley (2007), Chapter 9.6.6 :param head: head of the graph :return: yield a tuple of the form (back edge, loop body)
def compute_natural_loops(self, head): """ Computes all natural loops in the graph. Source: Aho, Alfred V., Lam, Monica S., Sethi, R. and Jeffrey Ullman. "Compilers: Principles, Techniques, & Tools, Second Edition" Pearson/Addison Wesley (2007), Chapter 9.6.6 :param head: head of the graph :return: yield a tuple of the form (back edge, loop body) """ for a, b in self.compute_back_edges(head): body = self._compute_natural_loop_body(b, a) yield ((a, b), body)
def compute_postdominators(
self, leaf)
Compute the postdominators of the graph
def compute_postdominators(self, leaf): """Compute the postdominators of the graph""" return self._compute_generic_dominators(leaf, self.reachable_parents, self.successors_iter, self.predecessors_iter)
def compute_strongly_connected_components(
self)
Partitions the graph into strongly connected components.
Iterative implementation of Gabow's path-based SCC algorithm. Source: Gabow, Harold N. "Path-based depth-first search for strong and biconnected components." Information Processing Letters 74.3 (2000), pp. 109--110
The iterative implementation is inspired by Mark Dickinson's code: http://code.activestate.com/recipes/ 578507-strongly-connected-components-of-a-directed-graph/ :return: yield a strongly connected component
def compute_strongly_connected_components(self): """ Partitions the graph into strongly connected components. Iterative implementation of Gabow's path-based SCC algorithm. Source: Gabow, Harold N. "Path-based depth-first search for strong and biconnected components." Information Processing Letters 74.3 (2000), pp. 109--110 The iterative implementation is inspired by Mark Dickinson's code: http://code.activestate.com/recipes/ 578507-strongly-connected-components-of-a-directed-graph/ :return: yield a strongly connected component """ stack = [] boundaries = [] counter = len(self.nodes()) # init index with 0 index = {v: 0 for v in self.nodes()} # state machine for worklist algorithm VISIT, HANDLE_RECURSION, MERGE = 0, 1, 2 NodeState = namedtuple('NodeState', ['state', 'node']) for node in self.nodes(): # next node if node was already visited if index[node]: continue todo = [NodeState(VISIT, node)] done = set() while todo: current = todo.pop() if current.node in done: continue # node is unvisited if current.state == VISIT: stack.append(current.node) index[current.node] = len(stack) boundaries.append(index[current.node]) todo.append(NodeState(MERGE, current.node)) # follow successors for successor in self.successors_iter(current.node): todo.append(NodeState(HANDLE_RECURSION, successor)) # iterative handling of recursion algorithm elif current.state == HANDLE_RECURSION: # visit unvisited successor if index[current.node] == 0: todo.append(NodeState(VISIT, current.node)) else: # contract cycle if necessary while index[current.node] < boundaries[-1]: boundaries.pop() # merge strongly connected component else: if index[current.node] == boundaries[-1]: boundaries.pop() counter += 1 scc = set() while index[current.node] <= len(stack): popped = stack.pop() index[popped] = counter scc.add(popped) done.add(current.node) yield scc
def copy(
self)
Copy the current graph instance
def copy(self): """Copy the current graph instance""" graph = self.__class__(self.loc_db) return graph + self
def del_block(
self, block)
def del_block(self, block): super(AsmCFG, self).del_node(block.loc_key) del self._loc_key_to_block[block.loc_key]
def del_edge(
self, src, dst)
Delete the edge @src->@dst and its associated constraint
def del_edge(self, src, dst): """Delete the edge @src->@dst and its associated constraint""" src_blk = self.loc_key_to_block(src) dst_blk = self.loc_key_to_block(dst) assert src_blk is not None assert dst_blk is not None # Delete from src.bto to_remove = [cons for cons in src_blk.bto if cons.loc_key == dst] if to_remove: assert len(to_remove) == 1 src_blk.bto.remove(to_remove[0]) # Del edge del self.edges2constraint[(src, dst)] super(AsmCFG, self).del_edge(src, dst)
def del_node(
self, node)
Delete the @node of the graph; Also delete every edge to/from this @node
def del_node(self, node): """Delete the @node of the graph; Also delete every edge to/from this @node""" if node in self._nodes: self._nodes.remove(node) for pred in self.predecessors(node): self.del_edge(pred, node) for succ in self.successors(node): self.del_edge(node, succ)
def discard_edge(
self, src, dst)
Remove edge between @src and @dst if it exits
def discard_edge(self, src, dst): """Remove edge between @src and @dst if it exits""" if (src, dst) in self._edges: self.del_edge(src, dst)
def dot(
self, offset=False)
@offset: (optional) if set, add the corresponding offsets in each node
def dot(self, offset=False): """ @offset: (optional) if set, add the corresponding offsets in each node """ self._dot_offset = offset return super(AsmCFG, self).dot()
def edge_attr(
self, src, dst)
def edge_attr(self, src, dst): cst = self.edges2constraint.get((src, dst), None) edge_color = "blue" if len(self.successors(src)) > 1: if cst == AsmConstraint.c_next: edge_color = "red" else: edge_color = "limegreen" return {"color": edge_color}
def edges(
self)
def edges(self): return self._edges
def find_path(
self, src, dst, cycles_count=0, done=None)
def find_path(self, src, dst, cycles_count=0, done=None): if done is None: done = {} if dst in done and done[dst] > cycles_count: return [[]] if src == dst: return [[src]] out = [] for node in self.predecessors(dst): done_n = dict(done) done_n[dst] = done_n.get(dst, 0) + 1 for path in self.find_path(src, node, cycles_count, done_n): if path and path[0] == src: out.append(path + [dst]) return out
def get_bad_blocks(
self)
Iterator on AsmBlockBad elements
def get_bad_blocks(self): """Iterator on AsmBlockBad elements""" # A bad asm block is always a leaf for loc_key in self.leaves(): block = self._loc_key_to_block.get(loc_key, None) if isinstance(block, AsmBlockBad): yield block
def get_bad_blocks_predecessors(
self, strict=False)
Iterator on loc_keys with an AsmBlockBad destination @strict: (optional) if set, return loc_key with only bad successors
def get_bad_blocks_predecessors(self, strict=False): """Iterator on loc_keys with an AsmBlockBad destination @strict: (optional) if set, return loc_key with only bad successors """ # Avoid returning the same block done = set() for badblock in self.get_bad_blocks(): for predecessor in self.predecessors_iter(badblock.loc_key): if predecessor not in done: if (strict and not all(isinstance(self._loc_key_to_block.get(block, None), AsmBlockBad) for block in self.successors_iter(predecessor))): continue yield predecessor done.add(predecessor)
def getby_offset(
self, offset)
Return asmblock containing @offset
def getby_offset(self, offset): """Return asmblock containing @offset""" for block in self.blocks: if block.lines[0].offset <= offset < \ (block.lines[-1].offset + block.lines[-1].l): return block return None
def guess_blocks_size(
self, mnemo)
Asm and compute max block size Add a 'size' and 'max_size' attribute on each block @mnemo: metamn instance
def guess_blocks_size(self, mnemo): """Asm and compute max block size Add a 'size' and 'max_size' attribute on each block @mnemo: metamn instance""" for block in self.blocks: size = 0 for instr in block.lines: if isinstance(instr, AsmRaw): # for special AsmRaw, only extract len if isinstance(instr.raw, list): data = None if len(instr.raw) == 0: l = 0 else: l = instr.raw[0].size / 8 * len(instr.raw) elif isinstance(instr.raw, str): data = instr.raw l = len(data) else: raise NotImplementedError('asm raw') else: # Assemble the instruction to retrieve its len. # If the instruction uses symbol it will fail # In this case, the max_instruction_len is used try: candidates = mnemo.asm(instr) l = len(candidates[-1]) except: l = mnemo.max_instruction_len data = None instr.data = data instr.l = l size += l block.size = size block.max_size = size log_asmblock.info("size: %d max: %d", block.size, block.max_size)
def has_loop(
self)
Return True if the graph contains at least a cycle
def has_loop(self): """Return True if the graph contains at least a cycle""" todo = list(self.nodes()) # tested nodes done = set() # current DFS nodes current = set() while todo: node = todo.pop() if node in done: continue if node in current: # DFS branch end for succ in self.successors_iter(node): if succ in current: return True # A node cannot be in current AND in done current.remove(node) done.add(node) else: # Launch DFS from node todo.append(node) current.add(node) todo += self.successors(node) return False
def heads(
self)
def heads(self): return [x for x in self.heads_iter()]
def heads_iter(
self)
def heads_iter(self): for node in self._nodes: if not self._nodes_pred[node]: yield node
def label2block(
self, loc_key)
Return the block corresponding to loc_key @loc_key @loc_key: LocKey instance
def label2block(self, loc_key): """Return the block corresponding to loc_key @loc_key @loc_key: LocKey instance""" warnings.warn('DEPRECATION WARNING: use "loc_key_to_block" instead of "label2block"') return self.loc_key_to_block(loc_key)
def leaves(
self)
def leaves(self): return [x for x in self.leaves_iter()]
def leaves_iter(
self)
def leaves_iter(self): for node in self._nodes: if not self._nodes_succ[node]: yield node
def loc_key_to_block(
self, loc_key)
Return the asmblock corresponding to loc_key @loc_key, None if unknown loc_key @loc_key: LocKey instance
def loc_key_to_block(self, loc_key): """ Return the asmblock corresponding to loc_key @loc_key, None if unknown loc_key @loc_key: LocKey instance """ return self._loc_key_to_block.get(loc_key, None)
def merge(
self, graph)
Merge with @graph, taking in account constraints
def merge(self, graph): """Merge with @graph, taking in account constraints""" # Add known blocks for block in graph.blocks: self.add_block(block) # Add nodes not already in it (ie. not linked to a block) for node in graph.nodes(): self.add_node(node) # -> add_edge(x, y, constraint) for edge in graph._edges: # May fail if there is an incompatibility in edges constraints # between the two graphs self.add_edge(*edge, constraint=graph.edges2constraint[edge])
def node2lines(
self, node)
def node2lines(self, node): if self.loc_db is None: loc_key_name = str(node) else: loc_key_name = self.loc_db.pretty_str(node) yield self.DotCellDescription(text=loc_key_name, attr={'align': 'center', 'colspan': 2, 'bgcolor': 'grey'}) block = self._loc_key_to_block.get(node, None) if block is None: raise StopIteration if isinstance(block, AsmBlockBad): yield [ self.DotCellDescription( text=block.ERROR_TYPES.get(block._errno, block._errno ), attr={}) ] raise StopIteration for line in block.lines: if self._dot_offset: yield [self.DotCellDescription(text="%.8X" % line.offset, attr={}), self.DotCellDescription(text=line.to_string(self.loc_db), attr={})] else: yield self.DotCellDescription(text=line.to_string(self.loc_db), attr={})
def node_attr(
self, node)
def node_attr(self, node): block = self._loc_key_to_block.get(node, None) if isinstance(block, AsmBlockBad): return {'style': 'filled', 'fillcolor': 'red'} return {}
def nodeid(
self, node)
Returns uniq id for a @node @node: a node of the graph
def nodeid(self, node): """ Returns uniq id for a @node @node: a node of the graph """ return hash(node) & 0xFFFFFFFFFFFFFFFF
def nodes(
self)
def nodes(self): return self._nodes
def predecessors(
self, node)
def predecessors(self, node): return [x for x in self.predecessors_iter(node)]
def predecessors_iter(
self, node)
def predecessors_iter(self, node): if not node in self._nodes_pred: raise StopIteration for n_pred in self._nodes_pred[node]: yield n_pred
def predecessors_stop_node_iter(
self, node, head)
def predecessors_stop_node_iter(self, node, head): if node == head: raise StopIteration for next_node in self.predecessors_iter(node): yield next_node
def reachable_parents(
self, leaf)
Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf
def reachable_parents(self, leaf): """Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf""" return self._reachable_nodes(leaf, self.predecessors_iter)
def reachable_parents_stop_node(
self, leaf, head)
Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf. Do not compute reachables past @head node
def reachable_parents_stop_node(self, leaf, head): """Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf. Do not compute reachables past @head node""" return self._reachable_nodes( leaf, lambda node_cur: self.predecessors_stop_node_iter( node_cur, head ) )
def reachable_sons(
self, head)
Compute all nodes reachable from node @head. Each son is an immediate successor of an arbitrary, already yielded son of @head
def reachable_sons(self, head): """Compute all nodes reachable from node @head. Each son is an immediate successor of an arbitrary, already yielded son of @head""" return self._reachable_nodes(head, self.successors_iter)
def rebuild_edges(
self)
Consider blocks '.bto' and rebuild edges according to them, ie: - update constraint type - add missing edge - remove no more used edge
This method should be called if a block's '.bto' in nodes have been modified without notifying this instance to resynchronize edges.
def rebuild_edges(self): """Consider blocks '.bto' and rebuild edges according to them, ie: - update constraint type - add missing edge - remove no more used edge This method should be called if a block's '.bto' in nodes have been modified without notifying this instance to resynchronize edges. """ for block in self.blocks: edges = [] # Rebuild edges from bto for constraint in block.bto: dst = self._loc_key_to_block.get(constraint.loc_key, None) if dst is None: # Missing destination, add to pendings self._pendings.setdefault( constraint.loc_key, set() ).add( self.AsmCFGPending( block, constraint.c_t ) ) continue edge = (block.loc_key, dst.loc_key) edges.append(edge) if edge in self._edges: # Already known edge, constraint may have changed self.edges2constraint[edge] = constraint.c_t else: # An edge is missing self.add_edge(edge[0], edge[1], constraint.c_t) # Remove useless edges for succ in self.successors(block.loc_key): edge = (block.loc_key, succ) if edge not in edges: self.del_edge(*edge)
def remove(
self, *args, **kwargs)
def remove(self, *args, **kwargs): raise DeprecationWarning("AsmCFG is a graph, use del_node")
def sanity_check(
self)
Do sanity checks on blocks' constraints: no pendings no multiple next constraint to same block * no next constraint to self
def sanity_check(self): """Do sanity checks on blocks' constraints: * no pendings * no multiple next constraint to same block * no next constraint to self """ if len(self._pendings) != 0: raise RuntimeError("Some blocks are missing: %s" % map( str, self._pendings.keys() )) next_edges = {edge: constraint for edge, constraint in self.edges2constraint.iteritems() if constraint == AsmConstraint.c_next} for loc_key in self._nodes: if loc_key not in self._loc_key_to_block: raise RuntimeError("Not supported yet: every node must have a corresponding AsmBlock") # No next constraint to self if (loc_key, loc_key) in next_edges: raise RuntimeError('Bad constraint: self in next') # No multiple next constraint to same block pred_next = list(ploc_key for (ploc_key, dloc_key) in next_edges if dloc_key == loc_key) if len(pred_next) > 1: raise RuntimeError("Too many next constraints for bloc %r" "(%s)" % (loc_key, pred_next))
def successors(
self, node)
def successors(self, node): return [x for x in self.successors_iter(node)]
def successors_iter(
self, node)
def successors_iter(self, node): if not node in self._nodes_succ: raise StopIteration for n_suc in self._nodes_succ[node]: yield n_suc
def walk_breadth_first_backward(
self, head)
Performs a breadth first search on the reversed graph from @head
def walk_breadth_first_backward(self, head): """Performs a breadth first search on the reversed graph from @head""" return self._walk_generic_first(head, 0, self.predecessors_iter)
def walk_breadth_first_forward(
self, head)
Performs a breadth first search on the graph from @head
def walk_breadth_first_forward(self, head): """Performs a breadth first search on the graph from @head""" return self._walk_generic_first(head, 0, self.successors_iter)
def walk_depth_first_backward(
self, head)
Performs a depth first search on the reversed graph from @head
def walk_depth_first_backward(self, head): """Performs a depth first search on the reversed graph from @head""" return self._walk_generic_first(head, -1, self.predecessors_iter)
def walk_depth_first_forward(
self, head)
Performs a depth first search on the graph from @head
def walk_depth_first_forward(self, head): """Performs a depth first search on the graph from @head""" return self._walk_generic_first(head, -1, self.successors_iter)
def walk_dominators(
self, node, dominators)
Return an iterator of the ordered list of @node's dominators The function doesn't return the self reference in dominators. @node: The start node @dominators: The dictionary containing at least node's dominators
def walk_dominators(self, node, dominators): """Return an iterator of the ordered list of @node's dominators The function doesn't return the self reference in dominators. @node: The start node @dominators: The dictionary containing at least node's dominators """ return self._walk_generic_dominator(node, dominators, self.predecessors_iter)
def walk_postdominators(
self, node, postdominators)
Return an iterator of the ordered list of @node's postdominators The function doesn't return the self reference in postdominators. @node: The start node @postdominators: The dictionary containing at least node's postdominators
def walk_postdominators(self, node, postdominators): """Return an iterator of the ordered list of @node's postdominators The function doesn't return the self reference in postdominators. @node: The start node @postdominators: The dictionary containing at least node's postdominators """ return self._walk_generic_dominator(node, postdominators, self.successors_iter)
class AsmConstraint
class AsmConstraint(object): c_to = "c_to" c_next = "c_next" def __init__(self, loc_key, c_t=c_to): # Sanity check assert isinstance(loc_key, LocKey) self.loc_key = loc_key self.c_t = c_t def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key def set_label(self, loc_key): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') self.loc_key = loc_key label = property(get_label, set_label) def to_string(self, loc_db=None): if loc_db is None: return "%s:%s" % (self.c_t, self.loc_key) else: return "%s:%s" % ( self.c_t, loc_db.pretty_str(self.loc_key) ) def __str__(self): return self.to_string()
Ancestors (in MRO)
- AsmConstraint
- __builtin__.object
Class variables
var c_next
var c_to
var label
Instance variables
var c_t
var label
var loc_key
Methods
def __init__(
self, loc_key, c_t='c_to')
def __init__(self, loc_key, c_t=c_to): # Sanity check assert isinstance(loc_key, LocKey) self.loc_key = loc_key self.c_t = c_t
def get_label(
self)
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def set_label(
self, loc_key)
def set_label(self, loc_key): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') self.loc_key = loc_key
def to_string(
self, loc_db=None)
def to_string(self, loc_db=None): if loc_db is None: return "%s:%s" % (self.c_t, self.loc_key) else: return "%s:%s" % ( self.c_t, loc_db.pretty_str(self.loc_key) )
class AsmConstraintNext
class AsmConstraintNext(AsmConstraint): def __init__(self, loc_key): super(AsmConstraintNext, self).__init__( loc_key, c_t=AsmConstraint.c_next )
Ancestors (in MRO)
- AsmConstraintNext
- AsmConstraint
- __builtin__.object
Class variables
Instance variables
Methods
def __init__(
self, loc_key)
Inheritance:
AsmConstraint
.__init__
def __init__(self, loc_key): super(AsmConstraintNext, self).__init__( loc_key, c_t=AsmConstraint.c_next )
def get_label(
self)
Inheritance:
AsmConstraint
.get_label
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def set_label(
self, loc_key)
Inheritance:
AsmConstraint
.set_label
def set_label(self, loc_key): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') self.loc_key = loc_key
def to_string(
self, loc_db=None)
Inheritance:
AsmConstraint
.to_string
def to_string(self, loc_db=None): if loc_db is None: return "%s:%s" % (self.c_t, self.loc_key) else: return "%s:%s" % ( self.c_t, loc_db.pretty_str(self.loc_key) )
class AsmConstraintTo
class AsmConstraintTo(AsmConstraint): def __init__(self, loc_key): super(AsmConstraintTo, self).__init__( loc_key, c_t=AsmConstraint.c_to )
Ancestors (in MRO)
- AsmConstraintTo
- AsmConstraint
- __builtin__.object
Class variables
Instance variables
Methods
def __init__(
self, loc_key)
Inheritance:
AsmConstraint
.__init__
def __init__(self, loc_key): super(AsmConstraintTo, self).__init__( loc_key, c_t=AsmConstraint.c_to )
def get_label(
self)
Inheritance:
AsmConstraint
.get_label
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def set_label(
self, loc_key)
Inheritance:
AsmConstraint
.set_label
def set_label(self, loc_key): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') self.loc_key = loc_key
def to_string(
self, loc_db=None)
Inheritance:
AsmConstraint
.to_string
def to_string(self, loc_db=None): if loc_db is None: return "%s:%s" % (self.c_t, self.loc_key) else: return "%s:%s" % ( self.c_t, loc_db.pretty_str(self.loc_key) )
class AsmRaw
class AsmRaw(object): def __init__(self, raw=""): self.raw = raw def __str__(self): return repr(self.raw) def to_string(self, loc_db): return str(self)
Ancestors (in MRO)
- AsmRaw
- __builtin__.object
Instance variables
var raw
Methods
def __init__(
self, raw='')
def __init__(self, raw=""): self.raw = raw
def to_string(
self, loc_db)
def to_string(self, loc_db): return str(self)
class AsmSymbolPool
[DEPRECATED API] use 'LocationDB' instead
class AsmSymbolPool(LocationDB): """[DEPRECATED API] use 'LocationDB' instead""" def __init__(self, *args, **kwargs): warnings.warn("Deprecated API, use 'LocationDB' instead") super(AsmSymbolPool, self).__init__(*args, **kwargs)
Ancestors (in MRO)
- AsmSymbolPool
- miasm2.core.locationdb.LocationDB
- __builtin__.object
Instance variables
var items
Return all loc_keys
var loc_keys
Return all loc_keys
var names
Return all known names
var offsets
Return all known offsets
Methods
def __init__(
self, *args, **kwargs)
def __init__(self, *args, **kwargs): warnings.warn("Deprecated API, use 'LocationDB' instead") super(AsmSymbolPool, self).__init__(*args, **kwargs)
def add_location(
self, name=None, offset=None, strict=True)
Add a new location in the locationDB. Returns the corresponding LocKey. If @name is set, also associate a name to this new location. If @offset is set, also associate an offset to this new location.
Strict mode (set by @strict, default): If a location with @offset or @name already exists, an error will be raised. Otherwise: If a location with @offset or @name already exists, the corresponding LocKey will be returned.
def add_location(self, name=None, offset=None, strict=True): """Add a new location in the locationDB. Returns the corresponding LocKey. If @name is set, also associate a name to this new location. If @offset is set, also associate an offset to this new location. Strict mode (set by @strict, default): If a location with @offset or @name already exists, an error will be raised. Otherwise: If a location with @offset or @name already exists, the corresponding LocKey will be returned. """ # Deprecation handling if is_int(name): assert offset is None or offset == name warnings.warn("Deprecated API: use 'add_location(offset=)' instead." " An additionnal 'name=' can be provided to also " "associate a name (there is no more default name)") offset = name name = None # Argument cleaning offset_loc_key = None if offset is not None: offset = int(offset) offset_loc_key = self.get_offset_location(offset) # Test for collisions name_loc_key = None if name is not None: name_loc_key = self.get_name_location(name) if strict: if name_loc_key is not None: raise ValueError("An entry for %r already exists (%r), and " "strict mode is enabled" % ( name, name_loc_key )) if offset_loc_key is not None: raise ValueError("An entry for 0x%x already exists (%r), and " "strict mode is enabled" % ( offset, offset_loc_key )) else: # Non-strict mode if name_loc_key is not None: known_offset = self.get_offset_location(name_loc_key) if known_offset != offset: raise ValueError( "Location with name '%s' already have an offset: 0x%x " "(!= 0x%x)" % (name, offset, known_offset) ) # Name already known, same offset -> nothing to do return name_loc_key elif offset_loc_key is not None: if name is not None: # This is an error. Check for already known name are checked above raise ValueError( "Location with offset 0x%x already exists." "To add a name to this location, use the dedicated API" "'add_location_name(%r, %r)'" % ( offset_loc_key, name )) # Offset already known, no name specified return offset_loc_key # No collision, this is a brand new location loc_key = LocKey(self._loc_key_num) self._loc_key_num += 1 self._loc_keys.add(loc_key) if offset is not None: assert offset not in self._offset_to_loc_key self._offset_to_loc_key[offset] = loc_key self._loc_key_to_offset[loc_key] = offset if name is not None: self._name_to_loc_key[name] = loc_key self._loc_key_to_names[loc_key] = set([name]) return loc_key
def add_location_name(
self, loc_key, name)
Associate a name @name to a given @loc_key @name: str instance @loc_key: LocKey instance
def add_location_name(self, loc_key, name): """Associate a name @name to a given @loc_key @name: str instance @loc_key: LocKey instance """ assert loc_key in self._loc_keys already_existing_loc = self._name_to_loc_key.get(name) if already_existing_loc is not None and already_existing_loc != loc_key: raise KeyError("%r is already associated to a different loc_key " "(%r)" % (name, already_existing_loc)) self._loc_key_to_names.setdefault(loc_key, set()).add(name) self._name_to_loc_key[name] = loc_key
def canonize_to_exprloc(
self, expr)
If expr is ExprInt, return ExprLoc with corresponding loc_key Else, return expr
@expr: Expr instance
def canonize_to_exprloc(self, expr): """ If expr is ExprInt, return ExprLoc with corresponding loc_key Else, return expr @expr: Expr instance """ if expr.is_int(): loc_key = self.get_or_create_offset_location(int(expr)) ret = ExprLoc(loc_key, expr.size) return ret return expr
def consistency_check(
self)
Ensure internal structures are consistent with each others
def consistency_check(self): """Ensure internal structures are consistent with each others""" assert set(self._loc_key_to_names).issubset(self._loc_keys) assert set(self._loc_key_to_offset).issubset(self._loc_keys) assert self._loc_key_to_offset == {v: k for k, v in self._offset_to_loc_key.iteritems()} assert reduce( lambda x, y:x.union(y), self._loc_key_to_names.itervalues(), set(), ) == set(self._name_to_loc_key) for name, loc_key in self._name_to_loc_key.iteritems(): assert name in self._loc_key_to_names[loc_key]
def del_loc_key_offset(
self, loc_key)
[DEPRECATED API], see 'unset_location_offset'
def del_loc_key_offset(self, loc_key): """[DEPRECATED API], see 'unset_location_offset'""" warnings.warn("Deprecated API: use 'unset_location_offset'") self.unset_location_offset(loc_key)
def gen_loc_key(
self)
[DEPRECATED API], see 'add_location'
def gen_loc_key(self): """[DEPRECATED API], see 'add_location'""" warnings.warn("Deprecated API: use 'add_location'") return self.add_location()
def get_location_names(
self, loc_key)
Return the frozenset of names associated to @loc_key @loc_key: LocKey instance
def get_location_names(self, loc_key): """ Return the frozenset of names associated to @loc_key @loc_key: LocKey instance """ assert isinstance(loc_key, LocKey) return frozenset(self._loc_key_to_names.get(loc_key, set()))
def get_location_offset(
self, loc_key)
Return the offset of @loc_key if any, None otherwise. @loc_key: LocKey instance
def get_location_offset(self, loc_key): """ Return the offset of @loc_key if any, None otherwise. @loc_key: LocKey instance """ assert isinstance(loc_key, LocKey) return self._loc_key_to_offset.get(loc_key)
def get_name_location(
self, name)
Return the LocKey of @name if any, None otherwise. @name: target name
def get_name_location(self, name): """ Return the LocKey of @name if any, None otherwise. @name: target name """ return self._name_to_loc_key.get(name)
def get_name_offset(
self, name)
Return the offset of @name if any, None otherwise. @name: target name
def get_name_offset(self, name): """ Return the offset of @name if any, None otherwise. @name: target name """ loc_key = self.get_name_location(name) if loc_key is None: return None return self.get_location_offset(loc_key)
def get_offset_location(
self, offset)
Return the LocKey of @offset if any, None otherwise. @name: target offset
def get_offset_location(self, offset): """ Return the LocKey of @offset if any, None otherwise. @name: target offset """ return self._offset_to_loc_key.get(offset)
def get_or_create_name_location(
self, name)
Return the LocKey of @name if any, create one otherwise. @name: target name
def get_or_create_name_location(self, name): """ Return the LocKey of @name if any, create one otherwise. @name: target name """ loc_key = self._name_to_loc_key.get(name) if loc_key is not None: return loc_key return self.add_location(name=name)
def get_or_create_offset_location(
self, offset)
Return the LocKey of @offset if any, create one otherwise. @offset: target offset
def get_or_create_offset_location(self, offset): """ Return the LocKey of @offset if any, create one otherwise. @offset: target offset """ loc_key = self._offset_to_loc_key.get(offset) if loc_key is not None: return loc_key return self.add_location(offset=offset)
def getby_name(
self, name)
[DEPRECATED API], see 'get_name_location'
def getby_name(self, name): """[DEPRECATED API], see 'get_name_location'""" warnings.warn("Deprecated API: use 'get_name_location'") return self.get_name_location(name)
def getby_name_create(
self, name)
[DEPRECATED API], see 'get_or_create_name_location'
def getby_name_create(self, name): """[DEPRECATED API], see 'get_or_create_name_location'""" warnings.warn("Deprecated API: use 'get_or_create_name_location'") return self.get_or_create_name_location(name)
def getby_offset(
self, offset)
[DEPRECATED API], see 'get_offset_location'
def getby_offset(self, offset): """[DEPRECATED API], see 'get_offset_location'""" warnings.warn("Deprecated API: use 'get_offset_location'") return self.get_offset_location(offset)
def getby_offset_create(
self, offset)
[DEPRECATED API], see 'get_or_create_offset_location'
def getby_offset_create(self, offset): """[DEPRECATED API], see 'get_or_create_offset_location'""" warnings.warn("Deprecated API: use 'get_or_create_offset_location'") return self.get_or_create_offset_location(offset)
def loc_key_to_name(
self, loc_key)
[DEPRECATED API], see 'get_location_names'
def loc_key_to_name(self, loc_key): """[DEPRECATED API], see 'get_location_names'""" warnings.warn("Deprecated API: use 'get_location_names'") return sorted(self.get_location_names(loc_key))[0]
def loc_key_to_offset(
self, loc_key)
[DEPRECATED API], see 'get_location_offset'
def loc_key_to_offset(self, loc_key): """[DEPRECATED API], see 'get_location_offset'""" warnings.warn("Deprecated API: use 'get_location_offset'") return self.get_location_offset(loc_key)
def merge(
self, location_db)
Merge with another LocationDB @location_db
WARNING: old reference to @location_db information (such as LocKeys) must be retrieved from the updated version of this instance. The dedicated "get_*" APIs may be used for this task
def merge(self, location_db): """Merge with another LocationDB @location_db WARNING: old reference to @location_db information (such as LocKeys) must be retrieved from the updated version of this instance. The dedicated "get_*" APIs may be used for this task """ # A simple merge is not doable here, because LocKey will certainly # collides for foreign_loc_key in location_db.loc_keys: foreign_names = location_db.get_location_names(foreign_loc_key) foreign_offset = location_db.get_location_offset(foreign_loc_key) if foreign_names: init_name = list(foreign_names)[0] else: init_name = None loc_key = self.add_location(offset=foreign_offset, name=init_name, strict=False) cur_names = self.get_location_names(loc_key) for name in foreign_names: if name not in cur_names and name != init_name: self.add_location_name(loc_key, name=name)
def pretty_str(
self, loc_key)
Return a human readable version of @loc_key, according to information available in this LocationDB instance
def pretty_str(self, loc_key): """Return a human readable version of @loc_key, according to information available in this LocationDB instance""" names = self.get_location_names(loc_key) if names: return ",".join(names) offset = self.get_location_offset(loc_key) if offset is not None: return "loc_%x" % offset return str(loc_key)
def remove_loc_key(
self, loc_key)
[DEPRECATED API], see 'remove_location'
def remove_loc_key(self, loc_key): """[DEPRECATED API], see 'remove_location'""" warnings.warn("Deprecated API: use 'remove_location'") self.remove_location(loc_key)
def remove_location(
self, loc_key)
Delete the location corresponding to @loc_key @loc_key: LocKey instance
def remove_location(self, loc_key): """ Delete the location corresponding to @loc_key @loc_key: LocKey instance """ assert isinstance(loc_key, LocKey) if loc_key not in self._loc_keys: raise KeyError("Unknown loc_key %r" % loc_key) names = self._loc_key_to_names.pop(loc_key, []) for name in names: del self._name_to_loc_key[name] offset = self._loc_key_to_offset.pop(loc_key, None) self._offset_to_loc_key.pop(offset, None) self._loc_keys.remove(loc_key)
def remove_location_name(
self, loc_key, name)
Disassociate a name @name from a given @loc_key Fail if @name is not already associated to @loc_key @name: str instance @loc_key: LocKey instance
def remove_location_name(self, loc_key, name): """Disassociate a name @name from a given @loc_key Fail if @name is not already associated to @loc_key @name: str instance @loc_key: LocKey instance """ assert loc_key in self._loc_keys already_existing_loc = self._name_to_loc_key.get(name) if already_existing_loc is None: raise KeyError("%r is not already associated" % name) if already_existing_loc != loc_key: raise KeyError("%r is already associated to a different loc_key " "(%r)" % (name, already_existing_loc)) del self._name_to_loc_key[name] self._loc_key_to_names[loc_key].remove(name)
def rename_location(
self, loc_key, newname)
[DEPRECATED API], see 'add_name_location' and 'remove_location_name'
def rename_location(self, loc_key, newname): """[DEPRECATED API], see 'add_name_location' and 'remove_location_name' """ warnings.warn("Deprecated API: use 'add_location_name' and " "'remove_location_name'") for name in self.get_location_names(loc_key): self.remove_location_name(loc_key, name) self.add_location_name(loc_key, name)
def set_location_offset(
self, loc_key, offset, force=False)
Associate the offset @offset to an LocKey @loc_key
If @force is set, override silently. Otherwise, if an offset is already associated to @loc_key, an error will be raised
def set_location_offset(self, loc_key, offset, force=False): """Associate the offset @offset to an LocKey @loc_key If @force is set, override silently. Otherwise, if an offset is already associated to @loc_key, an error will be raised """ assert loc_key in self._loc_keys already_existing_loc = self.get_offset_location(offset) if already_existing_loc is not None and already_existing_loc != loc_key: raise KeyError("%r is already associated to a different loc_key " "(%r)" % (offset, already_existing_loc)) already_existing_off = self._loc_key_to_offset.get(loc_key) if (already_existing_off is not None and already_existing_off != offset): if not force: raise ValueError( "%r already has an offset (0x%x). Use 'force=True'" " for silent overriding" % ( loc_key, already_existing_off )) else: self.unset_location_offset(loc_key) self._offset_to_loc_key[offset] = loc_key self._loc_key_to_offset[loc_key] = offset
def set_offset(
self, loc_key, offset)
[DEPRECATED API], see 'set_location_offset'
def set_offset(self, loc_key, offset): """[DEPRECATED API], see 'set_location_offset'""" warnings.warn("Deprecated API: use 'set_location_offset'") self.set_location_offset(loc_key, offset, force=True)
def str_loc_key(
self, loc_key)
[DEPRECATED API], see 'pretty_str'
def str_loc_key(self, loc_key): """[DEPRECATED API], see 'pretty_str'""" warnings.warn("Deprecated API: use 'pretty_str'") return self.pretty_str(loc_key)
def unset_location_offset(
self, loc_key)
Disassociate LocKey @loc_key's offset
Fail if there is already no offset associate with it @loc_key: LocKey
def unset_location_offset(self, loc_key): """Disassociate LocKey @loc_key's offset Fail if there is already no offset associate with it @loc_key: LocKey """ assert loc_key in self._loc_keys already_existing_off = self._loc_key_to_offset.get(loc_key) if already_existing_off is None: raise ValueError("%r already has no offset" % (loc_key)) del self._offset_to_loc_key[already_existing_off] del self._loc_key_to_offset[loc_key]
class BlockChain
Manage blocks linked with an asm_constraint_next
class BlockChain(object): """Manage blocks linked with an asm_constraint_next""" def __init__(self, loc_db, blocks): self.loc_db = loc_db self.blocks = blocks self.place() @property def pinned(self): """Return True iff at least one block is pinned""" return self.pinned_block_idx is not None def _set_pinned_block_idx(self): self.pinned_block_idx = None for i, block in enumerate(self.blocks): loc_key = block.loc_key if self.loc_db.get_location_offset(loc_key) is not None: if self.pinned_block_idx is not None: raise ValueError("Multiples pinned block detected") self.pinned_block_idx = i def place(self): """Compute BlockChain min_offset and max_offset using pinned block and blocks' size """ self._set_pinned_block_idx() self.max_size = 0 for block in self.blocks: self.max_size += block.max_size + block.alignment - 1 # Check if chain has one block pinned if not self.pinned: return loc = self.blocks[self.pinned_block_idx].loc_key offset_base = self.loc_db.get_location_offset(loc) assert(offset_base % self.blocks[self.pinned_block_idx].alignment == 0) self.offset_min = offset_base for block in self.blocks[:self.pinned_block_idx - 1:-1]: self.offset_min -= block.max_size + \ (block.alignment - block.max_size) % block.alignment self.offset_max = offset_base for block in self.blocks[self.pinned_block_idx:]: self.offset_max += block.max_size + \ (block.alignment - block.max_size) % block.alignment def merge(self, chain): """Best effort merge two block chains Return the list of resulting blockchains""" self.blocks += chain.blocks self.place() return [self] def fix_blocks(self, modified_loc_keys): """Propagate a pinned to its blocks' neighbour @modified_loc_keys: store new pinned loc_keys""" if not self.pinned: raise ValueError('Trying to fix unpinned block') # Propagate offset to blocks before pinned block pinned_block = self.blocks[self.pinned_block_idx] offset = self.loc_db.get_location_offset(pinned_block.loc_key) if offset % pinned_block.alignment != 0: raise RuntimeError('Bad alignment') for block in self.blocks[:self.pinned_block_idx - 1:-1]: new_offset = offset - block.size new_offset = new_offset - new_offset % pinned_block.alignment fix_loc_offset(self.loc_db, block.loc_key, new_offset, modified_loc_keys) # Propagate offset to blocks after pinned block offset = self.loc_db.get_location_offset(pinned_block.loc_key) + pinned_block.size last_block = pinned_block for block in self.blocks[self.pinned_block_idx + 1:]: offset += (- offset) % last_block.alignment fix_loc_offset(self.loc_db, block.loc_key, offset, modified_loc_keys) offset += block.size last_block = block return modified_loc_keys
Ancestors (in MRO)
- BlockChain
- __builtin__.object
Instance variables
var blocks
var loc_db
var pinned
Return True iff at least one block is pinned
Methods
def __init__(
self, loc_db, blocks)
def __init__(self, loc_db, blocks): self.loc_db = loc_db self.blocks = blocks self.place()
def fix_blocks(
self, modified_loc_keys)
Propagate a pinned to its blocks' neighbour @modified_loc_keys: store new pinned loc_keys
def fix_blocks(self, modified_loc_keys): """Propagate a pinned to its blocks' neighbour @modified_loc_keys: store new pinned loc_keys""" if not self.pinned: raise ValueError('Trying to fix unpinned block') # Propagate offset to blocks before pinned block pinned_block = self.blocks[self.pinned_block_idx] offset = self.loc_db.get_location_offset(pinned_block.loc_key) if offset % pinned_block.alignment != 0: raise RuntimeError('Bad alignment') for block in self.blocks[:self.pinned_block_idx - 1:-1]: new_offset = offset - block.size new_offset = new_offset - new_offset % pinned_block.alignment fix_loc_offset(self.loc_db, block.loc_key, new_offset, modified_loc_keys) # Propagate offset to blocks after pinned block offset = self.loc_db.get_location_offset(pinned_block.loc_key) + pinned_block.size last_block = pinned_block for block in self.blocks[self.pinned_block_idx + 1:]: offset += (- offset) % last_block.alignment fix_loc_offset(self.loc_db, block.loc_key, offset, modified_loc_keys) offset += block.size last_block = block return modified_loc_keys
def merge(
self, chain)
Best effort merge two block chains Return the list of resulting blockchains
def merge(self, chain): """Best effort merge two block chains Return the list of resulting blockchains""" self.blocks += chain.blocks self.place() return [self]
def place(
self)
Compute BlockChain min_offset and max_offset using pinned block and blocks' size
def place(self): """Compute BlockChain min_offset and max_offset using pinned block and blocks' size """ self._set_pinned_block_idx() self.max_size = 0 for block in self.blocks: self.max_size += block.max_size + block.alignment - 1 # Check if chain has one block pinned if not self.pinned: return loc = self.blocks[self.pinned_block_idx].loc_key offset_base = self.loc_db.get_location_offset(loc) assert(offset_base % self.blocks[self.pinned_block_idx].alignment == 0) self.offset_min = offset_base for block in self.blocks[:self.pinned_block_idx - 1:-1]: self.offset_min -= block.max_size + \ (block.alignment - block.max_size) % block.alignment self.offset_max = offset_base for block in self.blocks[self.pinned_block_idx:]: self.offset_max += block.max_size + \ (block.alignment - block.max_size) % block.alignment
class BlockChainWedge
Stand for wedges between blocks
class BlockChainWedge(object): """Stand for wedges between blocks""" def __init__(self, loc_db, offset, size): self.loc_db = loc_db self.offset = offset self.max_size = size self.offset_min = offset self.offset_max = offset + size def merge(self, chain): """Best effort merge two block chains Return the list of resulting blockchains""" self.loc_db.set_location_offset(chain.blocks[0].loc_key, self.offset_max) chain.place() return [self, chain]
Ancestors (in MRO)
- BlockChainWedge
- __builtin__.object
Instance variables
var loc_db
var max_size
var offset
var offset_max
var offset_min
Methods
def __init__(
self, loc_db, offset, size)
def __init__(self, loc_db, offset, size): self.loc_db = loc_db self.offset = offset self.max_size = size self.offset_min = offset self.offset_max = offset + size
def merge(
self, chain)
Best effort merge two block chains Return the list of resulting blockchains
def merge(self, chain): """Best effort merge two block chains Return the list of resulting blockchains""" self.loc_db.set_location_offset(chain.blocks[0].loc_key, self.offset_max) chain.place() return [self, chain]
class asm_bloc
class asm_bloc(object): def __init__(self, loc_key, alignment=1): warnings.warn('DEPRECATION WARNING: use "AsmBlock" instead of "asm_bloc"') super(asm_bloc, self).__init__(loc_key, alignment)
Ancestors (in MRO)
- asm_bloc
- __builtin__.object
Methods
def __init__(
self, loc_key, alignment=1)
def __init__(self, loc_key, alignment=1): warnings.warn('DEPRECATION WARNING: use "AsmBlock" instead of "asm_bloc"') super(asm_bloc, self).__init__(loc_key, alignment)
class asm_block_bad
class asm_block_bad(AsmBlockBad): def __init__(self, loc_key=None, alignment=1, errno=-1, *args, **kwargs): warnings.warn('DEPRECATION WARNING: use "AsmBlockBad" instead of "asm_block_bad"') super(asm_block_bad, self).__init__(loc_key, alignment, *args, **kwargs)
Ancestors (in MRO)
- asm_block_bad
- AsmBlockBad
- AsmBlock
- __builtin__.object
Class variables
Instance variables
Methods
def __init__(
self, loc_key=None, alignment=1, errno=-1, *args, **kwargs)
Inheritance:
AsmBlockBad
.__init__
Instanciate an AsmBlock_bad. @loc_key, @alignement: same as AsmBlock.init @errno: (optional) specify a error type associated with the block
def __init__(self, loc_key=None, alignment=1, errno=-1, *args, **kwargs): warnings.warn('DEPRECATION WARNING: use "AsmBlockBad" instead of "asm_block_bad"') super(asm_block_bad, self).__init__(loc_key, alignment, *args, **kwargs)
def add_cst(
self, loc_key, constraint_type)
Inheritance:
AsmBlockBad
.add_cst
Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next
def add_cst(self, loc_key, constraint_type): """ Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next """ assert isinstance(loc_key, LocKey) c = AsmConstraint(loc_key, constraint_type) self.bto.add(c)
def addline(
self, *args, **kwargs)
Inheritance:
AsmBlockBad
.addline
def addline(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot have line")
def addto(
self, *args, **kwargs)
Inheritance:
AsmBlockBad
.addto
def addto(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot have bto")
def fix_constraints(
self)
Inheritance:
AsmBlockBad
.fix_constraints
Fix next block constraints
def fix_constraints(self): """Fix next block constraints""" # destination -> associated constraints dests = {} for constraint in self.bto: dests.setdefault(constraint.loc_key, set()).add(constraint) self.bto = set(self._filter_constraint(constraints) for constraints in dests.itervalues())
def get_flow_instr(
self)
Inheritance:
AsmBlockBad
.get_flow_instr
def get_flow_instr(self): if not self.lines: return None for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1): if not 0 <= i < len(self.lines): return None l = self.lines[i] if l.splitflow() or l.breakflow(): raise NotImplementedError('not fully functional')
def get_label(
self)
Inheritance:
AsmBlockBad
.get_label
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def get_next(
self)
Inheritance:
AsmBlockBad
.get_next
def get_next(self): for constraint in self.bto: if constraint.c_t == AsmConstraint.c_next: return constraint.loc_key return None
def get_offsets(
self)
Inheritance:
AsmBlockBad
.get_offsets
def get_offsets(self): return [x.offset for x in self.lines]
def get_range(
self)
Inheritance:
AsmBlockBad
.get_range
Returns the offset hull of an AsmBlock
def get_range(self): """Returns the offset hull of an AsmBlock""" if len(self.lines): return (self.lines[0].offset, self.lines[-1].offset + self.lines[-1].l) else: return 0, 0
def get_subcall_instr(
self)
Inheritance:
AsmBlockBad
.get_subcall_instr
def get_subcall_instr(self): if not self.lines: return None delayslot = self.lines[0].delayslot end_index = len(self.lines) - 1 ds_max_index = max(end_index - delayslot, 0) for i in xrange(end_index, ds_max_index - 1, -1): l = self.lines[i] if l.is_subcall(): return l return None
def split(
self, *args, **kwargs)
Inheritance:
AsmBlockBad
.split
def split(self, *args, **kwargs): raise RuntimeError("An AsmBlockBad cannot be splitted")
def to_string(
self, loc_db=None)
Inheritance:
AsmBlockBad
.to_string
def to_string(self, loc_db=None): out = [] if loc_db is None: out.append(str(self.loc_key)) else: out.append(loc_db.pretty_str(self.loc_key)) for instr in self.lines: out.append(instr.to_string(loc_db)) if self.bto: lbls = ["->"] for dst in self.bto: if dst is None: lbls.append("Unknown? ") else: lbls.append(dst.to_string(loc_db) + " ") lbls = '\t'.join(sorted(lbls)) out.append(lbls) return '\n'.join(out)
class asm_constraint
class asm_constraint(AsmConstraint): def __init__(self, loc_key, c_t=AsmConstraint.c_to): warnings.warn('DEPRECATION WARNING: use "AsmConstraint" instead of "asm_constraint"') super(asm_constraint, self).__init__(loc_key, c_t)
Ancestors (in MRO)
- asm_constraint
- AsmConstraint
- __builtin__.object
Class variables
Instance variables
Methods
def __init__(
self, loc_key, c_t='c_to')
Inheritance:
AsmConstraint
.__init__
def __init__(self, loc_key, c_t=AsmConstraint.c_to): warnings.warn('DEPRECATION WARNING: use "AsmConstraint" instead of "asm_constraint"') super(asm_constraint, self).__init__(loc_key, c_t)
def get_label(
self)
Inheritance:
AsmConstraint
.get_label
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def set_label(
self, loc_key)
Inheritance:
AsmConstraint
.set_label
def set_label(self, loc_key): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') self.loc_key = loc_key
def to_string(
self, loc_db=None)
Inheritance:
AsmConstraint
.to_string
def to_string(self, loc_db=None): if loc_db is None: return "%s:%s" % (self.c_t, self.loc_key) else: return "%s:%s" % ( self.c_t, loc_db.pretty_str(self.loc_key) )
class asm_constraint_next
class asm_constraint_next(AsmConstraint): def __init__(self, loc_key): warnings.warn('DEPRECATION WARNING: use "AsmConstraintNext" instead of "asm_constraint_next"') super(asm_constraint_next, self).__init__(loc_key)
Ancestors (in MRO)
- asm_constraint_next
- AsmConstraint
- __builtin__.object
Class variables
Instance variables
Methods
def __init__(
self, loc_key)
Inheritance:
AsmConstraint
.__init__
def __init__(self, loc_key): warnings.warn('DEPRECATION WARNING: use "AsmConstraintNext" instead of "asm_constraint_next"') super(asm_constraint_next, self).__init__(loc_key)
def get_label(
self)
Inheritance:
AsmConstraint
.get_label
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def set_label(
self, loc_key)
Inheritance:
AsmConstraint
.set_label
def set_label(self, loc_key): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') self.loc_key = loc_key
def to_string(
self, loc_db=None)
Inheritance:
AsmConstraint
.to_string
def to_string(self, loc_db=None): if loc_db is None: return "%s:%s" % (self.c_t, self.loc_key) else: return "%s:%s" % ( self.c_t, loc_db.pretty_str(self.loc_key) )
class asm_constraint_to
class asm_constraint_to(AsmConstraint): def __init__(self, loc_key): warnings.warn('DEPRECATION WARNING: use "AsmConstraintTo" instead of "asm_constraint_to"') super(asm_constraint_to, self).__init__(loc_key)
Ancestors (in MRO)
- asm_constraint_to
- AsmConstraint
- __builtin__.object
Class variables
Instance variables
Methods
def __init__(
self, loc_key)
Inheritance:
AsmConstraint
.__init__
def __init__(self, loc_key): warnings.warn('DEPRECATION WARNING: use "AsmConstraintTo" instead of "asm_constraint_to"') super(asm_constraint_to, self).__init__(loc_key)
def get_label(
self)
Inheritance:
AsmConstraint
.get_label
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def set_label(
self, loc_key)
Inheritance:
AsmConstraint
.set_label
def set_label(self, loc_key): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') self.loc_key = loc_key
def to_string(
self, loc_db=None)
Inheritance:
AsmConstraint
.to_string
def to_string(self, loc_db=None): if loc_db is None: return "%s:%s" % (self.c_t, self.loc_key) else: return "%s:%s" % ( self.c_t, loc_db.pretty_str(self.loc_key) )
class asm_raw
class asm_raw(AsmRaw): def __init__(self, raw=""): warnings.warn('DEPRECATION WARNING: use "AsmRaw" instead of "asm_raw"') super(asm_label, self).__init__(raw)
Ancestors (in MRO)
Instance variables
Methods
def __init__(
self, raw='')
def __init__(self, raw=""): warnings.warn('DEPRECATION WARNING: use "AsmRaw" instead of "asm_raw"') super(asm_label, self).__init__(raw)
def to_string(
self, loc_db)
def to_string(self, loc_db): return str(self)
class asm_symbol_pool
class asm_symbol_pool(AsmSymbolPool): def __init__(self): warnings.warn('DEPRECATION WARNING: use "LocationDB" instead of "asm_symbol_pool"') super(asm_symbol_pool, self).__init__()
Ancestors (in MRO)
- asm_symbol_pool
- AsmSymbolPool
- miasm2.core.locationdb.LocationDB
- __builtin__.object
Instance variables
Methods
def __init__(
self)
Inheritance:
AsmSymbolPool
.__init__
def __init__(self): warnings.warn('DEPRECATION WARNING: use "LocationDB" instead of "asm_symbol_pool"') super(asm_symbol_pool, self).__init__()
def add_location(
self, name=None, offset=None, strict=True)
Inheritance:
AsmSymbolPool
.add_location
Add a new location in the locationDB. Returns the corresponding LocKey. If @name is set, also associate a name to this new location. If @offset is set, also associate an offset to this new location.
Strict mode (set by @strict, default): If a location with @offset or @name already exists, an error will be raised. Otherwise: If a location with @offset or @name already exists, the corresponding LocKey will be returned.
def add_location(self, name=None, offset=None, strict=True): """Add a new location in the locationDB. Returns the corresponding LocKey. If @name is set, also associate a name to this new location. If @offset is set, also associate an offset to this new location. Strict mode (set by @strict, default): If a location with @offset or @name already exists, an error will be raised. Otherwise: If a location with @offset or @name already exists, the corresponding LocKey will be returned. """ # Deprecation handling if is_int(name): assert offset is None or offset == name warnings.warn("Deprecated API: use 'add_location(offset=)' instead." " An additionnal 'name=' can be provided to also " "associate a name (there is no more default name)") offset = name name = None # Argument cleaning offset_loc_key = None if offset is not None: offset = int(offset) offset_loc_key = self.get_offset_location(offset) # Test for collisions name_loc_key = None if name is not None: name_loc_key = self.get_name_location(name) if strict: if name_loc_key is not None: raise ValueError("An entry for %r already exists (%r), and " "strict mode is enabled" % ( name, name_loc_key )) if offset_loc_key is not None: raise ValueError("An entry for 0x%x already exists (%r), and " "strict mode is enabled" % ( offset, offset_loc_key )) else: # Non-strict mode if name_loc_key is not None: known_offset = self.get_offset_location(name_loc_key) if known_offset != offset: raise ValueError( "Location with name '%s' already have an offset: 0x%x " "(!= 0x%x)" % (name, offset, known_offset) ) # Name already known, same offset -> nothing to do return name_loc_key elif offset_loc_key is not None: if name is not None: # This is an error. Check for already known name are checked above raise ValueError( "Location with offset 0x%x already exists." "To add a name to this location, use the dedicated API" "'add_location_name(%r, %r)'" % ( offset_loc_key, name )) # Offset already known, no name specified return offset_loc_key # No collision, this is a brand new location loc_key = LocKey(self._loc_key_num) self._loc_key_num += 1 self._loc_keys.add(loc_key) if offset is not None: assert offset not in self._offset_to_loc_key self._offset_to_loc_key[offset] = loc_key self._loc_key_to_offset[loc_key] = offset if name is not None: self._name_to_loc_key[name] = loc_key self._loc_key_to_names[loc_key] = set([name]) return loc_key
def add_location_name(
self, loc_key, name)
Inheritance:
AsmSymbolPool
.add_location_name
Associate a name @name to a given @loc_key @name: str instance @loc_key: LocKey instance
def add_location_name(self, loc_key, name): """Associate a name @name to a given @loc_key @name: str instance @loc_key: LocKey instance """ assert loc_key in self._loc_keys already_existing_loc = self._name_to_loc_key.get(name) if already_existing_loc is not None and already_existing_loc != loc_key: raise KeyError("%r is already associated to a different loc_key " "(%r)" % (name, already_existing_loc)) self._loc_key_to_names.setdefault(loc_key, set()).add(name) self._name_to_loc_key[name] = loc_key
def canonize_to_exprloc(
self, expr)
Inheritance:
AsmSymbolPool
.canonize_to_exprloc
If expr is ExprInt, return ExprLoc with corresponding loc_key Else, return expr
@expr: Expr instance
def canonize_to_exprloc(self, expr): """ If expr is ExprInt, return ExprLoc with corresponding loc_key Else, return expr @expr: Expr instance """ if expr.is_int(): loc_key = self.get_or_create_offset_location(int(expr)) ret = ExprLoc(loc_key, expr.size) return ret return expr
def consistency_check(
self)
Inheritance:
AsmSymbolPool
.consistency_check
Ensure internal structures are consistent with each others
def consistency_check(self): """Ensure internal structures are consistent with each others""" assert set(self._loc_key_to_names).issubset(self._loc_keys) assert set(self._loc_key_to_offset).issubset(self._loc_keys) assert self._loc_key_to_offset == {v: k for k, v in self._offset_to_loc_key.iteritems()} assert reduce( lambda x, y:x.union(y), self._loc_key_to_names.itervalues(), set(), ) == set(self._name_to_loc_key) for name, loc_key in self._name_to_loc_key.iteritems(): assert name in self._loc_key_to_names[loc_key]
def del_loc_key_offset(
self, loc_key)
Inheritance:
AsmSymbolPool
.del_loc_key_offset
[DEPRECATED API], see 'unset_location_offset'
def del_loc_key_offset(self, loc_key): """[DEPRECATED API], see 'unset_location_offset'""" warnings.warn("Deprecated API: use 'unset_location_offset'") self.unset_location_offset(loc_key)
def gen_loc_key(
self)
Inheritance:
AsmSymbolPool
.gen_loc_key
[DEPRECATED API], see 'add_location'
def gen_loc_key(self): """[DEPRECATED API], see 'add_location'""" warnings.warn("Deprecated API: use 'add_location'") return self.add_location()
def get_location_names(
self, loc_key)
Inheritance:
AsmSymbolPool
.get_location_names
Return the frozenset of names associated to @loc_key @loc_key: LocKey instance
def get_location_names(self, loc_key): """ Return the frozenset of names associated to @loc_key @loc_key: LocKey instance """ assert isinstance(loc_key, LocKey) return frozenset(self._loc_key_to_names.get(loc_key, set()))
def get_location_offset(
self, loc_key)
Inheritance:
AsmSymbolPool
.get_location_offset
Return the offset of @loc_key if any, None otherwise. @loc_key: LocKey instance
def get_location_offset(self, loc_key): """ Return the offset of @loc_key if any, None otherwise. @loc_key: LocKey instance """ assert isinstance(loc_key, LocKey) return self._loc_key_to_offset.get(loc_key)
def get_name_location(
self, name)
Inheritance:
AsmSymbolPool
.get_name_location
Return the LocKey of @name if any, None otherwise. @name: target name
def get_name_location(self, name): """ Return the LocKey of @name if any, None otherwise. @name: target name """ return self._name_to_loc_key.get(name)
def get_name_offset(
self, name)
Inheritance:
AsmSymbolPool
.get_name_offset
Return the offset of @name if any, None otherwise. @name: target name
def get_name_offset(self, name): """ Return the offset of @name if any, None otherwise. @name: target name """ loc_key = self.get_name_location(name) if loc_key is None: return None return self.get_location_offset(loc_key)
def get_offset_location(
self, offset)
Inheritance:
AsmSymbolPool
.get_offset_location
Return the LocKey of @offset if any, None otherwise. @name: target offset
def get_offset_location(self, offset): """ Return the LocKey of @offset if any, None otherwise. @name: target offset """ return self._offset_to_loc_key.get(offset)
def get_or_create_name_location(
self, name)
Inheritance:
AsmSymbolPool
.get_or_create_name_location
Return the LocKey of @name if any, create one otherwise. @name: target name
def get_or_create_name_location(self, name): """ Return the LocKey of @name if any, create one otherwise. @name: target name """ loc_key = self._name_to_loc_key.get(name) if loc_key is not None: return loc_key return self.add_location(name=name)
def get_or_create_offset_location(
self, offset)
Inheritance:
AsmSymbolPool
.get_or_create_offset_location
Return the LocKey of @offset if any, create one otherwise. @offset: target offset
def get_or_create_offset_location(self, offset): """ Return the LocKey of @offset if any, create one otherwise. @offset: target offset """ loc_key = self._offset_to_loc_key.get(offset) if loc_key is not None: return loc_key return self.add_location(offset=offset)
def getby_name(
self, name)
Inheritance:
AsmSymbolPool
.getby_name
[DEPRECATED API], see 'get_name_location'
def getby_name(self, name): """[DEPRECATED API], see 'get_name_location'""" warnings.warn("Deprecated API: use 'get_name_location'") return self.get_name_location(name)
def getby_name_create(
self, name)
Inheritance:
AsmSymbolPool
.getby_name_create
[DEPRECATED API], see 'get_or_create_name_location'
def getby_name_create(self, name): """[DEPRECATED API], see 'get_or_create_name_location'""" warnings.warn("Deprecated API: use 'get_or_create_name_location'") return self.get_or_create_name_location(name)
def getby_offset(
self, offset)
Inheritance:
AsmSymbolPool
.getby_offset
[DEPRECATED API], see 'get_offset_location'
def getby_offset(self, offset): """[DEPRECATED API], see 'get_offset_location'""" warnings.warn("Deprecated API: use 'get_offset_location'") return self.get_offset_location(offset)
def getby_offset_create(
self, offset)
Inheritance:
AsmSymbolPool
.getby_offset_create
[DEPRECATED API], see 'get_or_create_offset_location'
def getby_offset_create(self, offset): """[DEPRECATED API], see 'get_or_create_offset_location'""" warnings.warn("Deprecated API: use 'get_or_create_offset_location'") return self.get_or_create_offset_location(offset)
def loc_key_to_name(
self, loc_key)
Inheritance:
AsmSymbolPool
.loc_key_to_name
[DEPRECATED API], see 'get_location_names'
def loc_key_to_name(self, loc_key): """[DEPRECATED API], see 'get_location_names'""" warnings.warn("Deprecated API: use 'get_location_names'") return sorted(self.get_location_names(loc_key))[0]
def loc_key_to_offset(
self, loc_key)
Inheritance:
AsmSymbolPool
.loc_key_to_offset
[DEPRECATED API], see 'get_location_offset'
def loc_key_to_offset(self, loc_key): """[DEPRECATED API], see 'get_location_offset'""" warnings.warn("Deprecated API: use 'get_location_offset'") return self.get_location_offset(loc_key)
def merge(
self, location_db)
Inheritance:
AsmSymbolPool
.merge
Merge with another LocationDB @location_db
WARNING: old reference to @location_db information (such as LocKeys) must be retrieved from the updated version of this instance. The dedicated "get_*" APIs may be used for this task
def merge(self, location_db): """Merge with another LocationDB @location_db WARNING: old reference to @location_db information (such as LocKeys) must be retrieved from the updated version of this instance. The dedicated "get_*" APIs may be used for this task """ # A simple merge is not doable here, because LocKey will certainly # collides for foreign_loc_key in location_db.loc_keys: foreign_names = location_db.get_location_names(foreign_loc_key) foreign_offset = location_db.get_location_offset(foreign_loc_key) if foreign_names: init_name = list(foreign_names)[0] else: init_name = None loc_key = self.add_location(offset=foreign_offset, name=init_name, strict=False) cur_names = self.get_location_names(loc_key) for name in foreign_names: if name not in cur_names and name != init_name: self.add_location_name(loc_key, name=name)
def pretty_str(
self, loc_key)
Inheritance:
AsmSymbolPool
.pretty_str
Return a human readable version of @loc_key, according to information available in this LocationDB instance
def pretty_str(self, loc_key): """Return a human readable version of @loc_key, according to information available in this LocationDB instance""" names = self.get_location_names(loc_key) if names: return ",".join(names) offset = self.get_location_offset(loc_key) if offset is not None: return "loc_%x" % offset return str(loc_key)
def remove_loc_key(
self, loc_key)
Inheritance:
AsmSymbolPool
.remove_loc_key
[DEPRECATED API], see 'remove_location'
def remove_loc_key(self, loc_key): """[DEPRECATED API], see 'remove_location'""" warnings.warn("Deprecated API: use 'remove_location'") self.remove_location(loc_key)
def remove_location(
self, loc_key)
Inheritance:
AsmSymbolPool
.remove_location
Delete the location corresponding to @loc_key @loc_key: LocKey instance
def remove_location(self, loc_key): """ Delete the location corresponding to @loc_key @loc_key: LocKey instance """ assert isinstance(loc_key, LocKey) if loc_key not in self._loc_keys: raise KeyError("Unknown loc_key %r" % loc_key) names = self._loc_key_to_names.pop(loc_key, []) for name in names: del self._name_to_loc_key[name] offset = self._loc_key_to_offset.pop(loc_key, None) self._offset_to_loc_key.pop(offset, None) self._loc_keys.remove(loc_key)
def remove_location_name(
self, loc_key, name)
Inheritance:
AsmSymbolPool
.remove_location_name
Disassociate a name @name from a given @loc_key Fail if @name is not already associated to @loc_key @name: str instance @loc_key: LocKey instance
def remove_location_name(self, loc_key, name): """Disassociate a name @name from a given @loc_key Fail if @name is not already associated to @loc_key @name: str instance @loc_key: LocKey instance """ assert loc_key in self._loc_keys already_existing_loc = self._name_to_loc_key.get(name) if already_existing_loc is None: raise KeyError("%r is not already associated" % name) if already_existing_loc != loc_key: raise KeyError("%r is already associated to a different loc_key " "(%r)" % (name, already_existing_loc)) del self._name_to_loc_key[name] self._loc_key_to_names[loc_key].remove(name)
def rename_location(
self, loc_key, newname)
Inheritance:
AsmSymbolPool
.rename_location
[DEPRECATED API], see 'add_name_location' and 'remove_location_name'
def rename_location(self, loc_key, newname): """[DEPRECATED API], see 'add_name_location' and 'remove_location_name' """ warnings.warn("Deprecated API: use 'add_location_name' and " "'remove_location_name'") for name in self.get_location_names(loc_key): self.remove_location_name(loc_key, name) self.add_location_name(loc_key, name)
def set_location_offset(
self, loc_key, offset, force=False)
Inheritance:
AsmSymbolPool
.set_location_offset
Associate the offset @offset to an LocKey @loc_key
If @force is set, override silently. Otherwise, if an offset is already associated to @loc_key, an error will be raised
def set_location_offset(self, loc_key, offset, force=False): """Associate the offset @offset to an LocKey @loc_key If @force is set, override silently. Otherwise, if an offset is already associated to @loc_key, an error will be raised """ assert loc_key in self._loc_keys already_existing_loc = self.get_offset_location(offset) if already_existing_loc is not None and already_existing_loc != loc_key: raise KeyError("%r is already associated to a different loc_key " "(%r)" % (offset, already_existing_loc)) already_existing_off = self._loc_key_to_offset.get(loc_key) if (already_existing_off is not None and already_existing_off != offset): if not force: raise ValueError( "%r already has an offset (0x%x). Use 'force=True'" " for silent overriding" % ( loc_key, already_existing_off )) else: self.unset_location_offset(loc_key) self._offset_to_loc_key[offset] = loc_key self._loc_key_to_offset[loc_key] = offset
def set_offset(
self, loc_key, offset)
Inheritance:
AsmSymbolPool
.set_offset
[DEPRECATED API], see 'set_location_offset'
def set_offset(self, loc_key, offset): """[DEPRECATED API], see 'set_location_offset'""" warnings.warn("Deprecated API: use 'set_location_offset'") self.set_location_offset(loc_key, offset, force=True)
def str_loc_key(
self, loc_key)
Inheritance:
AsmSymbolPool
.str_loc_key
[DEPRECATED API], see 'pretty_str'
def str_loc_key(self, loc_key): """[DEPRECATED API], see 'pretty_str'""" warnings.warn("Deprecated API: use 'pretty_str'") return self.pretty_str(loc_key)
def unset_location_offset(
self, loc_key)
Inheritance:
AsmSymbolPool
.unset_location_offset
Disassociate LocKey @loc_key's offset
Fail if there is already no offset associate with it @loc_key: LocKey
def unset_location_offset(self, loc_key): """Disassociate LocKey @loc_key's offset Fail if there is already no offset associate with it @loc_key: LocKey """ assert loc_key in self._loc_keys already_existing_off = self._loc_key_to_offset.get(loc_key) if already_existing_off is None: raise ValueError("%r already has no offset" % (loc_key)) del self._offset_to_loc_key[already_existing_off] del self._loc_key_to_offset[loc_key]
class disasmEngine
Disassembly engine, taking care of disassembler options and mutli-block strategy.
Engine options:
- Object supporting membership test (offset in ..)
- dont_dis: stop the current disassembly branch if reached
- split_dis: force a basic block end if reached, with a next constraint on its successor
-
dont_dis_retcall_funcs: stop disassembly after a call to one of the given functions
-
On/Off
- follow_call: recursively disassemble CALL destinations
- dontdis_retcall: stop on CALL return addresses
-
dont_dis_nulstart_bloc: stop if a block begin with a few
-
Number
- lines_wd: maximum block's size (in number of instruction)
-
blocs_wd: maximum number of distinct disassembled block
-
callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis, loc_db)
- dis_block_callback: callback after each new disassembled block
class disasmEngine(object): """Disassembly engine, taking care of disassembler options and mutli-block strategy. Engine options: + Object supporting membership test (offset in ..) - dont_dis: stop the current disassembly branch if reached - split_dis: force a basic block end if reached, with a next constraint on its successor - dont_dis_retcall_funcs: stop disassembly after a call to one of the given functions + On/Off - follow_call: recursively disassemble CALL destinations - dontdis_retcall: stop on CALL return addresses - dont_dis_nulstart_bloc: stop if a block begin with a few \x00 + Number - lines_wd: maximum block's size (in number of instruction) - blocs_wd: maximum number of distinct disassembled block + callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis, loc_db) - dis_block_callback: callback after each new disassembled block """ def __init__(self, arch, attrib, bin_stream, **kwargs): """Instanciate a new disassembly engine @arch: targeted architecture @attrib: architecture attribute @bin_stream: bytes source @kwargs: (optional) custom options """ self.arch = arch self.attrib = attrib self.bin_stream = bin_stream self.loc_db = LocationDB() # Setup options self.dont_dis = [] self.split_dis = [] self.follow_call = False self.dontdis_retcall = False self.lines_wd = None self.blocs_wd = None self.dis_block_callback = None self.dont_dis_nulstart_bloc = False self.dont_dis_retcall_funcs = set() # Override options if needed self.__dict__.update(kwargs) def get_job_done(self): warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""") return set() def set_job_done(self, _): warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""") return def get_dis_bloc_callback(self): warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""") return self.dis_block_callback def set_dis_bloc_callback(self, function): warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""") self.dis_block_callback = function @property def symbol_pool(self): warnings.warn("""DEPRECATION WARNING: use 'loc_db'""") return self.loc_db # Deprecated job_done = property(get_job_done, set_job_done) dis_bloc_callback = property(get_dis_bloc_callback, set_dis_bloc_callback) def _dis_block(self, offset, job_done=None): """Disassemble the block at offset @offset @job_done: a set of already disassembled addresses Return the created AsmBlock and future offsets to disassemble """ if job_done is None: job_done = set() lines_cpt = 0 in_delayslot = False delayslot_count = self.arch.delayslot offsets_to_dis = set() add_next_offset = False loc_key = self.loc_db.get_or_create_offset_location(offset) cur_block = AsmBlock(loc_key) log_asmblock.debug("dis at %X", int(offset)) while not in_delayslot or delayslot_count > 0: if in_delayslot: delayslot_count -= 1 if offset in self.dont_dis: if not cur_block.lines: job_done.add(offset) # Block is empty -> bad block cur_block = AsmBlockBad(loc_key, errno=AsmBlockBad.ERROR_FORBIDDEN) else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block loc_key_cst = self.loc_db.get_or_create_offset_location(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break if lines_cpt > 0 and offset in self.split_dis: loc_key_cst = self.loc_db.get_or_create_offset_location(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) offsets_to_dis.add(offset) break lines_cpt += 1 if self.lines_wd is not None and lines_cpt > self.lines_wd: log_asmblock.debug("lines watchdog reached at %X", int(offset)) break if offset in job_done: loc_key_cst = self.loc_db.get_or_create_offset_location(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break off_i = offset error = None try: instr = self.arch.dis(self.bin_stream, self.attrib, offset) except Disasm_Exception as e: log_asmblock.warning(e) instr = None error = AsmBlockBad.ERROR_CANNOT_DISASM except IOError as e: log_asmblock.warning(e) instr = None error = AsmBlockBad.ERROR_IO if instr is None: log_asmblock.warning("cannot disasm at %X", int(off_i)) if not cur_block.lines: job_done.add(offset) # Block is empty -> bad block cur_block = AsmBlockBad(loc_key, errno=error) else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block loc_key_cst = self.loc_db.get_or_create_offset_location(off_i) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break # XXX TODO nul start block option if self.dont_dis_nulstart_bloc and instr.b.count('\x00') == instr.l: log_asmblock.warning("reach nul instr at %X", int(off_i)) if not cur_block.lines: # Block is empty -> bad block cur_block = AsmBlockBad(loc_key, errno=AsmBlockBad.ERROR_NULL_STARTING_BLOCK) else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block loc_key_cst = self.loc_db.get_or_create_offset_location(off_i) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break # special case: flow graph modificator in delayslot if in_delayslot and instr and (instr.splitflow() or instr.breakflow()): add_next_offset = True break job_done.add(offset) log_asmblock.debug("dis at %X", int(offset)) offset += instr.l log_asmblock.debug(instr) log_asmblock.debug(instr.args) cur_block.addline(instr) if not instr.breakflow(): continue # test split if instr.splitflow() and not (instr.is_subcall() and self.dontdis_retcall): add_next_offset = True if instr.dstflow(): instr.dstflow2label(self.loc_db) destinations = instr.getdstflow(self.loc_db) known_dsts = [] for dst in destinations: if not dst.is_loc(): continue loc_key = dst.loc_key loc_key_offset = self.loc_db.get_location_offset(loc_key) known_dsts.append(loc_key) if loc_key_offset in self.dont_dis_retcall_funcs: add_next_offset = False if (not instr.is_subcall()) or self.follow_call: cur_block.bto.update([AsmConstraint(loc_key, AsmConstraint.c_to) for loc_key in known_dsts]) # get in delayslot mode in_delayslot = True delayslot_count = instr.delayslot for c in cur_block.bto: loc_key_offset = self.loc_db.get_location_offset(c.loc_key) offsets_to_dis.add(loc_key_offset) if add_next_offset: loc_key_cst = self.loc_db.get_or_create_offset_location(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) offsets_to_dis.add(offset) # Fix multiple constraints cur_block.fix_constraints() if self.dis_block_callback is not None: self.dis_block_callback(mn=self.arch, attrib=self.attrib, pool_bin=self.bin_stream, cur_bloc=cur_block, offsets_to_dis=offsets_to_dis, loc_db=self.loc_db, # Deprecated API symbol_pool=self.loc_db) return cur_block, offsets_to_dis def dis_block(self, offset): """Disassemble the block at offset @offset and return the created AsmBlock @offset: targeted offset to disassemble """ current_block, _ = self._dis_block(offset) return current_block def dis_bloc(self, offset): """ DEPRECATED function Use dis_block instead of dis_bloc """ warnings.warn('DEPRECATION WARNING: use "dis_block" instead of "dis_bloc"') return self.dis_block(offset) def dis_multiblock(self, offset, blocks=None): """Disassemble every block reachable from @offset regarding specific disasmEngine conditions Return an AsmCFG instance containing disassembled blocks @offset: starting offset @blocks: (optional) AsmCFG instance of already disassembled blocks to merge with """ log_asmblock.info("dis bloc all") job_done = set() if blocks is None: blocks = AsmCFG(self.loc_db) todo = [offset] bloc_cpt = 0 while len(todo): bloc_cpt += 1 if self.blocs_wd is not None and bloc_cpt > self.blocs_wd: log_asmblock.debug("blocks watchdog reached at %X", int(offset)) break target_offset = int(todo.pop(0)) if (target_offset is None or target_offset in job_done): continue cur_block, nexts = self._dis_block(target_offset, job_done) todo += nexts blocks.add_block(cur_block) blocks.apply_splitting(self.loc_db, dis_block_callback=self.dis_block_callback, mn=self.arch, attrib=self.attrib, pool_bin=self.bin_stream) return blocks def dis_multibloc(self, offset, blocs=None): """ DEPRECATED function Use dis_multiblock instead of dis_multibloc """ warnings.warn('DEPRECATION WARNING: use "dis_multiblock" instead of "dis_multibloc"') return self.dis_multiblock(offset, blocs) def dis_instr(self, offset): """Disassemble one instruction at offset @offset and return the corresponding instruction instance @offset: targeted offset to disassemble """ old_lineswd = self.lines_wd self.lines_wd = 1 try: block = self.dis_block(offset) finally: self.lines_wd = old_lineswd instr = block.lines[0] return instr
Ancestors (in MRO)
- disasmEngine
- __builtin__.object
Class variables
var dis_bloc_callback
var job_done
Instance variables
var arch
var attrib
var bin_stream
var blocs_wd
var dis_bloc_callback
var dis_block_callback
var dont_dis
var dont_dis_nulstart_bloc
var dont_dis_retcall_funcs
var dontdis_retcall
var follow_call
var job_done
var lines_wd
var loc_db
var split_dis
var symbol_pool
Methods
def __init__(
self, arch, attrib, bin_stream, **kwargs)
Instanciate a new disassembly engine @arch: targeted architecture @attrib: architecture attribute @bin_stream: bytes source @kwargs: (optional) custom options
def __init__(self, arch, attrib, bin_stream, **kwargs): """Instanciate a new disassembly engine @arch: targeted architecture @attrib: architecture attribute @bin_stream: bytes source @kwargs: (optional) custom options """ self.arch = arch self.attrib = attrib self.bin_stream = bin_stream self.loc_db = LocationDB() # Setup options self.dont_dis = [] self.split_dis = [] self.follow_call = False self.dontdis_retcall = False self.lines_wd = None self.blocs_wd = None self.dis_block_callback = None self.dont_dis_nulstart_bloc = False self.dont_dis_retcall_funcs = set() # Override options if needed self.__dict__.update(kwargs)
def dis_bloc(
self, offset)
DEPRECATED function Use dis_block instead of dis_bloc
def dis_bloc(self, offset): """ DEPRECATED function Use dis_block instead of dis_bloc """ warnings.warn('DEPRECATION WARNING: use "dis_block" instead of "dis_bloc"') return self.dis_block(offset)
def dis_block(
self, offset)
Disassemble the block at offset @offset and return the created AsmBlock @offset: targeted offset to disassemble
def dis_block(self, offset): """Disassemble the block at offset @offset and return the created AsmBlock @offset: targeted offset to disassemble """ current_block, _ = self._dis_block(offset) return current_block
def dis_instr(
self, offset)
Disassemble one instruction at offset @offset and return the corresponding instruction instance @offset: targeted offset to disassemble
def dis_instr(self, offset): """Disassemble one instruction at offset @offset and return the corresponding instruction instance @offset: targeted offset to disassemble """ old_lineswd = self.lines_wd self.lines_wd = 1 try: block = self.dis_block(offset) finally: self.lines_wd = old_lineswd instr = block.lines[0] return instr
def dis_multibloc(
self, offset, blocs=None)
DEPRECATED function Use dis_multiblock instead of dis_multibloc
def dis_multibloc(self, offset, blocs=None): """ DEPRECATED function Use dis_multiblock instead of dis_multibloc """ warnings.warn('DEPRECATION WARNING: use "dis_multiblock" instead of "dis_multibloc"') return self.dis_multiblock(offset, blocs)
def dis_multiblock(
self, offset, blocks=None)
Disassemble every block reachable from @offset regarding specific disasmEngine conditions Return an AsmCFG instance containing disassembled blocks @offset: starting offset @blocks: (optional) AsmCFG instance of already disassembled blocks to merge with
def dis_multiblock(self, offset, blocks=None): """Disassemble every block reachable from @offset regarding specific disasmEngine conditions Return an AsmCFG instance containing disassembled blocks @offset: starting offset @blocks: (optional) AsmCFG instance of already disassembled blocks to merge with """ log_asmblock.info("dis bloc all") job_done = set() if blocks is None: blocks = AsmCFG(self.loc_db) todo = [offset] bloc_cpt = 0 while len(todo): bloc_cpt += 1 if self.blocs_wd is not None and bloc_cpt > self.blocs_wd: log_asmblock.debug("blocks watchdog reached at %X", int(offset)) break target_offset = int(todo.pop(0)) if (target_offset is None or target_offset in job_done): continue cur_block, nexts = self._dis_block(target_offset, job_done) todo += nexts blocks.add_block(cur_block) blocks.apply_splitting(self.loc_db, dis_block_callback=self.dis_block_callback, mn=self.arch, attrib=self.attrib, pool_bin=self.bin_stream) return blocks
def get_dis_bloc_callback(
self)
def get_dis_bloc_callback(self): warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""") return self.dis_block_callback
def get_job_done(
self)
def get_job_done(self): warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""") return set()
def set_dis_bloc_callback(
self, function)
def set_dis_bloc_callback(self, function): warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""") self.dis_block_callback = function
def set_job_done(
self, _)
def set_job_done(self, _): warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""") return