Top

miasm2.core.asmblock module

#-*- coding:utf-8 -*-

import logging
import warnings
from collections import namedtuple

from miasm2.expression.expression import ExprId, ExprInt, ExprLoc, \
    get_expr_locs
from miasm2.expression.expression import LocKey
from miasm2.expression.simplifications import expr_simp
from miasm2.expression.modint import moduint, modint
from miasm2.core.utils import Disasm_Exception, pck
from miasm2.core.graph import DiGraph, DiGraphSimplifier, MatchGraphJoker
from miasm2.core.interval import interval
from miasm2.core.locationdb import LocationDB


log_asmblock = logging.getLogger("asmblock")
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s"))
log_asmblock.addHandler(console_handler)
log_asmblock.setLevel(logging.WARNING)


def is_int(a):
    return isinstance(a, int) or isinstance(a, long) or \
        isinstance(a, moduint) or isinstance(a, modint)



class AsmRaw(object):

    def __init__(self, raw=""):
        self.raw = raw

    def __str__(self):
        return repr(self.raw)

    def to_string(self, loc_db):
        return str(self)


class asm_raw(AsmRaw):

    def __init__(self, raw=""):
        warnings.warn('DEPRECATION WARNING: use "AsmRaw" instead of "asm_raw"')
        super(asm_label, self).__init__(raw)


class AsmConstraint(object):
    c_to = "c_to"
    c_next = "c_next"

    def __init__(self, loc_key, c_t=c_to):
        # Sanity check
        assert isinstance(loc_key, LocKey)

        self.loc_key = loc_key
        self.c_t = c_t

    def get_label(self):
        warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
        return self.loc_key

    def set_label(self, loc_key):
        warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
        self.loc_key = loc_key

    label = property(get_label, set_label)

    def to_string(self, loc_db=None):
        if loc_db is None:
            return "%s:%s" % (self.c_t, self.loc_key)
        else:
            return "%s:%s" % (
                self.c_t,
                loc_db.pretty_str(self.loc_key)
            )

    def __str__(self):
        return self.to_string()


class asm_constraint(AsmConstraint):

    def __init__(self, loc_key, c_t=AsmConstraint.c_to):
        warnings.warn('DEPRECATION WARNING: use "AsmConstraint" instead of "asm_constraint"')
        super(asm_constraint, self).__init__(loc_key, c_t)


class AsmConstraintNext(AsmConstraint):

    def __init__(self, loc_key):
        super(AsmConstraintNext, self).__init__(
            loc_key,
            c_t=AsmConstraint.c_next
        )


class asm_constraint_next(AsmConstraint):

    def __init__(self, loc_key):
        warnings.warn('DEPRECATION WARNING: use "AsmConstraintNext" instead of "asm_constraint_next"')
        super(asm_constraint_next, self).__init__(loc_key)


class AsmConstraintTo(AsmConstraint):

    def __init__(self, loc_key):
        super(AsmConstraintTo, self).__init__(
            loc_key,
            c_t=AsmConstraint.c_to
        )

class asm_constraint_to(AsmConstraint):

    def __init__(self, loc_key):
        warnings.warn('DEPRECATION WARNING: use "AsmConstraintTo" instead of "asm_constraint_to"')
        super(asm_constraint_to, self).__init__(loc_key)


class AsmBlock(object):

    def __init__(self, loc_key, alignment=1):
        assert isinstance(loc_key, LocKey)

        self.bto = set()
        self.lines = []
        self._loc_key = loc_key
        self.alignment = alignment

    def get_label(self):
        warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
        return self.loc_key

    loc_key = property(lambda self:self._loc_key)
    label = property(get_label)


    def to_string(self, loc_db=None):
        out = []
        if loc_db is None:
            out.append(str(self.loc_key))
        else:
            out.append(loc_db.pretty_str(self.loc_key))

        for instr in self.lines:
            out.append(instr.to_string(loc_db))
        if self.bto:
            lbls = ["->"]
            for dst in self.bto:
                if dst is None:
                    lbls.append("Unknown? ")
                else:
                    lbls.append(dst.to_string(loc_db) + " ")
            lbls = '\t'.join(sorted(lbls))
            out.append(lbls)
        return '\n'.join(out)

    def __str__(self):
        return self.to_string()

    def addline(self, l):
        self.lines.append(l)

    def addto(self, c):
        assert isinstance(self.bto, set)
        self.bto.add(c)

    def split(self, loc_db, offset):
        loc_key = loc_db.get_or_create_offset_location(offset)
        log_asmblock.debug('split at %x', offset)
        i = -1
        offsets = [x.offset for x in self.lines]
        offset = loc_db.get_location_offset(loc_key)
        if offset not in offsets:
            log_asmblock.warning(
                'cannot split bloc at %X ' % offset +
                'middle instruction? default middle')
            offsets.sort()
            return None
        new_bloc = AsmBlock(loc_key)
        i = offsets.index(offset)

        self.lines, new_bloc.lines = self.lines[:i], self.lines[i:]
        flow_mod_instr = self.get_flow_instr()
        log_asmblock.debug('flow mod %r', flow_mod_instr)
        c = AsmConstraint(loc_key, AsmConstraint.c_next)
        # move dst if flowgraph modifier was in original bloc
        # (usecase: split delayslot bloc)
        if flow_mod_instr:
            for xx in self.bto:
                log_asmblock.debug('lbl %s', xx)
            c_next = set(
                [x for x in self.bto if x.c_t == AsmConstraint.c_next])
            c_to = [x for x in self.bto if x.c_t != AsmConstraint.c_next]
            self.bto = set([c] + c_to)
            new_bloc.bto = c_next
        else:
            new_bloc.bto = self.bto
            self.bto = set([c])
        return new_bloc

    def get_range(self):
        """Returns the offset hull of an AsmBlock"""
        if len(self.lines):
            return (self.lines[0].offset,
                    self.lines[-1].offset + self.lines[-1].l)
        else:
            return 0, 0

    def get_offsets(self):
        return [x.offset for x in self.lines]

    def add_cst(self, loc_key, constraint_type):
        """
        Add constraint between current block and block at @loc_key
        @loc_key: LocKey instance of constraint target
        @constraint_type: AsmConstraint c_to/c_next
        """
        assert isinstance(loc_key, LocKey)
        c = AsmConstraint(loc_key, constraint_type)
        self.bto.add(c)

    def get_flow_instr(self):
        if not self.lines:
            return None
        for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1):
            if not 0 <= i < len(self.lines):
                return None
            l = self.lines[i]
            if l.splitflow() or l.breakflow():
                raise NotImplementedError('not fully functional')

    def get_subcall_instr(self):
        if not self.lines:
            return None
        delayslot = self.lines[0].delayslot
        end_index = len(self.lines) - 1
        ds_max_index = max(end_index - delayslot, 0)
        for i in xrange(end_index, ds_max_index - 1, -1):
            l = self.lines[i]
            if l.is_subcall():
                return l
        return None

    def get_next(self):
        for constraint in self.bto:
            if constraint.c_t == AsmConstraint.c_next:
                return constraint.loc_key
        return None

    @staticmethod
    def _filter_constraint(constraints):
        """Sort and filter @constraints for AsmBlock.bto
        @constraints: non-empty set of AsmConstraint instance

        Always the same type -> one of the constraint
        c_next and c_to -> c_next
        """
        # Only one constraint
        if len(constraints) == 1:
            return next(iter(constraints))

        # Constraint type -> set of corresponding constraint
        cbytype = {}
        for cons in constraints:
            cbytype.setdefault(cons.c_t, set()).add(cons)

        # Only one type -> any constraint is OK
        if len(cbytype) == 1:
            return next(iter(constraints))

        # At least 2 types -> types = {c_next, c_to}
        # c_to is included in c_next
        return next(iter(cbytype[AsmConstraint.c_next]))

    def fix_constraints(self):
        """Fix next block constraints"""
        # destination -> associated constraints
        dests = {}
        for constraint in self.bto:
            dests.setdefault(constraint.loc_key, set()).add(constraint)

        self.bto = set(self._filter_constraint(constraints)
                       for constraints in dests.itervalues())


class asm_bloc(object):

    def __init__(self, loc_key, alignment=1):
        warnings.warn('DEPRECATION WARNING: use "AsmBlock" instead of "asm_bloc"')
        super(asm_bloc, self).__init__(loc_key, alignment)


class AsmBlockBad(AsmBlock):

    """Stand for a *bad* ASM block (malformed, unreachable,
    not disassembled, ...)"""


    ERROR_UNKNOWN = -1
    ERROR_CANNOT_DISASM = 0
    ERROR_NULL_STARTING_BLOCK = 1
    ERROR_FORBIDDEN = 2
    ERROR_IO = 3


    ERROR_TYPES = {
        ERROR_UNKNOWN: "Unknown error",
        ERROR_CANNOT_DISASM: "Unable to disassemble",
        ERROR_NULL_STARTING_BLOCK: "Null starting block",
        ERROR_FORBIDDEN: "Address forbidden by dont_dis",
        ERROR_IO: "IOError",
    }

    def __init__(self, loc_key=None, alignment=1, errno=ERROR_UNKNOWN, *args, **kwargs):
        """Instanciate an AsmBlock_bad.
        @loc_key, @alignement: same as AsmBlock.__init__
        @errno: (optional) specify a error type associated with the block
        """
        super(AsmBlockBad, self).__init__(loc_key, alignment, *args, **kwargs)
        self._errno = errno

    errno = property(lambda self: self._errno)

    def __str__(self):
        error_txt = self.ERROR_TYPES.get(self._errno, self._errno)
        return "\n".join([str(self.loc_key),
                          "\tBad block: %s" % error_txt])

    def addline(self, *args, **kwargs):
        raise RuntimeError("An AsmBlockBad cannot have line")

    def addto(self, *args, **kwargs):
        raise RuntimeError("An AsmBlockBad cannot have bto")

    def split(self, *args, **kwargs):
        raise RuntimeError("An AsmBlockBad cannot be splitted")


class asm_block_bad(AsmBlockBad):

    def __init__(self, loc_key=None, alignment=1, errno=-1, *args, **kwargs):
        warnings.warn('DEPRECATION WARNING: use "AsmBlockBad" instead of "asm_block_bad"')
        super(asm_block_bad, self).__init__(loc_key, alignment, *args, **kwargs)

class AsmSymbolPool(LocationDB):
    """[DEPRECATED API] use 'LocationDB' instead"""

    def __init__(self, *args, **kwargs):
        warnings.warn("Deprecated API, use 'LocationDB' instead")
        super(AsmSymbolPool, self).__init__(*args, **kwargs)

class asm_symbol_pool(AsmSymbolPool):

    def __init__(self):
        warnings.warn('DEPRECATION WARNING: use "LocationDB" instead of "asm_symbol_pool"')
        super(asm_symbol_pool, self).__init__()


class AsmCFG(DiGraph):

    """Directed graph standing for a ASM Control Flow Graph with:
     - nodes: AsmBlock
     - edges: constraints between blocks, synchronized with AsmBlock's "bto"

    Specialized the .dot export and force the relation between block to be uniq,
    and associated with a constraint.

    Offer helpers on AsmCFG management, such as research by loc_key, sanity
    checking and mnemonic size guessing.
    """

    # Internal structure for pending management
    AsmCFGPending = namedtuple("AsmCFGPending",
                               ["waiter", "constraint"])

    def __init__(self, loc_db=None, *args, **kwargs):
        super(AsmCFG, self).__init__(*args, **kwargs)
        # Edges -> constraint
        self.edges2constraint = {}
        # Expected LocKey -> set( (src, dst), constraint )
        self._pendings = {}
        # Loc_Key2block built on the fly
        self._loc_key_to_block = {}
        # loc_db
        self.loc_db = loc_db


    def copy(self):
        """Copy the current graph instance"""
        graph = self.__class__(self.loc_db)
        return graph + self


    # Compatibility with old list API
    def append(self, *args, **kwargs):
        raise DeprecationWarning("AsmCFG is a graph, use add_node")

    def remove(self, *args, **kwargs):
        raise DeprecationWarning("AsmCFG is a graph, use del_node")

    def __getitem__(self, *args, **kwargs):
        raise DeprecationWarning("Order of AsmCFG elements is not reliable")

    def __contains__(self, _):
        """
        DEPRECATED. Use:
        - loc_key in AsmCFG.nodes() to test loc_key existence
        """
        raise RuntimeError("DEPRECATED")

    def __iter__(self):
        """
        DEPRECATED. Use:
        - AsmCFG.blocks() to iter on blocks
        - loc_key in AsmCFG.nodes() to test loc_key existence
        """
        raise RuntimeError("DEPRECATED")

    def __len__(self):
        """Return the number of blocks in AsmCFG"""
        return len(self._nodes)

    blocks = property(lambda x:x._loc_key_to_block.itervalues())

    # Manage graph with associated constraints
    def add_edge(self, src, dst, constraint):
        """Add an edge to the graph
        @src: LocKey instance, source
        @dst: LocKey instance, destination
        @constraint: constraint associated to this edge
        """
        # Sanity check
        assert isinstance(src, LocKey)
        assert isinstance(dst, LocKey)
        known_cst = self.edges2constraint.get((src, dst), None)
        if known_cst is not None:
            assert known_cst == constraint
            return

        # Add the edge to src.bto if needed
        block_src = self.loc_key_to_block(src)
        if block_src:
            if dst not in [cons.loc_key for cons in block_src.bto]:
                block_src.bto.add(AsmConstraint(dst, constraint))

        # Add edge
        self.edges2constraint[(src, dst)] = constraint
        super(AsmCFG, self).add_edge(src, dst)

    def add_uniq_edge(self, src, dst, constraint):
        """
        Synonym for `add_edge`
        """
        self.add_edge(src, dst, constraint)

    def del_edge(self, src, dst):
        """Delete the edge @src->@dst and its associated constraint"""
        src_blk = self.loc_key_to_block(src)
        dst_blk = self.loc_key_to_block(dst)
        assert src_blk is not None
        assert dst_blk is not None
        # Delete from src.bto
        to_remove = [cons for cons in src_blk.bto if cons.loc_key == dst]
        if to_remove:
            assert len(to_remove) == 1
            src_blk.bto.remove(to_remove[0])

        # Del edge
        del self.edges2constraint[(src, dst)]
        super(AsmCFG, self).del_edge(src, dst)

    def del_block(self, block):
        super(AsmCFG, self).del_node(block.loc_key)
        del self._loc_key_to_block[block.loc_key]


    def add_node(self, node):
        assert isinstance(node, LocKey)
        return super(AsmCFG, self).add_node(node)

    def add_block(self, block):
        """
        Add the block @block to the current instance, if it is not already in
        @block: AsmBlock instance

        Edges will be created for @block.bto, if destinations are already in
        this instance. If not, they will be resolved when adding these
        aforementionned destinations.
        `self.pendings` indicates which blocks are not yet resolved.

        """
        status = super(AsmCFG, self).add_node(block.loc_key)

        if not status:
            return status

        # Update waiters
        if block.loc_key in self._pendings:
            for bblpend in self._pendings[block.loc_key]:
                self.add_edge(bblpend.waiter.loc_key, block.loc_key, bblpend.constraint)
            del self._pendings[block.loc_key]

        # Synchronize edges with block destinations
        self._loc_key_to_block[block.loc_key] = block

        for constraint in block.bto:
            dst = self._loc_key_to_block.get(constraint.loc_key,
                                           None)
            if dst is None:
                # Block is yet unknown, add it to pendings
                to_add = self.AsmCFGPending(waiter=block,
                                            constraint=constraint.c_t)
                self._pendings.setdefault(constraint.loc_key,
                                          set()).add(to_add)
            else:
                # Block is already in known nodes
                self.add_edge(block.loc_key, dst.loc_key, constraint.c_t)

        return status

    def merge(self, graph):
        """Merge with @graph, taking in account constraints"""
        # Add known blocks
        for block in graph.blocks:
            self.add_block(block)
        # Add nodes not already in it (ie. not linked to a block)
        for node in graph.nodes():
            self.add_node(node)
        # -> add_edge(x, y, constraint)
        for edge in graph._edges:
            # May fail if there is an incompatibility in edges constraints
            # between the two graphs
            self.add_edge(*edge, constraint=graph.edges2constraint[edge])


    def node2lines(self, node):
        if self.loc_db is None:
            loc_key_name = str(node)
        else:
            loc_key_name = self.loc_db.pretty_str(node)
        yield self.DotCellDescription(text=loc_key_name,
                                      attr={'align': 'center',
                                            'colspan': 2,
                                            'bgcolor': 'grey'})
        block = self._loc_key_to_block.get(node, None)
        if block is None:
            raise StopIteration
        if isinstance(block, AsmBlockBad):
            yield [
                self.DotCellDescription(
                    text=block.ERROR_TYPES.get(block._errno,
                                               block._errno
                    ),
                    attr={})
            ]
            raise StopIteration
        for line in block.lines:
            if self._dot_offset:
                yield [self.DotCellDescription(text="%.8X" % line.offset,
                                               attr={}),
                       self.DotCellDescription(text=line.to_string(self.loc_db), attr={})]
            else:
                yield self.DotCellDescription(text=line.to_string(self.loc_db), attr={})

    def node_attr(self, node):
        block = self._loc_key_to_block.get(node, None)
        if isinstance(block, AsmBlockBad):
            return {'style': 'filled', 'fillcolor': 'red'}
        return {}

    def edge_attr(self, src, dst):
        cst = self.edges2constraint.get((src, dst), None)
        edge_color = "blue"

        if len(self.successors(src)) > 1:
            if cst == AsmConstraint.c_next:
                edge_color = "red"
            else:
                edge_color = "limegreen"

        return {"color": edge_color}

    def dot(self, offset=False):
        """
        @offset: (optional) if set, add the corresponding offsets in each node
        """
        self._dot_offset = offset
        return super(AsmCFG, self).dot()

    # Helpers
    @property
    def pendings(self):
        """Dictionary of loc_key -> set(AsmCFGPending instance) indicating
        which loc_key are missing in the current instance.
        A loc_key is missing if a block which is already in nodes has constraints
        with him (thanks to its .bto) and the corresponding block is not yet in
        nodes
        """
        return self._pendings

    def label2block(self, loc_key):
        """Return the block corresponding to loc_key @loc_key
        @loc_key: LocKey instance"""
        warnings.warn('DEPRECATION WARNING: use "loc_key_to_block" instead of "label2block"')
        return self.loc_key_to_block(loc_key)

    def rebuild_edges(self):
        """Consider blocks '.bto' and rebuild edges according to them, ie:
        - update constraint type
        - add missing edge
        - remove no more used edge

        This method should be called if a block's '.bto' in nodes have been
        modified without notifying this instance to resynchronize edges.
        """
        for block in self.blocks:
            edges = []
            # Rebuild edges from bto
            for constraint in block.bto:
                dst = self._loc_key_to_block.get(constraint.loc_key,
                                                  None)
                if dst is None:
                    # Missing destination, add to pendings
                    self._pendings.setdefault(
                        constraint.loc_key,
                        set()
                    ).add(
                        self.AsmCFGPending(
                            block,
                            constraint.c_t
                        )
                    )
                    continue
                edge = (block.loc_key, dst.loc_key)
                edges.append(edge)
                if edge in self._edges:
                    # Already known edge, constraint may have changed
                    self.edges2constraint[edge] = constraint.c_t
                else:
                    # An edge is missing
                    self.add_edge(edge[0], edge[1], constraint.c_t)

            # Remove useless edges
            for succ in self.successors(block.loc_key):
                edge = (block.loc_key, succ)
                if edge not in edges:
                    self.del_edge(*edge)

    def get_bad_blocks(self):
        """Iterator on AsmBlockBad elements"""
        # A bad asm block is always a leaf
        for loc_key in self.leaves():
            block = self._loc_key_to_block.get(loc_key, None)
            if isinstance(block, AsmBlockBad):
                yield block

    def get_bad_blocks_predecessors(self, strict=False):
        """Iterator on loc_keys with an AsmBlockBad destination
        @strict: (optional) if set, return loc_key with only bad
        successors
        """
        # Avoid returning the same block
        done = set()
        for badblock in self.get_bad_blocks():
            for predecessor in self.predecessors_iter(badblock.loc_key):
                if predecessor not in done:
                    if (strict and
                        not all(isinstance(self._loc_key_to_block.get(block, None), AsmBlockBad)
                                for block in self.successors_iter(predecessor))):
                        continue
                    yield predecessor
                    done.add(predecessor)

    def getby_offset(self, offset):
        """Return asmblock containing @offset"""
        for block in self.blocks:
            if block.lines[0].offset <= offset < \
                    (block.lines[-1].offset + block.lines[-1].l):
                return block
        return None

    def loc_key_to_block(self, loc_key):
        """
        Return the asmblock corresponding to loc_key @loc_key, None if unknown
        loc_key
        @loc_key: LocKey instance
        """
        return self._loc_key_to_block.get(loc_key, None)

    def sanity_check(self):
        """Do sanity checks on blocks' constraints:
        * no pendings
        * no multiple next constraint to same block
        * no next constraint to self
        """

        if len(self._pendings) != 0:
            raise RuntimeError("Some blocks are missing: %s" % map(
                str,
                self._pendings.keys()
            ))

        next_edges = {edge: constraint
                      for edge, constraint in self.edges2constraint.iteritems()
                      if constraint == AsmConstraint.c_next}

        for loc_key in self._nodes:
            if loc_key not in self._loc_key_to_block:
                raise RuntimeError("Not supported yet: every node must have a corresponding AsmBlock")
            # No next constraint to self
            if (loc_key, loc_key) in next_edges:
                raise RuntimeError('Bad constraint: self in next')

            # No multiple next constraint to same block
            pred_next = list(ploc_key
                             for (ploc_key, dloc_key) in next_edges
                             if dloc_key == loc_key)

            if len(pred_next) > 1:
                raise RuntimeError("Too many next constraints for bloc %r"
                                   "(%s)" % (loc_key,
                                             pred_next))

    def guess_blocks_size(self, mnemo):
        """Asm and compute max block size
        Add a 'size' and 'max_size' attribute on each block
        @mnemo: metamn instance"""
        for block in self.blocks:
            size = 0
            for instr in block.lines:
                if isinstance(instr, AsmRaw):
                    # for special AsmRaw, only extract len
                    if isinstance(instr.raw, list):
                        data = None
                        if len(instr.raw) == 0:
                            l = 0
                        else:
                            l = instr.raw[0].size / 8 * len(instr.raw)
                    elif isinstance(instr.raw, str):
                        data = instr.raw
                        l = len(data)
                    else:
                        raise NotImplementedError('asm raw')
                else:
                    # Assemble the instruction to retrieve its len.
                    # If the instruction uses symbol it will fail
                    # In this case, the max_instruction_len is used
                    try:
                        candidates = mnemo.asm(instr)
                        l = len(candidates[-1])
                    except:
                        l = mnemo.max_instruction_len
                    data = None
                instr.data = data
                instr.l = l
                size += l

            block.size = size
            block.max_size = size
            log_asmblock.info("size: %d max: %d", block.size, block.max_size)

    def apply_splitting(self, loc_db, dis_block_callback=None, **kwargs):
        """Consider @self' bto destinations and split block in @self if one of
        these destinations jumps in the middle of this block.
        In order to work, they must be only one block in @self per loc_key in
        @loc_db (which is true if @self come from the same disasmEngine).

        @loc_db: LocationDB instance associated with @self'loc_keys
        @dis_block_callback: (optional) if set, this callback will be called on
        new block destinations
        @kwargs: (optional) named arguments to pass to dis_block_callback
        """
        # Get all possible destinations not yet resolved, with a resolved
        # offset
        block_dst = []
        for loc_key in self.pendings:
            offset = loc_db.get_location_offset(loc_key)
            if offset is not None:
                block_dst.append(offset)

        todo = set(self.blocks)
        rebuild_needed = False

        while todo:
            # Find a block with a destination inside another one
            cur_block = todo.pop()
            range_start, range_stop = cur_block.get_range()

            for off in block_dst:
                if not (off > range_start and off < range_stop):
                    continue

                # `cur_block` must be splitted at offset `off`from miasm2.core.locationdb import LocationDB

                new_b = cur_block.split(loc_db, off)
                log_asmblock.debug("Split block %x", off)
                if new_b is None:
                    log_asmblock.error("Cannot split %x!!", off)
                    continue

                # Remove pending from cur_block
                # Links from new_b will be generated in rebuild_edges
                for dst in new_b.bto:
                    if dst.loc_key not in self.pendings:
                        continue
                    self.pendings[dst.loc_key] = set(pending for pending in self.pendings[dst.loc_key]
                                                     if pending.waiter != cur_block)

                # The new block destinations may need to be disassembled
                if dis_block_callback:
                    offsets_to_dis = set(
                        self.loc_db.get_location_offset(constraint.loc_key)
                        for constraint in new_b.bto
                    )
                    dis_block_callback(cur_bloc=new_b,
                                       offsets_to_dis=offsets_to_dis,
                                       loc_db=loc_db, **kwargs)

                # Update structure
                rebuild_needed = True
                self.add_block(new_b)

                # The new block must be considered
                todo.add(new_b)
                range_start, range_stop = cur_block.get_range()

        # Rebuild edges to match new blocks'bto
        if rebuild_needed:
            self.rebuild_edges()

    def __str__(self):
        out = []
        for block in self.blocks:
            out.append(str(block))
        for loc_key_a, loc_key_b in self.edges():
            out.append("%s -> %s" % (loc_key_a, loc_key_b))
        return '\n'.join(out)

    def __repr__(self):
        return "<%s %s>" % (self.__class__.__name__, hex(id(self)))

# Out of _merge_blocks to be computed only once
_acceptable_block = lambda graph, loc_key: (not isinstance(graph.loc_key_to_block(loc_key), AsmBlockBad) and
                                   len(graph.loc_key_to_block(loc_key).lines) > 0)
_parent = MatchGraphJoker(restrict_in=False, filt=_acceptable_block)
_son = MatchGraphJoker(restrict_out=False, filt=_acceptable_block)
_expgraph = _parent >> _son


def _merge_blocks(dg, graph):
    """Graph simplification merging AsmBlock with one and only one son with this
    son if this son has one and only one parent"""

    # Blocks to ignore, because they have been removed from the graph
    to_ignore = set()

    for match in _expgraph.match(graph):

        # Get matching blocks
        lbl_block, lbl_succ = match[_parent], match[_son]
        block = graph.loc_key_to_block(lbl_block)
        succ = graph.loc_key_to_block(lbl_succ)

        # Ignore already deleted blocks
        if (block in to_ignore or
            succ in to_ignore):
            continue

        # Remove block last instruction if needed
        last_instr = block.lines[-1]
        if last_instr.delayslot > 0:
            # TODO: delayslot
            raise RuntimeError("Not implemented yet")

        if last_instr.is_subcall():
            continue
        if last_instr.breakflow() and last_instr.dstflow():
            block.lines.pop()

        # Merge block
        block.lines += succ.lines
        for nextb in graph.successors_iter(lbl_succ):
            graph.add_edge(lbl_block, nextb, graph.edges2constraint[(lbl_succ, nextb)])

        graph.del_block(succ)
        to_ignore.add(lbl_succ)


bbl_simplifier = DiGraphSimplifier()
bbl_simplifier.enable_passes([_merge_blocks])


def conservative_asm(mnemo, instr, symbols, conservative):
    """
    Asm instruction;
    Try to keep original instruction bytes if it exists
    """
    candidates = mnemo.asm(instr, symbols)
    if not candidates:
        raise ValueError('cannot asm:%s' % str(instr))
    if not hasattr(instr, "b"):
        return candidates[0], candidates
    if instr.b in candidates:
        return instr.b, candidates
    if conservative:
        for c in candidates:
            if len(c) == len(instr.b):
                return c, candidates
    return candidates[0], candidates


def fix_expr_val(expr, symbols):
    """Resolve an expression @expr using @symbols"""
    def expr_calc(e):
        if isinstance(e, ExprId):
            # Example:
            # toto:
            # .dword label
            loc_key = symbols.get_name_location(e.name)
            offset = symbols.get_location_offset(loc_key)
            e = ExprInt(offset, e.size)
        return e
    result = expr.visit(expr_calc)
    result = expr_simp(result)
    if not isinstance(result, ExprInt):
        raise RuntimeError('Cannot resolve symbol %s' % expr)
    return result


def fix_loc_offset(loc_db, loc_key, offset, modified):
    """
    Fix the @loc_key offset to @offset. If the @offset has changed, add @loc_key
    to @modified
    @loc_db: current loc_db
    """
    loc_offset = loc_db.get_location_offset(loc_key)
    if loc_offset == offset:
        return
    loc_db.set_location_offset(loc_key, offset, force=True)
    modified.add(loc_key)


class BlockChain(object):

    """Manage blocks linked with an asm_constraint_next"""

    def __init__(self, loc_db, blocks):
        self.loc_db = loc_db
        self.blocks = blocks
        self.place()

    @property
    def pinned(self):
        """Return True iff at least one block is pinned"""
        return self.pinned_block_idx is not None

    def _set_pinned_block_idx(self):
        self.pinned_block_idx = None
        for i, block in enumerate(self.blocks):
            loc_key = block.loc_key
            if self.loc_db.get_location_offset(loc_key) is not None:
                if self.pinned_block_idx is not None:
                    raise ValueError("Multiples pinned block detected")
                self.pinned_block_idx = i

    def place(self):
        """Compute BlockChain min_offset and max_offset using pinned block and
        blocks' size
        """
        self._set_pinned_block_idx()
        self.max_size = 0
        for block in self.blocks:
            self.max_size += block.max_size + block.alignment - 1

        # Check if chain has one block pinned
        if not self.pinned:
            return

        loc = self.blocks[self.pinned_block_idx].loc_key
        offset_base = self.loc_db.get_location_offset(loc)
        assert(offset_base % self.blocks[self.pinned_block_idx].alignment == 0)

        self.offset_min = offset_base
        for block in self.blocks[:self.pinned_block_idx - 1:-1]:
            self.offset_min -= block.max_size + \
                (block.alignment - block.max_size) % block.alignment

        self.offset_max = offset_base
        for block in self.blocks[self.pinned_block_idx:]:
            self.offset_max += block.max_size + \
                (block.alignment - block.max_size) % block.alignment

    def merge(self, chain):
        """Best effort merge two block chains
        Return the list of resulting blockchains"""
        self.blocks += chain.blocks
        self.place()
        return [self]

    def fix_blocks(self, modified_loc_keys):
        """Propagate a pinned to its blocks' neighbour
        @modified_loc_keys: store new pinned loc_keys"""

        if not self.pinned:
            raise ValueError('Trying to fix unpinned block')

        # Propagate offset to blocks before pinned block
        pinned_block = self.blocks[self.pinned_block_idx]
        offset = self.loc_db.get_location_offset(pinned_block.loc_key)
        if offset % pinned_block.alignment != 0:
            raise RuntimeError('Bad alignment')

        for block in self.blocks[:self.pinned_block_idx - 1:-1]:
            new_offset = offset - block.size
            new_offset = new_offset - new_offset % pinned_block.alignment
            fix_loc_offset(self.loc_db,
                           block.loc_key,
                           new_offset,
                           modified_loc_keys)

        # Propagate offset to blocks after pinned block
        offset = self.loc_db.get_location_offset(pinned_block.loc_key) + pinned_block.size

        last_block = pinned_block
        for block in self.blocks[self.pinned_block_idx + 1:]:
            offset += (- offset) % last_block.alignment
            fix_loc_offset(self.loc_db,
                           block.loc_key,
                           offset,
                           modified_loc_keys)
            offset += block.size
            last_block = block
        return modified_loc_keys


class BlockChainWedge(object):

    """Stand for wedges between blocks"""

    def __init__(self, loc_db, offset, size):
        self.loc_db = loc_db
        self.offset = offset
        self.max_size = size
        self.offset_min = offset
        self.offset_max = offset + size

    def merge(self, chain):
        """Best effort merge two block chains
        Return the list of resulting blockchains"""
        self.loc_db.set_location_offset(chain.blocks[0].loc_key, self.offset_max)
        chain.place()
        return [self, chain]


def group_constrained_blocks(loc_db, asmcfg):
    """
    Return the BlockChains list built from grouped blocks in asmcfg linked by
    asm_constraint_next
    @asmcfg: an AsmCfg instance
    """
    log_asmblock.info('group_constrained_blocks')

    # Group adjacent asmcfg
    remaining_blocks = list(asmcfg.blocks)
    known_block_chains = {}

    while remaining_blocks:
        # Create a new block chain
        block_list = [remaining_blocks.pop()]

        # Find sons in remainings blocks linked with a next constraint
        while True:
            # Get next block
            next_loc_key = block_list[-1].get_next()
            if next_loc_key is None or asmcfg.loc_key_to_block(next_loc_key) is None:
                break
            next_block = asmcfg.loc_key_to_block(next_loc_key)

            # Add the block at the end of the current chain
            if next_block not in remaining_blocks:
                break
            block_list.append(next_block)
            remaining_blocks.remove(next_block)

        # Check if son is in a known block group
        if next_loc_key is not None and next_loc_key in known_block_chains:
            block_list += known_block_chains[next_loc_key]
            del known_block_chains[next_loc_key]

        known_block_chains[block_list[0].loc_key] = block_list

    out_block_chains = []
    for loc_key in known_block_chains:
        chain = BlockChain(loc_db, known_block_chains[loc_key])
        out_block_chains.append(chain)
    return out_block_chains


def get_blockchains_address_interval(blockChains, dst_interval):
    """Compute the interval used by the pinned @blockChains
    Check if the placed chains are in the @dst_interval"""

    allocated_interval = interval()
    for chain in blockChains:
        if not chain.pinned:
            continue
        chain_interval = interval([(chain.offset_min, chain.offset_max - 1)])
        if chain_interval not in dst_interval:
            raise ValueError('Chain placed out of destination interval')
        allocated_interval += chain_interval
    return allocated_interval


def resolve_symbol(blockChains, loc_db, dst_interval=None):
    """Place @blockChains in the @dst_interval"""

    log_asmblock.info('resolve_symbol')
    if dst_interval is None:
        dst_interval = interval([(0, 0xFFFFFFFFFFFFFFFF)])

    forbidden_interval = interval(
        [(-1, 0xFFFFFFFFFFFFFFFF + 1)]) - dst_interval
    allocated_interval = get_blockchains_address_interval(blockChains,
                                                          dst_interval)
    log_asmblock.debug('allocated interval: %s', allocated_interval)

    pinned_chains = [chain for chain in blockChains if chain.pinned]

    # Add wedge in forbidden intervals
    for start, stop in forbidden_interval.intervals:
        wedge = BlockChainWedge(
            loc_db, offset=start, size=stop + 1 - start)
        pinned_chains.append(wedge)

    # Try to place bigger blockChains first
    pinned_chains.sort(key=lambda x: x.offset_min)
    blockChains.sort(key=lambda x: -x.max_size)

    fixed_chains = list(pinned_chains)

    log_asmblock.debug("place chains")
    for chain in blockChains:
        if chain.pinned:
            continue
        fixed = False
        for i in xrange(1, len(fixed_chains)):
            prev_chain = fixed_chains[i - 1]
            next_chain = fixed_chains[i]

            if prev_chain.offset_max + chain.max_size < next_chain.offset_min:
                new_chains = prev_chain.merge(chain)
                fixed_chains[i - 1:i] = new_chains
                fixed = True
                break
        if not fixed:
            raise RuntimeError('Cannot find enough space to place blocks')

    return [chain for chain in fixed_chains if isinstance(chain, BlockChain)]


def get_block_loc_keys(block):
    """Extract loc_keys used by @block"""
    symbols = set()
    for instr in block.lines:
        if isinstance(instr, AsmRaw):
            if isinstance(instr.raw, list):
                for expr in instr.raw:
                    symbols.update(get_expr_locs(expr))
        else:
            for arg in instr.args:
                symbols.update(get_expr_locs(arg))
    return symbols


def assemble_block(mnemo, block, loc_db, conservative=False):
    """Assemble a @block using @loc_db
    @conservative: (optional) use original bytes when possible
    """
    offset_i = 0

    for instr in block.lines:
        if isinstance(instr, AsmRaw):
            if isinstance(instr.raw, list):
                # Fix special AsmRaw
                data = ""
                for expr in instr.raw:
                    expr_int = fix_expr_val(expr, loc_db)
                    data += pck[expr_int.size](expr_int.arg)
                instr.data = data

            instr.offset = offset_i
            offset_i += instr.l
            continue

        # Assemble an instruction
        saved_args = list(instr.args)
        instr.offset = loc_db.get_location_offset(block.loc_key) + offset_i

        # Replace instruction's arguments by resolved ones
        instr.args = instr.resolve_args_with_symbols(loc_db)

        if instr.dstflow():
            instr.fixDstOffset()

        old_l = instr.l
        cached_candidate, _ = conservative_asm(mnemo, instr, loc_db,
                                               conservative)

        # Restore original arguments
        instr.args = saved_args

        # We need to update the block size
        block.size = block.size - old_l + len(cached_candidate)
        instr.data = cached_candidate
        instr.l = len(cached_candidate)

        offset_i += instr.l


def asmblock_final(mnemo, asmcfg, blockChains, loc_db, conservative=False):
    """Resolve and assemble @blockChains using @loc_db until fixed point is
    reached"""

    log_asmblock.debug("asmbloc_final")

    # Init structures
    blocks_using_loc_key = {}
    for block in asmcfg.blocks:
        exprlocs = get_block_loc_keys(block)
        loc_keys = set(expr.loc_key for expr in exprlocs)
        for loc_key in loc_keys:
            blocks_using_loc_key.setdefault(loc_key, set()).add(block)

    block2chain = {}
    for chain in blockChains:
        for block in chain.blocks:
            block2chain[block] = chain

    # Init worklist
    blocks_to_rework = set(asmcfg.blocks)

    # Fix and re-assemble blocks until fixed point is reached
    while True:

        # Propagate pinned blocks into chains
        modified_loc_keys = set()
        for chain in blockChains:
            chain.fix_blocks(modified_loc_keys)

        for loc_key in modified_loc_keys:
            # Retrive block with modified reference
            mod_block = asmcfg.loc_key_to_block(loc_key)
            if mod_block is not None:
                blocks_to_rework.add(mod_block)

            # Enqueue blocks referencing a modified loc_key
            if loc_key not in blocks_using_loc_key:
                continue
            for block in blocks_using_loc_key[loc_key]:
                blocks_to_rework.add(block)

        # No more work
        if not blocks_to_rework:
            break

        while blocks_to_rework:
            block = blocks_to_rework.pop()
            assemble_block(mnemo, block, loc_db, conservative)


def asmbloc_final(mnemo, blocks, blockChains, loc_db, conservative=False):
    """Resolve and assemble @blockChains using @loc_db until fixed point is
    reached"""

    warnings.warn('DEPRECATION WARNING: use "asmblock_final" instead of "asmbloc_final"')
    asmblock_final(mnemo, blocks, blockChains, loc_db, conservative)

def asm_resolve_final(mnemo, asmcfg, loc_db, dst_interval=None):
    """Resolve and assemble @asmcfg using @loc_db into interval
    @dst_interval"""

    asmcfg.sanity_check()

    asmcfg.guess_blocks_size(mnemo)
    blockChains = group_constrained_blocks(loc_db, asmcfg)
    resolved_blockChains = resolve_symbol(
        blockChains,
        loc_db,
        dst_interval
    )

    asmblock_final(mnemo, asmcfg, resolved_blockChains, loc_db)
    patches = {}
    output_interval = interval()

    for block in asmcfg.blocks:
        offset = loc_db.get_location_offset(block.loc_key)
        for instr in block.lines:
            if not instr.data:
                # Empty line
                continue
            assert len(instr.data) == instr.l
            patches[offset] = instr.data
            instruction_interval = interval([(offset, offset + instr.l - 1)])
            if not (instruction_interval & output_interval).empty:
                raise RuntimeError("overlapping bytes %X" % int(offset))
            instr.offset = offset
            offset += instr.l
    return patches


class disasmEngine(object):

    """Disassembly engine, taking care of disassembler options and mutli-block
    strategy.

    Engine options:

    + Object supporting membership test (offset in ..)
     - dont_dis: stop the current disassembly branch if reached
     - split_dis: force a basic block end if reached,
                  with a next constraint on its successor
     - dont_dis_retcall_funcs: stop disassembly after a call to one
                               of the given functions

    + On/Off
     - follow_call: recursively disassemble CALL destinations
     - dontdis_retcall: stop on CALL return addresses
     - dont_dis_nulstart_bloc: stop if a block begin with a few \x00

    + Number
     - lines_wd: maximum block's size (in number of instruction)
     - blocs_wd: maximum number of distinct disassembled block

    + callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis,
               loc_db)
     - dis_block_callback: callback after each new disassembled block
    """

    def __init__(self, arch, attrib, bin_stream, **kwargs):
        """Instanciate a new disassembly engine
        @arch: targeted architecture
        @attrib: architecture attribute
        @bin_stream: bytes source
        @kwargs: (optional) custom options
        """
        self.arch = arch
        self.attrib = attrib
        self.bin_stream = bin_stream
        self.loc_db = LocationDB()

        # Setup options
        self.dont_dis = []
        self.split_dis = []
        self.follow_call = False
        self.dontdis_retcall = False
        self.lines_wd = None
        self.blocs_wd = None
        self.dis_block_callback = None
        self.dont_dis_nulstart_bloc = False
        self.dont_dis_retcall_funcs = set()

        # Override options if needed
        self.__dict__.update(kwargs)

    def get_job_done(self):
        warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""")
        return set()

    def set_job_done(self, _):
        warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""")
        return

    def get_dis_bloc_callback(self):
        warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""")
        return self.dis_block_callback

    def set_dis_bloc_callback(self, function):
        warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""")
        self.dis_block_callback = function

    @property
    def symbol_pool(self):
        warnings.warn("""DEPRECATION WARNING: use 'loc_db'""")
        return self.loc_db

    # Deprecated
    job_done = property(get_job_done, set_job_done)
    dis_bloc_callback = property(get_dis_bloc_callback, set_dis_bloc_callback)

    def _dis_block(self, offset, job_done=None):
        """Disassemble the block at offset @offset
        @job_done: a set of already disassembled addresses
        Return the created AsmBlock and future offsets to disassemble
        """

        if job_done is None:
            job_done = set()
        lines_cpt = 0
        in_delayslot = False
        delayslot_count = self.arch.delayslot
        offsets_to_dis = set()
        add_next_offset = False
        loc_key = self.loc_db.get_or_create_offset_location(offset)
        cur_block = AsmBlock(loc_key)
        log_asmblock.debug("dis at %X", int(offset))
        while not in_delayslot or delayslot_count > 0:
            if in_delayslot:
                delayslot_count -= 1

            if offset in self.dont_dis:
                if not cur_block.lines:
                    job_done.add(offset)
                    # Block is empty -> bad block
                    cur_block = AsmBlockBad(loc_key, errno=AsmBlockBad.ERROR_FORBIDDEN)
                else:
                    # Block is not empty, stop the desassembly pass and add a
                    # constraint to the next block
                    loc_key_cst = self.loc_db.get_or_create_offset_location(offset)
                    cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                break

            if lines_cpt > 0 and offset in self.split_dis:
                loc_key_cst = self.loc_db.get_or_create_offset_location(offset)
                cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                offsets_to_dis.add(offset)
                break

            lines_cpt += 1
            if self.lines_wd is not None and lines_cpt > self.lines_wd:
                log_asmblock.debug("lines watchdog reached at %X", int(offset))
                break

            if offset in job_done:
                loc_key_cst = self.loc_db.get_or_create_offset_location(offset)
                cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                break

            off_i = offset
            error = None
            try:
                instr = self.arch.dis(self.bin_stream, self.attrib, offset)
            except Disasm_Exception as e:
                log_asmblock.warning(e)
                instr = None
                error = AsmBlockBad.ERROR_CANNOT_DISASM
            except IOError as e:
                log_asmblock.warning(e)
                instr = None
                error = AsmBlockBad.ERROR_IO


            if instr is None:
                log_asmblock.warning("cannot disasm at %X", int(off_i))
                if not cur_block.lines:
                    job_done.add(offset)
                    # Block is empty -> bad block
                    cur_block = AsmBlockBad(loc_key, errno=error)
                else:
                    # Block is not empty, stop the desassembly pass and add a
                    # constraint to the next block
                    loc_key_cst = self.loc_db.get_or_create_offset_location(off_i)
                    cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                break

            # XXX TODO nul start block option
            if self.dont_dis_nulstart_bloc and instr.b.count('\x00') == instr.l:
                log_asmblock.warning("reach nul instr at %X", int(off_i))
                if not cur_block.lines:
                    # Block is empty -> bad block
                    cur_block = AsmBlockBad(loc_key, errno=AsmBlockBad.ERROR_NULL_STARTING_BLOCK)
                else:
                    # Block is not empty, stop the desassembly pass and add a
                    # constraint to the next block
                    loc_key_cst = self.loc_db.get_or_create_offset_location(off_i)
                    cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                break

            # special case: flow graph modificator in delayslot
            if in_delayslot and instr and (instr.splitflow() or instr.breakflow()):
                add_next_offset = True
                break

            job_done.add(offset)
            log_asmblock.debug("dis at %X", int(offset))

            offset += instr.l
            log_asmblock.debug(instr)
            log_asmblock.debug(instr.args)

            cur_block.addline(instr)
            if not instr.breakflow():
                continue
            # test split
            if instr.splitflow() and not (instr.is_subcall() and self.dontdis_retcall):
                add_next_offset = True
            if instr.dstflow():
                instr.dstflow2label(self.loc_db)
                destinations = instr.getdstflow(self.loc_db)
                known_dsts = []
                for dst in destinations:
                    if not dst.is_loc():
                        continue
                    loc_key = dst.loc_key
                    loc_key_offset = self.loc_db.get_location_offset(loc_key)
                    known_dsts.append(loc_key)
                    if loc_key_offset in self.dont_dis_retcall_funcs:
                        add_next_offset = False
                if (not instr.is_subcall()) or self.follow_call:
                    cur_block.bto.update([AsmConstraint(loc_key, AsmConstraint.c_to) for loc_key in known_dsts])

            # get in delayslot mode
            in_delayslot = True
            delayslot_count = instr.delayslot

        for c in cur_block.bto:
            loc_key_offset = self.loc_db.get_location_offset(c.loc_key)
            offsets_to_dis.add(loc_key_offset)

        if add_next_offset:
            loc_key_cst = self.loc_db.get_or_create_offset_location(offset)
            cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
            offsets_to_dis.add(offset)

        # Fix multiple constraints
        cur_block.fix_constraints()

        if self.dis_block_callback is not None:
            self.dis_block_callback(mn=self.arch, attrib=self.attrib,
                                    pool_bin=self.bin_stream, cur_bloc=cur_block,
                                    offsets_to_dis=offsets_to_dis,
                                    loc_db=self.loc_db,
                                    # Deprecated API
                                    symbol_pool=self.loc_db)
        return cur_block, offsets_to_dis

    def dis_block(self, offset):
        """Disassemble the block at offset @offset and return the created
        AsmBlock
        @offset: targeted offset to disassemble
        """
        current_block, _ = self._dis_block(offset)
        return current_block

    def dis_bloc(self, offset):
        """
        DEPRECATED function
        Use dis_block instead of dis_bloc
        """
        warnings.warn('DEPRECATION WARNING: use "dis_block" instead of "dis_bloc"')
        return self.dis_block(offset)

    def dis_multiblock(self, offset, blocks=None):
        """Disassemble every block reachable from @offset regarding
        specific disasmEngine conditions
        Return an AsmCFG instance containing disassembled blocks
        @offset: starting offset
        @blocks: (optional) AsmCFG instance of already disassembled blocks to
                merge with
        """
        log_asmblock.info("dis bloc all")
        job_done = set()
        if blocks is None:
            blocks = AsmCFG(self.loc_db)
        todo = [offset]

        bloc_cpt = 0
        while len(todo):
            bloc_cpt += 1
            if self.blocs_wd is not None and bloc_cpt > self.blocs_wd:
                log_asmblock.debug("blocks watchdog reached at %X", int(offset))
                break

            target_offset = int(todo.pop(0))
            if (target_offset is None or
                    target_offset in job_done):
                continue
            cur_block, nexts = self._dis_block(target_offset, job_done)
            todo += nexts
            blocks.add_block(cur_block)

        blocks.apply_splitting(self.loc_db,
                               dis_block_callback=self.dis_block_callback,
                               mn=self.arch, attrib=self.attrib,
                               pool_bin=self.bin_stream)
        return blocks

    def dis_multibloc(self, offset, blocs=None):
        """
        DEPRECATED function
        Use dis_multiblock instead of dis_multibloc
        """
        warnings.warn('DEPRECATION WARNING: use "dis_multiblock" instead of "dis_multibloc"')
        return self.dis_multiblock(offset, blocs)

    def dis_instr(self, offset):
        """Disassemble one instruction at offset @offset and return the
        corresponding instruction instance
        @offset: targeted offset to disassemble
        """
        old_lineswd = self.lines_wd
        self.lines_wd = 1
        try:
            block = self.dis_block(offset)
        finally:
            self.lines_wd = old_lineswd

        instr = block.lines[0]
        return instr

Module variables

var bbl_simplifier

var console_handler

var log_asmblock

var pck

Functions

def asm_resolve_final(

mnemo, asmcfg, loc_db, dst_interval=None)

Resolve and assemble @asmcfg using @loc_db into interval @dst_interval

def asm_resolve_final(mnemo, asmcfg, loc_db, dst_interval=None):
    """Resolve and assemble @asmcfg using @loc_db into interval
    @dst_interval"""

    asmcfg.sanity_check()

    asmcfg.guess_blocks_size(mnemo)
    blockChains = group_constrained_blocks(loc_db, asmcfg)
    resolved_blockChains = resolve_symbol(
        blockChains,
        loc_db,
        dst_interval
    )

    asmblock_final(mnemo, asmcfg, resolved_blockChains, loc_db)
    patches = {}
    output_interval = interval()

    for block in asmcfg.blocks:
        offset = loc_db.get_location_offset(block.loc_key)
        for instr in block.lines:
            if not instr.data:
                # Empty line
                continue
            assert len(instr.data) == instr.l
            patches[offset] = instr.data
            instruction_interval = interval([(offset, offset + instr.l - 1)])
            if not (instruction_interval & output_interval).empty:
                raise RuntimeError("overlapping bytes %X" % int(offset))
            instr.offset = offset
            offset += instr.l
    return patches

def asmbloc_final(

mnemo, blocks, blockChains, loc_db, conservative=False)

Resolve and assemble @blockChains using @loc_db until fixed point is reached

def asmbloc_final(mnemo, blocks, blockChains, loc_db, conservative=False):
    """Resolve and assemble @blockChains using @loc_db until fixed point is
    reached"""

    warnings.warn('DEPRECATION WARNING: use "asmblock_final" instead of "asmbloc_final"')
    asmblock_final(mnemo, blocks, blockChains, loc_db, conservative)

def asmblock_final(

mnemo, asmcfg, blockChains, loc_db, conservative=False)

Resolve and assemble @blockChains using @loc_db until fixed point is reached

def asmblock_final(mnemo, asmcfg, blockChains, loc_db, conservative=False):
    """Resolve and assemble @blockChains using @loc_db until fixed point is
    reached"""

    log_asmblock.debug("asmbloc_final")

    # Init structures
    blocks_using_loc_key = {}
    for block in asmcfg.blocks:
        exprlocs = get_block_loc_keys(block)
        loc_keys = set(expr.loc_key for expr in exprlocs)
        for loc_key in loc_keys:
            blocks_using_loc_key.setdefault(loc_key, set()).add(block)

    block2chain = {}
    for chain in blockChains:
        for block in chain.blocks:
            block2chain[block] = chain

    # Init worklist
    blocks_to_rework = set(asmcfg.blocks)

    # Fix and re-assemble blocks until fixed point is reached
    while True:

        # Propagate pinned blocks into chains
        modified_loc_keys = set()
        for chain in blockChains:
            chain.fix_blocks(modified_loc_keys)

        for loc_key in modified_loc_keys:
            # Retrive block with modified reference
            mod_block = asmcfg.loc_key_to_block(loc_key)
            if mod_block is not None:
                blocks_to_rework.add(mod_block)

            # Enqueue blocks referencing a modified loc_key
            if loc_key not in blocks_using_loc_key:
                continue
            for block in blocks_using_loc_key[loc_key]:
                blocks_to_rework.add(block)

        # No more work
        if not blocks_to_rework:
            break

        while blocks_to_rework:
            block = blocks_to_rework.pop()
            assemble_block(mnemo, block, loc_db, conservative)

def assemble_block(

mnemo, block, loc_db, conservative=False)

Assemble a @block using @loc_db @conservative: (optional) use original bytes when possible

def assemble_block(mnemo, block, loc_db, conservative=False):
    """Assemble a @block using @loc_db
    @conservative: (optional) use original bytes when possible
    """
    offset_i = 0

    for instr in block.lines:
        if isinstance(instr, AsmRaw):
            if isinstance(instr.raw, list):
                # Fix special AsmRaw
                data = ""
                for expr in instr.raw:
                    expr_int = fix_expr_val(expr, loc_db)
                    data += pck[expr_int.size](expr_int.arg)
                instr.data = data

            instr.offset = offset_i
            offset_i += instr.l
            continue

        # Assemble an instruction
        saved_args = list(instr.args)
        instr.offset = loc_db.get_location_offset(block.loc_key) + offset_i

        # Replace instruction's arguments by resolved ones
        instr.args = instr.resolve_args_with_symbols(loc_db)

        if instr.dstflow():
            instr.fixDstOffset()

        old_l = instr.l
        cached_candidate, _ = conservative_asm(mnemo, instr, loc_db,
                                               conservative)

        # Restore original arguments
        instr.args = saved_args

        # We need to update the block size
        block.size = block.size - old_l + len(cached_candidate)
        instr.data = cached_candidate
        instr.l = len(cached_candidate)

        offset_i += instr.l

def conservative_asm(

mnemo, instr, symbols, conservative)

Asm instruction; Try to keep original instruction bytes if it exists

def conservative_asm(mnemo, instr, symbols, conservative):
    """
    Asm instruction;
    Try to keep original instruction bytes if it exists
    """
    candidates = mnemo.asm(instr, symbols)
    if not candidates:
        raise ValueError('cannot asm:%s' % str(instr))
    if not hasattr(instr, "b"):
        return candidates[0], candidates
    if instr.b in candidates:
        return instr.b, candidates
    if conservative:
        for c in candidates:
            if len(c) == len(instr.b):
                return c, candidates
    return candidates[0], candidates

def fix_expr_val(

expr, symbols)

Resolve an expression @expr using @symbols

def fix_expr_val(expr, symbols):
    """Resolve an expression @expr using @symbols"""
    def expr_calc(e):
        if isinstance(e, ExprId):
            # Example:
            # toto:
            # .dword label
            loc_key = symbols.get_name_location(e.name)
            offset = symbols.get_location_offset(loc_key)
            e = ExprInt(offset, e.size)
        return e
    result = expr.visit(expr_calc)
    result = expr_simp(result)
    if not isinstance(result, ExprInt):
        raise RuntimeError('Cannot resolve symbol %s' % expr)
    return result

def fix_loc_offset(

loc_db, loc_key, offset, modified)

Fix the @loc_key offset to @offset. If the @offset has changed, add @loc_key to @modified @loc_db: current loc_db

def fix_loc_offset(loc_db, loc_key, offset, modified):
    """
    Fix the @loc_key offset to @offset. If the @offset has changed, add @loc_key
    to @modified
    @loc_db: current loc_db
    """
    loc_offset = loc_db.get_location_offset(loc_key)
    if loc_offset == offset:
        return
    loc_db.set_location_offset(loc_key, offset, force=True)
    modified.add(loc_key)

def get_block_loc_keys(

block)

Extract loc_keys used by @block

def get_block_loc_keys(block):
    """Extract loc_keys used by @block"""
    symbols = set()
    for instr in block.lines:
        if isinstance(instr, AsmRaw):
            if isinstance(instr.raw, list):
                for expr in instr.raw:
                    symbols.update(get_expr_locs(expr))
        else:
            for arg in instr.args:
                symbols.update(get_expr_locs(arg))
    return symbols

def get_blockchains_address_interval(

blockChains, dst_interval)

Compute the interval used by the pinned @blockChains Check if the placed chains are in the @dst_interval

def get_blockchains_address_interval(blockChains, dst_interval):
    """Compute the interval used by the pinned @blockChains
    Check if the placed chains are in the @dst_interval"""

    allocated_interval = interval()
    for chain in blockChains:
        if not chain.pinned:
            continue
        chain_interval = interval([(chain.offset_min, chain.offset_max - 1)])
        if chain_interval not in dst_interval:
            raise ValueError('Chain placed out of destination interval')
        allocated_interval += chain_interval
    return allocated_interval

def group_constrained_blocks(

loc_db, asmcfg)

Return the BlockChains list built from grouped blocks in asmcfg linked by asm_constraint_next @asmcfg: an AsmCfg instance

def group_constrained_blocks(loc_db, asmcfg):
    """
    Return the BlockChains list built from grouped blocks in asmcfg linked by
    asm_constraint_next
    @asmcfg: an AsmCfg instance
    """
    log_asmblock.info('group_constrained_blocks')

    # Group adjacent asmcfg
    remaining_blocks = list(asmcfg.blocks)
    known_block_chains = {}

    while remaining_blocks:
        # Create a new block chain
        block_list = [remaining_blocks.pop()]

        # Find sons in remainings blocks linked with a next constraint
        while True:
            # Get next block
            next_loc_key = block_list[-1].get_next()
            if next_loc_key is None or asmcfg.loc_key_to_block(next_loc_key) is None:
                break
            next_block = asmcfg.loc_key_to_block(next_loc_key)

            # Add the block at the end of the current chain
            if next_block not in remaining_blocks:
                break
            block_list.append(next_block)
            remaining_blocks.remove(next_block)

        # Check if son is in a known block group
        if next_loc_key is not None and next_loc_key in known_block_chains:
            block_list += known_block_chains[next_loc_key]
            del known_block_chains[next_loc_key]

        known_block_chains[block_list[0].loc_key] = block_list

    out_block_chains = []
    for loc_key in known_block_chains:
        chain = BlockChain(loc_db, known_block_chains[loc_key])
        out_block_chains.append(chain)
    return out_block_chains

def is_int(

a)

def is_int(a):
    return isinstance(a, int) or isinstance(a, long) or \
        isinstance(a, moduint) or isinstance(a, modint)

def resolve_symbol(

blockChains, loc_db, dst_interval=None)

Place @blockChains in the @dst_interval

def resolve_symbol(blockChains, loc_db, dst_interval=None):
    """Place @blockChains in the @dst_interval"""

    log_asmblock.info('resolve_symbol')
    if dst_interval is None:
        dst_interval = interval([(0, 0xFFFFFFFFFFFFFFFF)])

    forbidden_interval = interval(
        [(-1, 0xFFFFFFFFFFFFFFFF + 1)]) - dst_interval
    allocated_interval = get_blockchains_address_interval(blockChains,
                                                          dst_interval)
    log_asmblock.debug('allocated interval: %s', allocated_interval)

    pinned_chains = [chain for chain in blockChains if chain.pinned]

    # Add wedge in forbidden intervals
    for start, stop in forbidden_interval.intervals:
        wedge = BlockChainWedge(
            loc_db, offset=start, size=stop + 1 - start)
        pinned_chains.append(wedge)

    # Try to place bigger blockChains first
    pinned_chains.sort(key=lambda x: x.offset_min)
    blockChains.sort(key=lambda x: -x.max_size)

    fixed_chains = list(pinned_chains)

    log_asmblock.debug("place chains")
    for chain in blockChains:
        if chain.pinned:
            continue
        fixed = False
        for i in xrange(1, len(fixed_chains)):
            prev_chain = fixed_chains[i - 1]
            next_chain = fixed_chains[i]

            if prev_chain.offset_max + chain.max_size < next_chain.offset_min:
                new_chains = prev_chain.merge(chain)
                fixed_chains[i - 1:i] = new_chains
                fixed = True
                break
        if not fixed:
            raise RuntimeError('Cannot find enough space to place blocks')

    return [chain for chain in fixed_chains if isinstance(chain, BlockChain)]

Classes

class AsmBlock

class AsmBlock(object):

    def __init__(self, loc_key, alignment=1):
        assert isinstance(loc_key, LocKey)

        self.bto = set()
        self.lines = []
        self._loc_key = loc_key
        self.alignment = alignment

    def get_label(self):
        warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
        return self.loc_key

    loc_key = property(lambda self:self._loc_key)
    label = property(get_label)


    def to_string(self, loc_db=None):
        out = []
        if loc_db is None:
            out.append(str(self.loc_key))
        else:
            out.append(loc_db.pretty_str(self.loc_key))

        for instr in self.lines:
            out.append(instr.to_string(loc_db))
        if self.bto:
            lbls = ["->"]
            for dst in self.bto:
                if dst is None:
                    lbls.append("Unknown? ")
                else:
                    lbls.append(dst.to_string(loc_db) + " ")
            lbls = '\t'.join(sorted(lbls))
            out.append(lbls)
        return '\n'.join(out)

    def __str__(self):
        return self.to_string()

    def addline(self, l):
        self.lines.append(l)

    def addto(self, c):
        assert isinstance(self.bto, set)
        self.bto.add(c)

    def split(self, loc_db, offset):
        loc_key = loc_db.get_or_create_offset_location(offset)
        log_asmblock.debug('split at %x', offset)
        i = -1
        offsets = [x.offset for x in self.lines]
        offset = loc_db.get_location_offset(loc_key)
        if offset not in offsets:
            log_asmblock.warning(
                'cannot split bloc at %X ' % offset +
                'middle instruction? default middle')
            offsets.sort()
            return None
        new_bloc = AsmBlock(loc_key)
        i = offsets.index(offset)

        self.lines, new_bloc.lines = self.lines[:i], self.lines[i:]
        flow_mod_instr = self.get_flow_instr()
        log_asmblock.debug('flow mod %r', flow_mod_instr)
        c = AsmConstraint(loc_key, AsmConstraint.c_next)
        # move dst if flowgraph modifier was in original bloc
        # (usecase: split delayslot bloc)
        if flow_mod_instr:
            for xx in self.bto:
                log_asmblock.debug('lbl %s', xx)
            c_next = set(
                [x for x in self.bto if x.c_t == AsmConstraint.c_next])
            c_to = [x for x in self.bto if x.c_t != AsmConstraint.c_next]
            self.bto = set([c] + c_to)
            new_bloc.bto = c_next
        else:
            new_bloc.bto = self.bto
            self.bto = set([c])
        return new_bloc

    def get_range(self):
        """Returns the offset hull of an AsmBlock"""
        if len(self.lines):
            return (self.lines[0].offset,
                    self.lines[-1].offset + self.lines[-1].l)
        else:
            return 0, 0

    def get_offsets(self):
        return [x.offset for x in self.lines]

    def add_cst(self, loc_key, constraint_type):
        """
        Add constraint between current block and block at @loc_key
        @loc_key: LocKey instance of constraint target
        @constraint_type: AsmConstraint c_to/c_next
        """
        assert isinstance(loc_key, LocKey)
        c = AsmConstraint(loc_key, constraint_type)
        self.bto.add(c)

    def get_flow_instr(self):
        if not self.lines:
            return None
        for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1):
            if not 0 <= i < len(self.lines):
                return None
            l = self.lines[i]
            if l.splitflow() or l.breakflow():
                raise NotImplementedError('not fully functional')

    def get_subcall_instr(self):
        if not self.lines:
            return None
        delayslot = self.lines[0].delayslot
        end_index = len(self.lines) - 1
        ds_max_index = max(end_index - delayslot, 0)
        for i in xrange(end_index, ds_max_index - 1, -1):
            l = self.lines[i]
            if l.is_subcall():
                return l
        return None

    def get_next(self):
        for constraint in self.bto:
            if constraint.c_t == AsmConstraint.c_next:
                return constraint.loc_key
        return None

    @staticmethod
    def _filter_constraint(constraints):
        """Sort and filter @constraints for AsmBlock.bto
        @constraints: non-empty set of AsmConstraint instance

        Always the same type -> one of the constraint
        c_next and c_to -> c_next
        """
        # Only one constraint
        if len(constraints) == 1:
            return next(iter(constraints))

        # Constraint type -> set of corresponding constraint
        cbytype = {}
        for cons in constraints:
            cbytype.setdefault(cons.c_t, set()).add(cons)

        # Only one type -> any constraint is OK
        if len(cbytype) == 1:
            return next(iter(constraints))

        # At least 2 types -> types = {c_next, c_to}
        # c_to is included in c_next
        return next(iter(cbytype[AsmConstraint.c_next]))

    def fix_constraints(self):
        """Fix next block constraints"""
        # destination -> associated constraints
        dests = {}
        for constraint in self.bto:
            dests.setdefault(constraint.loc_key, set()).add(constraint)

        self.bto = set(self._filter_constraint(constraints)
                       for constraints in dests.itervalues())

Ancestors (in MRO)

Class variables

var label

var loc_key

Instance variables

var alignment

var bto

var label

var lines

var loc_key

Methods

def __init__(

self, loc_key, alignment=1)

def __init__(self, loc_key, alignment=1):
    assert isinstance(loc_key, LocKey)
    self.bto = set()
    self.lines = []
    self._loc_key = loc_key
    self.alignment = alignment

def add_cst(

self, loc_key, constraint_type)

Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next

def add_cst(self, loc_key, constraint_type):
    """
    Add constraint between current block and block at @loc_key
    @loc_key: LocKey instance of constraint target
    @constraint_type: AsmConstraint c_to/c_next
    """
    assert isinstance(loc_key, LocKey)
    c = AsmConstraint(loc_key, constraint_type)
    self.bto.add(c)

def addline(

self, l)

def addline(self, l):
    self.lines.append(l)

def addto(

self, c)

def addto(self, c):
    assert isinstance(self.bto, set)
    self.bto.add(c)

def fix_constraints(

self)

Fix next block constraints

def fix_constraints(self):
    """Fix next block constraints"""
    # destination -> associated constraints
    dests = {}
    for constraint in self.bto:
        dests.setdefault(constraint.loc_key, set()).add(constraint)
    self.bto = set(self._filter_constraint(constraints)
                   for constraints in dests.itervalues())

def get_flow_instr(

self)

def get_flow_instr(self):
    if not self.lines:
        return None
    for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1):
        if not 0 <= i < len(self.lines):
            return None
        l = self.lines[i]
        if l.splitflow() or l.breakflow():
            raise NotImplementedError('not fully functional')

def get_label(

self)

def get_label(self):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    return self.loc_key

def get_next(

self)

def get_next(self):
    for constraint in self.bto:
        if constraint.c_t == AsmConstraint.c_next:
            return constraint.loc_key
    return None

def get_offsets(

self)

def get_offsets(self):
    return [x.offset for x in self.lines]

def get_range(

self)

Returns the offset hull of an AsmBlock

def get_range(self):
    """Returns the offset hull of an AsmBlock"""
    if len(self.lines):
        return (self.lines[0].offset,
                self.lines[-1].offset + self.lines[-1].l)
    else:
        return 0, 0

def get_subcall_instr(

self)

def get_subcall_instr(self):
    if not self.lines:
        return None
    delayslot = self.lines[0].delayslot
    end_index = len(self.lines) - 1
    ds_max_index = max(end_index - delayslot, 0)
    for i in xrange(end_index, ds_max_index - 1, -1):
        l = self.lines[i]
        if l.is_subcall():
            return l
    return None

def split(

self, loc_db, offset)

def split(self, loc_db, offset):
    loc_key = loc_db.get_or_create_offset_location(offset)
    log_asmblock.debug('split at %x', offset)
    i = -1
    offsets = [x.offset for x in self.lines]
    offset = loc_db.get_location_offset(loc_key)
    if offset not in offsets:
        log_asmblock.warning(
            'cannot split bloc at %X ' % offset +
            'middle instruction? default middle')
        offsets.sort()
        return None
    new_bloc = AsmBlock(loc_key)
    i = offsets.index(offset)
    self.lines, new_bloc.lines = self.lines[:i], self.lines[i:]
    flow_mod_instr = self.get_flow_instr()
    log_asmblock.debug('flow mod %r', flow_mod_instr)
    c = AsmConstraint(loc_key, AsmConstraint.c_next)
    # move dst if flowgraph modifier was in original bloc
    # (usecase: split delayslot bloc)
    if flow_mod_instr:
        for xx in self.bto:
            log_asmblock.debug('lbl %s', xx)
        c_next = set(
            [x for x in self.bto if x.c_t == AsmConstraint.c_next])
        c_to = [x for x in self.bto if x.c_t != AsmConstraint.c_next]
        self.bto = set([c] + c_to)
        new_bloc.bto = c_next
    else:
        new_bloc.bto = self.bto
        self.bto = set([c])
    return new_bloc

def to_string(

self, loc_db=None)

def to_string(self, loc_db=None):
    out = []
    if loc_db is None:
        out.append(str(self.loc_key))
    else:
        out.append(loc_db.pretty_str(self.loc_key))
    for instr in self.lines:
        out.append(instr.to_string(loc_db))
    if self.bto:
        lbls = ["->"]
        for dst in self.bto:
            if dst is None:
                lbls.append("Unknown? ")
            else:
                lbls.append(dst.to_string(loc_db) + " ")
        lbls = '\t'.join(sorted(lbls))
        out.append(lbls)
    return '\n'.join(out)

class AsmBlockBad

Stand for a bad ASM block (malformed, unreachable, not disassembled, ...)

class AsmBlockBad(AsmBlock):

    """Stand for a *bad* ASM block (malformed, unreachable,
    not disassembled, ...)"""


    ERROR_UNKNOWN = -1
    ERROR_CANNOT_DISASM = 0
    ERROR_NULL_STARTING_BLOCK = 1
    ERROR_FORBIDDEN = 2
    ERROR_IO = 3


    ERROR_TYPES = {
        ERROR_UNKNOWN: "Unknown error",
        ERROR_CANNOT_DISASM: "Unable to disassemble",
        ERROR_NULL_STARTING_BLOCK: "Null starting block",
        ERROR_FORBIDDEN: "Address forbidden by dont_dis",
        ERROR_IO: "IOError",
    }

    def __init__(self, loc_key=None, alignment=1, errno=ERROR_UNKNOWN, *args, **kwargs):
        """Instanciate an AsmBlock_bad.
        @loc_key, @alignement: same as AsmBlock.__init__
        @errno: (optional) specify a error type associated with the block
        """
        super(AsmBlockBad, self).__init__(loc_key, alignment, *args, **kwargs)
        self._errno = errno

    errno = property(lambda self: self._errno)

    def __str__(self):
        error_txt = self.ERROR_TYPES.get(self._errno, self._errno)
        return "\n".join([str(self.loc_key),
                          "\tBad block: %s" % error_txt])

    def addline(self, *args, **kwargs):
        raise RuntimeError("An AsmBlockBad cannot have line")

    def addto(self, *args, **kwargs):
        raise RuntimeError("An AsmBlockBad cannot have bto")

    def split(self, *args, **kwargs):
        raise RuntimeError("An AsmBlockBad cannot be splitted")

Ancestors (in MRO)

Class variables

var ERROR_CANNOT_DISASM

var ERROR_FORBIDDEN

var ERROR_IO

var ERROR_NULL_STARTING_BLOCK

var ERROR_TYPES

var ERROR_UNKNOWN

var errno

Instance variables

var alignment

Inheritance: AsmBlock.alignment

var bto

Inheritance: AsmBlock.bto

var errno

var label

Inheritance: AsmBlock.label

var lines

Inheritance: AsmBlock.lines

var loc_key

Inheritance: AsmBlock.loc_key

Methods

def __init__(

self, loc_key=None, alignment=1, errno=-1, *args, **kwargs)

Inheritance: AsmBlock.__init__

Instanciate an AsmBlock_bad. @loc_key, @alignement: same as AsmBlock.init @errno: (optional) specify a error type associated with the block

def __init__(self, loc_key=None, alignment=1, errno=ERROR_UNKNOWN, *args, **kwargs):
    """Instanciate an AsmBlock_bad.
    @loc_key, @alignement: same as AsmBlock.__init__
    @errno: (optional) specify a error type associated with the block
    """
    super(AsmBlockBad, self).__init__(loc_key, alignment, *args, **kwargs)
    self._errno = errno

def add_cst(

self, loc_key, constraint_type)

Inheritance: AsmBlock.add_cst

Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next

def add_cst(self, loc_key, constraint_type):
    """
    Add constraint between current block and block at @loc_key
    @loc_key: LocKey instance of constraint target
    @constraint_type: AsmConstraint c_to/c_next
    """
    assert isinstance(loc_key, LocKey)
    c = AsmConstraint(loc_key, constraint_type)
    self.bto.add(c)

def addline(

self, *args, **kwargs)

Inheritance: AsmBlock.addline

def addline(self, *args, **kwargs):
    raise RuntimeError("An AsmBlockBad cannot have line")

def addto(

self, *args, **kwargs)

Inheritance: AsmBlock.addto

def addto(self, *args, **kwargs):
    raise RuntimeError("An AsmBlockBad cannot have bto")

def fix_constraints(

self)

Inheritance: AsmBlock.fix_constraints

Fix next block constraints

def fix_constraints(self):
    """Fix next block constraints"""
    # destination -> associated constraints
    dests = {}
    for constraint in self.bto:
        dests.setdefault(constraint.loc_key, set()).add(constraint)
    self.bto = set(self._filter_constraint(constraints)
                   for constraints in dests.itervalues())

def get_flow_instr(

self)

Inheritance: AsmBlock.get_flow_instr

def get_flow_instr(self):
    if not self.lines:
        return None
    for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1):
        if not 0 <= i < len(self.lines):
            return None
        l = self.lines[i]
        if l.splitflow() or l.breakflow():
            raise NotImplementedError('not fully functional')

def get_label(

self)

Inheritance: AsmBlock.get_label

def get_label(self):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    return self.loc_key

def get_next(

self)

Inheritance: AsmBlock.get_next

def get_next(self):
    for constraint in self.bto:
        if constraint.c_t == AsmConstraint.c_next:
            return constraint.loc_key
    return None

def get_offsets(

self)

Inheritance: AsmBlock.get_offsets

def get_offsets(self):
    return [x.offset for x in self.lines]

def get_range(

self)

Inheritance: AsmBlock.get_range

Returns the offset hull of an AsmBlock

def get_range(self):
    """Returns the offset hull of an AsmBlock"""
    if len(self.lines):
        return (self.lines[0].offset,
                self.lines[-1].offset + self.lines[-1].l)
    else:
        return 0, 0

def get_subcall_instr(

self)

Inheritance: AsmBlock.get_subcall_instr

def get_subcall_instr(self):
    if not self.lines:
        return None
    delayslot = self.lines[0].delayslot
    end_index = len(self.lines) - 1
    ds_max_index = max(end_index - delayslot, 0)
    for i in xrange(end_index, ds_max_index - 1, -1):
        l = self.lines[i]
        if l.is_subcall():
            return l
    return None

def split(

self, *args, **kwargs)

Inheritance: AsmBlock.split

def split(self, *args, **kwargs):
    raise RuntimeError("An AsmBlockBad cannot be splitted")

def to_string(

self, loc_db=None)

Inheritance: AsmBlock.to_string

def to_string(self, loc_db=None):
    out = []
    if loc_db is None:
        out.append(str(self.loc_key))
    else:
        out.append(loc_db.pretty_str(self.loc_key))
    for instr in self.lines:
        out.append(instr.to_string(loc_db))
    if self.bto:
        lbls = ["->"]
        for dst in self.bto:
            if dst is None:
                lbls.append("Unknown? ")
            else:
                lbls.append(dst.to_string(loc_db) + " ")
        lbls = '\t'.join(sorted(lbls))
        out.append(lbls)
    return '\n'.join(out)

class AsmCFG

Directed graph standing for a ASM Control Flow Graph with: - nodes: AsmBlock - edges: constraints between blocks, synchronized with AsmBlock's "bto"

Specialized the .dot export and force the relation between block to be uniq, and associated with a constraint.

Offer helpers on AsmCFG management, such as research by loc_key, sanity checking and mnemonic size guessing.

class AsmCFG(DiGraph):

    """Directed graph standing for a ASM Control Flow Graph with:
     - nodes: AsmBlock
     - edges: constraints between blocks, synchronized with AsmBlock's "bto"

    Specialized the .dot export and force the relation between block to be uniq,
    and associated with a constraint.

    Offer helpers on AsmCFG management, such as research by loc_key, sanity
    checking and mnemonic size guessing.
    """

    # Internal structure for pending management
    AsmCFGPending = namedtuple("AsmCFGPending",
                               ["waiter", "constraint"])

    def __init__(self, loc_db=None, *args, **kwargs):
        super(AsmCFG, self).__init__(*args, **kwargs)
        # Edges -> constraint
        self.edges2constraint = {}
        # Expected LocKey -> set( (src, dst), constraint )
        self._pendings = {}
        # Loc_Key2block built on the fly
        self._loc_key_to_block = {}
        # loc_db
        self.loc_db = loc_db


    def copy(self):
        """Copy the current graph instance"""
        graph = self.__class__(self.loc_db)
        return graph + self


    # Compatibility with old list API
    def append(self, *args, **kwargs):
        raise DeprecationWarning("AsmCFG is a graph, use add_node")

    def remove(self, *args, **kwargs):
        raise DeprecationWarning("AsmCFG is a graph, use del_node")

    def __getitem__(self, *args, **kwargs):
        raise DeprecationWarning("Order of AsmCFG elements is not reliable")

    def __contains__(self, _):
        """
        DEPRECATED. Use:
        - loc_key in AsmCFG.nodes() to test loc_key existence
        """
        raise RuntimeError("DEPRECATED")

    def __iter__(self):
        """
        DEPRECATED. Use:
        - AsmCFG.blocks() to iter on blocks
        - loc_key in AsmCFG.nodes() to test loc_key existence
        """
        raise RuntimeError("DEPRECATED")

    def __len__(self):
        """Return the number of blocks in AsmCFG"""
        return len(self._nodes)

    blocks = property(lambda x:x._loc_key_to_block.itervalues())

    # Manage graph with associated constraints
    def add_edge(self, src, dst, constraint):
        """Add an edge to the graph
        @src: LocKey instance, source
        @dst: LocKey instance, destination
        @constraint: constraint associated to this edge
        """
        # Sanity check
        assert isinstance(src, LocKey)
        assert isinstance(dst, LocKey)
        known_cst = self.edges2constraint.get((src, dst), None)
        if known_cst is not None:
            assert known_cst == constraint
            return

        # Add the edge to src.bto if needed
        block_src = self.loc_key_to_block(src)
        if block_src:
            if dst not in [cons.loc_key for cons in block_src.bto]:
                block_src.bto.add(AsmConstraint(dst, constraint))

        # Add edge
        self.edges2constraint[(src, dst)] = constraint
        super(AsmCFG, self).add_edge(src, dst)

    def add_uniq_edge(self, src, dst, constraint):
        """
        Synonym for `add_edge`
        """
        self.add_edge(src, dst, constraint)

    def del_edge(self, src, dst):
        """Delete the edge @src->@dst and its associated constraint"""
        src_blk = self.loc_key_to_block(src)
        dst_blk = self.loc_key_to_block(dst)
        assert src_blk is not None
        assert dst_blk is not None
        # Delete from src.bto
        to_remove = [cons for cons in src_blk.bto if cons.loc_key == dst]
        if to_remove:
            assert len(to_remove) == 1
            src_blk.bto.remove(to_remove[0])

        # Del edge
        del self.edges2constraint[(src, dst)]
        super(AsmCFG, self).del_edge(src, dst)

    def del_block(self, block):
        super(AsmCFG, self).del_node(block.loc_key)
        del self._loc_key_to_block[block.loc_key]


    def add_node(self, node):
        assert isinstance(node, LocKey)
        return super(AsmCFG, self).add_node(node)

    def add_block(self, block):
        """
        Add the block @block to the current instance, if it is not already in
        @block: AsmBlock instance

        Edges will be created for @block.bto, if destinations are already in
        this instance. If not, they will be resolved when adding these
        aforementionned destinations.
        `self.pendings` indicates which blocks are not yet resolved.

        """
        status = super(AsmCFG, self).add_node(block.loc_key)

        if not status:
            return status

        # Update waiters
        if block.loc_key in self._pendings:
            for bblpend in self._pendings[block.loc_key]:
                self.add_edge(bblpend.waiter.loc_key, block.loc_key, bblpend.constraint)
            del self._pendings[block.loc_key]

        # Synchronize edges with block destinations
        self._loc_key_to_block[block.loc_key] = block

        for constraint in block.bto:
            dst = self._loc_key_to_block.get(constraint.loc_key,
                                           None)
            if dst is None:
                # Block is yet unknown, add it to pendings
                to_add = self.AsmCFGPending(waiter=block,
                                            constraint=constraint.c_t)
                self._pendings.setdefault(constraint.loc_key,
                                          set()).add(to_add)
            else:
                # Block is already in known nodes
                self.add_edge(block.loc_key, dst.loc_key, constraint.c_t)

        return status

    def merge(self, graph):
        """Merge with @graph, taking in account constraints"""
        # Add known blocks
        for block in graph.blocks:
            self.add_block(block)
        # Add nodes not already in it (ie. not linked to a block)
        for node in graph.nodes():
            self.add_node(node)
        # -> add_edge(x, y, constraint)
        for edge in graph._edges:
            # May fail if there is an incompatibility in edges constraints
            # between the two graphs
            self.add_edge(*edge, constraint=graph.edges2constraint[edge])


    def node2lines(self, node):
        if self.loc_db is None:
            loc_key_name = str(node)
        else:
            loc_key_name = self.loc_db.pretty_str(node)
        yield self.DotCellDescription(text=loc_key_name,
                                      attr={'align': 'center',
                                            'colspan': 2,
                                            'bgcolor': 'grey'})
        block = self._loc_key_to_block.get(node, None)
        if block is None:
            raise StopIteration
        if isinstance(block, AsmBlockBad):
            yield [
                self.DotCellDescription(
                    text=block.ERROR_TYPES.get(block._errno,
                                               block._errno
                    ),
                    attr={})
            ]
            raise StopIteration
        for line in block.lines:
            if self._dot_offset:
                yield [self.DotCellDescription(text="%.8X" % line.offset,
                                               attr={}),
                       self.DotCellDescription(text=line.to_string(self.loc_db), attr={})]
            else:
                yield self.DotCellDescription(text=line.to_string(self.loc_db), attr={})

    def node_attr(self, node):
        block = self._loc_key_to_block.get(node, None)
        if isinstance(block, AsmBlockBad):
            return {'style': 'filled', 'fillcolor': 'red'}
        return {}

    def edge_attr(self, src, dst):
        cst = self.edges2constraint.get((src, dst), None)
        edge_color = "blue"

        if len(self.successors(src)) > 1:
            if cst == AsmConstraint.c_next:
                edge_color = "red"
            else:
                edge_color = "limegreen"

        return {"color": edge_color}

    def dot(self, offset=False):
        """
        @offset: (optional) if set, add the corresponding offsets in each node
        """
        self._dot_offset = offset
        return super(AsmCFG, self).dot()

    # Helpers
    @property
    def pendings(self):
        """Dictionary of loc_key -> set(AsmCFGPending instance) indicating
        which loc_key are missing in the current instance.
        A loc_key is missing if a block which is already in nodes has constraints
        with him (thanks to its .bto) and the corresponding block is not yet in
        nodes
        """
        return self._pendings

    def label2block(self, loc_key):
        """Return the block corresponding to loc_key @loc_key
        @loc_key: LocKey instance"""
        warnings.warn('DEPRECATION WARNING: use "loc_key_to_block" instead of "label2block"')
        return self.loc_key_to_block(loc_key)

    def rebuild_edges(self):
        """Consider blocks '.bto' and rebuild edges according to them, ie:
        - update constraint type
        - add missing edge
        - remove no more used edge

        This method should be called if a block's '.bto' in nodes have been
        modified without notifying this instance to resynchronize edges.
        """
        for block in self.blocks:
            edges = []
            # Rebuild edges from bto
            for constraint in block.bto:
                dst = self._loc_key_to_block.get(constraint.loc_key,
                                                  None)
                if dst is None:
                    # Missing destination, add to pendings
                    self._pendings.setdefault(
                        constraint.loc_key,
                        set()
                    ).add(
                        self.AsmCFGPending(
                            block,
                            constraint.c_t
                        )
                    )
                    continue
                edge = (block.loc_key, dst.loc_key)
                edges.append(edge)
                if edge in self._edges:
                    # Already known edge, constraint may have changed
                    self.edges2constraint[edge] = constraint.c_t
                else:
                    # An edge is missing
                    self.add_edge(edge[0], edge[1], constraint.c_t)

            # Remove useless edges
            for succ in self.successors(block.loc_key):
                edge = (block.loc_key, succ)
                if edge not in edges:
                    self.del_edge(*edge)

    def get_bad_blocks(self):
        """Iterator on AsmBlockBad elements"""
        # A bad asm block is always a leaf
        for loc_key in self.leaves():
            block = self._loc_key_to_block.get(loc_key, None)
            if isinstance(block, AsmBlockBad):
                yield block

    def get_bad_blocks_predecessors(self, strict=False):
        """Iterator on loc_keys with an AsmBlockBad destination
        @strict: (optional) if set, return loc_key with only bad
        successors
        """
        # Avoid returning the same block
        done = set()
        for badblock in self.get_bad_blocks():
            for predecessor in self.predecessors_iter(badblock.loc_key):
                if predecessor not in done:
                    if (strict and
                        not all(isinstance(self._loc_key_to_block.get(block, None), AsmBlockBad)
                                for block in self.successors_iter(predecessor))):
                        continue
                    yield predecessor
                    done.add(predecessor)

    def getby_offset(self, offset):
        """Return asmblock containing @offset"""
        for block in self.blocks:
            if block.lines[0].offset <= offset < \
                    (block.lines[-1].offset + block.lines[-1].l):
                return block
        return None

    def loc_key_to_block(self, loc_key):
        """
        Return the asmblock corresponding to loc_key @loc_key, None if unknown
        loc_key
        @loc_key: LocKey instance
        """
        return self._loc_key_to_block.get(loc_key, None)

    def sanity_check(self):
        """Do sanity checks on blocks' constraints:
        * no pendings
        * no multiple next constraint to same block
        * no next constraint to self
        """

        if len(self._pendings) != 0:
            raise RuntimeError("Some blocks are missing: %s" % map(
                str,
                self._pendings.keys()
            ))

        next_edges = {edge: constraint
                      for edge, constraint in self.edges2constraint.iteritems()
                      if constraint == AsmConstraint.c_next}

        for loc_key in self._nodes:
            if loc_key not in self._loc_key_to_block:
                raise RuntimeError("Not supported yet: every node must have a corresponding AsmBlock")
            # No next constraint to self
            if (loc_key, loc_key) in next_edges:
                raise RuntimeError('Bad constraint: self in next')

            # No multiple next constraint to same block
            pred_next = list(ploc_key
                             for (ploc_key, dloc_key) in next_edges
                             if dloc_key == loc_key)

            if len(pred_next) > 1:
                raise RuntimeError("Too many next constraints for bloc %r"
                                   "(%s)" % (loc_key,
                                             pred_next))

    def guess_blocks_size(self, mnemo):
        """Asm and compute max block size
        Add a 'size' and 'max_size' attribute on each block
        @mnemo: metamn instance"""
        for block in self.blocks:
            size = 0
            for instr in block.lines:
                if isinstance(instr, AsmRaw):
                    # for special AsmRaw, only extract len
                    if isinstance(instr.raw, list):
                        data = None
                        if len(instr.raw) == 0:
                            l = 0
                        else:
                            l = instr.raw[0].size / 8 * len(instr.raw)
                    elif isinstance(instr.raw, str):
                        data = instr.raw
                        l = len(data)
                    else:
                        raise NotImplementedError('asm raw')
                else:
                    # Assemble the instruction to retrieve its len.
                    # If the instruction uses symbol it will fail
                    # In this case, the max_instruction_len is used
                    try:
                        candidates = mnemo.asm(instr)
                        l = len(candidates[-1])
                    except:
                        l = mnemo.max_instruction_len
                    data = None
                instr.data = data
                instr.l = l
                size += l

            block.size = size
            block.max_size = size
            log_asmblock.info("size: %d max: %d", block.size, block.max_size)

    def apply_splitting(self, loc_db, dis_block_callback=None, **kwargs):
        """Consider @self' bto destinations and split block in @self if one of
        these destinations jumps in the middle of this block.
        In order to work, they must be only one block in @self per loc_key in
        @loc_db (which is true if @self come from the same disasmEngine).

        @loc_db: LocationDB instance associated with @self'loc_keys
        @dis_block_callback: (optional) if set, this callback will be called on
        new block destinations
        @kwargs: (optional) named arguments to pass to dis_block_callback
        """
        # Get all possible destinations not yet resolved, with a resolved
        # offset
        block_dst = []
        for loc_key in self.pendings:
            offset = loc_db.get_location_offset(loc_key)
            if offset is not None:
                block_dst.append(offset)

        todo = set(self.blocks)
        rebuild_needed = False

        while todo:
            # Find a block with a destination inside another one
            cur_block = todo.pop()
            range_start, range_stop = cur_block.get_range()

            for off in block_dst:
                if not (off > range_start and off < range_stop):
                    continue

                # `cur_block` must be splitted at offset `off`from miasm2.core.locationdb import LocationDB

                new_b = cur_block.split(loc_db, off)
                log_asmblock.debug("Split block %x", off)
                if new_b is None:
                    log_asmblock.error("Cannot split %x!!", off)
                    continue

                # Remove pending from cur_block
                # Links from new_b will be generated in rebuild_edges
                for dst in new_b.bto:
                    if dst.loc_key not in self.pendings:
                        continue
                    self.pendings[dst.loc_key] = set(pending for pending in self.pendings[dst.loc_key]
                                                     if pending.waiter != cur_block)

                # The new block destinations may need to be disassembled
                if dis_block_callback:
                    offsets_to_dis = set(
                        self.loc_db.get_location_offset(constraint.loc_key)
                        for constraint in new_b.bto
                    )
                    dis_block_callback(cur_bloc=new_b,
                                       offsets_to_dis=offsets_to_dis,
                                       loc_db=loc_db, **kwargs)

                # Update structure
                rebuild_needed = True
                self.add_block(new_b)

                # The new block must be considered
                todo.add(new_b)
                range_start, range_stop = cur_block.get_range()

        # Rebuild edges to match new blocks'bto
        if rebuild_needed:
            self.rebuild_edges()

    def __str__(self):
        out = []
        for block in self.blocks:
            out.append(str(block))
        for loc_key_a, loc_key_b in self.edges():
            out.append("%s -> %s" % (loc_key_a, loc_key_b))
        return '\n'.join(out)

    def __repr__(self):
        return "<%s %s>" % (self.__class__.__name__, hex(id(self)))

Ancestors (in MRO)

  • AsmCFG
  • miasm2.core.graph.DiGraph
  • __builtin__.object

Class variables

var AsmCFGPending

var DotCellDescription

var blocks

Instance variables

var blocks

var edges2constraint

var loc_db

var pendings

Dictionary of loc_key -> set(AsmCFGPending instance) indicating which loc_key are missing in the current instance. A loc_key is missing if a block which is already in nodes has constraints with him (thanks to its .bto) and the corresponding block is not yet in nodes

Methods

def __init__(

self, loc_db=None, *args, **kwargs)

def __init__(self, loc_db=None, *args, **kwargs):
    super(AsmCFG, self).__init__(*args, **kwargs)
    # Edges -> constraint
    self.edges2constraint = {}
    # Expected LocKey -> set( (src, dst), constraint )
    self._pendings = {}
    # Loc_Key2block built on the fly
    self._loc_key_to_block = {}
    # loc_db
    self.loc_db = loc_db

def add_block(

self, block)

Add the block @block to the current instance, if it is not already in @block: AsmBlock instance

Edges will be created for @block.bto, if destinations are already in this instance. If not, they will be resolved when adding these aforementionned destinations. self.pendings indicates which blocks are not yet resolved.

def add_block(self, block):
    """
    Add the block @block to the current instance, if it is not already in
    @block: AsmBlock instance
    Edges will be created for @block.bto, if destinations are already in
    this instance. If not, they will be resolved when adding these
    aforementionned destinations.
    `self.pendings` indicates which blocks are not yet resolved.
    """
    status = super(AsmCFG, self).add_node(block.loc_key)
    if not status:
        return status
    # Update waiters
    if block.loc_key in self._pendings:
        for bblpend in self._pendings[block.loc_key]:
            self.add_edge(bblpend.waiter.loc_key, block.loc_key, bblpend.constraint)
        del self._pendings[block.loc_key]
    # Synchronize edges with block destinations
    self._loc_key_to_block[block.loc_key] = block
    for constraint in block.bto:
        dst = self._loc_key_to_block.get(constraint.loc_key,
                                       None)
        if dst is None:
            # Block is yet unknown, add it to pendings
            to_add = self.AsmCFGPending(waiter=block,
                                        constraint=constraint.c_t)
            self._pendings.setdefault(constraint.loc_key,
                                      set()).add(to_add)
        else:
            # Block is already in known nodes
            self.add_edge(block.loc_key, dst.loc_key, constraint.c_t)
    return status

def add_edge(

self, src, dst, constraint)

Add an edge to the graph @src: LocKey instance, source @dst: LocKey instance, destination @constraint: constraint associated to this edge

def add_edge(self, src, dst, constraint):
    """Add an edge to the graph
    @src: LocKey instance, source
    @dst: LocKey instance, destination
    @constraint: constraint associated to this edge
    """
    # Sanity check
    assert isinstance(src, LocKey)
    assert isinstance(dst, LocKey)
    known_cst = self.edges2constraint.get((src, dst), None)
    if known_cst is not None:
        assert known_cst == constraint
        return
    # Add the edge to src.bto if needed
    block_src = self.loc_key_to_block(src)
    if block_src:
        if dst not in [cons.loc_key for cons in block_src.bto]:
            block_src.bto.add(AsmConstraint(dst, constraint))
    # Add edge
    self.edges2constraint[(src, dst)] = constraint
    super(AsmCFG, self).add_edge(src, dst)

def add_node(

self, node)

def add_node(self, node):
    assert isinstance(node, LocKey)
    return super(AsmCFG, self).add_node(node)

def add_uniq_edge(

self, src, dst, constraint)

Synonym for add_edge

def add_uniq_edge(self, src, dst, constraint):
    """
    Synonym for `add_edge`
    """
    self.add_edge(src, dst, constraint)

def append(

self, *args, **kwargs)

def append(self, *args, **kwargs):
    raise DeprecationWarning("AsmCFG is a graph, use add_node")

def apply_splitting(

self, loc_db, dis_block_callback=None, **kwargs)

Consider @self' bto destinations and split block in @self if one of these destinations jumps in the middle of this block. In order to work, they must be only one block in @self per loc_key in @loc_db (which is true if @self come from the same disasmEngine).

@loc_db: LocationDB instance associated with @self'loc_keys @dis_block_callback: (optional) if set, this callback will be called on new block destinations @kwargs: (optional) named arguments to pass to dis_block_callback

def apply_splitting(self, loc_db, dis_block_callback=None, **kwargs):
    """Consider @self' bto destinations and split block in @self if one of
    these destinations jumps in the middle of this block.
    In order to work, they must be only one block in @self per loc_key in
    @loc_db (which is true if @self come from the same disasmEngine).
    @loc_db: LocationDB instance associated with @self'loc_keys
    @dis_block_callback: (optional) if set, this callback will be called on
    new block destinations
    @kwargs: (optional) named arguments to pass to dis_block_callback
    """
    # Get all possible destinations not yet resolved, with a resolved
    # offset
    block_dst = []
    for loc_key in self.pendings:
        offset = loc_db.get_location_offset(loc_key)
        if offset is not None:
            block_dst.append(offset)
    todo = set(self.blocks)
    rebuild_needed = False
    while todo:
        # Find a block with a destination inside another one
        cur_block = todo.pop()
        range_start, range_stop = cur_block.get_range()
        for off in block_dst:
            if not (off > range_start and off < range_stop):
                continue
            # `cur_block` must be splitted at offset `off`from miasm2.core.locationdb import LocationDB
            new_b = cur_block.split(loc_db, off)
            log_asmblock.debug("Split block %x", off)
            if new_b is None:
                log_asmblock.error("Cannot split %x!!", off)
                continue
            # Remove pending from cur_block
            # Links from new_b will be generated in rebuild_edges
            for dst in new_b.bto:
                if dst.loc_key not in self.pendings:
                    continue
                self.pendings[dst.loc_key] = set(pending for pending in self.pendings[dst.loc_key]
                                                 if pending.waiter != cur_block)
            # The new block destinations may need to be disassembled
            if dis_block_callback:
                offsets_to_dis = set(
                    self.loc_db.get_location_offset(constraint.loc_key)
                    for constraint in new_b.bto
                )
                dis_block_callback(cur_bloc=new_b,
                                   offsets_to_dis=offsets_to_dis,
                                   loc_db=loc_db, **kwargs)
            # Update structure
            rebuild_needed = True
            self.add_block(new_b)
            # The new block must be considered
            todo.add(new_b)
            range_start, range_stop = cur_block.get_range()
    # Rebuild edges to match new blocks'bto
    if rebuild_needed:
        self.rebuild_edges()

def compute_back_edges(

self, head)

Computes all back edges from a node to a dominator in the graph. :param head: head of graph :return: yield a back edge

def compute_back_edges(self, head):
    """
    Computes all back edges from a node to a
    dominator in the graph.
    :param head: head of graph
    :return: yield a back edge
    """
    dominators = self.compute_dominators(head)
    # traverse graph
    for node in self.walk_depth_first_forward(head):
        for successor in self.successors_iter(node):
            # check for a back edge to a dominator
            if successor in dominators[node]:
                edge = (node, successor)
                yield edge

def compute_dominance_frontier(

self, head)

Compute the dominance frontier of the graph

Source: Cooper, Keith D., Timothy J. Harvey, and Ken Kennedy. "A simple, fast dominance algorithm." Software Practice & Experience 4 (2001), p. 9

def compute_dominance_frontier(self, head):
    """
    Compute the dominance frontier of the graph
    Source: Cooper, Keith D., Timothy J. Harvey, and Ken Kennedy.
    "A simple, fast dominance algorithm."
    Software Practice & Experience 4 (2001), p. 9
    """
    idoms = self.compute_immediate_dominators(head)
    frontier = {}
    for node in idoms:
        if self._nodes_pred[node] >= 2:
            for predecessor in self.predecessors_iter(node):
                runner = predecessor
                if runner not in idoms:
                    continue
                while runner != idoms[node]:
                    if runner not in frontier:
                        frontier[runner] = set()
                    frontier[runner].add(node)
                    runner = idoms[runner]
    return frontier

def compute_dominator_tree(

self, head)

Computes the dominator tree of a graph :param head: head of graph :return: DiGraph

def compute_dominator_tree(self, head):
    """
    Computes the dominator tree of a graph
    :param head: head of graph
    :return: DiGraph
    """
    idoms = self.compute_immediate_dominators(head)
    dominator_tree = DiGraph()
    for node in idoms:
        dominator_tree.add_edge(idoms[node], node)
    return dominator_tree

def compute_dominators(

self, head)

Compute the dominators of the graph

def compute_dominators(self, head):
    """Compute the dominators of the graph"""
    return self._compute_generic_dominators(head,
                                            self.reachable_sons,
                                            self.predecessors_iter,
                                            self.successors_iter)

def compute_immediate_dominators(

self, head)

Compute the immediate dominators of the graph

def compute_immediate_dominators(self, head):
    """Compute the immediate dominators of the graph"""
    dominators = self.compute_dominators(head)
    idoms = {}
    for node in dominators:
        for predecessor in self.walk_dominators(node, dominators):
            if predecessor in dominators[node] and node != predecessor:
                idoms[node] = predecessor
                break
    return idoms

def compute_natural_loops(

self, head)

Computes all natural loops in the graph.

Source: Aho, Alfred V., Lam, Monica S., Sethi, R. and Jeffrey Ullman. "Compilers: Principles, Techniques, & Tools, Second Edition" Pearson/Addison Wesley (2007), Chapter 9.6.6 :param head: head of the graph :return: yield a tuple of the form (back edge, loop body)

def compute_natural_loops(self, head):
    """
    Computes all natural loops in the graph.
    Source: Aho, Alfred V., Lam, Monica S., Sethi, R. and Jeffrey Ullman.
    "Compilers: Principles, Techniques, & Tools, Second Edition"
    Pearson/Addison Wesley (2007), Chapter 9.6.6
    :param head: head of the graph
    :return: yield a tuple of the form (back edge, loop body)
    """
    for a, b in self.compute_back_edges(head):
        body = self._compute_natural_loop_body(b, a)
        yield ((a, b), body)

def compute_postdominators(

self, leaf)

Compute the postdominators of the graph

def compute_postdominators(self, leaf):
    """Compute the postdominators of the graph"""
    return self._compute_generic_dominators(leaf,
                                            self.reachable_parents,
                                            self.successors_iter,
                                            self.predecessors_iter)

def compute_strongly_connected_components(

self)

Partitions the graph into strongly connected components.

Iterative implementation of Gabow's path-based SCC algorithm. Source: Gabow, Harold N. "Path-based depth-first search for strong and biconnected components." Information Processing Letters 74.3 (2000), pp. 109--110

The iterative implementation is inspired by Mark Dickinson's code: http://code.activestate.com/recipes/ 578507-strongly-connected-components-of-a-directed-graph/ :return: yield a strongly connected component

def compute_strongly_connected_components(self):
    """
    Partitions the graph into strongly connected components.
    Iterative implementation of Gabow's path-based SCC algorithm.
    Source: Gabow, Harold N.
    "Path-based depth-first search for strong and biconnected components."
    Information Processing Letters 74.3 (2000), pp. 109--110
    The iterative implementation is inspired by Mark Dickinson's
    code:
    http://code.activestate.com/recipes/
    578507-strongly-connected-components-of-a-directed-graph/
    :return: yield a strongly connected component
    """
    stack = []
    boundaries = []
    counter = len(self.nodes())
    # init index with 0
    index = {v: 0 for v in self.nodes()}
    # state machine for worklist algorithm
    VISIT, HANDLE_RECURSION, MERGE = 0, 1, 2
    NodeState = namedtuple('NodeState', ['state', 'node'])
    for node in self.nodes():
        # next node if node was already visited
        if index[node]:
            continue
        todo = [NodeState(VISIT, node)]
        done = set()
        while todo:
            current = todo.pop()
            if current.node in done:
                continue
            # node is unvisited
            if current.state == VISIT:
                stack.append(current.node)
                index[current.node] = len(stack)
                boundaries.append(index[current.node])
                todo.append(NodeState(MERGE, current.node))
                # follow successors
                for successor in self.successors_iter(current.node):
                    todo.append(NodeState(HANDLE_RECURSION, successor))
            # iterative handling of recursion algorithm
            elif current.state == HANDLE_RECURSION:
                # visit unvisited successor
                if index[current.node] == 0:
                    todo.append(NodeState(VISIT, current.node))
                else:
                    # contract cycle if necessary
                    while index[current.node] < boundaries[-1]:
                        boundaries.pop()
            # merge strongly connected component
            else:
                if index[current.node] == boundaries[-1]:
                    boundaries.pop()
                    counter += 1
                    scc = set()
                    while index[current.node] <= len(stack):
                        popped = stack.pop()
                        index[popped] = counter
                        scc.add(popped)
                        done.add(current.node)
                    yield scc

def copy(

self)

Copy the current graph instance

def copy(self):
    """Copy the current graph instance"""
    graph = self.__class__(self.loc_db)
    return graph + self

def del_block(

self, block)

def del_block(self, block):
    super(AsmCFG, self).del_node(block.loc_key)
    del self._loc_key_to_block[block.loc_key]

def del_edge(

self, src, dst)

Delete the edge @src->@dst and its associated constraint

def del_edge(self, src, dst):
    """Delete the edge @src->@dst and its associated constraint"""
    src_blk = self.loc_key_to_block(src)
    dst_blk = self.loc_key_to_block(dst)
    assert src_blk is not None
    assert dst_blk is not None
    # Delete from src.bto
    to_remove = [cons for cons in src_blk.bto if cons.loc_key == dst]
    if to_remove:
        assert len(to_remove) == 1
        src_blk.bto.remove(to_remove[0])
    # Del edge
    del self.edges2constraint[(src, dst)]
    super(AsmCFG, self).del_edge(src, dst)

def del_node(

self, node)

Delete the @node of the graph; Also delete every edge to/from this @node

def del_node(self, node):
    """Delete the @node of the graph; Also delete every edge to/from this
    @node"""
    if node in self._nodes:
        self._nodes.remove(node)
    for pred in self.predecessors(node):
        self.del_edge(pred, node)
    for succ in self.successors(node):
        self.del_edge(node, succ)

def discard_edge(

self, src, dst)

Remove edge between @src and @dst if it exits

def discard_edge(self, src, dst):
    """Remove edge between @src and @dst if it exits"""
    if (src, dst) in self._edges:
        self.del_edge(src, dst)

def dot(

self, offset=False)

@offset: (optional) if set, add the corresponding offsets in each node

def dot(self, offset=False):
    """
    @offset: (optional) if set, add the corresponding offsets in each node
    """
    self._dot_offset = offset
    return super(AsmCFG, self).dot()

def edge_attr(

self, src, dst)

def edge_attr(self, src, dst):
    cst = self.edges2constraint.get((src, dst), None)
    edge_color = "blue"
    if len(self.successors(src)) > 1:
        if cst == AsmConstraint.c_next:
            edge_color = "red"
        else:
            edge_color = "limegreen"
    return {"color": edge_color}

def edges(

self)

def edges(self):
    return self._edges

def find_path(

self, src, dst, cycles_count=0, done=None)

def find_path(self, src, dst, cycles_count=0, done=None):
    if done is None:
        done = {}
    if dst in done and done[dst] > cycles_count:
        return [[]]
    if src == dst:
        return [[src]]
    out = []
    for node in self.predecessors(dst):
        done_n = dict(done)
        done_n[dst] = done_n.get(dst, 0) + 1
        for path in self.find_path(src, node, cycles_count, done_n):
            if path and path[0] == src:
                out.append(path + [dst])
    return out

def get_bad_blocks(

self)

Iterator on AsmBlockBad elements

def get_bad_blocks(self):
    """Iterator on AsmBlockBad elements"""
    # A bad asm block is always a leaf
    for loc_key in self.leaves():
        block = self._loc_key_to_block.get(loc_key, None)
        if isinstance(block, AsmBlockBad):
            yield block

def get_bad_blocks_predecessors(

self, strict=False)

Iterator on loc_keys with an AsmBlockBad destination @strict: (optional) if set, return loc_key with only bad successors

def get_bad_blocks_predecessors(self, strict=False):
    """Iterator on loc_keys with an AsmBlockBad destination
    @strict: (optional) if set, return loc_key with only bad
    successors
    """
    # Avoid returning the same block
    done = set()
    for badblock in self.get_bad_blocks():
        for predecessor in self.predecessors_iter(badblock.loc_key):
            if predecessor not in done:
                if (strict and
                    not all(isinstance(self._loc_key_to_block.get(block, None), AsmBlockBad)
                            for block in self.successors_iter(predecessor))):
                    continue
                yield predecessor
                done.add(predecessor)

def getby_offset(

self, offset)

Return asmblock containing @offset

def getby_offset(self, offset):
    """Return asmblock containing @offset"""
    for block in self.blocks:
        if block.lines[0].offset <= offset < \
                (block.lines[-1].offset + block.lines[-1].l):
            return block
    return None

def guess_blocks_size(

self, mnemo)

Asm and compute max block size Add a 'size' and 'max_size' attribute on each block @mnemo: metamn instance

def guess_blocks_size(self, mnemo):
    """Asm and compute max block size
    Add a 'size' and 'max_size' attribute on each block
    @mnemo: metamn instance"""
    for block in self.blocks:
        size = 0
        for instr in block.lines:
            if isinstance(instr, AsmRaw):
                # for special AsmRaw, only extract len
                if isinstance(instr.raw, list):
                    data = None
                    if len(instr.raw) == 0:
                        l = 0
                    else:
                        l = instr.raw[0].size / 8 * len(instr.raw)
                elif isinstance(instr.raw, str):
                    data = instr.raw
                    l = len(data)
                else:
                    raise NotImplementedError('asm raw')
            else:
                # Assemble the instruction to retrieve its len.
                # If the instruction uses symbol it will fail
                # In this case, the max_instruction_len is used
                try:
                    candidates = mnemo.asm(instr)
                    l = len(candidates[-1])
                except:
                    l = mnemo.max_instruction_len
                data = None
            instr.data = data
            instr.l = l
            size += l
        block.size = size
        block.max_size = size
        log_asmblock.info("size: %d max: %d", block.size, block.max_size)

def has_loop(

self)

Return True if the graph contains at least a cycle

def has_loop(self):
    """Return True if the graph contains at least a cycle"""
    todo = list(self.nodes())
    # tested nodes
    done = set()
    # current DFS nodes
    current = set()
    while todo:
        node = todo.pop()
        if node in done:
            continue
        if node in current:
            # DFS branch end
            for succ in self.successors_iter(node):
                if succ in current:
                    return True
            # A node cannot be in current AND in done
            current.remove(node)
            done.add(node)
        else:
            # Launch DFS from node
            todo.append(node)
            current.add(node)
            todo += self.successors(node)
    return False

def heads(

self)

def heads(self):
    return [x for x in self.heads_iter()]

def heads_iter(

self)

def heads_iter(self):
    for node in self._nodes:
        if not self._nodes_pred[node]:
            yield node

def label2block(

self, loc_key)

Return the block corresponding to loc_key @loc_key @loc_key: LocKey instance

def label2block(self, loc_key):
    """Return the block corresponding to loc_key @loc_key
    @loc_key: LocKey instance"""
    warnings.warn('DEPRECATION WARNING: use "loc_key_to_block" instead of "label2block"')
    return self.loc_key_to_block(loc_key)

def leaves(

self)

def leaves(self):
    return [x for x in self.leaves_iter()]

def leaves_iter(

self)

def leaves_iter(self):
    for node in self._nodes:
        if not self._nodes_succ[node]:
            yield node

def loc_key_to_block(

self, loc_key)

Return the asmblock corresponding to loc_key @loc_key, None if unknown loc_key @loc_key: LocKey instance

def loc_key_to_block(self, loc_key):
    """
    Return the asmblock corresponding to loc_key @loc_key, None if unknown
    loc_key
    @loc_key: LocKey instance
    """
    return self._loc_key_to_block.get(loc_key, None)

def merge(

self, graph)

Merge with @graph, taking in account constraints

def merge(self, graph):
    """Merge with @graph, taking in account constraints"""
    # Add known blocks
    for block in graph.blocks:
        self.add_block(block)
    # Add nodes not already in it (ie. not linked to a block)
    for node in graph.nodes():
        self.add_node(node)
    # -> add_edge(x, y, constraint)
    for edge in graph._edges:
        # May fail if there is an incompatibility in edges constraints
        # between the two graphs
        self.add_edge(*edge, constraint=graph.edges2constraint[edge])

def node2lines(

self, node)

def node2lines(self, node):
    if self.loc_db is None:
        loc_key_name = str(node)
    else:
        loc_key_name = self.loc_db.pretty_str(node)
    yield self.DotCellDescription(text=loc_key_name,
                                  attr={'align': 'center',
                                        'colspan': 2,
                                        'bgcolor': 'grey'})
    block = self._loc_key_to_block.get(node, None)
    if block is None:
        raise StopIteration
    if isinstance(block, AsmBlockBad):
        yield [
            self.DotCellDescription(
                text=block.ERROR_TYPES.get(block._errno,
                                           block._errno
                ),
                attr={})
        ]
        raise StopIteration
    for line in block.lines:
        if self._dot_offset:
            yield [self.DotCellDescription(text="%.8X" % line.offset,
                                           attr={}),
                   self.DotCellDescription(text=line.to_string(self.loc_db), attr={})]
        else:
            yield self.DotCellDescription(text=line.to_string(self.loc_db), attr={})

def node_attr(

self, node)

def node_attr(self, node):
    block = self._loc_key_to_block.get(node, None)
    if isinstance(block, AsmBlockBad):
        return {'style': 'filled', 'fillcolor': 'red'}
    return {}

def nodeid(

self, node)

Returns uniq id for a @node @node: a node of the graph

def nodeid(self, node):
    """
    Returns uniq id for a @node
    @node: a node of the graph
    """
    return hash(node) & 0xFFFFFFFFFFFFFFFF

def nodes(

self)

def nodes(self):
    return self._nodes

def predecessors(

self, node)

def predecessors(self, node):
    return [x for x in self.predecessors_iter(node)]

def predecessors_iter(

self, node)

def predecessors_iter(self, node):
    if not node in self._nodes_pred:
        raise StopIteration
    for n_pred in self._nodes_pred[node]:
        yield n_pred

def predecessors_stop_node_iter(

self, node, head)

def predecessors_stop_node_iter(self, node, head):
    if node == head:
        raise StopIteration
    for next_node in self.predecessors_iter(node):
        yield next_node

def reachable_parents(

self, leaf)

Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf

def reachable_parents(self, leaf):
    """Compute all parents of node @leaf. Each parent is an immediate
    predecessor of an arbitrary, already yielded parent of @leaf"""
    return self._reachable_nodes(leaf, self.predecessors_iter)

def reachable_parents_stop_node(

self, leaf, head)

Compute all parents of node @leaf. Each parent is an immediate predecessor of an arbitrary, already yielded parent of @leaf. Do not compute reachables past @head node

def reachable_parents_stop_node(self, leaf, head):
    """Compute all parents of node @leaf. Each parent is an immediate
    predecessor of an arbitrary, already yielded parent of @leaf.
    Do not compute reachables past @head node"""
    return self._reachable_nodes(
        leaf,
        lambda node_cur: self.predecessors_stop_node_iter(
            node_cur, head
        )
    )

def reachable_sons(

self, head)

Compute all nodes reachable from node @head. Each son is an immediate successor of an arbitrary, already yielded son of @head

def reachable_sons(self, head):
    """Compute all nodes reachable from node @head. Each son is an
    immediate successor of an arbitrary, already yielded son of @head"""
    return self._reachable_nodes(head, self.successors_iter)

def rebuild_edges(

self)

Consider blocks '.bto' and rebuild edges according to them, ie: - update constraint type - add missing edge - remove no more used edge

This method should be called if a block's '.bto' in nodes have been modified without notifying this instance to resynchronize edges.

def rebuild_edges(self):
    """Consider blocks '.bto' and rebuild edges according to them, ie:
    - update constraint type
    - add missing edge
    - remove no more used edge
    This method should be called if a block's '.bto' in nodes have been
    modified without notifying this instance to resynchronize edges.
    """
    for block in self.blocks:
        edges = []
        # Rebuild edges from bto
        for constraint in block.bto:
            dst = self._loc_key_to_block.get(constraint.loc_key,
                                              None)
            if dst is None:
                # Missing destination, add to pendings
                self._pendings.setdefault(
                    constraint.loc_key,
                    set()
                ).add(
                    self.AsmCFGPending(
                        block,
                        constraint.c_t
                    )
                )
                continue
            edge = (block.loc_key, dst.loc_key)
            edges.append(edge)
            if edge in self._edges:
                # Already known edge, constraint may have changed
                self.edges2constraint[edge] = constraint.c_t
            else:
                # An edge is missing
                self.add_edge(edge[0], edge[1], constraint.c_t)
        # Remove useless edges
        for succ in self.successors(block.loc_key):
            edge = (block.loc_key, succ)
            if edge not in edges:
                self.del_edge(*edge)

def remove(

self, *args, **kwargs)

def remove(self, *args, **kwargs):
    raise DeprecationWarning("AsmCFG is a graph, use del_node")

def sanity_check(

self)

Do sanity checks on blocks' constraints: no pendings no multiple next constraint to same block * no next constraint to self

def sanity_check(self):
    """Do sanity checks on blocks' constraints:
    * no pendings
    * no multiple next constraint to same block
    * no next constraint to self
    """
    if len(self._pendings) != 0:
        raise RuntimeError("Some blocks are missing: %s" % map(
            str,
            self._pendings.keys()
        ))
    next_edges = {edge: constraint
                  for edge, constraint in self.edges2constraint.iteritems()
                  if constraint == AsmConstraint.c_next}
    for loc_key in self._nodes:
        if loc_key not in self._loc_key_to_block:
            raise RuntimeError("Not supported yet: every node must have a corresponding AsmBlock")
        # No next constraint to self
        if (loc_key, loc_key) in next_edges:
            raise RuntimeError('Bad constraint: self in next')
        # No multiple next constraint to same block
        pred_next = list(ploc_key
                         for (ploc_key, dloc_key) in next_edges
                         if dloc_key == loc_key)
        if len(pred_next) > 1:
            raise RuntimeError("Too many next constraints for bloc %r"
                               "(%s)" % (loc_key,
                                         pred_next))

def successors(

self, node)

def successors(self, node):
    return [x for x in self.successors_iter(node)]

def successors_iter(

self, node)

def successors_iter(self, node):
    if not node in self._nodes_succ:
        raise StopIteration
    for n_suc in self._nodes_succ[node]:
        yield n_suc

def walk_breadth_first_backward(

self, head)

Performs a breadth first search on the reversed graph from @head

def walk_breadth_first_backward(self, head):
    """Performs a breadth first search on the reversed graph from @head"""
    return self._walk_generic_first(head, 0, self.predecessors_iter)

def walk_breadth_first_forward(

self, head)

Performs a breadth first search on the graph from @head

def walk_breadth_first_forward(self, head):
    """Performs a breadth first search on the graph from @head"""
    return self._walk_generic_first(head, 0, self.successors_iter)

def walk_depth_first_backward(

self, head)

Performs a depth first search on the reversed graph from @head

def walk_depth_first_backward(self, head):
    """Performs a depth first search on the reversed graph from @head"""
    return self._walk_generic_first(head, -1, self.predecessors_iter)

def walk_depth_first_forward(

self, head)

Performs a depth first search on the graph from @head

def walk_depth_first_forward(self, head):
    """Performs a depth first search on the graph from @head"""
    return self._walk_generic_first(head, -1, self.successors_iter)

def walk_dominators(

self, node, dominators)

Return an iterator of the ordered list of @node's dominators The function doesn't return the self reference in dominators. @node: The start node @dominators: The dictionary containing at least node's dominators

def walk_dominators(self, node, dominators):
    """Return an iterator of the ordered list of @node's dominators
    The function doesn't return the self reference in dominators.
    @node: The start node
    @dominators: The dictionary containing at least node's dominators
    """
    return self._walk_generic_dominator(node,
                                        dominators,
                                        self.predecessors_iter)

def walk_postdominators(

self, node, postdominators)

Return an iterator of the ordered list of @node's postdominators The function doesn't return the self reference in postdominators. @node: The start node @postdominators: The dictionary containing at least node's postdominators

def walk_postdominators(self, node, postdominators):
    """Return an iterator of the ordered list of @node's postdominators
    The function doesn't return the self reference in postdominators.
    @node: The start node
    @postdominators: The dictionary containing at least node's
    postdominators
    """
    return self._walk_generic_dominator(node,
                                        postdominators,
                                        self.successors_iter)

class AsmConstraint

class AsmConstraint(object):
    c_to = "c_to"
    c_next = "c_next"

    def __init__(self, loc_key, c_t=c_to):
        # Sanity check
        assert isinstance(loc_key, LocKey)

        self.loc_key = loc_key
        self.c_t = c_t

    def get_label(self):
        warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
        return self.loc_key

    def set_label(self, loc_key):
        warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
        self.loc_key = loc_key

    label = property(get_label, set_label)

    def to_string(self, loc_db=None):
        if loc_db is None:
            return "%s:%s" % (self.c_t, self.loc_key)
        else:
            return "%s:%s" % (
                self.c_t,
                loc_db.pretty_str(self.loc_key)
            )

    def __str__(self):
        return self.to_string()

Ancestors (in MRO)

Class variables

var c_next

var c_to

var label

Instance variables

var c_t

var label

var loc_key

Methods

def __init__(

self, loc_key, c_t='c_to')

def __init__(self, loc_key, c_t=c_to):
    # Sanity check
    assert isinstance(loc_key, LocKey)
    self.loc_key = loc_key
    self.c_t = c_t

def get_label(

self)

def get_label(self):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    return self.loc_key

def set_label(

self, loc_key)

def set_label(self, loc_key):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    self.loc_key = loc_key

def to_string(

self, loc_db=None)

def to_string(self, loc_db=None):
    if loc_db is None:
        return "%s:%s" % (self.c_t, self.loc_key)
    else:
        return "%s:%s" % (
            self.c_t,
            loc_db.pretty_str(self.loc_key)
        )

class AsmConstraintNext

class AsmConstraintNext(AsmConstraint):

    def __init__(self, loc_key):
        super(AsmConstraintNext, self).__init__(
            loc_key,
            c_t=AsmConstraint.c_next
        )

Ancestors (in MRO)

Class variables

var c_next

Inheritance: AsmConstraint.c_next

var c_to

Inheritance: AsmConstraint.c_to

Instance variables

var c_t

Inheritance: AsmConstraint.c_t

var label

Inheritance: AsmConstraint.label

var loc_key

Inheritance: AsmConstraint.loc_key

Methods

def __init__(

self, loc_key)

Inheritance: AsmConstraint.__init__

def __init__(self, loc_key):
    super(AsmConstraintNext, self).__init__(
        loc_key,
        c_t=AsmConstraint.c_next
    )

def get_label(

self)

Inheritance: AsmConstraint.get_label

def get_label(self):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    return self.loc_key

def set_label(

self, loc_key)

Inheritance: AsmConstraint.set_label

def set_label(self, loc_key):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    self.loc_key = loc_key

def to_string(

self, loc_db=None)

Inheritance: AsmConstraint.to_string

def to_string(self, loc_db=None):
    if loc_db is None:
        return "%s:%s" % (self.c_t, self.loc_key)
    else:
        return "%s:%s" % (
            self.c_t,
            loc_db.pretty_str(self.loc_key)
        )

class AsmConstraintTo

class AsmConstraintTo(AsmConstraint):

    def __init__(self, loc_key):
        super(AsmConstraintTo, self).__init__(
            loc_key,
            c_t=AsmConstraint.c_to
        )

Ancestors (in MRO)

Class variables

var c_next

Inheritance: AsmConstraint.c_next

var c_to

Inheritance: AsmConstraint.c_to

Instance variables

var c_t

Inheritance: AsmConstraint.c_t

var label

Inheritance: AsmConstraint.label

var loc_key

Inheritance: AsmConstraint.loc_key

Methods

def __init__(

self, loc_key)

Inheritance: AsmConstraint.__init__

def __init__(self, loc_key):
    super(AsmConstraintTo, self).__init__(
        loc_key,
        c_t=AsmConstraint.c_to
    )

def get_label(

self)

Inheritance: AsmConstraint.get_label

def get_label(self):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    return self.loc_key

def set_label(

self, loc_key)

Inheritance: AsmConstraint.set_label

def set_label(self, loc_key):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    self.loc_key = loc_key

def to_string(

self, loc_db=None)

Inheritance: AsmConstraint.to_string

def to_string(self, loc_db=None):
    if loc_db is None:
        return "%s:%s" % (self.c_t, self.loc_key)
    else:
        return "%s:%s" % (
            self.c_t,
            loc_db.pretty_str(self.loc_key)
        )

class AsmRaw

class AsmRaw(object):

    def __init__(self, raw=""):
        self.raw = raw

    def __str__(self):
        return repr(self.raw)

    def to_string(self, loc_db):
        return str(self)

Ancestors (in MRO)

Instance variables

var raw

Methods

def __init__(

self, raw='')

def __init__(self, raw=""):
    self.raw = raw

def to_string(

self, loc_db)

def to_string(self, loc_db):
    return str(self)

class AsmSymbolPool

[DEPRECATED API] use 'LocationDB' instead

class AsmSymbolPool(LocationDB):
    """[DEPRECATED API] use 'LocationDB' instead"""

    def __init__(self, *args, **kwargs):
        warnings.warn("Deprecated API, use 'LocationDB' instead")
        super(AsmSymbolPool, self).__init__(*args, **kwargs)

Ancestors (in MRO)

  • AsmSymbolPool
  • miasm2.core.locationdb.LocationDB
  • __builtin__.object

Instance variables

var items

Return all loc_keys

var loc_keys

Return all loc_keys

var names

Return all known names

var offsets

Return all known offsets

Methods

def __init__(

self, *args, **kwargs)

def __init__(self, *args, **kwargs):
    warnings.warn("Deprecated API, use 'LocationDB' instead")
    super(AsmSymbolPool, self).__init__(*args, **kwargs)

def add_location(

self, name=None, offset=None, strict=True)

Add a new location in the locationDB. Returns the corresponding LocKey. If @name is set, also associate a name to this new location. If @offset is set, also associate an offset to this new location.

Strict mode (set by @strict, default): If a location with @offset or @name already exists, an error will be raised. Otherwise: If a location with @offset or @name already exists, the corresponding LocKey will be returned.

def add_location(self, name=None, offset=None, strict=True):
    """Add a new location in the locationDB. Returns the corresponding LocKey.
    If @name is set, also associate a name to this new location.
    If @offset is set, also associate an offset to this new location.
    Strict mode (set by @strict, default):
      If a location with @offset or @name already exists, an error will be
    raised.
    Otherwise:
      If a location with @offset or @name already exists, the corresponding
    LocKey will be returned.
    """
    # Deprecation handling
    if is_int(name):
        assert offset is None or offset == name
        warnings.warn("Deprecated API: use 'add_location(offset=)' instead."
                      " An additionnal 'name=' can be provided to also "
                      "associate a name (there is no more default name)")
        offset = name
        name = None
    # Argument cleaning
    offset_loc_key = None
    if offset is not None:
        offset = int(offset)
        offset_loc_key = self.get_offset_location(offset)
    # Test for collisions
    name_loc_key = None
    if name is not None:
        name_loc_key = self.get_name_location(name)
    if strict:
        if name_loc_key is not None:
            raise ValueError("An entry for %r already exists (%r), and "
                             "strict mode is enabled" % (
                                 name, name_loc_key
                             ))
        if offset_loc_key is not None:
            raise ValueError("An entry for 0x%x already exists (%r), and "
                             "strict mode is enabled" % (
                                 offset, offset_loc_key
                             ))
    else:
        # Non-strict mode
        if name_loc_key is not None:
            known_offset = self.get_offset_location(name_loc_key)
            if known_offset != offset:
                raise ValueError(
                    "Location with name '%s' already have an offset: 0x%x "
                    "(!= 0x%x)" % (name, offset, known_offset)
                    )
            # Name already known, same offset -> nothing to do
            return name_loc_key
        elif offset_loc_key is not None:
            if name is not None:
                # This is an error. Check for already known name are checked above
                raise ValueError(
                    "Location with offset 0x%x already exists."
                    "To add a name to this location, use the dedicated API"
                    "'add_location_name(%r, %r)'" % (
                        offset_loc_key,
                        name
                    ))
            # Offset already known, no name specified
            return offset_loc_key
    # No collision, this is a brand new location
    loc_key = LocKey(self._loc_key_num)
    self._loc_key_num += 1
    self._loc_keys.add(loc_key)
    if offset is not None:
        assert offset not in self._offset_to_loc_key
        self._offset_to_loc_key[offset] = loc_key
        self._loc_key_to_offset[loc_key] = offset
    if name is not None:
        self._name_to_loc_key[name] = loc_key
        self._loc_key_to_names[loc_key] = set([name])
    return loc_key

def add_location_name(

self, loc_key, name)

Associate a name @name to a given @loc_key @name: str instance @loc_key: LocKey instance

def add_location_name(self, loc_key, name):
    """Associate a name @name to a given @loc_key
    @name: str instance
    @loc_key: LocKey instance
    """
    assert loc_key in self._loc_keys
    already_existing_loc = self._name_to_loc_key.get(name)
    if already_existing_loc is not None and already_existing_loc != loc_key:
        raise KeyError("%r is already associated to a different loc_key "
                       "(%r)" % (name, already_existing_loc))
    self._loc_key_to_names.setdefault(loc_key, set()).add(name)
    self._name_to_loc_key[name] = loc_key

def canonize_to_exprloc(

self, expr)

If expr is ExprInt, return ExprLoc with corresponding loc_key Else, return expr

@expr: Expr instance

def canonize_to_exprloc(self, expr):
    """
    If expr is ExprInt, return ExprLoc with corresponding loc_key
    Else, return expr
    @expr: Expr instance
    """
    if expr.is_int():
        loc_key = self.get_or_create_offset_location(int(expr))
        ret = ExprLoc(loc_key, expr.size)
        return ret
    return expr

def consistency_check(

self)

Ensure internal structures are consistent with each others

def consistency_check(self):
    """Ensure internal structures are consistent with each others"""
    assert set(self._loc_key_to_names).issubset(self._loc_keys)
    assert set(self._loc_key_to_offset).issubset(self._loc_keys)
    assert self._loc_key_to_offset == {v: k for k, v in self._offset_to_loc_key.iteritems()}
    assert reduce(
        lambda x, y:x.union(y),
        self._loc_key_to_names.itervalues(),
        set(),
    ) == set(self._name_to_loc_key)
    for name, loc_key in self._name_to_loc_key.iteritems():
        assert name in self._loc_key_to_names[loc_key]

def del_loc_key_offset(

self, loc_key)

[DEPRECATED API], see 'unset_location_offset'

def del_loc_key_offset(self, loc_key):
    """[DEPRECATED API], see 'unset_location_offset'"""
    warnings.warn("Deprecated API: use 'unset_location_offset'")
    self.unset_location_offset(loc_key)

def gen_loc_key(

self)

[DEPRECATED API], see 'add_location'

def gen_loc_key(self):
    """[DEPRECATED API], see 'add_location'"""
    warnings.warn("Deprecated API: use 'add_location'")
    return self.add_location()

def get_location_names(

self, loc_key)

Return the frozenset of names associated to @loc_key @loc_key: LocKey instance

def get_location_names(self, loc_key):
    """
    Return the frozenset of names associated to @loc_key
    @loc_key: LocKey instance
    """
    assert isinstance(loc_key, LocKey)
    return frozenset(self._loc_key_to_names.get(loc_key, set()))

def get_location_offset(

self, loc_key)

Return the offset of @loc_key if any, None otherwise. @loc_key: LocKey instance

def get_location_offset(self, loc_key):
    """
    Return the offset of @loc_key if any, None otherwise.
    @loc_key: LocKey instance
    """
    assert isinstance(loc_key, LocKey)
    return self._loc_key_to_offset.get(loc_key)

def get_name_location(

self, name)

Return the LocKey of @name if any, None otherwise. @name: target name

def get_name_location(self, name):
    """
    Return the LocKey of @name if any, None otherwise.
    @name: target name
    """
    return self._name_to_loc_key.get(name)

def get_name_offset(

self, name)

Return the offset of @name if any, None otherwise. @name: target name

def get_name_offset(self, name):
    """
    Return the offset of @name if any, None otherwise.
    @name: target name
    """
    loc_key = self.get_name_location(name)
    if loc_key is None:
        return None
    return self.get_location_offset(loc_key)

def get_offset_location(

self, offset)

Return the LocKey of @offset if any, None otherwise. @name: target offset

def get_offset_location(self, offset):
    """
    Return the LocKey of @offset if any, None otherwise.
    @name: target offset
    """
    return self._offset_to_loc_key.get(offset)

def get_or_create_name_location(

self, name)

Return the LocKey of @name if any, create one otherwise. @name: target name

def get_or_create_name_location(self, name):
    """
    Return the LocKey of @name if any, create one otherwise.
    @name: target name
    """
    loc_key = self._name_to_loc_key.get(name)
    if loc_key is not None:
        return loc_key
    return self.add_location(name=name)

def get_or_create_offset_location(

self, offset)

Return the LocKey of @offset if any, create one otherwise. @offset: target offset

def get_or_create_offset_location(self, offset):
    """
    Return the LocKey of @offset if any, create one otherwise.
    @offset: target offset
    """
    loc_key = self._offset_to_loc_key.get(offset)
    if loc_key is not None:
        return loc_key
    return self.add_location(offset=offset)

def getby_name(

self, name)

[DEPRECATED API], see 'get_name_location'

def getby_name(self, name):
    """[DEPRECATED API], see 'get_name_location'"""
    warnings.warn("Deprecated API: use 'get_name_location'")
    return self.get_name_location(name)

def getby_name_create(

self, name)

[DEPRECATED API], see 'get_or_create_name_location'

def getby_name_create(self, name):
    """[DEPRECATED API], see 'get_or_create_name_location'"""
    warnings.warn("Deprecated API: use 'get_or_create_name_location'")
    return self.get_or_create_name_location(name)

def getby_offset(

self, offset)

[DEPRECATED API], see 'get_offset_location'

def getby_offset(self, offset):
    """[DEPRECATED API], see 'get_offset_location'"""
    warnings.warn("Deprecated API: use 'get_offset_location'")
    return self.get_offset_location(offset)

def getby_offset_create(

self, offset)

[DEPRECATED API], see 'get_or_create_offset_location'

def getby_offset_create(self, offset):
    """[DEPRECATED API], see 'get_or_create_offset_location'"""
    warnings.warn("Deprecated API: use 'get_or_create_offset_location'")
    return self.get_or_create_offset_location(offset)

def loc_key_to_name(

self, loc_key)

[DEPRECATED API], see 'get_location_names'

def loc_key_to_name(self, loc_key):
    """[DEPRECATED API], see 'get_location_names'"""
    warnings.warn("Deprecated API: use 'get_location_names'")
    return sorted(self.get_location_names(loc_key))[0]

def loc_key_to_offset(

self, loc_key)

[DEPRECATED API], see 'get_location_offset'

def loc_key_to_offset(self, loc_key):
    """[DEPRECATED API], see 'get_location_offset'"""
    warnings.warn("Deprecated API: use 'get_location_offset'")
    return self.get_location_offset(loc_key)

def merge(

self, location_db)

Merge with another LocationDB @location_db

WARNING: old reference to @location_db information (such as LocKeys) must be retrieved from the updated version of this instance. The dedicated "get_*" APIs may be used for this task

def merge(self, location_db):
    """Merge with another LocationDB @location_db
    WARNING: old reference to @location_db information (such as LocKeys)
    must be retrieved from the updated version of this instance. The
    dedicated "get_*" APIs may be used for this task
    """
    # A simple merge is not doable here, because LocKey will certainly
    # collides
    for foreign_loc_key in location_db.loc_keys:
        foreign_names = location_db.get_location_names(foreign_loc_key)
        foreign_offset = location_db.get_location_offset(foreign_loc_key)
        if foreign_names:
            init_name = list(foreign_names)[0]
        else:
            init_name = None
        loc_key = self.add_location(offset=foreign_offset, name=init_name,
                                    strict=False)
        cur_names = self.get_location_names(loc_key)
        for name in foreign_names:
            if name not in cur_names and name != init_name:
                self.add_location_name(loc_key, name=name)

def pretty_str(

self, loc_key)

Return a human readable version of @loc_key, according to information available in this LocationDB instance

def pretty_str(self, loc_key):
    """Return a human readable version of @loc_key, according to information
    available in this LocationDB instance"""
    names = self.get_location_names(loc_key)
    if names:
        return ",".join(names)
    offset = self.get_location_offset(loc_key)
    if offset is not None:
        return "loc_%x" % offset
    return str(loc_key)

def remove_loc_key(

self, loc_key)

[DEPRECATED API], see 'remove_location'

def remove_loc_key(self, loc_key):
    """[DEPRECATED API], see 'remove_location'"""
    warnings.warn("Deprecated API: use 'remove_location'")
    self.remove_location(loc_key)

def remove_location(

self, loc_key)

Delete the location corresponding to @loc_key @loc_key: LocKey instance

def remove_location(self, loc_key):
    """
    Delete the location corresponding to @loc_key
    @loc_key: LocKey instance
    """
    assert isinstance(loc_key, LocKey)
    if loc_key not in self._loc_keys:
        raise KeyError("Unknown loc_key %r" % loc_key)
    names = self._loc_key_to_names.pop(loc_key, [])
    for name in names:
        del self._name_to_loc_key[name]
    offset = self._loc_key_to_offset.pop(loc_key, None)
    self._offset_to_loc_key.pop(offset, None)
    self._loc_keys.remove(loc_key)

def remove_location_name(

self, loc_key, name)

Disassociate a name @name from a given @loc_key Fail if @name is not already associated to @loc_key @name: str instance @loc_key: LocKey instance

def remove_location_name(self, loc_key, name):
    """Disassociate a name @name from a given @loc_key
    Fail if @name is not already associated to @loc_key
    @name: str instance
    @loc_key: LocKey instance
    """
    assert loc_key in self._loc_keys
    already_existing_loc = self._name_to_loc_key.get(name)
    if already_existing_loc is None:
        raise KeyError("%r is not already associated" % name)
    if already_existing_loc != loc_key:
        raise KeyError("%r is already associated to a different loc_key "
                       "(%r)" % (name, already_existing_loc))
    del self._name_to_loc_key[name]
    self._loc_key_to_names[loc_key].remove(name)

def rename_location(

self, loc_key, newname)

[DEPRECATED API], see 'add_name_location' and 'remove_location_name'

def rename_location(self, loc_key, newname):
    """[DEPRECATED API], see 'add_name_location' and 'remove_location_name'
    """
    warnings.warn("Deprecated API: use 'add_location_name' and "
                  "'remove_location_name'")
    for name in self.get_location_names(loc_key):
        self.remove_location_name(loc_key, name)
    self.add_location_name(loc_key, name)

def set_location_offset(

self, loc_key, offset, force=False)

Associate the offset @offset to an LocKey @loc_key

If @force is set, override silently. Otherwise, if an offset is already associated to @loc_key, an error will be raised

def set_location_offset(self, loc_key, offset, force=False):
    """Associate the offset @offset to an LocKey @loc_key
    If @force is set, override silently. Otherwise, if an offset is already
    associated to @loc_key, an error will be raised
    """
    assert loc_key in self._loc_keys
    already_existing_loc = self.get_offset_location(offset)
    if already_existing_loc is not None and already_existing_loc != loc_key:
        raise KeyError("%r is already associated to a different loc_key "
                       "(%r)" % (offset, already_existing_loc))
    already_existing_off = self._loc_key_to_offset.get(loc_key)
    if (already_existing_off is not None and
        already_existing_off != offset):
        if not force:
            raise ValueError(
                "%r already has an offset (0x%x). Use 'force=True'"
                " for silent overriding" % (
                    loc_key, already_existing_off
                ))
        else:
            self.unset_location_offset(loc_key)
    self._offset_to_loc_key[offset] = loc_key
    self._loc_key_to_offset[loc_key] = offset

def set_offset(

self, loc_key, offset)

[DEPRECATED API], see 'set_location_offset'

def set_offset(self, loc_key, offset):
    """[DEPRECATED API], see 'set_location_offset'"""
    warnings.warn("Deprecated API: use 'set_location_offset'")
    self.set_location_offset(loc_key, offset, force=True)

def str_loc_key(

self, loc_key)

[DEPRECATED API], see 'pretty_str'

def str_loc_key(self, loc_key):
    """[DEPRECATED API], see 'pretty_str'"""
    warnings.warn("Deprecated API: use 'pretty_str'")
    return self.pretty_str(loc_key)

def unset_location_offset(

self, loc_key)

Disassociate LocKey @loc_key's offset

Fail if there is already no offset associate with it @loc_key: LocKey

def unset_location_offset(self, loc_key):
    """Disassociate LocKey @loc_key's offset
    Fail if there is already no offset associate with it
    @loc_key: LocKey
    """
    assert loc_key in self._loc_keys
    already_existing_off = self._loc_key_to_offset.get(loc_key)
    if already_existing_off is None:
        raise ValueError("%r already has no offset" % (loc_key))
    del self._offset_to_loc_key[already_existing_off]
    del self._loc_key_to_offset[loc_key]

class BlockChain

Manage blocks linked with an asm_constraint_next

class BlockChain(object):

    """Manage blocks linked with an asm_constraint_next"""

    def __init__(self, loc_db, blocks):
        self.loc_db = loc_db
        self.blocks = blocks
        self.place()

    @property
    def pinned(self):
        """Return True iff at least one block is pinned"""
        return self.pinned_block_idx is not None

    def _set_pinned_block_idx(self):
        self.pinned_block_idx = None
        for i, block in enumerate(self.blocks):
            loc_key = block.loc_key
            if self.loc_db.get_location_offset(loc_key) is not None:
                if self.pinned_block_idx is not None:
                    raise ValueError("Multiples pinned block detected")
                self.pinned_block_idx = i

    def place(self):
        """Compute BlockChain min_offset and max_offset using pinned block and
        blocks' size
        """
        self._set_pinned_block_idx()
        self.max_size = 0
        for block in self.blocks:
            self.max_size += block.max_size + block.alignment - 1

        # Check if chain has one block pinned
        if not self.pinned:
            return

        loc = self.blocks[self.pinned_block_idx].loc_key
        offset_base = self.loc_db.get_location_offset(loc)
        assert(offset_base % self.blocks[self.pinned_block_idx].alignment == 0)

        self.offset_min = offset_base
        for block in self.blocks[:self.pinned_block_idx - 1:-1]:
            self.offset_min -= block.max_size + \
                (block.alignment - block.max_size) % block.alignment

        self.offset_max = offset_base
        for block in self.blocks[self.pinned_block_idx:]:
            self.offset_max += block.max_size + \
                (block.alignment - block.max_size) % block.alignment

    def merge(self, chain):
        """Best effort merge two block chains
        Return the list of resulting blockchains"""
        self.blocks += chain.blocks
        self.place()
        return [self]

    def fix_blocks(self, modified_loc_keys):
        """Propagate a pinned to its blocks' neighbour
        @modified_loc_keys: store new pinned loc_keys"""

        if not self.pinned:
            raise ValueError('Trying to fix unpinned block')

        # Propagate offset to blocks before pinned block
        pinned_block = self.blocks[self.pinned_block_idx]
        offset = self.loc_db.get_location_offset(pinned_block.loc_key)
        if offset % pinned_block.alignment != 0:
            raise RuntimeError('Bad alignment')

        for block in self.blocks[:self.pinned_block_idx - 1:-1]:
            new_offset = offset - block.size
            new_offset = new_offset - new_offset % pinned_block.alignment
            fix_loc_offset(self.loc_db,
                           block.loc_key,
                           new_offset,
                           modified_loc_keys)

        # Propagate offset to blocks after pinned block
        offset = self.loc_db.get_location_offset(pinned_block.loc_key) + pinned_block.size

        last_block = pinned_block
        for block in self.blocks[self.pinned_block_idx + 1:]:
            offset += (- offset) % last_block.alignment
            fix_loc_offset(self.loc_db,
                           block.loc_key,
                           offset,
                           modified_loc_keys)
            offset += block.size
            last_block = block
        return modified_loc_keys

Ancestors (in MRO)

Instance variables

var blocks

var loc_db

var pinned

Return True iff at least one block is pinned

Methods

def __init__(

self, loc_db, blocks)

def __init__(self, loc_db, blocks):
    self.loc_db = loc_db
    self.blocks = blocks
    self.place()

def fix_blocks(

self, modified_loc_keys)

Propagate a pinned to its blocks' neighbour @modified_loc_keys: store new pinned loc_keys

def fix_blocks(self, modified_loc_keys):
    """Propagate a pinned to its blocks' neighbour
    @modified_loc_keys: store new pinned loc_keys"""
    if not self.pinned:
        raise ValueError('Trying to fix unpinned block')
    # Propagate offset to blocks before pinned block
    pinned_block = self.blocks[self.pinned_block_idx]
    offset = self.loc_db.get_location_offset(pinned_block.loc_key)
    if offset % pinned_block.alignment != 0:
        raise RuntimeError('Bad alignment')
    for block in self.blocks[:self.pinned_block_idx - 1:-1]:
        new_offset = offset - block.size
        new_offset = new_offset - new_offset % pinned_block.alignment
        fix_loc_offset(self.loc_db,
                       block.loc_key,
                       new_offset,
                       modified_loc_keys)
    # Propagate offset to blocks after pinned block
    offset = self.loc_db.get_location_offset(pinned_block.loc_key) + pinned_block.size
    last_block = pinned_block
    for block in self.blocks[self.pinned_block_idx + 1:]:
        offset += (- offset) % last_block.alignment
        fix_loc_offset(self.loc_db,
                       block.loc_key,
                       offset,
                       modified_loc_keys)
        offset += block.size
        last_block = block
    return modified_loc_keys

def merge(

self, chain)

Best effort merge two block chains Return the list of resulting blockchains

def merge(self, chain):
    """Best effort merge two block chains
    Return the list of resulting blockchains"""
    self.blocks += chain.blocks
    self.place()
    return [self]

def place(

self)

Compute BlockChain min_offset and max_offset using pinned block and blocks' size

def place(self):
    """Compute BlockChain min_offset and max_offset using pinned block and
    blocks' size
    """
    self._set_pinned_block_idx()
    self.max_size = 0
    for block in self.blocks:
        self.max_size += block.max_size + block.alignment - 1
    # Check if chain has one block pinned
    if not self.pinned:
        return
    loc = self.blocks[self.pinned_block_idx].loc_key
    offset_base = self.loc_db.get_location_offset(loc)
    assert(offset_base % self.blocks[self.pinned_block_idx].alignment == 0)
    self.offset_min = offset_base
    for block in self.blocks[:self.pinned_block_idx - 1:-1]:
        self.offset_min -= block.max_size + \
            (block.alignment - block.max_size) % block.alignment
    self.offset_max = offset_base
    for block in self.blocks[self.pinned_block_idx:]:
        self.offset_max += block.max_size + \
            (block.alignment - block.max_size) % block.alignment

class BlockChainWedge

Stand for wedges between blocks

class BlockChainWedge(object):

    """Stand for wedges between blocks"""

    def __init__(self, loc_db, offset, size):
        self.loc_db = loc_db
        self.offset = offset
        self.max_size = size
        self.offset_min = offset
        self.offset_max = offset + size

    def merge(self, chain):
        """Best effort merge two block chains
        Return the list of resulting blockchains"""
        self.loc_db.set_location_offset(chain.blocks[0].loc_key, self.offset_max)
        chain.place()
        return [self, chain]

Ancestors (in MRO)

Instance variables

var loc_db

var max_size

var offset

var offset_max

var offset_min

Methods

def __init__(

self, loc_db, offset, size)

def __init__(self, loc_db, offset, size):
    self.loc_db = loc_db
    self.offset = offset
    self.max_size = size
    self.offset_min = offset
    self.offset_max = offset + size

def merge(

self, chain)

Best effort merge two block chains Return the list of resulting blockchains

def merge(self, chain):
    """Best effort merge two block chains
    Return the list of resulting blockchains"""
    self.loc_db.set_location_offset(chain.blocks[0].loc_key, self.offset_max)
    chain.place()
    return [self, chain]

class asm_bloc

class asm_bloc(object):

    def __init__(self, loc_key, alignment=1):
        warnings.warn('DEPRECATION WARNING: use "AsmBlock" instead of "asm_bloc"')
        super(asm_bloc, self).__init__(loc_key, alignment)

Ancestors (in MRO)

Methods

def __init__(

self, loc_key, alignment=1)

def __init__(self, loc_key, alignment=1):
    warnings.warn('DEPRECATION WARNING: use "AsmBlock" instead of "asm_bloc"')
    super(asm_bloc, self).__init__(loc_key, alignment)

class asm_block_bad

class asm_block_bad(AsmBlockBad):

    def __init__(self, loc_key=None, alignment=1, errno=-1, *args, **kwargs):
        warnings.warn('DEPRECATION WARNING: use "AsmBlockBad" instead of "asm_block_bad"')
        super(asm_block_bad, self).__init__(loc_key, alignment, *args, **kwargs)

Ancestors (in MRO)

Class variables

var ERROR_CANNOT_DISASM

Inheritance: AsmBlockBad.ERROR_CANNOT_DISASM

var ERROR_FORBIDDEN

Inheritance: AsmBlockBad.ERROR_FORBIDDEN

var ERROR_IO

Inheritance: AsmBlockBad.ERROR_IO

var ERROR_NULL_STARTING_BLOCK

Inheritance: AsmBlockBad.ERROR_NULL_STARTING_BLOCK

var ERROR_TYPES

Inheritance: AsmBlockBad.ERROR_TYPES

var ERROR_UNKNOWN

Inheritance: AsmBlockBad.ERROR_UNKNOWN

Instance variables

var alignment

Inheritance: AsmBlockBad.alignment

var bto

Inheritance: AsmBlockBad.bto

var errno

Inheritance: AsmBlockBad.errno

var label

Inheritance: AsmBlockBad.label

var lines

Inheritance: AsmBlockBad.lines

var loc_key

Inheritance: AsmBlockBad.loc_key

Methods

def __init__(

self, loc_key=None, alignment=1, errno=-1, *args, **kwargs)

Inheritance: AsmBlockBad.__init__

Instanciate an AsmBlock_bad. @loc_key, @alignement: same as AsmBlock.init @errno: (optional) specify a error type associated with the block

def __init__(self, loc_key=None, alignment=1, errno=-1, *args, **kwargs):
    warnings.warn('DEPRECATION WARNING: use "AsmBlockBad" instead of "asm_block_bad"')
    super(asm_block_bad, self).__init__(loc_key, alignment, *args, **kwargs)

def add_cst(

self, loc_key, constraint_type)

Inheritance: AsmBlockBad.add_cst

Add constraint between current block and block at @loc_key @loc_key: LocKey instance of constraint target @constraint_type: AsmConstraint c_to/c_next

def add_cst(self, loc_key, constraint_type):
    """
    Add constraint between current block and block at @loc_key
    @loc_key: LocKey instance of constraint target
    @constraint_type: AsmConstraint c_to/c_next
    """
    assert isinstance(loc_key, LocKey)
    c = AsmConstraint(loc_key, constraint_type)
    self.bto.add(c)

def addline(

self, *args, **kwargs)

Inheritance: AsmBlockBad.addline

def addline(self, *args, **kwargs):
    raise RuntimeError("An AsmBlockBad cannot have line")

def addto(

self, *args, **kwargs)

Inheritance: AsmBlockBad.addto

def addto(self, *args, **kwargs):
    raise RuntimeError("An AsmBlockBad cannot have bto")

def fix_constraints(

self)

Inheritance: AsmBlockBad.fix_constraints

Fix next block constraints

def fix_constraints(self):
    """Fix next block constraints"""
    # destination -> associated constraints
    dests = {}
    for constraint in self.bto:
        dests.setdefault(constraint.loc_key, set()).add(constraint)
    self.bto = set(self._filter_constraint(constraints)
                   for constraints in dests.itervalues())

def get_flow_instr(

self)

Inheritance: AsmBlockBad.get_flow_instr

def get_flow_instr(self):
    if not self.lines:
        return None
    for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1):
        if not 0 <= i < len(self.lines):
            return None
        l = self.lines[i]
        if l.splitflow() or l.breakflow():
            raise NotImplementedError('not fully functional')

def get_label(

self)

Inheritance: AsmBlockBad.get_label

def get_label(self):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    return self.loc_key

def get_next(

self)

Inheritance: AsmBlockBad.get_next

def get_next(self):
    for constraint in self.bto:
        if constraint.c_t == AsmConstraint.c_next:
            return constraint.loc_key
    return None

def get_offsets(

self)

Inheritance: AsmBlockBad.get_offsets

def get_offsets(self):
    return [x.offset for x in self.lines]

def get_range(

self)

Inheritance: AsmBlockBad.get_range

Returns the offset hull of an AsmBlock

def get_range(self):
    """Returns the offset hull of an AsmBlock"""
    if len(self.lines):
        return (self.lines[0].offset,
                self.lines[-1].offset + self.lines[-1].l)
    else:
        return 0, 0

def get_subcall_instr(

self)

Inheritance: AsmBlockBad.get_subcall_instr

def get_subcall_instr(self):
    if not self.lines:
        return None
    delayslot = self.lines[0].delayslot
    end_index = len(self.lines) - 1
    ds_max_index = max(end_index - delayslot, 0)
    for i in xrange(end_index, ds_max_index - 1, -1):
        l = self.lines[i]
        if l.is_subcall():
            return l
    return None

def split(

self, *args, **kwargs)

Inheritance: AsmBlockBad.split

def split(self, *args, **kwargs):
    raise RuntimeError("An AsmBlockBad cannot be splitted")

def to_string(

self, loc_db=None)

Inheritance: AsmBlockBad.to_string

def to_string(self, loc_db=None):
    out = []
    if loc_db is None:
        out.append(str(self.loc_key))
    else:
        out.append(loc_db.pretty_str(self.loc_key))
    for instr in self.lines:
        out.append(instr.to_string(loc_db))
    if self.bto:
        lbls = ["->"]
        for dst in self.bto:
            if dst is None:
                lbls.append("Unknown? ")
            else:
                lbls.append(dst.to_string(loc_db) + " ")
        lbls = '\t'.join(sorted(lbls))
        out.append(lbls)
    return '\n'.join(out)

class asm_constraint

class asm_constraint(AsmConstraint):

    def __init__(self, loc_key, c_t=AsmConstraint.c_to):
        warnings.warn('DEPRECATION WARNING: use "AsmConstraint" instead of "asm_constraint"')
        super(asm_constraint, self).__init__(loc_key, c_t)

Ancestors (in MRO)

Class variables

var c_next

Inheritance: AsmConstraint.c_next

var c_to

Inheritance: AsmConstraint.c_to

Instance variables

var c_t

Inheritance: AsmConstraint.c_t

var label

Inheritance: AsmConstraint.label

var loc_key

Inheritance: AsmConstraint.loc_key

Methods

def __init__(

self, loc_key, c_t='c_to')

Inheritance: AsmConstraint.__init__

def __init__(self, loc_key, c_t=AsmConstraint.c_to):
    warnings.warn('DEPRECATION WARNING: use "AsmConstraint" instead of "asm_constraint"')
    super(asm_constraint, self).__init__(loc_key, c_t)

def get_label(

self)

Inheritance: AsmConstraint.get_label

def get_label(self):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    return self.loc_key

def set_label(

self, loc_key)

Inheritance: AsmConstraint.set_label

def set_label(self, loc_key):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    self.loc_key = loc_key

def to_string(

self, loc_db=None)

Inheritance: AsmConstraint.to_string

def to_string(self, loc_db=None):
    if loc_db is None:
        return "%s:%s" % (self.c_t, self.loc_key)
    else:
        return "%s:%s" % (
            self.c_t,
            loc_db.pretty_str(self.loc_key)
        )

class asm_constraint_next

class asm_constraint_next(AsmConstraint):

    def __init__(self, loc_key):
        warnings.warn('DEPRECATION WARNING: use "AsmConstraintNext" instead of "asm_constraint_next"')
        super(asm_constraint_next, self).__init__(loc_key)

Ancestors (in MRO)

Class variables

var c_next

Inheritance: AsmConstraint.c_next

var c_to

Inheritance: AsmConstraint.c_to

Instance variables

var c_t

Inheritance: AsmConstraint.c_t

var label

Inheritance: AsmConstraint.label

var loc_key

Inheritance: AsmConstraint.loc_key

Methods

def __init__(

self, loc_key)

Inheritance: AsmConstraint.__init__

def __init__(self, loc_key):
    warnings.warn('DEPRECATION WARNING: use "AsmConstraintNext" instead of "asm_constraint_next"')
    super(asm_constraint_next, self).__init__(loc_key)

def get_label(

self)

Inheritance: AsmConstraint.get_label

def get_label(self):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    return self.loc_key

def set_label(

self, loc_key)

Inheritance: AsmConstraint.set_label

def set_label(self, loc_key):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    self.loc_key = loc_key

def to_string(

self, loc_db=None)

Inheritance: AsmConstraint.to_string

def to_string(self, loc_db=None):
    if loc_db is None:
        return "%s:%s" % (self.c_t, self.loc_key)
    else:
        return "%s:%s" % (
            self.c_t,
            loc_db.pretty_str(self.loc_key)
        )

class asm_constraint_to

class asm_constraint_to(AsmConstraint):

    def __init__(self, loc_key):
        warnings.warn('DEPRECATION WARNING: use "AsmConstraintTo" instead of "asm_constraint_to"')
        super(asm_constraint_to, self).__init__(loc_key)

Ancestors (in MRO)

Class variables

var c_next

Inheritance: AsmConstraint.c_next

var c_to

Inheritance: AsmConstraint.c_to

Instance variables

var c_t

Inheritance: AsmConstraint.c_t

var label

Inheritance: AsmConstraint.label

var loc_key

Inheritance: AsmConstraint.loc_key

Methods

def __init__(

self, loc_key)

Inheritance: AsmConstraint.__init__

def __init__(self, loc_key):
    warnings.warn('DEPRECATION WARNING: use "AsmConstraintTo" instead of "asm_constraint_to"')
    super(asm_constraint_to, self).__init__(loc_key)

def get_label(

self)

Inheritance: AsmConstraint.get_label

def get_label(self):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    return self.loc_key

def set_label(

self, loc_key)

Inheritance: AsmConstraint.set_label

def set_label(self, loc_key):
    warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"')
    self.loc_key = loc_key

def to_string(

self, loc_db=None)

Inheritance: AsmConstraint.to_string

def to_string(self, loc_db=None):
    if loc_db is None:
        return "%s:%s" % (self.c_t, self.loc_key)
    else:
        return "%s:%s" % (
            self.c_t,
            loc_db.pretty_str(self.loc_key)
        )

class asm_raw

class asm_raw(AsmRaw):

    def __init__(self, raw=""):
        warnings.warn('DEPRECATION WARNING: use "AsmRaw" instead of "asm_raw"')
        super(asm_label, self).__init__(raw)

Ancestors (in MRO)

Instance variables

var raw

Inheritance: AsmRaw.raw

Methods

def __init__(

self, raw='')

Inheritance: AsmRaw.__init__

def __init__(self, raw=""):
    warnings.warn('DEPRECATION WARNING: use "AsmRaw" instead of "asm_raw"')
    super(asm_label, self).__init__(raw)

def to_string(

self, loc_db)

Inheritance: AsmRaw.to_string

def to_string(self, loc_db):
    return str(self)

class asm_symbol_pool

class asm_symbol_pool(AsmSymbolPool):

    def __init__(self):
        warnings.warn('DEPRECATION WARNING: use "LocationDB" instead of "asm_symbol_pool"')
        super(asm_symbol_pool, self).__init__()

Ancestors (in MRO)

Instance variables

var items

Inheritance: AsmSymbolPool.items

Return all loc_keys

var loc_keys

Inheritance: AsmSymbolPool.loc_keys

Return all loc_keys

var names

Inheritance: AsmSymbolPool.names

Return all known names

var offsets

Inheritance: AsmSymbolPool.offsets

Return all known offsets

Methods

def __init__(

self)

Inheritance: AsmSymbolPool.__init__

def __init__(self):
    warnings.warn('DEPRECATION WARNING: use "LocationDB" instead of "asm_symbol_pool"')
    super(asm_symbol_pool, self).__init__()

def add_location(

self, name=None, offset=None, strict=True)

Inheritance: AsmSymbolPool.add_location

Add a new location in the locationDB. Returns the corresponding LocKey. If @name is set, also associate a name to this new location. If @offset is set, also associate an offset to this new location.

Strict mode (set by @strict, default): If a location with @offset or @name already exists, an error will be raised. Otherwise: If a location with @offset or @name already exists, the corresponding LocKey will be returned.

def add_location(self, name=None, offset=None, strict=True):
    """Add a new location in the locationDB. Returns the corresponding LocKey.
    If @name is set, also associate a name to this new location.
    If @offset is set, also associate an offset to this new location.
    Strict mode (set by @strict, default):
      If a location with @offset or @name already exists, an error will be
    raised.
    Otherwise:
      If a location with @offset or @name already exists, the corresponding
    LocKey will be returned.
    """
    # Deprecation handling
    if is_int(name):
        assert offset is None or offset == name
        warnings.warn("Deprecated API: use 'add_location(offset=)' instead."
                      " An additionnal 'name=' can be provided to also "
                      "associate a name (there is no more default name)")
        offset = name
        name = None
    # Argument cleaning
    offset_loc_key = None
    if offset is not None:
        offset = int(offset)
        offset_loc_key = self.get_offset_location(offset)
    # Test for collisions
    name_loc_key = None
    if name is not None:
        name_loc_key = self.get_name_location(name)
    if strict:
        if name_loc_key is not None:
            raise ValueError("An entry for %r already exists (%r), and "
                             "strict mode is enabled" % (
                                 name, name_loc_key
                             ))
        if offset_loc_key is not None:
            raise ValueError("An entry for 0x%x already exists (%r), and "
                             "strict mode is enabled" % (
                                 offset, offset_loc_key
                             ))
    else:
        # Non-strict mode
        if name_loc_key is not None:
            known_offset = self.get_offset_location(name_loc_key)
            if known_offset != offset:
                raise ValueError(
                    "Location with name '%s' already have an offset: 0x%x "
                    "(!= 0x%x)" % (name, offset, known_offset)
                    )
            # Name already known, same offset -> nothing to do
            return name_loc_key
        elif offset_loc_key is not None:
            if name is not None:
                # This is an error. Check for already known name are checked above
                raise ValueError(
                    "Location with offset 0x%x already exists."
                    "To add a name to this location, use the dedicated API"
                    "'add_location_name(%r, %r)'" % (
                        offset_loc_key,
                        name
                    ))
            # Offset already known, no name specified
            return offset_loc_key
    # No collision, this is a brand new location
    loc_key = LocKey(self._loc_key_num)
    self._loc_key_num += 1
    self._loc_keys.add(loc_key)
    if offset is not None:
        assert offset not in self._offset_to_loc_key
        self._offset_to_loc_key[offset] = loc_key
        self._loc_key_to_offset[loc_key] = offset
    if name is not None:
        self._name_to_loc_key[name] = loc_key
        self._loc_key_to_names[loc_key] = set([name])
    return loc_key

def add_location_name(

self, loc_key, name)

Inheritance: AsmSymbolPool.add_location_name

Associate a name @name to a given @loc_key @name: str instance @loc_key: LocKey instance

def add_location_name(self, loc_key, name):
    """Associate a name @name to a given @loc_key
    @name: str instance
    @loc_key: LocKey instance
    """
    assert loc_key in self._loc_keys
    already_existing_loc = self._name_to_loc_key.get(name)
    if already_existing_loc is not None and already_existing_loc != loc_key:
        raise KeyError("%r is already associated to a different loc_key "
                       "(%r)" % (name, already_existing_loc))
    self._loc_key_to_names.setdefault(loc_key, set()).add(name)
    self._name_to_loc_key[name] = loc_key

def canonize_to_exprloc(

self, expr)

Inheritance: AsmSymbolPool.canonize_to_exprloc

If expr is ExprInt, return ExprLoc with corresponding loc_key Else, return expr

@expr: Expr instance

def canonize_to_exprloc(self, expr):
    """
    If expr is ExprInt, return ExprLoc with corresponding loc_key
    Else, return expr
    @expr: Expr instance
    """
    if expr.is_int():
        loc_key = self.get_or_create_offset_location(int(expr))
        ret = ExprLoc(loc_key, expr.size)
        return ret
    return expr

def consistency_check(

self)

Inheritance: AsmSymbolPool.consistency_check

Ensure internal structures are consistent with each others

def consistency_check(self):
    """Ensure internal structures are consistent with each others"""
    assert set(self._loc_key_to_names).issubset(self._loc_keys)
    assert set(self._loc_key_to_offset).issubset(self._loc_keys)
    assert self._loc_key_to_offset == {v: k for k, v in self._offset_to_loc_key.iteritems()}
    assert reduce(
        lambda x, y:x.union(y),
        self._loc_key_to_names.itervalues(),
        set(),
    ) == set(self._name_to_loc_key)
    for name, loc_key in self._name_to_loc_key.iteritems():
        assert name in self._loc_key_to_names[loc_key]

def del_loc_key_offset(

self, loc_key)

Inheritance: AsmSymbolPool.del_loc_key_offset

[DEPRECATED API], see 'unset_location_offset'

def del_loc_key_offset(self, loc_key):
    """[DEPRECATED API], see 'unset_location_offset'"""
    warnings.warn("Deprecated API: use 'unset_location_offset'")
    self.unset_location_offset(loc_key)

def gen_loc_key(

self)

Inheritance: AsmSymbolPool.gen_loc_key

[DEPRECATED API], see 'add_location'

def gen_loc_key(self):
    """[DEPRECATED API], see 'add_location'"""
    warnings.warn("Deprecated API: use 'add_location'")
    return self.add_location()

def get_location_names(

self, loc_key)

Inheritance: AsmSymbolPool.get_location_names

Return the frozenset of names associated to @loc_key @loc_key: LocKey instance

def get_location_names(self, loc_key):
    """
    Return the frozenset of names associated to @loc_key
    @loc_key: LocKey instance
    """
    assert isinstance(loc_key, LocKey)
    return frozenset(self._loc_key_to_names.get(loc_key, set()))

def get_location_offset(

self, loc_key)

Inheritance: AsmSymbolPool.get_location_offset

Return the offset of @loc_key if any, None otherwise. @loc_key: LocKey instance

def get_location_offset(self, loc_key):
    """
    Return the offset of @loc_key if any, None otherwise.
    @loc_key: LocKey instance
    """
    assert isinstance(loc_key, LocKey)
    return self._loc_key_to_offset.get(loc_key)

def get_name_location(

self, name)

Inheritance: AsmSymbolPool.get_name_location

Return the LocKey of @name if any, None otherwise. @name: target name

def get_name_location(self, name):
    """
    Return the LocKey of @name if any, None otherwise.
    @name: target name
    """
    return self._name_to_loc_key.get(name)

def get_name_offset(

self, name)

Inheritance: AsmSymbolPool.get_name_offset

Return the offset of @name if any, None otherwise. @name: target name

def get_name_offset(self, name):
    """
    Return the offset of @name if any, None otherwise.
    @name: target name
    """
    loc_key = self.get_name_location(name)
    if loc_key is None:
        return None
    return self.get_location_offset(loc_key)

def get_offset_location(

self, offset)

Inheritance: AsmSymbolPool.get_offset_location

Return the LocKey of @offset if any, None otherwise. @name: target offset

def get_offset_location(self, offset):
    """
    Return the LocKey of @offset if any, None otherwise.
    @name: target offset
    """
    return self._offset_to_loc_key.get(offset)

def get_or_create_name_location(

self, name)

Inheritance: AsmSymbolPool.get_or_create_name_location

Return the LocKey of @name if any, create one otherwise. @name: target name

def get_or_create_name_location(self, name):
    """
    Return the LocKey of @name if any, create one otherwise.
    @name: target name
    """
    loc_key = self._name_to_loc_key.get(name)
    if loc_key is not None:
        return loc_key
    return self.add_location(name=name)

def get_or_create_offset_location(

self, offset)

Inheritance: AsmSymbolPool.get_or_create_offset_location

Return the LocKey of @offset if any, create one otherwise. @offset: target offset

def get_or_create_offset_location(self, offset):
    """
    Return the LocKey of @offset if any, create one otherwise.
    @offset: target offset
    """
    loc_key = self._offset_to_loc_key.get(offset)
    if loc_key is not None:
        return loc_key
    return self.add_location(offset=offset)

def getby_name(

self, name)

Inheritance: AsmSymbolPool.getby_name

[DEPRECATED API], see 'get_name_location'

def getby_name(self, name):
    """[DEPRECATED API], see 'get_name_location'"""
    warnings.warn("Deprecated API: use 'get_name_location'")
    return self.get_name_location(name)

def getby_name_create(

self, name)

Inheritance: AsmSymbolPool.getby_name_create

[DEPRECATED API], see 'get_or_create_name_location'

def getby_name_create(self, name):
    """[DEPRECATED API], see 'get_or_create_name_location'"""
    warnings.warn("Deprecated API: use 'get_or_create_name_location'")
    return self.get_or_create_name_location(name)

def getby_offset(

self, offset)

Inheritance: AsmSymbolPool.getby_offset

[DEPRECATED API], see 'get_offset_location'

def getby_offset(self, offset):
    """[DEPRECATED API], see 'get_offset_location'"""
    warnings.warn("Deprecated API: use 'get_offset_location'")
    return self.get_offset_location(offset)

def getby_offset_create(

self, offset)

Inheritance: AsmSymbolPool.getby_offset_create

[DEPRECATED API], see 'get_or_create_offset_location'

def getby_offset_create(self, offset):
    """[DEPRECATED API], see 'get_or_create_offset_location'"""
    warnings.warn("Deprecated API: use 'get_or_create_offset_location'")
    return self.get_or_create_offset_location(offset)

def loc_key_to_name(

self, loc_key)

Inheritance: AsmSymbolPool.loc_key_to_name

[DEPRECATED API], see 'get_location_names'

def loc_key_to_name(self, loc_key):
    """[DEPRECATED API], see 'get_location_names'"""
    warnings.warn("Deprecated API: use 'get_location_names'")
    return sorted(self.get_location_names(loc_key))[0]

def loc_key_to_offset(

self, loc_key)

Inheritance: AsmSymbolPool.loc_key_to_offset

[DEPRECATED API], see 'get_location_offset'

def loc_key_to_offset(self, loc_key):
    """[DEPRECATED API], see 'get_location_offset'"""
    warnings.warn("Deprecated API: use 'get_location_offset'")
    return self.get_location_offset(loc_key)

def merge(

self, location_db)

Inheritance: AsmSymbolPool.merge

Merge with another LocationDB @location_db

WARNING: old reference to @location_db information (such as LocKeys) must be retrieved from the updated version of this instance. The dedicated "get_*" APIs may be used for this task

def merge(self, location_db):
    """Merge with another LocationDB @location_db
    WARNING: old reference to @location_db information (such as LocKeys)
    must be retrieved from the updated version of this instance. The
    dedicated "get_*" APIs may be used for this task
    """
    # A simple merge is not doable here, because LocKey will certainly
    # collides
    for foreign_loc_key in location_db.loc_keys:
        foreign_names = location_db.get_location_names(foreign_loc_key)
        foreign_offset = location_db.get_location_offset(foreign_loc_key)
        if foreign_names:
            init_name = list(foreign_names)[0]
        else:
            init_name = None
        loc_key = self.add_location(offset=foreign_offset, name=init_name,
                                    strict=False)
        cur_names = self.get_location_names(loc_key)
        for name in foreign_names:
            if name not in cur_names and name != init_name:
                self.add_location_name(loc_key, name=name)

def pretty_str(

self, loc_key)

Inheritance: AsmSymbolPool.pretty_str

Return a human readable version of @loc_key, according to information available in this LocationDB instance

def pretty_str(self, loc_key):
    """Return a human readable version of @loc_key, according to information
    available in this LocationDB instance"""
    names = self.get_location_names(loc_key)
    if names:
        return ",".join(names)
    offset = self.get_location_offset(loc_key)
    if offset is not None:
        return "loc_%x" % offset
    return str(loc_key)

def remove_loc_key(

self, loc_key)

Inheritance: AsmSymbolPool.remove_loc_key

[DEPRECATED API], see 'remove_location'

def remove_loc_key(self, loc_key):
    """[DEPRECATED API], see 'remove_location'"""
    warnings.warn("Deprecated API: use 'remove_location'")
    self.remove_location(loc_key)

def remove_location(

self, loc_key)

Inheritance: AsmSymbolPool.remove_location

Delete the location corresponding to @loc_key @loc_key: LocKey instance

def remove_location(self, loc_key):
    """
    Delete the location corresponding to @loc_key
    @loc_key: LocKey instance
    """
    assert isinstance(loc_key, LocKey)
    if loc_key not in self._loc_keys:
        raise KeyError("Unknown loc_key %r" % loc_key)
    names = self._loc_key_to_names.pop(loc_key, [])
    for name in names:
        del self._name_to_loc_key[name]
    offset = self._loc_key_to_offset.pop(loc_key, None)
    self._offset_to_loc_key.pop(offset, None)
    self._loc_keys.remove(loc_key)

def remove_location_name(

self, loc_key, name)

Inheritance: AsmSymbolPool.remove_location_name

Disassociate a name @name from a given @loc_key Fail if @name is not already associated to @loc_key @name: str instance @loc_key: LocKey instance

def remove_location_name(self, loc_key, name):
    """Disassociate a name @name from a given @loc_key
    Fail if @name is not already associated to @loc_key
    @name: str instance
    @loc_key: LocKey instance
    """
    assert loc_key in self._loc_keys
    already_existing_loc = self._name_to_loc_key.get(name)
    if already_existing_loc is None:
        raise KeyError("%r is not already associated" % name)
    if already_existing_loc != loc_key:
        raise KeyError("%r is already associated to a different loc_key "
                       "(%r)" % (name, already_existing_loc))
    del self._name_to_loc_key[name]
    self._loc_key_to_names[loc_key].remove(name)

def rename_location(

self, loc_key, newname)

Inheritance: AsmSymbolPool.rename_location

[DEPRECATED API], see 'add_name_location' and 'remove_location_name'

def rename_location(self, loc_key, newname):
    """[DEPRECATED API], see 'add_name_location' and 'remove_location_name'
    """
    warnings.warn("Deprecated API: use 'add_location_name' and "
                  "'remove_location_name'")
    for name in self.get_location_names(loc_key):
        self.remove_location_name(loc_key, name)
    self.add_location_name(loc_key, name)

def set_location_offset(

self, loc_key, offset, force=False)

Inheritance: AsmSymbolPool.set_location_offset

Associate the offset @offset to an LocKey @loc_key

If @force is set, override silently. Otherwise, if an offset is already associated to @loc_key, an error will be raised

def set_location_offset(self, loc_key, offset, force=False):
    """Associate the offset @offset to an LocKey @loc_key
    If @force is set, override silently. Otherwise, if an offset is already
    associated to @loc_key, an error will be raised
    """
    assert loc_key in self._loc_keys
    already_existing_loc = self.get_offset_location(offset)
    if already_existing_loc is not None and already_existing_loc != loc_key:
        raise KeyError("%r is already associated to a different loc_key "
                       "(%r)" % (offset, already_existing_loc))
    already_existing_off = self._loc_key_to_offset.get(loc_key)
    if (already_existing_off is not None and
        already_existing_off != offset):
        if not force:
            raise ValueError(
                "%r already has an offset (0x%x). Use 'force=True'"
                " for silent overriding" % (
                    loc_key, already_existing_off
                ))
        else:
            self.unset_location_offset(loc_key)
    self._offset_to_loc_key[offset] = loc_key
    self._loc_key_to_offset[loc_key] = offset

def set_offset(

self, loc_key, offset)

Inheritance: AsmSymbolPool.set_offset

[DEPRECATED API], see 'set_location_offset'

def set_offset(self, loc_key, offset):
    """[DEPRECATED API], see 'set_location_offset'"""
    warnings.warn("Deprecated API: use 'set_location_offset'")
    self.set_location_offset(loc_key, offset, force=True)

def str_loc_key(

self, loc_key)

Inheritance: AsmSymbolPool.str_loc_key

[DEPRECATED API], see 'pretty_str'

def str_loc_key(self, loc_key):
    """[DEPRECATED API], see 'pretty_str'"""
    warnings.warn("Deprecated API: use 'pretty_str'")
    return self.pretty_str(loc_key)

def unset_location_offset(

self, loc_key)

Inheritance: AsmSymbolPool.unset_location_offset

Disassociate LocKey @loc_key's offset

Fail if there is already no offset associate with it @loc_key: LocKey

def unset_location_offset(self, loc_key):
    """Disassociate LocKey @loc_key's offset
    Fail if there is already no offset associate with it
    @loc_key: LocKey
    """
    assert loc_key in self._loc_keys
    already_existing_off = self._loc_key_to_offset.get(loc_key)
    if already_existing_off is None:
        raise ValueError("%r already has no offset" % (loc_key))
    del self._offset_to_loc_key[already_existing_off]
    del self._loc_key_to_offset[loc_key]

class disasmEngine

Disassembly engine, taking care of disassembler options and mutli-block strategy.

Engine options:

  • Object supporting membership test (offset in ..)
  • dont_dis: stop the current disassembly branch if reached
  • split_dis: force a basic block end if reached, with a next constraint on its successor
  • dont_dis_retcall_funcs: stop disassembly after a call to one of the given functions

  • On/Off

  • follow_call: recursively disassemble CALL destinations
  • dontdis_retcall: stop on CALL return addresses
  • dont_dis_nulstart_bloc: stop if a block begin with a few

  • Number

  • lines_wd: maximum block's size (in number of instruction)
  • blocs_wd: maximum number of distinct disassembled block

  • callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis, loc_db)

  • dis_block_callback: callback after each new disassembled block
class disasmEngine(object):

    """Disassembly engine, taking care of disassembler options and mutli-block
    strategy.

    Engine options:

    + Object supporting membership test (offset in ..)
     - dont_dis: stop the current disassembly branch if reached
     - split_dis: force a basic block end if reached,
                  with a next constraint on its successor
     - dont_dis_retcall_funcs: stop disassembly after a call to one
                               of the given functions

    + On/Off
     - follow_call: recursively disassemble CALL destinations
     - dontdis_retcall: stop on CALL return addresses
     - dont_dis_nulstart_bloc: stop if a block begin with a few \x00

    + Number
     - lines_wd: maximum block's size (in number of instruction)
     - blocs_wd: maximum number of distinct disassembled block

    + callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis,
               loc_db)
     - dis_block_callback: callback after each new disassembled block
    """

    def __init__(self, arch, attrib, bin_stream, **kwargs):
        """Instanciate a new disassembly engine
        @arch: targeted architecture
        @attrib: architecture attribute
        @bin_stream: bytes source
        @kwargs: (optional) custom options
        """
        self.arch = arch
        self.attrib = attrib
        self.bin_stream = bin_stream
        self.loc_db = LocationDB()

        # Setup options
        self.dont_dis = []
        self.split_dis = []
        self.follow_call = False
        self.dontdis_retcall = False
        self.lines_wd = None
        self.blocs_wd = None
        self.dis_block_callback = None
        self.dont_dis_nulstart_bloc = False
        self.dont_dis_retcall_funcs = set()

        # Override options if needed
        self.__dict__.update(kwargs)

    def get_job_done(self):
        warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""")
        return set()

    def set_job_done(self, _):
        warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""")
        return

    def get_dis_bloc_callback(self):
        warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""")
        return self.dis_block_callback

    def set_dis_bloc_callback(self, function):
        warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""")
        self.dis_block_callback = function

    @property
    def symbol_pool(self):
        warnings.warn("""DEPRECATION WARNING: use 'loc_db'""")
        return self.loc_db

    # Deprecated
    job_done = property(get_job_done, set_job_done)
    dis_bloc_callback = property(get_dis_bloc_callback, set_dis_bloc_callback)

    def _dis_block(self, offset, job_done=None):
        """Disassemble the block at offset @offset
        @job_done: a set of already disassembled addresses
        Return the created AsmBlock and future offsets to disassemble
        """

        if job_done is None:
            job_done = set()
        lines_cpt = 0
        in_delayslot = False
        delayslot_count = self.arch.delayslot
        offsets_to_dis = set()
        add_next_offset = False
        loc_key = self.loc_db.get_or_create_offset_location(offset)
        cur_block = AsmBlock(loc_key)
        log_asmblock.debug("dis at %X", int(offset))
        while not in_delayslot or delayslot_count > 0:
            if in_delayslot:
                delayslot_count -= 1

            if offset in self.dont_dis:
                if not cur_block.lines:
                    job_done.add(offset)
                    # Block is empty -> bad block
                    cur_block = AsmBlockBad(loc_key, errno=AsmBlockBad.ERROR_FORBIDDEN)
                else:
                    # Block is not empty, stop the desassembly pass and add a
                    # constraint to the next block
                    loc_key_cst = self.loc_db.get_or_create_offset_location(offset)
                    cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                break

            if lines_cpt > 0 and offset in self.split_dis:
                loc_key_cst = self.loc_db.get_or_create_offset_location(offset)
                cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                offsets_to_dis.add(offset)
                break

            lines_cpt += 1
            if self.lines_wd is not None and lines_cpt > self.lines_wd:
                log_asmblock.debug("lines watchdog reached at %X", int(offset))
                break

            if offset in job_done:
                loc_key_cst = self.loc_db.get_or_create_offset_location(offset)
                cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                break

            off_i = offset
            error = None
            try:
                instr = self.arch.dis(self.bin_stream, self.attrib, offset)
            except Disasm_Exception as e:
                log_asmblock.warning(e)
                instr = None
                error = AsmBlockBad.ERROR_CANNOT_DISASM
            except IOError as e:
                log_asmblock.warning(e)
                instr = None
                error = AsmBlockBad.ERROR_IO


            if instr is None:
                log_asmblock.warning("cannot disasm at %X", int(off_i))
                if not cur_block.lines:
                    job_done.add(offset)
                    # Block is empty -> bad block
                    cur_block = AsmBlockBad(loc_key, errno=error)
                else:
                    # Block is not empty, stop the desassembly pass and add a
                    # constraint to the next block
                    loc_key_cst = self.loc_db.get_or_create_offset_location(off_i)
                    cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                break

            # XXX TODO nul start block option
            if self.dont_dis_nulstart_bloc and instr.b.count('\x00') == instr.l:
                log_asmblock.warning("reach nul instr at %X", int(off_i))
                if not cur_block.lines:
                    # Block is empty -> bad block
                    cur_block = AsmBlockBad(loc_key, errno=AsmBlockBad.ERROR_NULL_STARTING_BLOCK)
                else:
                    # Block is not empty, stop the desassembly pass and add a
                    # constraint to the next block
                    loc_key_cst = self.loc_db.get_or_create_offset_location(off_i)
                    cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
                break

            # special case: flow graph modificator in delayslot
            if in_delayslot and instr and (instr.splitflow() or instr.breakflow()):
                add_next_offset = True
                break

            job_done.add(offset)
            log_asmblock.debug("dis at %X", int(offset))

            offset += instr.l
            log_asmblock.debug(instr)
            log_asmblock.debug(instr.args)

            cur_block.addline(instr)
            if not instr.breakflow():
                continue
            # test split
            if instr.splitflow() and not (instr.is_subcall() and self.dontdis_retcall):
                add_next_offset = True
            if instr.dstflow():
                instr.dstflow2label(self.loc_db)
                destinations = instr.getdstflow(self.loc_db)
                known_dsts = []
                for dst in destinations:
                    if not dst.is_loc():
                        continue
                    loc_key = dst.loc_key
                    loc_key_offset = self.loc_db.get_location_offset(loc_key)
                    known_dsts.append(loc_key)
                    if loc_key_offset in self.dont_dis_retcall_funcs:
                        add_next_offset = False
                if (not instr.is_subcall()) or self.follow_call:
                    cur_block.bto.update([AsmConstraint(loc_key, AsmConstraint.c_to) for loc_key in known_dsts])

            # get in delayslot mode
            in_delayslot = True
            delayslot_count = instr.delayslot

        for c in cur_block.bto:
            loc_key_offset = self.loc_db.get_location_offset(c.loc_key)
            offsets_to_dis.add(loc_key_offset)

        if add_next_offset:
            loc_key_cst = self.loc_db.get_or_create_offset_location(offset)
            cur_block.add_cst(loc_key_cst, AsmConstraint.c_next)
            offsets_to_dis.add(offset)

        # Fix multiple constraints
        cur_block.fix_constraints()

        if self.dis_block_callback is not None:
            self.dis_block_callback(mn=self.arch, attrib=self.attrib,
                                    pool_bin=self.bin_stream, cur_bloc=cur_block,
                                    offsets_to_dis=offsets_to_dis,
                                    loc_db=self.loc_db,
                                    # Deprecated API
                                    symbol_pool=self.loc_db)
        return cur_block, offsets_to_dis

    def dis_block(self, offset):
        """Disassemble the block at offset @offset and return the created
        AsmBlock
        @offset: targeted offset to disassemble
        """
        current_block, _ = self._dis_block(offset)
        return current_block

    def dis_bloc(self, offset):
        """
        DEPRECATED function
        Use dis_block instead of dis_bloc
        """
        warnings.warn('DEPRECATION WARNING: use "dis_block" instead of "dis_bloc"')
        return self.dis_block(offset)

    def dis_multiblock(self, offset, blocks=None):
        """Disassemble every block reachable from @offset regarding
        specific disasmEngine conditions
        Return an AsmCFG instance containing disassembled blocks
        @offset: starting offset
        @blocks: (optional) AsmCFG instance of already disassembled blocks to
                merge with
        """
        log_asmblock.info("dis bloc all")
        job_done = set()
        if blocks is None:
            blocks = AsmCFG(self.loc_db)
        todo = [offset]

        bloc_cpt = 0
        while len(todo):
            bloc_cpt += 1
            if self.blocs_wd is not None and bloc_cpt > self.blocs_wd:
                log_asmblock.debug("blocks watchdog reached at %X", int(offset))
                break

            target_offset = int(todo.pop(0))
            if (target_offset is None or
                    target_offset in job_done):
                continue
            cur_block, nexts = self._dis_block(target_offset, job_done)
            todo += nexts
            blocks.add_block(cur_block)

        blocks.apply_splitting(self.loc_db,
                               dis_block_callback=self.dis_block_callback,
                               mn=self.arch, attrib=self.attrib,
                               pool_bin=self.bin_stream)
        return blocks

    def dis_multibloc(self, offset, blocs=None):
        """
        DEPRECATED function
        Use dis_multiblock instead of dis_multibloc
        """
        warnings.warn('DEPRECATION WARNING: use "dis_multiblock" instead of "dis_multibloc"')
        return self.dis_multiblock(offset, blocs)

    def dis_instr(self, offset):
        """Disassemble one instruction at offset @offset and return the
        corresponding instruction instance
        @offset: targeted offset to disassemble
        """
        old_lineswd = self.lines_wd
        self.lines_wd = 1
        try:
            block = self.dis_block(offset)
        finally:
            self.lines_wd = old_lineswd

        instr = block.lines[0]
        return instr

Ancestors (in MRO)

Class variables

var dis_bloc_callback

var job_done

Instance variables

var arch

var attrib

var bin_stream

var blocs_wd

var dis_bloc_callback

var dis_block_callback

var dont_dis

var dont_dis_nulstart_bloc

var dont_dis_retcall_funcs

var dontdis_retcall

var follow_call

var job_done

var lines_wd

var loc_db

var split_dis

var symbol_pool

Methods

def __init__(

self, arch, attrib, bin_stream, **kwargs)

Instanciate a new disassembly engine @arch: targeted architecture @attrib: architecture attribute @bin_stream: bytes source @kwargs: (optional) custom options

def __init__(self, arch, attrib, bin_stream, **kwargs):
    """Instanciate a new disassembly engine
    @arch: targeted architecture
    @attrib: architecture attribute
    @bin_stream: bytes source
    @kwargs: (optional) custom options
    """
    self.arch = arch
    self.attrib = attrib
    self.bin_stream = bin_stream
    self.loc_db = LocationDB()
    # Setup options
    self.dont_dis = []
    self.split_dis = []
    self.follow_call = False
    self.dontdis_retcall = False
    self.lines_wd = None
    self.blocs_wd = None
    self.dis_block_callback = None
    self.dont_dis_nulstart_bloc = False
    self.dont_dis_retcall_funcs = set()
    # Override options if needed
    self.__dict__.update(kwargs)

def dis_bloc(

self, offset)

DEPRECATED function Use dis_block instead of dis_bloc

def dis_bloc(self, offset):
    """
    DEPRECATED function
    Use dis_block instead of dis_bloc
    """
    warnings.warn('DEPRECATION WARNING: use "dis_block" instead of "dis_bloc"')
    return self.dis_block(offset)

def dis_block(

self, offset)

Disassemble the block at offset @offset and return the created AsmBlock @offset: targeted offset to disassemble

def dis_block(self, offset):
    """Disassemble the block at offset @offset and return the created
    AsmBlock
    @offset: targeted offset to disassemble
    """
    current_block, _ = self._dis_block(offset)
    return current_block

def dis_instr(

self, offset)

Disassemble one instruction at offset @offset and return the corresponding instruction instance @offset: targeted offset to disassemble

def dis_instr(self, offset):
    """Disassemble one instruction at offset @offset and return the
    corresponding instruction instance
    @offset: targeted offset to disassemble
    """
    old_lineswd = self.lines_wd
    self.lines_wd = 1
    try:
        block = self.dis_block(offset)
    finally:
        self.lines_wd = old_lineswd
    instr = block.lines[0]
    return instr

def dis_multibloc(

self, offset, blocs=None)

DEPRECATED function Use dis_multiblock instead of dis_multibloc

def dis_multibloc(self, offset, blocs=None):
    """
    DEPRECATED function
    Use dis_multiblock instead of dis_multibloc
    """
    warnings.warn('DEPRECATION WARNING: use "dis_multiblock" instead of "dis_multibloc"')
    return self.dis_multiblock(offset, blocs)

def dis_multiblock(

self, offset, blocks=None)

Disassemble every block reachable from @offset regarding specific disasmEngine conditions Return an AsmCFG instance containing disassembled blocks @offset: starting offset @blocks: (optional) AsmCFG instance of already disassembled blocks to merge with

def dis_multiblock(self, offset, blocks=None):
    """Disassemble every block reachable from @offset regarding
    specific disasmEngine conditions
    Return an AsmCFG instance containing disassembled blocks
    @offset: starting offset
    @blocks: (optional) AsmCFG instance of already disassembled blocks to
            merge with
    """
    log_asmblock.info("dis bloc all")
    job_done = set()
    if blocks is None:
        blocks = AsmCFG(self.loc_db)
    todo = [offset]
    bloc_cpt = 0
    while len(todo):
        bloc_cpt += 1
        if self.blocs_wd is not None and bloc_cpt > self.blocs_wd:
            log_asmblock.debug("blocks watchdog reached at %X", int(offset))
            break
        target_offset = int(todo.pop(0))
        if (target_offset is None or
                target_offset in job_done):
            continue
        cur_block, nexts = self._dis_block(target_offset, job_done)
        todo += nexts
        blocks.add_block(cur_block)
    blocks.apply_splitting(self.loc_db,
                           dis_block_callback=self.dis_block_callback,
                           mn=self.arch, attrib=self.attrib,
                           pool_bin=self.bin_stream)
    return blocks

def get_dis_bloc_callback(

self)

def get_dis_bloc_callback(self):
    warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""")
    return self.dis_block_callback

def get_job_done(

self)

def get_job_done(self):
    warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""")
    return set()

def set_dis_bloc_callback(

self, function)

def set_dis_bloc_callback(self, function):
    warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""")
    self.dis_block_callback = function

def set_job_done(

self, _)

def set_job_done(self, _):
    warnings.warn("""DEPRECATION WARNING: "job_done" is not needed anymore, support is dropped.""")
    return