miasm2.ir.ir module
#-*- coding:utf-8 -*- # # Copyright (C) 2013 Fabrice Desclaux # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # import warnings from itertools import chain import miasm2.expression.expression as m2_expr from miasm2.expression.expression_helper import get_missing_interval from miasm2.core.asmblock import AsmBlock, AsmConstraint from miasm2.core.graph import DiGraph def _expr_loc_to_symb(expr, loc_db): if not expr.is_loc(): return expr if loc_db is None: name = str(expr) else: names = loc_db.get_location_names(expr.loc_key) if not names: name = loc_db.pretty_str(expr.loc_key) else: # Use only one name for readability name = sorted(names)[0] return m2_expr.ExprId(name, expr.size) class AssignBlock(object): """Represent parallel IR assignment, such as: EAX = EBX EBX = EAX -> Exchange between EBX and EAX AssignBlock can be seen as a dictionnary where keys are the destinations (ExprId or ExprMem), and values their corresponding sources. Also provides common manipulation on this assignments. """ __slots__ = ["_assigns", "_instr"] def __init__(self, irs=None, instr=None): """Create a new AssignBlock @irs: (optional) sequence of ExprAff, or dictionnary dst (Expr) -> src (Expr) @instr: (optional) associate an instruction with this AssignBlock """ if irs is None: irs = [] self._instr = instr self._assigns = {} # ExprAff.dst -> ExprAff.src # Concurrent assignments are handled in _set if hasattr(irs, "iteritems"): for dst, src in irs.iteritems(): self._set(dst, src) else: for expraff in irs: self._set(expraff.dst, expraff.src) @property def instr(self): """Return the associated instruction, if any""" return self._instr def _set(self, dst, src): """ Special cases: * if dst is an ExprSlice, expand it to affect the full Expression * if dst already known, sources are merged """ if dst.size != src.size: raise RuntimeError( "sanitycheck: args must have same size! %s" % ([(str(arg), arg.size) for arg in [dst, src]])) if isinstance(dst, m2_expr.ExprSlice): # Complete the source with missing slice parts new_dst = dst.arg rest = [(m2_expr.ExprSlice(dst.arg, r[0], r[1]), r[0], r[1]) for r in dst.slice_rest()] all_a = [(src, dst.start, dst.stop)] + rest all_a.sort(key=lambda x: x[1]) args = [expr for (expr, _, _) in all_a] new_src = m2_expr.ExprCompose(*args) else: new_dst, new_src = dst, src if new_dst in self._assigns and isinstance(new_src, m2_expr.ExprCompose): if not isinstance(self[new_dst], m2_expr.ExprCompose): # prev_RAX = 0x1122334455667788 # input_RAX[0:8] = 0x89 # final_RAX -> ? (assignment are in parallel) raise RuntimeError("Concurent access on same bit not allowed") # Consider slice grouping expr_list = [(new_dst, new_src), (new_dst, self[new_dst])] # Find collision e_colision = reduce(lambda x, y: x.union(y), (self.get_modified_slice(dst, src) for (dst, src) in expr_list), set()) # Sort interval collision known_intervals = sorted([(x[1], x[2]) for x in e_colision]) for i, (_, stop) in enumerate(known_intervals[:-1]): if stop > known_intervals[i + 1][0]: raise RuntimeError( "Concurent access on same bit not allowed") # Fill with missing data missing_i = get_missing_interval(known_intervals, 0, new_dst.size) remaining = ((m2_expr.ExprSlice(new_dst, *interval), interval[0], interval[1]) for interval in missing_i) # Build the merging expression args = list(e_colision.union(remaining)) args.sort(key=lambda x: x[1]) starts = [start for (_, start, _) in args] assert len(set(starts)) == len(starts) args = [expr for (expr, _, _) in args] new_src = m2_expr.ExprCompose(*args) # Sanity check if not isinstance(new_dst, (m2_expr.ExprId, m2_expr.ExprMem)): raise TypeError("Destination cannot be a %s" % type(new_dst)) self._assigns[new_dst] = new_src def __setitem__(self, dst, src): raise RuntimeError('AssignBlock is immutable') def __getitem__(self, key): return self._assigns[key] def __contains__(self, key): return key in self._assigns def iteritems(self): for dst, src in self._assigns.iteritems(): yield dst, src def items(self): return [(dst, src) for dst, src in self.iteritems()] def itervalues(self): for src in self._assigns.itervalues(): yield src def keys(self): return self._assigns.keys() def values(self): return self._assigns.values() def __iter__(self): for dst in self._assigns: yield dst def __delitem__(self, _): raise RuntimeError('AssignBlock is immutable') def update(self, _): raise RuntimeError('AssignBlock is immutable') def __eq__(self, other): if set(self.keys()) != set(other.keys()): return False return all(other[dst] == src for dst, src in self.iteritems()) def __ne__(self, other): return not self.__eq__(other) def __len__(self): return len(self._assigns) def get(self, key, default): return self._assigns.get(key, default) @staticmethod def get_modified_slice(dst, src): """Return an Expr list of extra expressions needed during the object instanciation""" if not isinstance(src, m2_expr.ExprCompose): raise ValueError("Get mod slice not on expraff slice", str(src)) modified_s = [] for index, arg in src.iter_args(): if not (isinstance(arg, m2_expr.ExprSlice) and arg.arg == dst and index == arg.start and index+arg.size == arg.stop): # If x is not the initial expression modified_s.append((arg, index, index+arg.size)) return modified_s def get_w(self): """Return a set of elements written""" return set(self.keys()) def get_rw(self, mem_read=False, cst_read=False): """Return a dictionnary associating written expressions to a set of their read requirements @mem_read: (optional) mem_read argument of `get_r` @cst_read: (optional) cst_read argument of `get_r` """ out = {} for dst, src in self.iteritems(): src_read = src.get_r(mem_read=mem_read, cst_read=cst_read) if isinstance(dst, m2_expr.ExprMem) and mem_read: # Read on destination happens only with ExprMem src_read.update(dst.arg.get_r(mem_read=mem_read, cst_read=cst_read)) out[dst] = src_read return out def get_r(self, mem_read=False, cst_read=False): """Return a set of elements reads @mem_read: (optional) mem_read argument of `get_r` @cst_read: (optional) cst_read argument of `get_r` """ return set( chain.from_iterable(self.get_rw(mem_read=mem_read, cst_read=cst_read).itervalues())) def __str__(self): out = [] for dst, src in sorted(self._assigns.iteritems()): out.append("%s = %s" % (dst, src)) return "\n".join(out) def dst2ExprAff(self, dst): """Return an ExprAff corresponding to @dst equation @dst: Expr instance""" return m2_expr.ExprAff(dst, self[dst]) def simplify(self, simplifier): """ Return a new AssignBlock with expression simplified @simplifier: ExpressionSimplifier instance """ new_assignblk = {} for dst, src in self.iteritems(): if dst == src: continue new_src = simplifier(src) new_dst = simplifier(dst) new_assignblk[new_dst] = new_src return AssignBlock(irs=new_assignblk, instr=self.instr) def to_string(self, loc_db=None): out = [] for dst, src in self.iteritems(): new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) line = "%s = %s" % (new_dst, new_src) out.append(line) out.append("") return "\n".join(out) class IRBlock(object): """Intermediate representation block object. Stand for an intermediate representation basic block. """ __slots__ = ["_loc_key", "_assignblks", "_dst", "_dst_linenb"] def __init__(self, loc_key, assignblks): """ @loc_key: LocKey of the IR basic block @assignblks: list of AssignBlock """ assert isinstance(loc_key, m2_expr.LocKey) self._loc_key = loc_key for assignblk in assignblks: assert isinstance(assignblk, AssignBlock) self._assignblks = tuple(assignblks) self._dst = None self._dst_linenb = None def __eq__(self, other): if self.__class__ is not other.__class__: return False if self.loc_key != other.loc_key: return False if len(self.assignblks) != len(other.assignblks): return False for assignblk1, assignblk2 in zip(self.assignblks, other.assignblks): if assignblk1 != assignblk2: return False return True def __ne__(self, other): return not self.__eq__(other) def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key loc_key = property(lambda self:self._loc_key) label = property(get_label) @property def assignblks(self): return self._assignblks @property def irs(self): warnings.warn('DEPRECATION WARNING: use "irblock.assignblks" instead of "irblock.irs"') return self._assignblks def __iter__(self): """Iterate on assignblks""" return self._assignblks.__iter__() def __getitem__(self, index): """Getitem on assignblks""" return self._assignblks.__getitem__(index) def __len__(self): """Length of assignblks""" return self._assignblks.__len__() def is_dst_set(self): return self._dst is not None def cache_dst(self): final_dst = None final_linenb = None for linenb, assignblk in enumerate(self): for dst, src in assignblk.iteritems(): if dst.is_id("IRDst"): if final_dst is not None: raise ValueError('Multiple destinations!') final_dst = src final_linenb = linenb self._dst = final_dst self._dst_linenb = final_linenb return final_dst @property def dst(self): """Return the value of IRDst for the IRBlock""" if self.is_dst_set(): return self._dst return self.cache_dst() def set_dst(self, value): """Generate a new IRBlock with a dst (IRBlock) fixed to @value""" irs = [] dst_found = False for assignblk in self: new_assignblk = {} for dst, src in assignblk.iteritems(): if dst.is_id("IRDst"): assert dst_found is False dst_found = True new_assignblk[dst] = value else: new_assignblk[dst] = src irs.append(AssignBlock(new_assignblk, assignblk.instr)) return IRBlock(self.loc_key, irs) @property def dst_linenb(self): """Line number of the IRDst setting statement in the current irs""" if not self.is_dst_set(): self.cache_dst() return self._dst_linenb def __str__(self): out = [] out.append(str(self.loc_key)) for assignblk in self: for dst, src in assignblk.iteritems(): out.append('\t%s = %s' % (dst, src)) out.append("") return "\n".join(out) def modify_exprs(self, mod_dst=None, mod_src=None): """ Generate a new IRBlock with its AssignBlock expressions modified according to @mod_dst and @mod_src @mod_dst: function called to modify Expression destination @mod_src: function called to modify Expression source """ if mod_dst is None: mod_dst = lambda expr:expr if mod_src is None: mod_src = lambda expr:expr assignblks = [] for assignblk in self: new_assignblk = {} for dst, src in assignblk.iteritems(): new_assignblk[mod_dst(dst)] = mod_src(src) assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) return IRBlock(self.loc_key, assignblks) def to_string(self, loc_db=None): out = [] if loc_db is None: node_name = "%s:" % self.loc_key else: names = loc_db.get_location_names(self.loc_key) if not names: node_name = "%s:" % loc_db.pretty_str(self.loc_key) else: node_name = "".join("%s:\n" % name for name in names) out.append(node_name) for assignblk in self: out.append(assignblk.to_string(loc_db)) return '\n'.join(out) def simplify(self, simplifier): """ Simplify expressions in each assignblock @simplifier: ExpressionSimplifier instance """ modified = False assignblks = [] for assignblk in self: new_assignblk = assignblk.simplify(simplifier) if assignblk != new_assignblk: modified = True assignblks.append(new_assignblk) return modified, IRBlock(self.loc_key, assignblks) class irbloc(IRBlock): """ DEPRECATED object Use IRBlock instead of irbloc """ def __init__(self, loc_key, irs, lines=None): warnings.warn('DEPRECATION WARNING: use "IRBlock" instead of "irblock"') super(irbloc, self).__init__(loc_key, irs) class IRCFG(DiGraph): """DiGraph for IR instances""" def __init__(self, irdst, loc_db, blocks=None, *args, **kwargs): """Instanciate a IRCFG @loc_db: LocationDB instance @blocks: IR blocks """ self.loc_db = loc_db if blocks is None: blocks = {} self._blocks = blocks self._irdst = irdst super(IRCFG, self).__init__(*args, **kwargs) @property def IRDst(self): return self._irdst @property def blocks(self): return self._blocks def add_irblock(self, irblock): """ Add the @irblock to the current IRCFG @irblock: IRBlock instance """ self.blocks[irblock.loc_key] = irblock self.add_node(irblock.loc_key) for dst in self.dst_trackback(irblock): if dst.is_int(): dst_loc_key = self.loc_db.get_or_create_offset_location(int(dst)) dst = m2_expr.ExprLoc(dst_loc_key, irblock.dst.size) if dst.is_loc(): self.add_uniq_edge(irblock.loc_key, dst.loc_key) def node2lines(self, node): if self.loc_db is None: node_name = str(node) else: names = self.loc_db.get_location_names(node) if not names: node_name = self.loc_db.pretty_str(node) else: node_name = "".join("%s:\n" % name for name in names) yield self.DotCellDescription( text="%s" % node_name, attr={ 'align': 'center', 'colspan': 2, 'bgcolor': 'grey', } ) if node not in self._blocks: yield [self.DotCellDescription(text="NOT PRESENT", attr={})] raise StopIteration for i, assignblk in enumerate(self._blocks[node]): for dst, src in assignblk.iteritems(): new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) line = "%s = %s" % (new_dst, new_src) if self._dot_offset: yield [self.DotCellDescription(text="%-4d" % i, attr={}), self.DotCellDescription(text=line, attr={})] else: yield self.DotCellDescription(text=line, attr={}) yield self.DotCellDescription(text="", attr={}) def edge_attr(self, src, dst): if src not in self._blocks or dst not in self._blocks: return {} src_irdst = self._blocks[src].dst edge_color = "blue" if isinstance(src_irdst, m2_expr.ExprCond): src1, src2 = src_irdst.src1, src_irdst.src2 if src1.is_loc(dst): edge_color = "limegreen" elif src2.is_loc(dst): edge_color = "red" return {"color": edge_color} def node_attr(self, node): if node not in self._blocks: return {'style': 'filled', 'fillcolor': 'red'} return {} def dot(self, offset=False): """ @offset: (optional) if set, add the corresponding line number in each node """ self._dot_offset = offset return super(IRCFG, self).dot() def get_loc_key(self, addr): """Transforms an ExprId/ExprInt/loc_key/int into a loc_key @addr: an ExprId/ExprInt/loc_key/int""" if isinstance(addr, m2_expr.LocKey): return addr elif isinstance(addr, m2_expr.ExprLoc): return addr.loc_key try: addr = int(addr) except (ValueError, TypeError): return None return self.loc_db.get_offset_location(addr) def get_or_create_loc_key(self, addr): """Transforms an ExprId/ExprInt/loc_key/int into a loc_key If the offset @addr is not in the LocationDB, create it @addr: an ExprId/ExprInt/loc_key/int""" loc_key = self.get_loc_key(addr) if loc_key is not None: return loc_key return self.loc_db.add_location(offset=int(addr)) def get_block(self, addr): """Returns the irbloc associated to an ExprId/ExprInt/loc_key/int @addr: an ExprId/ExprInt/loc_key/int""" loc_key = self.get_loc_key(addr) if loc_key is None: return None return self.blocks.get(loc_key, None) def getby_offset(self, offset): out = set() for irb in self.blocks.values(): for assignblk in irb: instr = assignblk.instr if instr is None: continue if instr.offset <= offset < instr.offset + instr.l: out.add(irb) return out def simplify(self, simplifier): """ Simplify expressions in each irblocks @simplifier: ExpressionSimplifier instance """ modified = False for loc_key, block in self.blocks.iteritems(): assignblks = [] for assignblk in block: new_assignblk = assignblk.simplify(simplifier) if assignblk != new_assignblk: modified = True assignblks.append(new_assignblk) self.blocks[loc_key] = IRBlock(loc_key, assignblks) return modified def replace_expr_in_ir(self, block, replaced): for assignblk in block: for dst, src in assignblk.items(): del assignblk[dst] assignblk[dst.replace_expr(replaced)] = src.replace_expr(replaced) def get_rw(self, regs_ids=None): """ Calls get_rw(irb) for each bloc @regs_ids : ids of registers used in IR """ if regs_ids is None: regs_ids = [] for irblock in self.blocks.values(): irblock.get_rw(regs_ids) def _extract_dst(self, todo, done): """ Naive extraction of @todo destinations WARNING: @todo and @done are modified """ out = set() while todo: dst = todo.pop() if dst.is_loc(): done.add(dst) elif dst.is_mem() or dst.is_int(): done.add(dst) elif dst.is_cond(): todo.add(dst.src1) todo.add(dst.src2) elif dst.is_id(): out.add(dst) else: done.add(dst) return out def dst_trackback(self, irb): """ Naive backtracking of IRDst @irb: irbloc instance """ todo = set([irb.dst]) done = set() for assignblk in reversed(irb): if not todo: break out = self._extract_dst(todo, done) found = set() follow = set() for dst in out: if dst in assignblk: follow.add(assignblk[dst]) found.add(dst) follow.update(out.difference(found)) todo = follow return done class DiGraphIR(IRCFG): """ DEPRECATED object Use IRCFG instead of DiGraphIR """ def __init__(self, irdst, loc_db, blocks=None, *args, **kwargs): warnings.warn('DEPRECATION WARNING: use "IRCFG" instead of "DiGraphIR"') super(IRCFG, self).__init__(irdst, loc_db, blocks=None, *args, **kwargs) class IntermediateRepresentation(object): """ Intermediate representation object Allow native assembly to intermediate representation traduction """ def __init__(self, arch, attrib, loc_db): self.pc = arch.getpc(attrib) self.sp = arch.getsp(attrib) self.arch = arch self.attrib = attrib self.loc_db = loc_db self.IRDst = None def get_ir(self, instr): raise NotImplementedError("Abstract Method") def new_ircfg(self, *args, **kwargs): """ Return a new instance of IRCFG """ return IRCFG(self.IRDst, self.loc_db, *args, **kwargs) def new_ircfg_from_asmcfg(self, asmcfg, *args, **kwargs): """ Return a new instance of IRCFG from an @asmcfg @asmcfg: AsmCFG instance """ ircfg = IRCFG(self.IRDst, self.loc_db, *args, **kwargs) for block in asmcfg.blocks: self.add_asmblock_to_ircfg(block, ircfg) return ircfg def instr2ir(self, instr): ir_bloc_cur, extra_irblocks = self.get_ir(instr) for index, irb in enumerate(extra_irblocks): irs = [] for assignblk in irb: irs.append(AssignBlock(assignblk, instr)) extra_irblocks[index] = IRBlock(irb.loc_key, irs) assignblk = AssignBlock(ir_bloc_cur, instr) return assignblk, extra_irblocks def add_instr_to_ircfg(self, instr, ircfg, loc_key=None, gen_pc_updt=False): """ Add the native instruction @instr to the @ircfg @instr: instruction instance @ircfg: IRCFG instance @loc_key: loc_key instance of the instruction destination @gen_pc_updt: insert PC update effects between instructions """ if loc_key is None: offset = getattr(instr, "offset", None) loc_key = self.loc_db.add_location(offset=offset) block = AsmBlock(loc_key) block.lines = [instr] self.add_asmblock_to_ircfg(block, ircfg, gen_pc_updt) return loc_key def gen_pc_update(self, assignments, instr): offset = m2_expr.ExprInt(instr.offset, self.pc.size) assignments.append(AssignBlock({self.pc:offset}, instr)) def add_instr_to_current_state(self, instr, block, assignments, ir_blocks_all, gen_pc_updt): """ Add the IR effects of an instruction to the current state. Returns a bool: * True if the current assignments list must be split * False in other cases. @instr: native instruction @block: native block source @assignments: list of current AssignBlocks @ir_blocks_all: list of additional effects @gen_pc_updt: insert PC update effects between instructions """ if gen_pc_updt is not False: self.gen_pc_update(assignments, instr) assignblk, ir_blocks_extra = self.instr2ir(instr) assignments.append(assignblk) ir_blocks_all += ir_blocks_extra if ir_blocks_extra: return True return False def add_asmblock_to_ircfg(self, block, ircfg, gen_pc_updt=False): """ Add a native block to the current IR @block: native assembly block @ircfg: IRCFG instance @gen_pc_updt: insert PC update effects between instructions """ loc_key = block.loc_key ir_blocks_all = [] assignments = [] for instr in block.lines: if loc_key is None: assignments = [] loc_key = self.get_loc_key_for_instr(instr) split = self.add_instr_to_current_state( instr, block, assignments, ir_blocks_all, gen_pc_updt ) if split: ir_blocks_all.append(IRBlock(loc_key, assignments)) loc_key = None assignments = [] if loc_key is not None: ir_blocks_all.append(IRBlock(loc_key, assignments)) new_ir_blocks_all = self.post_add_asmblock_to_ircfg(block, ircfg, ir_blocks_all) for irblock in new_ir_blocks_all: ircfg.add_irblock(irblock) return new_ir_blocks_all def add_block(self, block, gen_pc_updt=False): """ DEPRECATED function Use add_block instead of add_block """ warnings.warn("""DEPRECATION WARNING ircfg is now out of IntermediateRepresentation Use: ircfg = ir_arch.new_ircfg() ir_arch.add_asmblock_to_ircfg(block, ircfg) """) raise RuntimeError("API Deprecated") def add_bloc(self, block, gen_pc_updt=False): """ DEPRECATED function Use add_block instead of add_block """ self.add_block(block, gen_pc_updt) def get_next_loc_key(self, instr): loc_key = self.loc_db.get_or_create_offset_location(instr.offset + instr.l) return loc_key def get_loc_key_for_instr(self, instr): """Returns the loc_key associated to an instruction @instr: current instruction""" return self.loc_db.get_or_create_offset_location(instr.offset) def gen_loc_key_and_expr(self, size): """ Return a loc_key and it's corresponding ExprLoc @size: size of expression """ loc_key = self.loc_db.add_location() return loc_key, m2_expr.ExprLoc(loc_key, size) def expr_fix_regs_for_mode(self, expr, *args, **kwargs): return expr def expraff_fix_regs_for_mode(self, expr, *args, **kwargs): return expr def irbloc_fix_regs_for_mode(self, irblock, *args, **kwargs): return irblock def is_pc_written(self, block): """Return the first Assignblk of the @blockin which PC is written @block: IRBlock instance""" all_pc = self.arch.pc.values() for assignblk in block: if assignblk.dst in all_pc: return assignblk return None def set_empty_dst_to_next(self, block, ir_blocks): for index, irblock in enumerate(ir_blocks): if irblock.dst is not None: continue next_loc_key = block.get_next() if next_loc_key is None: loc_key = None if block.lines: line = block.lines[-1] if line.offset is not None: loc_key = self.loc_db.get_or_create_offset_location(line.offset + line.l) if loc_key is None: loc_key = self.loc_db.add_location() block.add_cst(loc_key, AsmConstraint.c_next) else: loc_key = next_loc_key dst = m2_expr.ExprLoc(loc_key, self.pc.size) if irblock.assignblks: instr = irblock.assignblks[-1].instr else: instr = None assignblk = AssignBlock({self.IRDst: dst}, instr) ir_blocks[index] = IRBlock(irblock.loc_key, list(irblock.assignblks) + [assignblk]) def post_add_asmblock_to_ircfg(self, block, ircfg, ir_blocks): self.set_empty_dst_to_next(block, ir_blocks) new_irblocks = [] for irblock in ir_blocks: new_irblock = self.irbloc_fix_regs_for_mode(irblock, self.attrib) ircfg.add_irblock(new_irblock) new_irblocks.append(new_irblock) return new_irblocks class ir(IntermediateRepresentation): """ DEPRECATED object Use IntermediateRepresentation instead of ir """ def __init__(self, loc_key, irs, lines=None): warnings.warn('DEPRECATION WARNING: use "IntermediateRepresentation" instead of "ir"') super(ir, self).__init__(loc_key, irs, lines)
Classes
class AssignBlock
Represent parallel IR assignment, such as: EAX = EBX EBX = EAX
-> Exchange between EBX and EAX
AssignBlock can be seen as a dictionnary where keys are the destinations (ExprId or ExprMem), and values their corresponding sources.
Also provides common manipulation on this assignments.
class AssignBlock(object): """Represent parallel IR assignment, such as: EAX = EBX EBX = EAX -> Exchange between EBX and EAX AssignBlock can be seen as a dictionnary where keys are the destinations (ExprId or ExprMem), and values their corresponding sources. Also provides common manipulation on this assignments. """ __slots__ = ["_assigns", "_instr"] def __init__(self, irs=None, instr=None): """Create a new AssignBlock @irs: (optional) sequence of ExprAff, or dictionnary dst (Expr) -> src (Expr) @instr: (optional) associate an instruction with this AssignBlock """ if irs is None: irs = [] self._instr = instr self._assigns = {} # ExprAff.dst -> ExprAff.src # Concurrent assignments are handled in _set if hasattr(irs, "iteritems"): for dst, src in irs.iteritems(): self._set(dst, src) else: for expraff in irs: self._set(expraff.dst, expraff.src) @property def instr(self): """Return the associated instruction, if any""" return self._instr def _set(self, dst, src): """ Special cases: * if dst is an ExprSlice, expand it to affect the full Expression * if dst already known, sources are merged """ if dst.size != src.size: raise RuntimeError( "sanitycheck: args must have same size! %s" % ([(str(arg), arg.size) for arg in [dst, src]])) if isinstance(dst, m2_expr.ExprSlice): # Complete the source with missing slice parts new_dst = dst.arg rest = [(m2_expr.ExprSlice(dst.arg, r[0], r[1]), r[0], r[1]) for r in dst.slice_rest()] all_a = [(src, dst.start, dst.stop)] + rest all_a.sort(key=lambda x: x[1]) args = [expr for (expr, _, _) in all_a] new_src = m2_expr.ExprCompose(*args) else: new_dst, new_src = dst, src if new_dst in self._assigns and isinstance(new_src, m2_expr.ExprCompose): if not isinstance(self[new_dst], m2_expr.ExprCompose): # prev_RAX = 0x1122334455667788 # input_RAX[0:8] = 0x89 # final_RAX -> ? (assignment are in parallel) raise RuntimeError("Concurent access on same bit not allowed") # Consider slice grouping expr_list = [(new_dst, new_src), (new_dst, self[new_dst])] # Find collision e_colision = reduce(lambda x, y: x.union(y), (self.get_modified_slice(dst, src) for (dst, src) in expr_list), set()) # Sort interval collision known_intervals = sorted([(x[1], x[2]) for x in e_colision]) for i, (_, stop) in enumerate(known_intervals[:-1]): if stop > known_intervals[i + 1][0]: raise RuntimeError( "Concurent access on same bit not allowed") # Fill with missing data missing_i = get_missing_interval(known_intervals, 0, new_dst.size) remaining = ((m2_expr.ExprSlice(new_dst, *interval), interval[0], interval[1]) for interval in missing_i) # Build the merging expression args = list(e_colision.union(remaining)) args.sort(key=lambda x: x[1]) starts = [start for (_, start, _) in args] assert len(set(starts)) == len(starts) args = [expr for (expr, _, _) in args] new_src = m2_expr.ExprCompose(*args) # Sanity check if not isinstance(new_dst, (m2_expr.ExprId, m2_expr.ExprMem)): raise TypeError("Destination cannot be a %s" % type(new_dst)) self._assigns[new_dst] = new_src def __setitem__(self, dst, src): raise RuntimeError('AssignBlock is immutable') def __getitem__(self, key): return self._assigns[key] def __contains__(self, key): return key in self._assigns def iteritems(self): for dst, src in self._assigns.iteritems(): yield dst, src def items(self): return [(dst, src) for dst, src in self.iteritems()] def itervalues(self): for src in self._assigns.itervalues(): yield src def keys(self): return self._assigns.keys() def values(self): return self._assigns.values() def __iter__(self): for dst in self._assigns: yield dst def __delitem__(self, _): raise RuntimeError('AssignBlock is immutable') def update(self, _): raise RuntimeError('AssignBlock is immutable') def __eq__(self, other): if set(self.keys()) != set(other.keys()): return False return all(other[dst] == src for dst, src in self.iteritems()) def __ne__(self, other): return not self.__eq__(other) def __len__(self): return len(self._assigns) def get(self, key, default): return self._assigns.get(key, default) @staticmethod def get_modified_slice(dst, src): """Return an Expr list of extra expressions needed during the object instanciation""" if not isinstance(src, m2_expr.ExprCompose): raise ValueError("Get mod slice not on expraff slice", str(src)) modified_s = [] for index, arg in src.iter_args(): if not (isinstance(arg, m2_expr.ExprSlice) and arg.arg == dst and index == arg.start and index+arg.size == arg.stop): # If x is not the initial expression modified_s.append((arg, index, index+arg.size)) return modified_s def get_w(self): """Return a set of elements written""" return set(self.keys()) def get_rw(self, mem_read=False, cst_read=False): """Return a dictionnary associating written expressions to a set of their read requirements @mem_read: (optional) mem_read argument of `get_r` @cst_read: (optional) cst_read argument of `get_r` """ out = {} for dst, src in self.iteritems(): src_read = src.get_r(mem_read=mem_read, cst_read=cst_read) if isinstance(dst, m2_expr.ExprMem) and mem_read: # Read on destination happens only with ExprMem src_read.update(dst.arg.get_r(mem_read=mem_read, cst_read=cst_read)) out[dst] = src_read return out def get_r(self, mem_read=False, cst_read=False): """Return a set of elements reads @mem_read: (optional) mem_read argument of `get_r` @cst_read: (optional) cst_read argument of `get_r` """ return set( chain.from_iterable(self.get_rw(mem_read=mem_read, cst_read=cst_read).itervalues())) def __str__(self): out = [] for dst, src in sorted(self._assigns.iteritems()): out.append("%s = %s" % (dst, src)) return "\n".join(out) def dst2ExprAff(self, dst): """Return an ExprAff corresponding to @dst equation @dst: Expr instance""" return m2_expr.ExprAff(dst, self[dst]) def simplify(self, simplifier): """ Return a new AssignBlock with expression simplified @simplifier: ExpressionSimplifier instance """ new_assignblk = {} for dst, src in self.iteritems(): if dst == src: continue new_src = simplifier(src) new_dst = simplifier(dst) new_assignblk[new_dst] = new_src return AssignBlock(irs=new_assignblk, instr=self.instr) def to_string(self, loc_db=None): out = [] for dst, src in self.iteritems(): new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) line = "%s = %s" % (new_dst, new_src) out.append(line) out.append("") return "\n".join(out)
Ancestors (in MRO)
- AssignBlock
- __builtin__.object
Static methods
def get_modified_slice(
dst, src)
Return an Expr list of extra expressions needed during the object instanciation
@staticmethod def get_modified_slice(dst, src): """Return an Expr list of extra expressions needed during the object instanciation""" if not isinstance(src, m2_expr.ExprCompose): raise ValueError("Get mod slice not on expraff slice", str(src)) modified_s = [] for index, arg in src.iter_args(): if not (isinstance(arg, m2_expr.ExprSlice) and arg.arg == dst and index == arg.start and index+arg.size == arg.stop): # If x is not the initial expression modified_s.append((arg, index, index+arg.size)) return modified_s
Instance variables
var instr
Return the associated instruction, if any
Methods
def __init__(
self, irs=None, instr=None)
Create a new AssignBlock @irs: (optional) sequence of ExprAff, or dictionnary dst (Expr) -> src (Expr) @instr: (optional) associate an instruction with this AssignBlock
def __init__(self, irs=None, instr=None): """Create a new AssignBlock @irs: (optional) sequence of ExprAff, or dictionnary dst (Expr) -> src (Expr) @instr: (optional) associate an instruction with this AssignBlock """ if irs is None: irs = [] self._instr = instr self._assigns = {} # ExprAff.dst -> ExprAff.src # Concurrent assignments are handled in _set if hasattr(irs, "iteritems"): for dst, src in irs.iteritems(): self._set(dst, src) else: for expraff in irs: self._set(expraff.dst, expraff.src)
def dst2ExprAff(
self, dst)
Return an ExprAff corresponding to @dst equation @dst: Expr instance
def dst2ExprAff(self, dst): """Return an ExprAff corresponding to @dst equation @dst: Expr instance""" return m2_expr.ExprAff(dst, self[dst])
def get(
self, key, default)
def get(self, key, default): return self._assigns.get(key, default)
def get_r(
self, mem_read=False, cst_read=False)
Return a set of elements reads
@mem_read: (optional) mem_read argument of get_r
@cst_read: (optional) cst_read argument of get_r
def get_r(self, mem_read=False, cst_read=False): """Return a set of elements reads @mem_read: (optional) mem_read argument of `get_r` @cst_read: (optional) cst_read argument of `get_r` """ return set( chain.from_iterable(self.get_rw(mem_read=mem_read, cst_read=cst_read).itervalues()))
def get_rw(
self, mem_read=False, cst_read=False)
Return a dictionnary associating written expressions to a set of
their read requirements
@mem_read: (optional) mem_read argument of get_r
@cst_read: (optional) cst_read argument of get_r
def get_rw(self, mem_read=False, cst_read=False): """Return a dictionnary associating written expressions to a set of their read requirements @mem_read: (optional) mem_read argument of `get_r` @cst_read: (optional) cst_read argument of `get_r` """ out = {} for dst, src in self.iteritems(): src_read = src.get_r(mem_read=mem_read, cst_read=cst_read) if isinstance(dst, m2_expr.ExprMem) and mem_read: # Read on destination happens only with ExprMem src_read.update(dst.arg.get_r(mem_read=mem_read, cst_read=cst_read)) out[dst] = src_read return out
def get_w(
self)
Return a set of elements written
def get_w(self): """Return a set of elements written""" return set(self.keys())
def items(
self)
def items(self): return [(dst, src) for dst, src in self.iteritems()]
def iteritems(
self)
def iteritems(self): for dst, src in self._assigns.iteritems(): yield dst, src
def itervalues(
self)
def itervalues(self): for src in self._assigns.itervalues(): yield src
def keys(
self)
def keys(self): return self._assigns.keys()
def simplify(
self, simplifier)
Return a new AssignBlock with expression simplified
@simplifier: ExpressionSimplifier instance
def simplify(self, simplifier): """ Return a new AssignBlock with expression simplified @simplifier: ExpressionSimplifier instance """ new_assignblk = {} for dst, src in self.iteritems(): if dst == src: continue new_src = simplifier(src) new_dst = simplifier(dst) new_assignblk[new_dst] = new_src return AssignBlock(irs=new_assignblk, instr=self.instr)
def to_string(
self, loc_db=None)
def to_string(self, loc_db=None): out = [] for dst, src in self.iteritems(): new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) line = "%s = %s" % (new_dst, new_src) out.append(line) out.append("") return "\n".join(out)
def update(
self, _)
def update(self, _): raise RuntimeError('AssignBlock is immutable')
def values(
self)
def values(self): return self._assigns.values()
class DiGraphIR
DEPRECATED object Use IRCFG instead of DiGraphIR
class DiGraphIR(IRCFG): """ DEPRECATED object Use IRCFG instead of DiGraphIR """ def __init__(self, irdst, loc_db, blocks=None, *args, **kwargs): warnings.warn('DEPRECATION WARNING: use "IRCFG" instead of "DiGraphIR"') super(IRCFG, self).__init__(irdst, loc_db, blocks=None, *args, **kwargs)
Ancestors (in MRO)
Class variables
Instance variables
Methods
def __init__(
self, irdst, loc_db, blocks=None, *args, **kwargs)
Instanciate a IRCFG @loc_db: LocationDB instance @blocks: IR blocks
def __init__(self, irdst, loc_db, blocks=None, *args, **kwargs): warnings.warn('DEPRECATION WARNING: use "IRCFG" instead of "DiGraphIR"') super(IRCFG, self).__init__(irdst, loc_db, blocks=None, *args, **kwargs)
def add_edge(
self, src, dst)
def add_edge(self, src, dst): if not src in self._nodes: self.add_node(src) if not dst in self._nodes: self.add_node(dst) self._edges.append((src, dst)) self._nodes_succ[src].append(dst) self._nodes_pred[dst].append(src)
def add_irblock(
self, irblock)
Inheritance:
IRCFG
.add_irblock
Add the @irblock to the current IRCFG @irblock: IRBlock instance
def add_irblock(self, irblock): """ Add the @irblock to the current IRCFG @irblock: IRBlock instance """ self.blocks[irblock.loc_key] = irblock self.add_node(irblock.loc_key) for dst in self.dst_trackback(irblock): if dst.is_int(): dst_loc_key = self.loc_db.get_or_create_offset_location(int(dst)) dst = m2_expr.ExprLoc(dst_loc_key, irblock.dst.size) if dst.is_loc(): self.add_uniq_edge(irblock.loc_key, dst.loc_key)
def add_node(
self, node)
Add the node @node to the graph. If the node was already present, return False. Otherwise, return True
def add_node(self, node): """Add the node @node to the graph. If the node was already present, return False. Otherwise, return True """ if node in self._nodes: return False self._nodes.add(node) self._nodes_succ[node] = [] self._nodes_pred[node] = [] return True
def add_uniq_edge(
self, src, dst)
Inheritance:
IRCFG
.add_uniq_edge
Add an edge from @src to @dst if it doesn't already exist
def add_uniq_edge(self, src, dst): """Add an edge from @src to @dst if it doesn't already exist""" if (src not in self._nodes_succ or dst not in self._nodes_succ[src]): self.add_edge(src, dst)
def compute_back_edges(
self, head)
Inheritance:
IRCFG
.compute_back_edges
Computes all back edges from a node to a dominator in the graph. :param head: head of graph :return: yield a back edge
def compute_back_edges(self, head): """ Computes all back edges from a node to a dominator in the graph. :param head: head of graph :return: yield a back edge """ dominators = self.compute_dominators(head) # traverse graph for node in self.walk_depth_first_forward(head): for successor in self.successors_iter(node): # check for a back edge to a dominator if successor in dominators[node]: edge = (node, successor) yield edge
def compute_dominance_frontier(
self, head)
Inheritance:
IRCFG
.compute_dominance_frontier
Compute the dominance frontier of the graph
Source: Cooper, Keith D., Timothy J. Harvey, and Ken Kennedy. "A simple, fast dominance algorithm." Software Practice & Experience 4 (2001), p. 9
def compute_dominance_frontier(self, head): """ Compute the dominance frontier of the graph Source: Cooper, Keith D., Timothy J. Harvey, and Ken Kennedy. "A simple, fast dominance algorithm." Software Practice & Experience 4 (2001), p. 9 """ idoms = self.compute_immediate_dominators(head) frontier = {} for node in idoms: if self._nodes_pred[node] >= 2: for predecessor in self.predecessors_iter(node): runner = predecessor if runner not in idoms: continue while runner != idoms[node]: if runner not in frontier: frontier[runner] = set() frontier[runner].add(node) runner = idoms[runner] return frontier
def compute_dominator_tree(
self, head)
Inheritance:
IRCFG
.compute_dominator_tree
Computes the dominator tree of a graph :param head: head of graph :return: DiGraph
def compute_dominator_tree(self, head): """ Computes the dominator tree of a graph :param head: head of graph :return: DiGraph """ idoms = self.compute_immediate_dominators(head) dominator_tree = DiGraph() for node in idoms: dominator_tree.add_edge(idoms[node], node) return dominator_tree
def compute_dominators(
self, head)
Inheritance:
IRCFG
.compute_dominators
Compute the dominators of the graph
def compute_dominators(self, head): """Compute the dominators of the graph""" return self._compute_generic_dominators(head, self.reachable_sons, self.predecessors_iter, self.successors_iter)
def compute_immediate_dominators(
self, head)
Inheritance:
IRCFG
.compute_immediate_dominators
Compute the immediate dominators of the graph
def compute_immediate_dominators(self, head): """Compute the immediate dominators of the graph""" dominators = self.compute_dominators(head) idoms = {} for node in dominators: for predecessor in self.walk_dominators(node, dominators): if predecessor in dominators[node] and node != predecessor: idoms[node] = predecessor break return idoms
def compute_natural_loops(
self, head)
Inheritance:
IRCFG
.compute_natural_loops
Computes all natural loops in the graph.
Source: Aho, Alfred V., Lam, Monica S., Sethi, R. and Jeffrey Ullman. "Compilers: Principles, Techniques, & Tools, Second Edition" Pearson/Addison Wesley (2007), Chapter 9.6.6 :param head: head of the graph :return: yield a tuple of the form (back edge, loop body)
def compute_natural_loops(self, head): """ Computes all natural loops in the graph. Source: Aho, Alfred V., Lam, Monica S., Sethi, R. and Jeffrey Ullman. "Compilers: Principles, Techniques, & Tools, Second Edition" Pearson/Addison Wesley (2007), Chapter 9.6.6 :param head: head of the graph :return: yield a tuple of the form (back edge, loop body) """ for a, b in self.compute_back_edges(head): body = self._compute_natural_loop_body(b, a) yield ((a, b), body)
def compute_postdominators(
self, leaf)
Inheritance:
IRCFG
.compute_postdominators
Compute the postdominators of the graph
def compute_postdominators(self, leaf): """Compute the postdominators of the graph""" return self._compute_generic_dominators(leaf, self.reachable_parents, self.successors_iter, self.predecessors_iter)
def compute_strongly_connected_components(
self)
Inheritance:
IRCFG
.compute_strongly_connected_components
Partitions the graph into strongly connected components.
Iterative implementation of Gabow's path-based SCC algorithm. Source: Gabow, Harold N. "Path-based depth-first search for strong and biconnected components." Information Processing Letters 74.3 (2000), pp. 109--110
The iterative implementation is inspired by Mark Dickinson's code: http://code.activestate.com/recipes/ 578507-strongly-connected-components-of-a-directed-graph/ :return: yield a strongly connected component
def compute_strongly_connected_components(self): """ Partitions the graph into strongly connected components. Iterative implementation of Gabow's path-based SCC algorithm. Source: Gabow, Harold N. "Path-based depth-first search for strong and biconnected components." Information Processing Letters 74.3 (2000), pp. 109--110 The iterative implementation is inspired by Mark Dickinson's code: http://code.activestate.com/recipes/ 578507-strongly-connected-components-of-a-directed-graph/ :return: yield a strongly connected component """ stack = [] boundaries = [] counter = len(self.nodes()) # init index with 0 index = {v: 0 for v in self.nodes()} # state machine for worklist algorithm VISIT, HANDLE_RECURSION, MERGE = 0, 1, 2 NodeState = namedtuple('NodeState', ['state', 'node']) for node in self.nodes(): # next node if node was already visited if index[node]: continue todo = [NodeState(VISIT, node)] done = set() while todo: current = todo.pop() if current.node in done: continue # node is unvisited if current.state == VISIT: stack.append(current.node) index[current.node] = len(stack) boundaries.append(index[current.node]) todo.append(NodeState(MERGE, current.node)) # follow successors for successor in self.successors_iter(current.node): todo.append(NodeState(HANDLE_RECURSION, successor)) # iterative handling of recursion algorithm elif current.state == HANDLE_RECURSION: # visit unvisited successor if index[current.node] == 0: todo.append(NodeState(VISIT, current.node)) else: # contract cycle if necessary while index[current.node] < boundaries[-1]: boundaries.pop() # merge strongly connected component else: if index[current.node] == boundaries[-1]: boundaries.pop() counter += 1 scc = set() while index[current.node] <= len(stack): popped = stack.pop() index[popped] = counter scc.add(popped) done.add(current.node) yield scc
def copy(
self)
Copy the current graph instance
def copy(self): """Copy the current graph instance""" graph = self.__class__() return graph + self
def del_edge(
self, src, dst)
def del_edge(self, src, dst): self._edges.remove((src, dst)) self._nodes_succ[src].remove(dst) self._nodes_pred[dst].remove(src)
def del_node(
self, node)
Delete the @node of the graph; Also delete every edge to/from this @node
def del_node(self, node): """Delete the @node of the graph; Also delete every edge to/from this @node""" if node in self._nodes: self._nodes.remove(node) for pred in self.predecessors(node): self.del_edge(pred, node) for succ in self.successors(node): self.del_edge(node, succ)
def discard_edge(
self, src, dst)
Inheritance:
IRCFG
.discard_edge
Remove edge between @src and @dst if it exits
def discard_edge(self, src, dst): """Remove edge between @src and @dst if it exits""" if (src, dst) in self._edges: self.del_edge(src, dst)
def dot(
self, offset=False)
@offset: (optional) if set, add the corresponding line number in each node
def dot(self, offset=False): """ @offset: (optional) if set, add the corresponding line number in each node """ self._dot_offset = offset return super(IRCFG, self).dot()
def dst_trackback(
self, irb)
Inheritance:
IRCFG
.dst_trackback
Naive backtracking of IRDst @irb: irbloc instance
def dst_trackback(self, irb): """ Naive backtracking of IRDst @irb: irbloc instance """ todo = set([irb.dst]) done = set() for assignblk in reversed(irb): if not todo: break out = self._extract_dst(todo, done) found = set() follow = set() for dst in out: if dst in assignblk: follow.add(assignblk[dst]) found.add(dst) follow.update(out.difference(found)) todo = follow return done
def edge_attr(
self, src, dst)
def edge_attr(self, src, dst): if src not in self._blocks or dst not in self._blocks: return {} src_irdst = self._blocks[src].dst edge_color = "blue" if isinstance(src_irdst, m2_expr.ExprCond): src1, src2 = src_irdst.src1, src_irdst.src2 if src1.is_loc(dst): edge_color = "limegreen" elif src2.is_loc(dst): edge_color = "red" return {"color": edge_color}
def find_path(
self, src, dst, cycles_count=0, done=None)
def find_path(self, src, dst, cycles_count=0, done=None): if done is None: done = {} if dst in done and done[dst] > cycles_count: return [[]] if src == dst: return [[src]] out = [] for node in self.predecessors(dst): done_n = dict(done) done_n[dst] = done_n.get(dst, 0) + 1 for path in self.find_path(src, node, cycles_count, done_n): if path and path[0] == src: out.append(path + [dst]) return out
def get_block(
self, addr)
Returns the irbloc associated to an ExprId/ExprInt/loc_key/int @addr: an ExprId/ExprInt/loc_key/int
def get_block(self, addr): """Returns the irbloc associated to an ExprId/ExprInt/loc_key/int @addr: an ExprId/ExprInt/loc_key/int""" loc_key = self.get_loc_key(addr) if loc_key is None: return None return self.blocks.get(loc_key, None)
def get_loc_key(
self, addr)
Inheritance:
IRCFG
.get_loc_key
Transforms an ExprId/ExprInt/loc_key/int into a loc_key @addr: an ExprId/ExprInt/loc_key/int
def get_loc_key(self, addr): """Transforms an ExprId/ExprInt/loc_key/int into a loc_key @addr: an ExprId/ExprInt/loc_key/int""" if isinstance(addr, m2_expr.LocKey): return addr elif isinstance(addr, m2_expr.ExprLoc): return addr.loc_key try: addr = int(addr) except (ValueError, TypeError): return None return self.loc_db.get_offset_location(addr)
def get_or_create_loc_key(
self, addr)
Inheritance:
IRCFG
.get_or_create_loc_key
Transforms an ExprId/ExprInt/loc_key/int into a loc_key If the offset @addr is not in the LocationDB, create it @addr: an ExprId/ExprInt/loc_key/int
def get_or_create_loc_key(self, addr): """Transforms an ExprId/ExprInt/loc_key/int into a loc_key If the offset @addr is not in the LocationDB, create it @addr: an ExprId/ExprInt/loc_key/int""" loc_key = self.get_loc_key(addr) if loc_key is not None: return loc_key return self.loc_db.add_location(offset=int(addr))
def get_rw(
self, regs_ids=None)
Calls get_rw(irb) for each bloc @regs_ids : ids of registers used in IR
def get_rw(self, regs_ids=None): """ Calls get_rw(irb) for each bloc @regs_ids : ids of registers used in IR """ if regs_ids is None: regs_ids = [] for irblock in self.blocks.values(): irblock.get_rw(regs_ids)
def getby_offset(
self, offset)
Inheritance:
IRCFG
.getby_offset
def getby_offset(self, offset): out = set() for irb in self.blocks.values(): for assignblk in irb: instr = assignblk.instr if instr is None: continue if instr.offset <= offset < instr.offset + instr.l: out.add(irb) return out
def has_loop(
self)
Return True if the graph contains at least a cycle
def has_loop(self): """Return True if the graph contains at least a cycle""" todo = list(self.nodes()) # tested nodes done = set() # current DFS nodes current = set() while todo: node = todo.pop() if node in done: continue if node in current: # DFS branch end for succ in self.successors_iter(node): if succ in current: return True # A node cannot be in current AND in done current.remove(node) done.add(node) else: # Launch DFS from node todo.append(node) current.add(node) todo += self.successors(node) return False
def heads(
self)
def heads(self): return [x for x in self.heads_iter()]
def heads_iter(
self)
Inheritance:
IRCFG
.heads_iter
def heads_iter(self): for node in self._nodes: if not self._nodes_pred[node]: yield node
def leaves(
self)
def leaves(self): return [x for x in self.leaves_iter()]
def leaves_iter(
self)
Inheritance:
IRCFG
.leaves_iter
def leaves_iter(self): for node in self._nodes: if not self._nodes_succ[node]: yield node
def merge(
self, graph)
Merge the current graph with @graph @graph: DiGraph instance
def merge(self, graph): """Merge the current graph with @graph @graph: DiGraph instance """ for node in graph._nodes: self.add_node(node) for edge in graph._edges: self.add_edge(*edge)
def node2lines(
self, node)
Inheritance:
IRCFG
.node2lines
def node2lines(self, node): if self.loc_db is None: node_name = str(node) else: names = self.loc_db.get_location_names(node) if not names: node_name = self.loc_db.pretty_str(node) else: node_name = "".join("%s:\n" % name for name in names) yield self.DotCellDescription( text="%s" % node_name, attr={ 'align': 'center', 'colspan': 2, 'bgcolor': 'grey', } ) if node not in self._blocks: yield [self.DotCellDescription(text="NOT PRESENT", attr={})] raise StopIteration for i, assignblk in enumerate(self._blocks[node]): for dst, src in assignblk.iteritems(): new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) line = "%s = %s" % (new_dst, new_src) if self._dot_offset: yield [self.DotCellDescription(text="%-4d" % i, attr={}), self.DotCellDescription(text=line, attr={})] else: yield self.DotCellDescription(text=line, attr={}) yield self.DotCellDescription(text="", attr={})
def node_attr(
self, node)
def node_attr(self, node): if node not in self._blocks: return {'style': 'filled', 'fillcolor': 'red'} return {}
def nodeid(
self, node)
Returns uniq id for a @node @node: a node of the graph
def nodeid(self, node): """ Returns uniq id for a @node @node: a node of the graph """ return hash(node) & 0xFFFFFFFFFFFFFFFF
def predecessors(
self, node)
Inheritance:
IRCFG
.predecessors
def predecessors(self, node): return [x for x in self.predecessors_iter(node)]
def predecessors_iter(
self, node)
Inheritance:
IRCFG
.predecessors_iter
def predecessors_iter(self, node): if not node in self._nodes_pred: raise StopIteration for n_pred in self._nodes_pred[node]: yield n_pred
def predecessors_stop_node_iter(
self, node, head)
Inheritance:
IRCFG
.predecessors_stop_node_iter
def predecessors_stop_node_iter(self, node, head): if node == head: raise StopIteration for next_node in self.predecessors_iter(node): yield next_node
def reachable_parents(
self, leaf)
Inheritance:
IRCFG
.reachable_parents
Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf
def reachable_parents(self, leaf): """Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf""" return self._reachable_nodes(leaf, self.predecessors_iter)
def reachable_parents_stop_node(
self, leaf, head)
Inheritance:
IRCFG
.reachable_parents_stop_node
Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf. Do not compute reachables past @head node
def reachable_parents_stop_node(self, leaf, head): """Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf. Do not compute reachables past @head node""" return self._reachable_nodes( leaf, lambda node_cur: self.predecessors_stop_node_iter( node_cur, head ) )
def reachable_sons(
self, head)
Inheritance:
IRCFG
.reachable_sons
Compute all nodes reachable from node @head. Each son is an immediate successor of an arbitrary, already yielded son of @head
def reachable_sons(self, head): """Compute all nodes reachable from node @head. Each son is an immediate successor of an arbitrary, already yielded son of @head""" return self._reachable_nodes(head, self.successors_iter)
def replace_expr_in_ir(
self, block, replaced)
Inheritance:
IRCFG
.replace_expr_in_ir
def replace_expr_in_ir(self, block, replaced): for assignblk in block: for dst, src in assignblk.items(): del assignblk[dst] assignblk[dst.replace_expr(replaced)] = src.replace_expr(replaced)
def simplify(
self, simplifier)
Simplify expressions in each irblocks @simplifier: ExpressionSimplifier instance
def simplify(self, simplifier): """ Simplify expressions in each irblocks @simplifier: ExpressionSimplifier instance """ modified = False for loc_key, block in self.blocks.iteritems(): assignblks = [] for assignblk in block: new_assignblk = assignblk.simplify(simplifier) if assignblk != new_assignblk: modified = True assignblks.append(new_assignblk) self.blocks[loc_key] = IRBlock(loc_key, assignblks) return modified
def successors(
self, node)
Inheritance:
IRCFG
.successors
def successors(self, node): return [x for x in self.successors_iter(node)]
def successors_iter(
self, node)
Inheritance:
IRCFG
.successors_iter
def successors_iter(self, node): if not node in self._nodes_succ: raise StopIteration for n_suc in self._nodes_succ[node]: yield n_suc
def walk_breadth_first_backward(
self, head)
Inheritance:
IRCFG
.walk_breadth_first_backward
Performs a breadth first search on the reversed graph from @head
def walk_breadth_first_backward(self, head): """Performs a breadth first search on the reversed graph from @head""" return self._walk_generic_first(head, 0, self.predecessors_iter)
def walk_breadth_first_forward(
self, head)
Inheritance:
IRCFG
.walk_breadth_first_forward
Performs a breadth first search on the graph from @head
def walk_breadth_first_forward(self, head): """Performs a breadth first search on the graph from @head""" return self._walk_generic_first(head, 0, self.successors_iter)
def walk_depth_first_backward(
self, head)
Inheritance:
IRCFG
.walk_depth_first_backward
Performs a depth first search on the reversed graph from @head
def walk_depth_first_backward(self, head): """Performs a depth first search on the reversed graph from @head""" return self._walk_generic_first(head, -1, self.predecessors_iter)
def walk_depth_first_forward(
self, head)
Inheritance:
IRCFG
.walk_depth_first_forward
Performs a depth first search on the graph from @head
def walk_depth_first_forward(self, head): """Performs a depth first search on the graph from @head""" return self._walk_generic_first(head, -1, self.successors_iter)
def walk_dominators(
self, node, dominators)
Inheritance:
IRCFG
.walk_dominators
Return an iterator of the ordered list of @node's dominators The function doesn't return the self reference in dominators. @node: The start node @dominators: The dictionary containing at least node's dominators
def walk_dominators(self, node, dominators): """Return an iterator of the ordered list of @node's dominators The function doesn't return the self reference in dominators. @node: The start node @dominators: The dictionary containing at least node's dominators """ return self._walk_generic_dominator(node, dominators, self.predecessors_iter)
def walk_postdominators(
self, node, postdominators)
Inheritance:
IRCFG
.walk_postdominators
Return an iterator of the ordered list of @node's postdominators The function doesn't return the self reference in postdominators. @node: The start node @postdominators: The dictionary containing at least node's postdominators
def walk_postdominators(self, node, postdominators): """Return an iterator of the ordered list of @node's postdominators The function doesn't return the self reference in postdominators. @node: The start node @postdominators: The dictionary containing at least node's postdominators """ return self._walk_generic_dominator(node, postdominators, self.successors_iter)
class IRBlock
Intermediate representation block object.
Stand for an intermediate representation basic block.
class IRBlock(object): """Intermediate representation block object. Stand for an intermediate representation basic block. """ __slots__ = ["_loc_key", "_assignblks", "_dst", "_dst_linenb"] def __init__(self, loc_key, assignblks): """ @loc_key: LocKey of the IR basic block @assignblks: list of AssignBlock """ assert isinstance(loc_key, m2_expr.LocKey) self._loc_key = loc_key for assignblk in assignblks: assert isinstance(assignblk, AssignBlock) self._assignblks = tuple(assignblks) self._dst = None self._dst_linenb = None def __eq__(self, other): if self.__class__ is not other.__class__: return False if self.loc_key != other.loc_key: return False if len(self.assignblks) != len(other.assignblks): return False for assignblk1, assignblk2 in zip(self.assignblks, other.assignblks): if assignblk1 != assignblk2: return False return True def __ne__(self, other): return not self.__eq__(other) def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key loc_key = property(lambda self:self._loc_key) label = property(get_label) @property def assignblks(self): return self._assignblks @property def irs(self): warnings.warn('DEPRECATION WARNING: use "irblock.assignblks" instead of "irblock.irs"') return self._assignblks def __iter__(self): """Iterate on assignblks""" return self._assignblks.__iter__() def __getitem__(self, index): """Getitem on assignblks""" return self._assignblks.__getitem__(index) def __len__(self): """Length of assignblks""" return self._assignblks.__len__() def is_dst_set(self): return self._dst is not None def cache_dst(self): final_dst = None final_linenb = None for linenb, assignblk in enumerate(self): for dst, src in assignblk.iteritems(): if dst.is_id("IRDst"): if final_dst is not None: raise ValueError('Multiple destinations!') final_dst = src final_linenb = linenb self._dst = final_dst self._dst_linenb = final_linenb return final_dst @property def dst(self): """Return the value of IRDst for the IRBlock""" if self.is_dst_set(): return self._dst return self.cache_dst() def set_dst(self, value): """Generate a new IRBlock with a dst (IRBlock) fixed to @value""" irs = [] dst_found = False for assignblk in self: new_assignblk = {} for dst, src in assignblk.iteritems(): if dst.is_id("IRDst"): assert dst_found is False dst_found = True new_assignblk[dst] = value else: new_assignblk[dst] = src irs.append(AssignBlock(new_assignblk, assignblk.instr)) return IRBlock(self.loc_key, irs) @property def dst_linenb(self): """Line number of the IRDst setting statement in the current irs""" if not self.is_dst_set(): self.cache_dst() return self._dst_linenb def __str__(self): out = [] out.append(str(self.loc_key)) for assignblk in self: for dst, src in assignblk.iteritems(): out.append('\t%s = %s' % (dst, src)) out.append("") return "\n".join(out) def modify_exprs(self, mod_dst=None, mod_src=None): """ Generate a new IRBlock with its AssignBlock expressions modified according to @mod_dst and @mod_src @mod_dst: function called to modify Expression destination @mod_src: function called to modify Expression source """ if mod_dst is None: mod_dst = lambda expr:expr if mod_src is None: mod_src = lambda expr:expr assignblks = [] for assignblk in self: new_assignblk = {} for dst, src in assignblk.iteritems(): new_assignblk[mod_dst(dst)] = mod_src(src) assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) return IRBlock(self.loc_key, assignblks) def to_string(self, loc_db=None): out = [] if loc_db is None: node_name = "%s:" % self.loc_key else: names = loc_db.get_location_names(self.loc_key) if not names: node_name = "%s:" % loc_db.pretty_str(self.loc_key) else: node_name = "".join("%s:\n" % name for name in names) out.append(node_name) for assignblk in self: out.append(assignblk.to_string(loc_db)) return '\n'.join(out) def simplify(self, simplifier): """ Simplify expressions in each assignblock @simplifier: ExpressionSimplifier instance """ modified = False assignblks = [] for assignblk in self: new_assignblk = assignblk.simplify(simplifier) if assignblk != new_assignblk: modified = True assignblks.append(new_assignblk) return modified, IRBlock(self.loc_key, assignblks)
Ancestors (in MRO)
- IRBlock
- __builtin__.object
Class variables
var label
var loc_key
Instance variables
var assignblks
var dst
Return the value of IRDst for the IRBlock
var dst_linenb
Line number of the IRDst setting statement in the current irs
var irs
var label
var loc_key
Methods
def __init__(
self, loc_key, assignblks)
@loc_key: LocKey of the IR basic block @assignblks: list of AssignBlock
def __init__(self, loc_key, assignblks): """ @loc_key: LocKey of the IR basic block @assignblks: list of AssignBlock """ assert isinstance(loc_key, m2_expr.LocKey) self._loc_key = loc_key for assignblk in assignblks: assert isinstance(assignblk, AssignBlock) self._assignblks = tuple(assignblks) self._dst = None self._dst_linenb = None
def cache_dst(
self)
def cache_dst(self): final_dst = None final_linenb = None for linenb, assignblk in enumerate(self): for dst, src in assignblk.iteritems(): if dst.is_id("IRDst"): if final_dst is not None: raise ValueError('Multiple destinations!') final_dst = src final_linenb = linenb self._dst = final_dst self._dst_linenb = final_linenb return final_dst
def get_label(
self)
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def is_dst_set(
self)
def is_dst_set(self): return self._dst is not None
def modify_exprs(
self, mod_dst=None, mod_src=None)
Generate a new IRBlock with its AssignBlock expressions modified according to @mod_dst and @mod_src @mod_dst: function called to modify Expression destination @mod_src: function called to modify Expression source
def modify_exprs(self, mod_dst=None, mod_src=None): """ Generate a new IRBlock with its AssignBlock expressions modified according to @mod_dst and @mod_src @mod_dst: function called to modify Expression destination @mod_src: function called to modify Expression source """ if mod_dst is None: mod_dst = lambda expr:expr if mod_src is None: mod_src = lambda expr:expr assignblks = [] for assignblk in self: new_assignblk = {} for dst, src in assignblk.iteritems(): new_assignblk[mod_dst(dst)] = mod_src(src) assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) return IRBlock(self.loc_key, assignblks)
def set_dst(
self, value)
Generate a new IRBlock with a dst (IRBlock) fixed to @value
def set_dst(self, value): """Generate a new IRBlock with a dst (IRBlock) fixed to @value""" irs = [] dst_found = False for assignblk in self: new_assignblk = {} for dst, src in assignblk.iteritems(): if dst.is_id("IRDst"): assert dst_found is False dst_found = True new_assignblk[dst] = value else: new_assignblk[dst] = src irs.append(AssignBlock(new_assignblk, assignblk.instr)) return IRBlock(self.loc_key, irs)
def simplify(
self, simplifier)
Simplify expressions in each assignblock @simplifier: ExpressionSimplifier instance
def simplify(self, simplifier): """ Simplify expressions in each assignblock @simplifier: ExpressionSimplifier instance """ modified = False assignblks = [] for assignblk in self: new_assignblk = assignblk.simplify(simplifier) if assignblk != new_assignblk: modified = True assignblks.append(new_assignblk) return modified, IRBlock(self.loc_key, assignblks)
def to_string(
self, loc_db=None)
def to_string(self, loc_db=None): out = [] if loc_db is None: node_name = "%s:" % self.loc_key else: names = loc_db.get_location_names(self.loc_key) if not names: node_name = "%s:" % loc_db.pretty_str(self.loc_key) else: node_name = "".join("%s:\n" % name for name in names) out.append(node_name) for assignblk in self: out.append(assignblk.to_string(loc_db)) return '\n'.join(out)
class IRCFG
DiGraph for IR instances
class IRCFG(DiGraph): """DiGraph for IR instances""" def __init__(self, irdst, loc_db, blocks=None, *args, **kwargs): """Instanciate a IRCFG @loc_db: LocationDB instance @blocks: IR blocks """ self.loc_db = loc_db if blocks is None: blocks = {} self._blocks = blocks self._irdst = irdst super(IRCFG, self).__init__(*args, **kwargs) @property def IRDst(self): return self._irdst @property def blocks(self): return self._blocks def add_irblock(self, irblock): """ Add the @irblock to the current IRCFG @irblock: IRBlock instance """ self.blocks[irblock.loc_key] = irblock self.add_node(irblock.loc_key) for dst in self.dst_trackback(irblock): if dst.is_int(): dst_loc_key = self.loc_db.get_or_create_offset_location(int(dst)) dst = m2_expr.ExprLoc(dst_loc_key, irblock.dst.size) if dst.is_loc(): self.add_uniq_edge(irblock.loc_key, dst.loc_key) def node2lines(self, node): if self.loc_db is None: node_name = str(node) else: names = self.loc_db.get_location_names(node) if not names: node_name = self.loc_db.pretty_str(node) else: node_name = "".join("%s:\n" % name for name in names) yield self.DotCellDescription( text="%s" % node_name, attr={ 'align': 'center', 'colspan': 2, 'bgcolor': 'grey', } ) if node not in self._blocks: yield [self.DotCellDescription(text="NOT PRESENT", attr={})] raise StopIteration for i, assignblk in enumerate(self._blocks[node]): for dst, src in assignblk.iteritems(): new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) line = "%s = %s" % (new_dst, new_src) if self._dot_offset: yield [self.DotCellDescription(text="%-4d" % i, attr={}), self.DotCellDescription(text=line, attr={})] else: yield self.DotCellDescription(text=line, attr={}) yield self.DotCellDescription(text="", attr={}) def edge_attr(self, src, dst): if src not in self._blocks or dst not in self._blocks: return {} src_irdst = self._blocks[src].dst edge_color = "blue" if isinstance(src_irdst, m2_expr.ExprCond): src1, src2 = src_irdst.src1, src_irdst.src2 if src1.is_loc(dst): edge_color = "limegreen" elif src2.is_loc(dst): edge_color = "red" return {"color": edge_color} def node_attr(self, node): if node not in self._blocks: return {'style': 'filled', 'fillcolor': 'red'} return {} def dot(self, offset=False): """ @offset: (optional) if set, add the corresponding line number in each node """ self._dot_offset = offset return super(IRCFG, self).dot() def get_loc_key(self, addr): """Transforms an ExprId/ExprInt/loc_key/int into a loc_key @addr: an ExprId/ExprInt/loc_key/int""" if isinstance(addr, m2_expr.LocKey): return addr elif isinstance(addr, m2_expr.ExprLoc): return addr.loc_key try: addr = int(addr) except (ValueError, TypeError): return None return self.loc_db.get_offset_location(addr) def get_or_create_loc_key(self, addr): """Transforms an ExprId/ExprInt/loc_key/int into a loc_key If the offset @addr is not in the LocationDB, create it @addr: an ExprId/ExprInt/loc_key/int""" loc_key = self.get_loc_key(addr) if loc_key is not None: return loc_key return self.loc_db.add_location(offset=int(addr)) def get_block(self, addr): """Returns the irbloc associated to an ExprId/ExprInt/loc_key/int @addr: an ExprId/ExprInt/loc_key/int""" loc_key = self.get_loc_key(addr) if loc_key is None: return None return self.blocks.get(loc_key, None) def getby_offset(self, offset): out = set() for irb in self.blocks.values(): for assignblk in irb: instr = assignblk.instr if instr is None: continue if instr.offset <= offset < instr.offset + instr.l: out.add(irb) return out def simplify(self, simplifier): """ Simplify expressions in each irblocks @simplifier: ExpressionSimplifier instance """ modified = False for loc_key, block in self.blocks.iteritems(): assignblks = [] for assignblk in block: new_assignblk = assignblk.simplify(simplifier) if assignblk != new_assignblk: modified = True assignblks.append(new_assignblk) self.blocks[loc_key] = IRBlock(loc_key, assignblks) return modified def replace_expr_in_ir(self, block, replaced): for assignblk in block: for dst, src in assignblk.items(): del assignblk[dst] assignblk[dst.replace_expr(replaced)] = src.replace_expr(replaced) def get_rw(self, regs_ids=None): """ Calls get_rw(irb) for each bloc @regs_ids : ids of registers used in IR """ if regs_ids is None: regs_ids = [] for irblock in self.blocks.values(): irblock.get_rw(regs_ids) def _extract_dst(self, todo, done): """ Naive extraction of @todo destinations WARNING: @todo and @done are modified """ out = set() while todo: dst = todo.pop() if dst.is_loc(): done.add(dst) elif dst.is_mem() or dst.is_int(): done.add(dst) elif dst.is_cond(): todo.add(dst.src1) todo.add(dst.src2) elif dst.is_id(): out.add(dst) else: done.add(dst) return out def dst_trackback(self, irb): """ Naive backtracking of IRDst @irb: irbloc instance """ todo = set([irb.dst]) done = set() for assignblk in reversed(irb): if not todo: break out = self._extract_dst(todo, done) found = set() follow = set() for dst in out: if dst in assignblk: follow.add(assignblk[dst]) found.add(dst) follow.update(out.difference(found)) todo = follow return done
Ancestors (in MRO)
- IRCFG
- miasm2.core.graph.DiGraph
- __builtin__.object
Class variables
var DotCellDescription
Instance variables
var IRDst
var blocks
var loc_db
Methods
def __init__(
self, irdst, loc_db, blocks=None, *args, **kwargs)
Instanciate a IRCFG @loc_db: LocationDB instance @blocks: IR blocks
def __init__(self, irdst, loc_db, blocks=None, *args, **kwargs): """Instanciate a IRCFG @loc_db: LocationDB instance @blocks: IR blocks """ self.loc_db = loc_db if blocks is None: blocks = {} self._blocks = blocks self._irdst = irdst super(IRCFG, self).__init__(*args, **kwargs)
def add_edge(
self, src, dst)
def add_edge(self, src, dst): if not src in self._nodes: self.add_node(src) if not dst in self._nodes: self.add_node(dst) self._edges.append((src, dst)) self._nodes_succ[src].append(dst) self._nodes_pred[dst].append(src)
def add_irblock(
self, irblock)
Add the @irblock to the current IRCFG @irblock: IRBlock instance
def add_irblock(self, irblock): """ Add the @irblock to the current IRCFG @irblock: IRBlock instance """ self.blocks[irblock.loc_key] = irblock self.add_node(irblock.loc_key) for dst in self.dst_trackback(irblock): if dst.is_int(): dst_loc_key = self.loc_db.get_or_create_offset_location(int(dst)) dst = m2_expr.ExprLoc(dst_loc_key, irblock.dst.size) if dst.is_loc(): self.add_uniq_edge(irblock.loc_key, dst.loc_key)
def add_node(
self, node)
Add the node @node to the graph. If the node was already present, return False. Otherwise, return True
def add_node(self, node): """Add the node @node to the graph. If the node was already present, return False. Otherwise, return True """ if node in self._nodes: return False self._nodes.add(node) self._nodes_succ[node] = [] self._nodes_pred[node] = [] return True
def add_uniq_edge(
self, src, dst)
Add an edge from @src to @dst if it doesn't already exist
def add_uniq_edge(self, src, dst): """Add an edge from @src to @dst if it doesn't already exist""" if (src not in self._nodes_succ or dst not in self._nodes_succ[src]): self.add_edge(src, dst)
def compute_back_edges(
self, head)
Computes all back edges from a node to a dominator in the graph. :param head: head of graph :return: yield a back edge
def compute_back_edges(self, head): """ Computes all back edges from a node to a dominator in the graph. :param head: head of graph :return: yield a back edge """ dominators = self.compute_dominators(head) # traverse graph for node in self.walk_depth_first_forward(head): for successor in self.successors_iter(node): # check for a back edge to a dominator if successor in dominators[node]: edge = (node, successor) yield edge
def compute_dominance_frontier(
self, head)
Compute the dominance frontier of the graph
Source: Cooper, Keith D., Timothy J. Harvey, and Ken Kennedy. "A simple, fast dominance algorithm." Software Practice & Experience 4 (2001), p. 9
def compute_dominance_frontier(self, head): """ Compute the dominance frontier of the graph Source: Cooper, Keith D., Timothy J. Harvey, and Ken Kennedy. "A simple, fast dominance algorithm." Software Practice & Experience 4 (2001), p. 9 """ idoms = self.compute_immediate_dominators(head) frontier = {} for node in idoms: if self._nodes_pred[node] >= 2: for predecessor in self.predecessors_iter(node): runner = predecessor if runner not in idoms: continue while runner != idoms[node]: if runner not in frontier: frontier[runner] = set() frontier[runner].add(node) runner = idoms[runner] return frontier
def compute_dominator_tree(
self, head)
Computes the dominator tree of a graph :param head: head of graph :return: DiGraph
def compute_dominator_tree(self, head): """ Computes the dominator tree of a graph :param head: head of graph :return: DiGraph """ idoms = self.compute_immediate_dominators(head) dominator_tree = DiGraph() for node in idoms: dominator_tree.add_edge(idoms[node], node) return dominator_tree
def compute_dominators(
self, head)
Compute the dominators of the graph
def compute_dominators(self, head): """Compute the dominators of the graph""" return self._compute_generic_dominators(head, self.reachable_sons, self.predecessors_iter, self.successors_iter)
def compute_immediate_dominators(
self, head)
Compute the immediate dominators of the graph
def compute_immediate_dominators(self, head): """Compute the immediate dominators of the graph""" dominators = self.compute_dominators(head) idoms = {} for node in dominators: for predecessor in self.walk_dominators(node, dominators): if predecessor in dominators[node] and node != predecessor: idoms[node] = predecessor break return idoms
def compute_natural_loops(
self, head)
Computes all natural loops in the graph.
Source: Aho, Alfred V., Lam, Monica S., Sethi, R. and Jeffrey Ullman. "Compilers: Principles, Techniques, & Tools, Second Edition" Pearson/Addison Wesley (2007), Chapter 9.6.6 :param head: head of the graph :return: yield a tuple of the form (back edge, loop body)
def compute_natural_loops(self, head): """ Computes all natural loops in the graph. Source: Aho, Alfred V., Lam, Monica S., Sethi, R. and Jeffrey Ullman. "Compilers: Principles, Techniques, & Tools, Second Edition" Pearson/Addison Wesley (2007), Chapter 9.6.6 :param head: head of the graph :return: yield a tuple of the form (back edge, loop body) """ for a, b in self.compute_back_edges(head): body = self._compute_natural_loop_body(b, a) yield ((a, b), body)
def compute_postdominators(
self, leaf)
Compute the postdominators of the graph
def compute_postdominators(self, leaf): """Compute the postdominators of the graph""" return self._compute_generic_dominators(leaf, self.reachable_parents, self.successors_iter, self.predecessors_iter)
def compute_strongly_connected_components(
self)
Partitions the graph into strongly connected components.
Iterative implementation of Gabow's path-based SCC algorithm. Source: Gabow, Harold N. "Path-based depth-first search for strong and biconnected components." Information Processing Letters 74.3 (2000), pp. 109--110
The iterative implementation is inspired by Mark Dickinson's code: http://code.activestate.com/recipes/ 578507-strongly-connected-components-of-a-directed-graph/ :return: yield a strongly connected component
def compute_strongly_connected_components(self): """ Partitions the graph into strongly connected components. Iterative implementation of Gabow's path-based SCC algorithm. Source: Gabow, Harold N. "Path-based depth-first search for strong and biconnected components." Information Processing Letters 74.3 (2000), pp. 109--110 The iterative implementation is inspired by Mark Dickinson's code: http://code.activestate.com/recipes/ 578507-strongly-connected-components-of-a-directed-graph/ :return: yield a strongly connected component """ stack = [] boundaries = [] counter = len(self.nodes()) # init index with 0 index = {v: 0 for v in self.nodes()} # state machine for worklist algorithm VISIT, HANDLE_RECURSION, MERGE = 0, 1, 2 NodeState = namedtuple('NodeState', ['state', 'node']) for node in self.nodes(): # next node if node was already visited if index[node]: continue todo = [NodeState(VISIT, node)] done = set() while todo: current = todo.pop() if current.node in done: continue # node is unvisited if current.state == VISIT: stack.append(current.node) index[current.node] = len(stack) boundaries.append(index[current.node]) todo.append(NodeState(MERGE, current.node)) # follow successors for successor in self.successors_iter(current.node): todo.append(NodeState(HANDLE_RECURSION, successor)) # iterative handling of recursion algorithm elif current.state == HANDLE_RECURSION: # visit unvisited successor if index[current.node] == 0: todo.append(NodeState(VISIT, current.node)) else: # contract cycle if necessary while index[current.node] < boundaries[-1]: boundaries.pop() # merge strongly connected component else: if index[current.node] == boundaries[-1]: boundaries.pop() counter += 1 scc = set() while index[current.node] <= len(stack): popped = stack.pop() index[popped] = counter scc.add(popped) done.add(current.node) yield scc
def copy(
self)
Copy the current graph instance
def copy(self): """Copy the current graph instance""" graph = self.__class__() return graph + self
def del_edge(
self, src, dst)
def del_edge(self, src, dst): self._edges.remove((src, dst)) self._nodes_succ[src].remove(dst) self._nodes_pred[dst].remove(src)
def del_node(
self, node)
Delete the @node of the graph; Also delete every edge to/from this @node
def del_node(self, node): """Delete the @node of the graph; Also delete every edge to/from this @node""" if node in self._nodes: self._nodes.remove(node) for pred in self.predecessors(node): self.del_edge(pred, node) for succ in self.successors(node): self.del_edge(node, succ)
def discard_edge(
self, src, dst)
Remove edge between @src and @dst if it exits
def discard_edge(self, src, dst): """Remove edge between @src and @dst if it exits""" if (src, dst) in self._edges: self.del_edge(src, dst)
def dot(
self, offset=False)
@offset: (optional) if set, add the corresponding line number in each node
def dot(self, offset=False): """ @offset: (optional) if set, add the corresponding line number in each node """ self._dot_offset = offset return super(IRCFG, self).dot()
def dst_trackback(
self, irb)
Naive backtracking of IRDst @irb: irbloc instance
def dst_trackback(self, irb): """ Naive backtracking of IRDst @irb: irbloc instance """ todo = set([irb.dst]) done = set() for assignblk in reversed(irb): if not todo: break out = self._extract_dst(todo, done) found = set() follow = set() for dst in out: if dst in assignblk: follow.add(assignblk[dst]) found.add(dst) follow.update(out.difference(found)) todo = follow return done
def edge_attr(
self, src, dst)
def edge_attr(self, src, dst): if src not in self._blocks or dst not in self._blocks: return {} src_irdst = self._blocks[src].dst edge_color = "blue" if isinstance(src_irdst, m2_expr.ExprCond): src1, src2 = src_irdst.src1, src_irdst.src2 if src1.is_loc(dst): edge_color = "limegreen" elif src2.is_loc(dst): edge_color = "red" return {"color": edge_color}
def edges(
self)
def edges(self): return self._edges
def find_path(
self, src, dst, cycles_count=0, done=None)
def find_path(self, src, dst, cycles_count=0, done=None): if done is None: done = {} if dst in done and done[dst] > cycles_count: return [[]] if src == dst: return [[src]] out = [] for node in self.predecessors(dst): done_n = dict(done) done_n[dst] = done_n.get(dst, 0) + 1 for path in self.find_path(src, node, cycles_count, done_n): if path and path[0] == src: out.append(path + [dst]) return out
def get_block(
self, addr)
Returns the irbloc associated to an ExprId/ExprInt/loc_key/int @addr: an ExprId/ExprInt/loc_key/int
def get_block(self, addr): """Returns the irbloc associated to an ExprId/ExprInt/loc_key/int @addr: an ExprId/ExprInt/loc_key/int""" loc_key = self.get_loc_key(addr) if loc_key is None: return None return self.blocks.get(loc_key, None)
def get_loc_key(
self, addr)
Transforms an ExprId/ExprInt/loc_key/int into a loc_key @addr: an ExprId/ExprInt/loc_key/int
def get_loc_key(self, addr): """Transforms an ExprId/ExprInt/loc_key/int into a loc_key @addr: an ExprId/ExprInt/loc_key/int""" if isinstance(addr, m2_expr.LocKey): return addr elif isinstance(addr, m2_expr.ExprLoc): return addr.loc_key try: addr = int(addr) except (ValueError, TypeError): return None return self.loc_db.get_offset_location(addr)
def get_or_create_loc_key(
self, addr)
Transforms an ExprId/ExprInt/loc_key/int into a loc_key If the offset @addr is not in the LocationDB, create it @addr: an ExprId/ExprInt/loc_key/int
def get_or_create_loc_key(self, addr): """Transforms an ExprId/ExprInt/loc_key/int into a loc_key If the offset @addr is not in the LocationDB, create it @addr: an ExprId/ExprInt/loc_key/int""" loc_key = self.get_loc_key(addr) if loc_key is not None: return loc_key return self.loc_db.add_location(offset=int(addr))
def get_rw(
self, regs_ids=None)
Calls get_rw(irb) for each bloc @regs_ids : ids of registers used in IR
def get_rw(self, regs_ids=None): """ Calls get_rw(irb) for each bloc @regs_ids : ids of registers used in IR """ if regs_ids is None: regs_ids = [] for irblock in self.blocks.values(): irblock.get_rw(regs_ids)
def getby_offset(
self, offset)
def getby_offset(self, offset): out = set() for irb in self.blocks.values(): for assignblk in irb: instr = assignblk.instr if instr is None: continue if instr.offset <= offset < instr.offset + instr.l: out.add(irb) return out
def has_loop(
self)
Return True if the graph contains at least a cycle
def has_loop(self): """Return True if the graph contains at least a cycle""" todo = list(self.nodes()) # tested nodes done = set() # current DFS nodes current = set() while todo: node = todo.pop() if node in done: continue if node in current: # DFS branch end for succ in self.successors_iter(node): if succ in current: return True # A node cannot be in current AND in done current.remove(node) done.add(node) else: # Launch DFS from node todo.append(node) current.add(node) todo += self.successors(node) return False
def heads(
self)
def heads(self): return [x for x in self.heads_iter()]
def heads_iter(
self)
def heads_iter(self): for node in self._nodes: if not self._nodes_pred[node]: yield node
def leaves(
self)
def leaves(self): return [x for x in self.leaves_iter()]
def leaves_iter(
self)
def leaves_iter(self): for node in self._nodes: if not self._nodes_succ[node]: yield node
def merge(
self, graph)
Merge the current graph with @graph @graph: DiGraph instance
def merge(self, graph): """Merge the current graph with @graph @graph: DiGraph instance """ for node in graph._nodes: self.add_node(node) for edge in graph._edges: self.add_edge(*edge)
def node2lines(
self, node)
def node2lines(self, node): if self.loc_db is None: node_name = str(node) else: names = self.loc_db.get_location_names(node) if not names: node_name = self.loc_db.pretty_str(node) else: node_name = "".join("%s:\n" % name for name in names) yield self.DotCellDescription( text="%s" % node_name, attr={ 'align': 'center', 'colspan': 2, 'bgcolor': 'grey', } ) if node not in self._blocks: yield [self.DotCellDescription(text="NOT PRESENT", attr={})] raise StopIteration for i, assignblk in enumerate(self._blocks[node]): for dst, src in assignblk.iteritems(): new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) line = "%s = %s" % (new_dst, new_src) if self._dot_offset: yield [self.DotCellDescription(text="%-4d" % i, attr={}), self.DotCellDescription(text=line, attr={})] else: yield self.DotCellDescription(text=line, attr={}) yield self.DotCellDescription(text="", attr={})
def node_attr(
self, node)
def node_attr(self, node): if node not in self._blocks: return {'style': 'filled', 'fillcolor': 'red'} return {}
def nodeid(
self, node)
Returns uniq id for a @node @node: a node of the graph
def nodeid(self, node): """ Returns uniq id for a @node @node: a node of the graph """ return hash(node) & 0xFFFFFFFFFFFFFFFF
def nodes(
self)
def nodes(self): return self._nodes
def predecessors(
self, node)
def predecessors(self, node): return [x for x in self.predecessors_iter(node)]
def predecessors_iter(
self, node)
def predecessors_iter(self, node): if not node in self._nodes_pred: raise StopIteration for n_pred in self._nodes_pred[node]: yield n_pred
def predecessors_stop_node_iter(
self, node, head)
def predecessors_stop_node_iter(self, node, head): if node == head: raise StopIteration for next_node in self.predecessors_iter(node): yield next_node
def reachable_parents(
self, leaf)
Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf
def reachable_parents(self, leaf): """Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf""" return self._reachable_nodes(leaf, self.predecessors_iter)
def reachable_parents_stop_node(
self, leaf, head)
Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf. Do not compute reachables past @head node
def reachable_parents_stop_node(self, leaf, head): """Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf. Do not compute reachables past @head node""" return self._reachable_nodes( leaf, lambda node_cur: self.predecessors_stop_node_iter( node_cur, head ) )
def reachable_sons(
self, head)
Compute all nodes reachable from node @head. Each son is an immediate successor of an arbitrary, already yielded son of @head
def reachable_sons(self, head): """Compute all nodes reachable from node @head. Each son is an immediate successor of an arbitrary, already yielded son of @head""" return self._reachable_nodes(head, self.successors_iter)
def replace_expr_in_ir(
self, block, replaced)
def replace_expr_in_ir(self, block, replaced): for assignblk in block: for dst, src in assignblk.items(): del assignblk[dst] assignblk[dst.replace_expr(replaced)] = src.replace_expr(replaced)
def simplify(
self, simplifier)
Simplify expressions in each irblocks @simplifier: ExpressionSimplifier instance
def simplify(self, simplifier): """ Simplify expressions in each irblocks @simplifier: ExpressionSimplifier instance """ modified = False for loc_key, block in self.blocks.iteritems(): assignblks = [] for assignblk in block: new_assignblk = assignblk.simplify(simplifier) if assignblk != new_assignblk: modified = True assignblks.append(new_assignblk) self.blocks[loc_key] = IRBlock(loc_key, assignblks) return modified
def successors(
self, node)
def successors(self, node): return [x for x in self.successors_iter(node)]
def successors_iter(
self, node)
def successors_iter(self, node): if not node in self._nodes_succ: raise StopIteration for n_suc in self._nodes_succ[node]: yield n_suc
def walk_breadth_first_backward(
self, head)
Performs a breadth first search on the reversed graph from @head
def walk_breadth_first_backward(self, head): """Performs a breadth first search on the reversed graph from @head""" return self._walk_generic_first(head, 0, self.predecessors_iter)
def walk_breadth_first_forward(
self, head)
Performs a breadth first search on the graph from @head
def walk_breadth_first_forward(self, head): """Performs a breadth first search on the graph from @head""" return self._walk_generic_first(head, 0, self.successors_iter)
def walk_depth_first_backward(
self, head)
Performs a depth first search on the reversed graph from @head
def walk_depth_first_backward(self, head): """Performs a depth first search on the reversed graph from @head""" return self._walk_generic_first(head, -1, self.predecessors_iter)
def walk_depth_first_forward(
self, head)
Performs a depth first search on the graph from @head
def walk_depth_first_forward(self, head): """Performs a depth first search on the graph from @head""" return self._walk_generic_first(head, -1, self.successors_iter)
def walk_dominators(
self, node, dominators)
Return an iterator of the ordered list of @node's dominators The function doesn't return the self reference in dominators. @node: The start node @dominators: The dictionary containing at least node's dominators
def walk_dominators(self, node, dominators): """Return an iterator of the ordered list of @node's dominators The function doesn't return the self reference in dominators. @node: The start node @dominators: The dictionary containing at least node's dominators """ return self._walk_generic_dominator(node, dominators, self.predecessors_iter)
def walk_postdominators(
self, node, postdominators)
Return an iterator of the ordered list of @node's postdominators The function doesn't return the self reference in postdominators. @node: The start node @postdominators: The dictionary containing at least node's postdominators
def walk_postdominators(self, node, postdominators): """Return an iterator of the ordered list of @node's postdominators The function doesn't return the self reference in postdominators. @node: The start node @postdominators: The dictionary containing at least node's postdominators """ return self._walk_generic_dominator(node, postdominators, self.successors_iter)
class IntermediateRepresentation
Intermediate representation object
Allow native assembly to intermediate representation traduction
class IntermediateRepresentation(object): """ Intermediate representation object Allow native assembly to intermediate representation traduction """ def __init__(self, arch, attrib, loc_db): self.pc = arch.getpc(attrib) self.sp = arch.getsp(attrib) self.arch = arch self.attrib = attrib self.loc_db = loc_db self.IRDst = None def get_ir(self, instr): raise NotImplementedError("Abstract Method") def new_ircfg(self, *args, **kwargs): """ Return a new instance of IRCFG """ return IRCFG(self.IRDst, self.loc_db, *args, **kwargs) def new_ircfg_from_asmcfg(self, asmcfg, *args, **kwargs): """ Return a new instance of IRCFG from an @asmcfg @asmcfg: AsmCFG instance """ ircfg = IRCFG(self.IRDst, self.loc_db, *args, **kwargs) for block in asmcfg.blocks: self.add_asmblock_to_ircfg(block, ircfg) return ircfg def instr2ir(self, instr): ir_bloc_cur, extra_irblocks = self.get_ir(instr) for index, irb in enumerate(extra_irblocks): irs = [] for assignblk in irb: irs.append(AssignBlock(assignblk, instr)) extra_irblocks[index] = IRBlock(irb.loc_key, irs) assignblk = AssignBlock(ir_bloc_cur, instr) return assignblk, extra_irblocks def add_instr_to_ircfg(self, instr, ircfg, loc_key=None, gen_pc_updt=False): """ Add the native instruction @instr to the @ircfg @instr: instruction instance @ircfg: IRCFG instance @loc_key: loc_key instance of the instruction destination @gen_pc_updt: insert PC update effects between instructions """ if loc_key is None: offset = getattr(instr, "offset", None) loc_key = self.loc_db.add_location(offset=offset) block = AsmBlock(loc_key) block.lines = [instr] self.add_asmblock_to_ircfg(block, ircfg, gen_pc_updt) return loc_key def gen_pc_update(self, assignments, instr): offset = m2_expr.ExprInt(instr.offset, self.pc.size) assignments.append(AssignBlock({self.pc:offset}, instr)) def add_instr_to_current_state(self, instr, block, assignments, ir_blocks_all, gen_pc_updt): """ Add the IR effects of an instruction to the current state. Returns a bool: * True if the current assignments list must be split * False in other cases. @instr: native instruction @block: native block source @assignments: list of current AssignBlocks @ir_blocks_all: list of additional effects @gen_pc_updt: insert PC update effects between instructions """ if gen_pc_updt is not False: self.gen_pc_update(assignments, instr) assignblk, ir_blocks_extra = self.instr2ir(instr) assignments.append(assignblk) ir_blocks_all += ir_blocks_extra if ir_blocks_extra: return True return False def add_asmblock_to_ircfg(self, block, ircfg, gen_pc_updt=False): """ Add a native block to the current IR @block: native assembly block @ircfg: IRCFG instance @gen_pc_updt: insert PC update effects between instructions """ loc_key = block.loc_key ir_blocks_all = [] assignments = [] for instr in block.lines: if loc_key is None: assignments = [] loc_key = self.get_loc_key_for_instr(instr) split = self.add_instr_to_current_state( instr, block, assignments, ir_blocks_all, gen_pc_updt ) if split: ir_blocks_all.append(IRBlock(loc_key, assignments)) loc_key = None assignments = [] if loc_key is not None: ir_blocks_all.append(IRBlock(loc_key, assignments)) new_ir_blocks_all = self.post_add_asmblock_to_ircfg(block, ircfg, ir_blocks_all) for irblock in new_ir_blocks_all: ircfg.add_irblock(irblock) return new_ir_blocks_all def add_block(self, block, gen_pc_updt=False): """ DEPRECATED function Use add_block instead of add_block """ warnings.warn("""DEPRECATION WARNING ircfg is now out of IntermediateRepresentation Use: ircfg = ir_arch.new_ircfg() ir_arch.add_asmblock_to_ircfg(block, ircfg) """) raise RuntimeError("API Deprecated") def add_bloc(self, block, gen_pc_updt=False): """ DEPRECATED function Use add_block instead of add_block """ self.add_block(block, gen_pc_updt) def get_next_loc_key(self, instr): loc_key = self.loc_db.get_or_create_offset_location(instr.offset + instr.l) return loc_key def get_loc_key_for_instr(self, instr): """Returns the loc_key associated to an instruction @instr: current instruction""" return self.loc_db.get_or_create_offset_location(instr.offset) def gen_loc_key_and_expr(self, size): """ Return a loc_key and it's corresponding ExprLoc @size: size of expression """ loc_key = self.loc_db.add_location() return loc_key, m2_expr.ExprLoc(loc_key, size) def expr_fix_regs_for_mode(self, expr, *args, **kwargs): return expr def expraff_fix_regs_for_mode(self, expr, *args, **kwargs): return expr def irbloc_fix_regs_for_mode(self, irblock, *args, **kwargs): return irblock def is_pc_written(self, block): """Return the first Assignblk of the @blockin which PC is written @block: IRBlock instance""" all_pc = self.arch.pc.values() for assignblk in block: if assignblk.dst in all_pc: return assignblk return None def set_empty_dst_to_next(self, block, ir_blocks): for index, irblock in enumerate(ir_blocks): if irblock.dst is not None: continue next_loc_key = block.get_next() if next_loc_key is None: loc_key = None if block.lines: line = block.lines[-1] if line.offset is not None: loc_key = self.loc_db.get_or_create_offset_location(line.offset + line.l) if loc_key is None: loc_key = self.loc_db.add_location() block.add_cst(loc_key, AsmConstraint.c_next) else: loc_key = next_loc_key dst = m2_expr.ExprLoc(loc_key, self.pc.size) if irblock.assignblks: instr = irblock.assignblks[-1].instr else: instr = None assignblk = AssignBlock({self.IRDst: dst}, instr) ir_blocks[index] = IRBlock(irblock.loc_key, list(irblock.assignblks) + [assignblk]) def post_add_asmblock_to_ircfg(self, block, ircfg, ir_blocks): self.set_empty_dst_to_next(block, ir_blocks) new_irblocks = [] for irblock in ir_blocks: new_irblock = self.irbloc_fix_regs_for_mode(irblock, self.attrib) ircfg.add_irblock(new_irblock) new_irblocks.append(new_irblock) return new_irblocks
Ancestors (in MRO)
- IntermediateRepresentation
- __builtin__.object
Instance variables
var IRDst
var arch
var attrib
var loc_db
var pc
var sp
Methods
def __init__(
self, arch, attrib, loc_db)
def __init__(self, arch, attrib, loc_db): self.pc = arch.getpc(attrib) self.sp = arch.getsp(attrib) self.arch = arch self.attrib = attrib self.loc_db = loc_db self.IRDst = None
def add_asmblock_to_ircfg(
self, block, ircfg, gen_pc_updt=False)
Add a native block to the current IR @block: native assembly block @ircfg: IRCFG instance @gen_pc_updt: insert PC update effects between instructions
def add_asmblock_to_ircfg(self, block, ircfg, gen_pc_updt=False): """ Add a native block to the current IR @block: native assembly block @ircfg: IRCFG instance @gen_pc_updt: insert PC update effects between instructions """ loc_key = block.loc_key ir_blocks_all = [] assignments = [] for instr in block.lines: if loc_key is None: assignments = [] loc_key = self.get_loc_key_for_instr(instr) split = self.add_instr_to_current_state( instr, block, assignments, ir_blocks_all, gen_pc_updt ) if split: ir_blocks_all.append(IRBlock(loc_key, assignments)) loc_key = None assignments = [] if loc_key is not None: ir_blocks_all.append(IRBlock(loc_key, assignments)) new_ir_blocks_all = self.post_add_asmblock_to_ircfg(block, ircfg, ir_blocks_all) for irblock in new_ir_blocks_all: ircfg.add_irblock(irblock) return new_ir_blocks_all
def add_bloc(
self, block, gen_pc_updt=False)
DEPRECATED function Use add_block instead of add_block
def add_bloc(self, block, gen_pc_updt=False): """ DEPRECATED function Use add_block instead of add_block """ self.add_block(block, gen_pc_updt)
def add_block(
self, block, gen_pc_updt=False)
DEPRECATED function Use add_block instead of add_block
def add_block(self, block, gen_pc_updt=False): """ DEPRECATED function Use add_block instead of add_block """ warnings.warn("""DEPRECATION WARNING ircfg is now out of IntermediateRepresentation Use: ircfg = ir_arch.new_ircfg() ir_arch.add_asmblock_to_ircfg(block, ircfg) """) raise RuntimeError("API Deprecated")
def add_instr_to_current_state(
self, instr, block, assignments, ir_blocks_all, gen_pc_updt)
Add the IR effects of an instruction to the current state.
Returns a bool: True if the current assignments list must be split False in other cases.
@instr: native instruction @block: native block source @assignments: list of current AssignBlocks @ir_blocks_all: list of additional effects @gen_pc_updt: insert PC update effects between instructions
def add_instr_to_current_state(self, instr, block, assignments, ir_blocks_all, gen_pc_updt): """ Add the IR effects of an instruction to the current state. Returns a bool: * True if the current assignments list must be split * False in other cases. @instr: native instruction @block: native block source @assignments: list of current AssignBlocks @ir_blocks_all: list of additional effects @gen_pc_updt: insert PC update effects between instructions """ if gen_pc_updt is not False: self.gen_pc_update(assignments, instr) assignblk, ir_blocks_extra = self.instr2ir(instr) assignments.append(assignblk) ir_blocks_all += ir_blocks_extra if ir_blocks_extra: return True return False
def add_instr_to_ircfg(
self, instr, ircfg, loc_key=None, gen_pc_updt=False)
Add the native instruction @instr to the @ircfg @instr: instruction instance @ircfg: IRCFG instance @loc_key: loc_key instance of the instruction destination @gen_pc_updt: insert PC update effects between instructions
def add_instr_to_ircfg(self, instr, ircfg, loc_key=None, gen_pc_updt=False): """ Add the native instruction @instr to the @ircfg @instr: instruction instance @ircfg: IRCFG instance @loc_key: loc_key instance of the instruction destination @gen_pc_updt: insert PC update effects between instructions """ if loc_key is None: offset = getattr(instr, "offset", None) loc_key = self.loc_db.add_location(offset=offset) block = AsmBlock(loc_key) block.lines = [instr] self.add_asmblock_to_ircfg(block, ircfg, gen_pc_updt) return loc_key
def expr_fix_regs_for_mode(
self, expr, *args, **kwargs)
def expr_fix_regs_for_mode(self, expr, *args, **kwargs): return expr
def expraff_fix_regs_for_mode(
self, expr, *args, **kwargs)
def expraff_fix_regs_for_mode(self, expr, *args, **kwargs): return expr
def gen_loc_key_and_expr(
self, size)
Return a loc_key and it's corresponding ExprLoc @size: size of expression
def gen_loc_key_and_expr(self, size): """ Return a loc_key and it's corresponding ExprLoc @size: size of expression """ loc_key = self.loc_db.add_location() return loc_key, m2_expr.ExprLoc(loc_key, size)
def gen_pc_update(
self, assignments, instr)
def gen_pc_update(self, assignments, instr): offset = m2_expr.ExprInt(instr.offset, self.pc.size) assignments.append(AssignBlock({self.pc:offset}, instr))
def get_ir(
self, instr)
def get_ir(self, instr): raise NotImplementedError("Abstract Method")
def get_loc_key_for_instr(
self, instr)
Returns the loc_key associated to an instruction @instr: current instruction
def get_loc_key_for_instr(self, instr): """Returns the loc_key associated to an instruction @instr: current instruction""" return self.loc_db.get_or_create_offset_location(instr.offset)
def get_next_loc_key(
self, instr)
def get_next_loc_key(self, instr): loc_key = self.loc_db.get_or_create_offset_location(instr.offset + instr.l) return loc_key
def instr2ir(
self, instr)
def instr2ir(self, instr): ir_bloc_cur, extra_irblocks = self.get_ir(instr) for index, irb in enumerate(extra_irblocks): irs = [] for assignblk in irb: irs.append(AssignBlock(assignblk, instr)) extra_irblocks[index] = IRBlock(irb.loc_key, irs) assignblk = AssignBlock(ir_bloc_cur, instr) return assignblk, extra_irblocks
def irbloc_fix_regs_for_mode(
self, irblock, *args, **kwargs)
def irbloc_fix_regs_for_mode(self, irblock, *args, **kwargs): return irblock
def is_pc_written(
self, block)
Return the first Assignblk of the @blockin which PC is written @block: IRBlock instance
def is_pc_written(self, block): """Return the first Assignblk of the @blockin which PC is written @block: IRBlock instance""" all_pc = self.arch.pc.values() for assignblk in block: if assignblk.dst in all_pc: return assignblk return None
def new_ircfg(
self, *args, **kwargs)
Return a new instance of IRCFG
def new_ircfg(self, *args, **kwargs): """ Return a new instance of IRCFG """ return IRCFG(self.IRDst, self.loc_db, *args, **kwargs)
def new_ircfg_from_asmcfg(
self, asmcfg, *args, **kwargs)
Return a new instance of IRCFG from an @asmcfg @asmcfg: AsmCFG instance
def new_ircfg_from_asmcfg(self, asmcfg, *args, **kwargs): """ Return a new instance of IRCFG from an @asmcfg @asmcfg: AsmCFG instance """ ircfg = IRCFG(self.IRDst, self.loc_db, *args, **kwargs) for block in asmcfg.blocks: self.add_asmblock_to_ircfg(block, ircfg) return ircfg
def post_add_asmblock_to_ircfg(
self, block, ircfg, ir_blocks)
def post_add_asmblock_to_ircfg(self, block, ircfg, ir_blocks): self.set_empty_dst_to_next(block, ir_blocks) new_irblocks = [] for irblock in ir_blocks: new_irblock = self.irbloc_fix_regs_for_mode(irblock, self.attrib) ircfg.add_irblock(new_irblock) new_irblocks.append(new_irblock) return new_irblocks
def set_empty_dst_to_next(
self, block, ir_blocks)
def set_empty_dst_to_next(self, block, ir_blocks): for index, irblock in enumerate(ir_blocks): if irblock.dst is not None: continue next_loc_key = block.get_next() if next_loc_key is None: loc_key = None if block.lines: line = block.lines[-1] if line.offset is not None: loc_key = self.loc_db.get_or_create_offset_location(line.offset + line.l) if loc_key is None: loc_key = self.loc_db.add_location() block.add_cst(loc_key, AsmConstraint.c_next) else: loc_key = next_loc_key dst = m2_expr.ExprLoc(loc_key, self.pc.size) if irblock.assignblks: instr = irblock.assignblks[-1].instr else: instr = None assignblk = AssignBlock({self.IRDst: dst}, instr) ir_blocks[index] = IRBlock(irblock.loc_key, list(irblock.assignblks) + [assignblk])
class ir
DEPRECATED object Use IntermediateRepresentation instead of ir
class ir(IntermediateRepresentation): """ DEPRECATED object Use IntermediateRepresentation instead of ir """ def __init__(self, loc_key, irs, lines=None): warnings.warn('DEPRECATION WARNING: use "IntermediateRepresentation" instead of "ir"') super(ir, self).__init__(loc_key, irs, lines)
Ancestors (in MRO)
- ir
- IntermediateRepresentation
- __builtin__.object
Instance variables
Methods
def __init__(
self, loc_key, irs, lines=None)
Inheritance:
IntermediateRepresentation
.__init__
def __init__(self, loc_key, irs, lines=None): warnings.warn('DEPRECATION WARNING: use "IntermediateRepresentation" instead of "ir"') super(ir, self).__init__(loc_key, irs, lines)
def add_asmblock_to_ircfg(
self, block, ircfg, gen_pc_updt=False)
Inheritance:
IntermediateRepresentation
.add_asmblock_to_ircfg
Add a native block to the current IR @block: native assembly block @ircfg: IRCFG instance @gen_pc_updt: insert PC update effects between instructions
def add_asmblock_to_ircfg(self, block, ircfg, gen_pc_updt=False): """ Add a native block to the current IR @block: native assembly block @ircfg: IRCFG instance @gen_pc_updt: insert PC update effects between instructions """ loc_key = block.loc_key ir_blocks_all = [] assignments = [] for instr in block.lines: if loc_key is None: assignments = [] loc_key = self.get_loc_key_for_instr(instr) split = self.add_instr_to_current_state( instr, block, assignments, ir_blocks_all, gen_pc_updt ) if split: ir_blocks_all.append(IRBlock(loc_key, assignments)) loc_key = None assignments = [] if loc_key is not None: ir_blocks_all.append(IRBlock(loc_key, assignments)) new_ir_blocks_all = self.post_add_asmblock_to_ircfg(block, ircfg, ir_blocks_all) for irblock in new_ir_blocks_all: ircfg.add_irblock(irblock) return new_ir_blocks_all
def add_bloc(
self, block, gen_pc_updt=False)
Inheritance:
IntermediateRepresentation
.add_bloc
DEPRECATED function Use add_block instead of add_block
def add_bloc(self, block, gen_pc_updt=False): """ DEPRECATED function Use add_block instead of add_block """ self.add_block(block, gen_pc_updt)
def add_block(
self, block, gen_pc_updt=False)
Inheritance:
IntermediateRepresentation
.add_block
DEPRECATED function Use add_block instead of add_block
def add_block(self, block, gen_pc_updt=False): """ DEPRECATED function Use add_block instead of add_block """ warnings.warn("""DEPRECATION WARNING ircfg is now out of IntermediateRepresentation Use: ircfg = ir_arch.new_ircfg() ir_arch.add_asmblock_to_ircfg(block, ircfg) """) raise RuntimeError("API Deprecated")
def add_instr_to_current_state(
self, instr, block, assignments, ir_blocks_all, gen_pc_updt)
Inheritance:
IntermediateRepresentation
.add_instr_to_current_state
Add the IR effects of an instruction to the current state.
Returns a bool: True if the current assignments list must be split False in other cases.
@instr: native instruction @block: native block source @assignments: list of current AssignBlocks @ir_blocks_all: list of additional effects @gen_pc_updt: insert PC update effects between instructions
def add_instr_to_current_state(self, instr, block, assignments, ir_blocks_all, gen_pc_updt): """ Add the IR effects of an instruction to the current state. Returns a bool: * True if the current assignments list must be split * False in other cases. @instr: native instruction @block: native block source @assignments: list of current AssignBlocks @ir_blocks_all: list of additional effects @gen_pc_updt: insert PC update effects between instructions """ if gen_pc_updt is not False: self.gen_pc_update(assignments, instr) assignblk, ir_blocks_extra = self.instr2ir(instr) assignments.append(assignblk) ir_blocks_all += ir_blocks_extra if ir_blocks_extra: return True return False
def add_instr_to_ircfg(
self, instr, ircfg, loc_key=None, gen_pc_updt=False)
Inheritance:
IntermediateRepresentation
.add_instr_to_ircfg
Add the native instruction @instr to the @ircfg @instr: instruction instance @ircfg: IRCFG instance @loc_key: loc_key instance of the instruction destination @gen_pc_updt: insert PC update effects between instructions
def add_instr_to_ircfg(self, instr, ircfg, loc_key=None, gen_pc_updt=False): """ Add the native instruction @instr to the @ircfg @instr: instruction instance @ircfg: IRCFG instance @loc_key: loc_key instance of the instruction destination @gen_pc_updt: insert PC update effects between instructions """ if loc_key is None: offset = getattr(instr, "offset", None) loc_key = self.loc_db.add_location(offset=offset) block = AsmBlock(loc_key) block.lines = [instr] self.add_asmblock_to_ircfg(block, ircfg, gen_pc_updt) return loc_key
def expr_fix_regs_for_mode(
self, expr, *args, **kwargs)
Inheritance:
IntermediateRepresentation
.expr_fix_regs_for_mode
def expr_fix_regs_for_mode(self, expr, *args, **kwargs): return expr
def expraff_fix_regs_for_mode(
self, expr, *args, **kwargs)
Inheritance:
IntermediateRepresentation
.expraff_fix_regs_for_mode
def expraff_fix_regs_for_mode(self, expr, *args, **kwargs): return expr
def gen_loc_key_and_expr(
self, size)
Inheritance:
IntermediateRepresentation
.gen_loc_key_and_expr
Return a loc_key and it's corresponding ExprLoc @size: size of expression
def gen_loc_key_and_expr(self, size): """ Return a loc_key and it's corresponding ExprLoc @size: size of expression """ loc_key = self.loc_db.add_location() return loc_key, m2_expr.ExprLoc(loc_key, size)
def gen_pc_update(
self, assignments, instr)
Inheritance:
IntermediateRepresentation
.gen_pc_update
def gen_pc_update(self, assignments, instr): offset = m2_expr.ExprInt(instr.offset, self.pc.size) assignments.append(AssignBlock({self.pc:offset}, instr))
def get_ir(
self, instr)
Inheritance:
IntermediateRepresentation
.get_ir
def get_ir(self, instr): raise NotImplementedError("Abstract Method")
def get_loc_key_for_instr(
self, instr)
Inheritance:
IntermediateRepresentation
.get_loc_key_for_instr
Returns the loc_key associated to an instruction @instr: current instruction
def get_loc_key_for_instr(self, instr): """Returns the loc_key associated to an instruction @instr: current instruction""" return self.loc_db.get_or_create_offset_location(instr.offset)
def get_next_loc_key(
self, instr)
Inheritance:
IntermediateRepresentation
.get_next_loc_key
def get_next_loc_key(self, instr): loc_key = self.loc_db.get_or_create_offset_location(instr.offset + instr.l) return loc_key
def instr2ir(
self, instr)
Inheritance:
IntermediateRepresentation
.instr2ir
def instr2ir(self, instr): ir_bloc_cur, extra_irblocks = self.get_ir(instr) for index, irb in enumerate(extra_irblocks): irs = [] for assignblk in irb: irs.append(AssignBlock(assignblk, instr)) extra_irblocks[index] = IRBlock(irb.loc_key, irs) assignblk = AssignBlock(ir_bloc_cur, instr) return assignblk, extra_irblocks
def irbloc_fix_regs_for_mode(
self, irblock, *args, **kwargs)
Inheritance:
IntermediateRepresentation
.irbloc_fix_regs_for_mode
def irbloc_fix_regs_for_mode(self, irblock, *args, **kwargs): return irblock
def is_pc_written(
self, block)
Inheritance:
IntermediateRepresentation
.is_pc_written
Return the first Assignblk of the @blockin which PC is written @block: IRBlock instance
def is_pc_written(self, block): """Return the first Assignblk of the @blockin which PC is written @block: IRBlock instance""" all_pc = self.arch.pc.values() for assignblk in block: if assignblk.dst in all_pc: return assignblk return None
def new_ircfg(
self, *args, **kwargs)
Inheritance:
IntermediateRepresentation
.new_ircfg
Return a new instance of IRCFG
def new_ircfg(self, *args, **kwargs): """ Return a new instance of IRCFG """ return IRCFG(self.IRDst, self.loc_db, *args, **kwargs)
def new_ircfg_from_asmcfg(
self, asmcfg, *args, **kwargs)
Inheritance:
IntermediateRepresentation
.new_ircfg_from_asmcfg
Return a new instance of IRCFG from an @asmcfg @asmcfg: AsmCFG instance
def new_ircfg_from_asmcfg(self, asmcfg, *args, **kwargs): """ Return a new instance of IRCFG from an @asmcfg @asmcfg: AsmCFG instance """ ircfg = IRCFG(self.IRDst, self.loc_db, *args, **kwargs) for block in asmcfg.blocks: self.add_asmblock_to_ircfg(block, ircfg) return ircfg
def post_add_asmblock_to_ircfg(
self, block, ircfg, ir_blocks)
Inheritance:
IntermediateRepresentation
.post_add_asmblock_to_ircfg
def post_add_asmblock_to_ircfg(self, block, ircfg, ir_blocks): self.set_empty_dst_to_next(block, ir_blocks) new_irblocks = [] for irblock in ir_blocks: new_irblock = self.irbloc_fix_regs_for_mode(irblock, self.attrib) ircfg.add_irblock(new_irblock) new_irblocks.append(new_irblock) return new_irblocks
def set_empty_dst_to_next(
self, block, ir_blocks)
Inheritance:
IntermediateRepresentation
.set_empty_dst_to_next
def set_empty_dst_to_next(self, block, ir_blocks): for index, irblock in enumerate(ir_blocks): if irblock.dst is not None: continue next_loc_key = block.get_next() if next_loc_key is None: loc_key = None if block.lines: line = block.lines[-1] if line.offset is not None: loc_key = self.loc_db.get_or_create_offset_location(line.offset + line.l) if loc_key is None: loc_key = self.loc_db.add_location() block.add_cst(loc_key, AsmConstraint.c_next) else: loc_key = next_loc_key dst = m2_expr.ExprLoc(loc_key, self.pc.size) if irblock.assignblks: instr = irblock.assignblks[-1].instr else: instr = None assignblk = AssignBlock({self.IRDst: dst}, instr) ir_blocks[index] = IRBlock(irblock.loc_key, list(irblock.assignblks) + [assignblk])
class irbloc
DEPRECATED object Use IRBlock instead of irbloc
class irbloc(IRBlock): """ DEPRECATED object Use IRBlock instead of irbloc """ def __init__(self, loc_key, irs, lines=None): warnings.warn('DEPRECATION WARNING: use "IRBlock" instead of "irblock"') super(irbloc, self).__init__(loc_key, irs)
Ancestors (in MRO)
Instance variables
var dst_linenb
Inheritance:
IRBlock
.dst_linenb
Line number of the IRDst setting statement in the current irs
Methods
def __init__(
self, loc_key, irs, lines=None)
@loc_key: LocKey of the IR basic block @assignblks: list of AssignBlock
def __init__(self, loc_key, irs, lines=None): warnings.warn('DEPRECATION WARNING: use "IRBlock" instead of "irblock"') super(irbloc, self).__init__(loc_key, irs)
def cache_dst(
self)
Inheritance:
IRBlock
.cache_dst
def cache_dst(self): final_dst = None final_linenb = None for linenb, assignblk in enumerate(self): for dst, src in assignblk.iteritems(): if dst.is_id("IRDst"): if final_dst is not None: raise ValueError('Multiple destinations!') final_dst = src final_linenb = linenb self._dst = final_dst self._dst_linenb = final_linenb return final_dst
def get_label(
self)
Inheritance:
IRBlock
.get_label
def get_label(self): warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') return self.loc_key
def is_dst_set(
self)
Inheritance:
IRBlock
.is_dst_set
def is_dst_set(self): return self._dst is not None
def modify_exprs(
self, mod_dst=None, mod_src=None)
Inheritance:
IRBlock
.modify_exprs
Generate a new IRBlock with its AssignBlock expressions modified according to @mod_dst and @mod_src @mod_dst: function called to modify Expression destination @mod_src: function called to modify Expression source
def modify_exprs(self, mod_dst=None, mod_src=None): """ Generate a new IRBlock with its AssignBlock expressions modified according to @mod_dst and @mod_src @mod_dst: function called to modify Expression destination @mod_src: function called to modify Expression source """ if mod_dst is None: mod_dst = lambda expr:expr if mod_src is None: mod_src = lambda expr:expr assignblks = [] for assignblk in self: new_assignblk = {} for dst, src in assignblk.iteritems(): new_assignblk[mod_dst(dst)] = mod_src(src) assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) return IRBlock(self.loc_key, assignblks)
def set_dst(
self, value)
Generate a new IRBlock with a dst (IRBlock) fixed to @value
def set_dst(self, value): """Generate a new IRBlock with a dst (IRBlock) fixed to @value""" irs = [] dst_found = False for assignblk in self: new_assignblk = {} for dst, src in assignblk.iteritems(): if dst.is_id("IRDst"): assert dst_found is False dst_found = True new_assignblk[dst] = value else: new_assignblk[dst] = src irs.append(AssignBlock(new_assignblk, assignblk.instr)) return IRBlock(self.loc_key, irs)
def simplify(
self, simplifier)
Simplify expressions in each assignblock @simplifier: ExpressionSimplifier instance
def simplify(self, simplifier): """ Simplify expressions in each assignblock @simplifier: ExpressionSimplifier instance """ modified = False assignblks = [] for assignblk in self: new_assignblk = assignblk.simplify(simplifier) if assignblk != new_assignblk: modified = True assignblks.append(new_assignblk) return modified, IRBlock(self.loc_key, assignblks)
def to_string(
self, loc_db=None)
Inheritance:
IRBlock
.to_string
def to_string(self, loc_db=None): out = [] if loc_db is None: node_name = "%s:" % self.loc_key else: names = loc_db.get_location_names(self.loc_key) if not names: node_name = "%s:" % loc_db.pretty_str(self.loc_key) else: node_name = "".join("%s:\n" % name for name in names) out.append(node_name) for assignblk in self: out.append(assignblk.to_string(loc_db)) return '\n'.join(out)