Top

miasm2.analysis.data_analysis module

from miasm2.expression.expression \
    import get_expr_mem, get_list_rw, ExprId, ExprInt
from miasm2.ir.symbexec import SymbolicExecutionEngine


def get_node_name(label, i, n):
    n_name = (label, i, n)
    return n_name


def intra_block_flow_raw(ir_arch, ircfg, flow_graph, irb, in_nodes, out_nodes):
    """
    Create data flow for an irbloc using raw IR expressions
    """
    current_nodes = {}
    for i, assignblk in enumerate(irb):
        dict_rw = assignblk.get_rw(cst_read=True)
        current_nodes.update(out_nodes)

        # gen mem arg to mem node links
        all_mems = set()
        for node_w, nodes_r in dict_rw.iteritems():
            for n in nodes_r.union([node_w]):
                all_mems.update(get_expr_mem(n))
            if not all_mems:
                continue

            for n in all_mems:
                node_n_w = get_node_name(irb.loc_key, i, n)
                if not n in nodes_r:
                    continue
                o_r = n.arg.get_r(mem_read=False, cst_read=True)
                for n_r in o_r:
                    if n_r in current_nodes:
                        node_n_r = current_nodes[n_r]
                    else:
                        node_n_r = get_node_name(irb.loc_key, i, n_r)
                        current_nodes[n_r] = node_n_r
                        in_nodes[n_r] = node_n_r
                    flow_graph.add_uniq_edge(node_n_r, node_n_w)

        # gen data flow links
        for node_w, nodes_r in dict_rw.iteritems():
            for n_r in nodes_r:
                if n_r in current_nodes:
                    node_n_r = current_nodes[n_r]
                else:
                    node_n_r = get_node_name(irb.loc_key, i, n_r)
                    current_nodes[n_r] = node_n_r
                    in_nodes[n_r] = node_n_r

                flow_graph.add_node(node_n_r)

                node_n_w = get_node_name(irb.loc_key, i + 1, node_w)
                out_nodes[node_w] = node_n_w

                flow_graph.add_node(node_n_w)
                flow_graph.add_uniq_edge(node_n_r, node_n_w)



def inter_block_flow_link(ir_arch, ircfg, flow_graph, irb_in_nodes, irb_out_nodes, todo, link_exec_to_data):
    lbl, current_nodes, exec_nodes = todo
    current_nodes = dict(current_nodes)

    # link current nodes to bloc in_nodes
    if not lbl in ircfg.blocks:
        print "cannot find bloc!!", lbl
        return set()
    irb = ircfg.blocks[lbl]
    to_del = set()
    for n_r, node_n_r in irb_in_nodes[irb.loc_key].items():
        if not n_r in current_nodes:
            continue
        flow_graph.add_uniq_edge(current_nodes[n_r], node_n_r)
        to_del.add(n_r)

    # if link exec to data, all nodes depends on exec nodes
    if link_exec_to_data:
        for n_x_r in exec_nodes:
            for n_r, node_n_r in irb_in_nodes[irb.loc_key].items():
                if not n_x_r in current_nodes:
                    continue
                if isinstance(n_r, ExprInt):
                    continue
                flow_graph.add_uniq_edge(current_nodes[n_x_r], node_n_r)

    # update current nodes using bloc out_nodes
    for n_w, node_n_w in irb_out_nodes[irb.loc_key].items():
        current_nodes[n_w] = node_n_w

    # get nodes involved in exec flow
    x_nodes = tuple(sorted(list(irb.dst.get_r())))

    todo = set()
    for lbl_dst in ircfg.successors(irb.loc_key):
        todo.add((lbl_dst, tuple(current_nodes.items()), x_nodes))

    return todo


def create_implicit_flow(ir_arch, flow_graph, irb_in_nodes, irb_out_ndes):

    # first fix IN/OUT
    # If a son read a node which in not in OUT, add it
    todo = set(ir_arch.blocks.keys())
    while todo:
        lbl = todo.pop()
        irb = ir_arch.blocks[lbl]
        for lbl_son in ir_arch.graph.successors(irb.loc_key):
            if not lbl_son in ir_arch.blocks:
                print "cannot find bloc!!", lbl
                continue
            irb_son = ir_arch.blocks[lbl_son]
            for n_r in irb_in_nodes[irb_son.loc_key]:
                if n_r in irb_out_nodes[irb.loc_key]:
                    continue
                if not isinstance(n_r, ExprId):
                    continue

                node_n_w = irb.loc_key, len(irb), n_r
                irb_out_nodes[irb.loc_key][n_r] = node_n_w
                if not n_r in irb_in_nodes[irb.loc_key]:
                    irb_in_nodes[irb.loc_key][n_r] = irb.loc_key, 0, n_r
                node_n_r = irb_in_nodes[irb.loc_key][n_r]
                for lbl_p in ir_arch.graph.predecessors(irb.loc_key):
                    todo.add(lbl_p)

                flow_graph.add_uniq_edge(node_n_r, node_n_w)


def inter_block_flow(ir_arch, ircfg, flow_graph, irb_0, irb_in_nodes, irb_out_nodes, link_exec_to_data=True):

    todo = set()
    done = set()
    todo.add((irb_0, (), ()))

    while todo:
        state = todo.pop()
        if state in done:
            continue
        done.add(state)
        out = inter_block_flow_link(ir_arch, ircfg, flow_graph, irb_in_nodes, irb_out_nodes, state, link_exec_to_data)
        todo.update(out)


class symb_exec_func:

    """
    This algorithm will do symbolic execution on a function, trying to propagate
    states between basic blocks in order to extract inter-blocs dataflow. The
    algorithm tries to merge states from blocks with multiple parents.

    There is no real magic here, loops and complex merging will certainly fail.
    """

    def __init__(self, ir_arch):
        self.todo = set()
        self.stateby_ad = {}
        self.cpt = {}
        self.states_var_done = set()
        self.states_done = set()
        self.total_done = 0
        self.ir_arch = ir_arch

    def add_state(self, parent, ad, state):
        variables = dict(state.symbols.items())

        # get bloc dead, and remove from state
        b = self.ir_arch.get_block(ad)
        if b is None:
            raise ValueError("unknown bloc! %s" % ad)
        variables = variables.items()

        s = parent, ad, tuple(sorted(variables))
        self.todo.add(s)

    def get_next_state(self):
        state = self.todo.pop()
        return state

    def do_step(self):
        if len(self.todo) == 0:
            return None
        if self.total_done > 600:
            print "symbexec watchdog!"
            return None
        self.total_done += 1
        print 'CPT', self.total_done
        while self.todo:
            state = self.get_next_state()
            parent, ad, s = state
            self.states_done.add(state)
            self.states_var_done.add(state)

            sb = SymbolicExecutionEngine(self.ir_arch, dict(s))

            return parent, ad, sb
        return None

Functions

def create_implicit_flow(

ir_arch, flow_graph, irb_in_nodes, irb_out_ndes)

def create_implicit_flow(ir_arch, flow_graph, irb_in_nodes, irb_out_ndes):

    # first fix IN/OUT
    # If a son read a node which in not in OUT, add it
    todo = set(ir_arch.blocks.keys())
    while todo:
        lbl = todo.pop()
        irb = ir_arch.blocks[lbl]
        for lbl_son in ir_arch.graph.successors(irb.loc_key):
            if not lbl_son in ir_arch.blocks:
                print "cannot find bloc!!", lbl
                continue
            irb_son = ir_arch.blocks[lbl_son]
            for n_r in irb_in_nodes[irb_son.loc_key]:
                if n_r in irb_out_nodes[irb.loc_key]:
                    continue
                if not isinstance(n_r, ExprId):
                    continue

                node_n_w = irb.loc_key, len(irb), n_r
                irb_out_nodes[irb.loc_key][n_r] = node_n_w
                if not n_r in irb_in_nodes[irb.loc_key]:
                    irb_in_nodes[irb.loc_key][n_r] = irb.loc_key, 0, n_r
                node_n_r = irb_in_nodes[irb.loc_key][n_r]
                for lbl_p in ir_arch.graph.predecessors(irb.loc_key):
                    todo.add(lbl_p)

                flow_graph.add_uniq_edge(node_n_r, node_n_w)

def get_node_name(

label, i, n)

def get_node_name(label, i, n):
    n_name = (label, i, n)
    return n_name

def inter_block_flow(

ir_arch, ircfg, flow_graph, irb_0, irb_in_nodes, irb_out_nodes, link_exec_to_data=True)

def inter_block_flow(ir_arch, ircfg, flow_graph, irb_0, irb_in_nodes, irb_out_nodes, link_exec_to_data=True):

    todo = set()
    done = set()
    todo.add((irb_0, (), ()))

    while todo:
        state = todo.pop()
        if state in done:
            continue
        done.add(state)
        out = inter_block_flow_link(ir_arch, ircfg, flow_graph, irb_in_nodes, irb_out_nodes, state, link_exec_to_data)
        todo.update(out)

def intra_block_flow_raw(

ir_arch, ircfg, flow_graph, irb, in_nodes, out_nodes)

Create data flow for an irbloc using raw IR expressions

def intra_block_flow_raw(ir_arch, ircfg, flow_graph, irb, in_nodes, out_nodes):
    """
    Create data flow for an irbloc using raw IR expressions
    """
    current_nodes = {}
    for i, assignblk in enumerate(irb):
        dict_rw = assignblk.get_rw(cst_read=True)
        current_nodes.update(out_nodes)

        # gen mem arg to mem node links
        all_mems = set()
        for node_w, nodes_r in dict_rw.iteritems():
            for n in nodes_r.union([node_w]):
                all_mems.update(get_expr_mem(n))
            if not all_mems:
                continue

            for n in all_mems:
                node_n_w = get_node_name(irb.loc_key, i, n)
                if not n in nodes_r:
                    continue
                o_r = n.arg.get_r(mem_read=False, cst_read=True)
                for n_r in o_r:
                    if n_r in current_nodes:
                        node_n_r = current_nodes[n_r]
                    else:
                        node_n_r = get_node_name(irb.loc_key, i, n_r)
                        current_nodes[n_r] = node_n_r
                        in_nodes[n_r] = node_n_r
                    flow_graph.add_uniq_edge(node_n_r, node_n_w)

        # gen data flow links
        for node_w, nodes_r in dict_rw.iteritems():
            for n_r in nodes_r:
                if n_r in current_nodes:
                    node_n_r = current_nodes[n_r]
                else:
                    node_n_r = get_node_name(irb.loc_key, i, n_r)
                    current_nodes[n_r] = node_n_r
                    in_nodes[n_r] = node_n_r

                flow_graph.add_node(node_n_r)

                node_n_w = get_node_name(irb.loc_key, i + 1, node_w)
                out_nodes[node_w] = node_n_w

                flow_graph.add_node(node_n_w)
                flow_graph.add_uniq_edge(node_n_r, node_n_w)

Classes

class symb_exec_func

This algorithm will do symbolic execution on a function, trying to propagate states between basic blocks in order to extract inter-blocs dataflow. The algorithm tries to merge states from blocks with multiple parents.

There is no real magic here, loops and complex merging will certainly fail.

class symb_exec_func:

    """
    This algorithm will do symbolic execution on a function, trying to propagate
    states between basic blocks in order to extract inter-blocs dataflow. The
    algorithm tries to merge states from blocks with multiple parents.

    There is no real magic here, loops and complex merging will certainly fail.
    """

    def __init__(self, ir_arch):
        self.todo = set()
        self.stateby_ad = {}
        self.cpt = {}
        self.states_var_done = set()
        self.states_done = set()
        self.total_done = 0
        self.ir_arch = ir_arch

    def add_state(self, parent, ad, state):
        variables = dict(state.symbols.items())

        # get bloc dead, and remove from state
        b = self.ir_arch.get_block(ad)
        if b is None:
            raise ValueError("unknown bloc! %s" % ad)
        variables = variables.items()

        s = parent, ad, tuple(sorted(variables))
        self.todo.add(s)

    def get_next_state(self):
        state = self.todo.pop()
        return state

    def do_step(self):
        if len(self.todo) == 0:
            return None
        if self.total_done > 600:
            print "symbexec watchdog!"
            return None
        self.total_done += 1
        print 'CPT', self.total_done
        while self.todo:
            state = self.get_next_state()
            parent, ad, s = state
            self.states_done.add(state)
            self.states_var_done.add(state)

            sb = SymbolicExecutionEngine(self.ir_arch, dict(s))

            return parent, ad, sb
        return None

Ancestors (in MRO)

Instance variables

var cpt

var ir_arch

var stateby_ad

var states_done

var states_var_done

var todo

var total_done

Methods

def __init__(

self, ir_arch)

def __init__(self, ir_arch):
    self.todo = set()
    self.stateby_ad = {}
    self.cpt = {}
    self.states_var_done = set()
    self.states_done = set()
    self.total_done = 0
    self.ir_arch = ir_arch

def add_state(

self, parent, ad, state)

def add_state(self, parent, ad, state):
    variables = dict(state.symbols.items())
    # get bloc dead, and remove from state
    b = self.ir_arch.get_block(ad)
    if b is None:
        raise ValueError("unknown bloc! %s" % ad)
    variables = variables.items()
    s = parent, ad, tuple(sorted(variables))
    self.todo.add(s)

def do_step(

self)

def do_step(self):
    if len(self.todo) == 0:
        return None
    if self.total_done > 600:
        print "symbexec watchdog!"
        return None
    self.total_done += 1
    print 'CPT', self.total_done
    while self.todo:
        state = self.get_next_state()
        parent, ad, s = state
        self.states_done.add(state)
        self.states_var_done.add(state)
        sb = SymbolicExecutionEngine(self.ir_arch, dict(s))
        return parent, ad, sb
    return None

def get_next_state(

self)

def get_next_state(self):
    state = self.todo.pop()
    return state