Top

miasm2.jitter.jitcore_cc_base module

#-*- coding:utf-8 -*-

import os
import tempfile
from distutils.sysconfig import get_python_inc

from miasm2.jitter.jitcore import JitCore
from miasm2.core.utils import keydefaultdict


def gen_core(arch, attrib):
    lib_dir = os.path.dirname(os.path.realpath(__file__))

    txt = ""
    txt += '#include "%s/queue.h"\n' % lib_dir
    txt += '#include "%s/op_semantics.h"\n' % lib_dir
    txt += '#include "%s/vm_mngr.h"\n' % lib_dir
    txt += '#include "%s/vm_mngr_py.h"\n' % lib_dir
    txt += '#include "%s/bn.h"\n' % lib_dir
    txt += '#include "%s/JitCore.h"\n' % lib_dir
    txt += '#include "%s/arch/JitCore_%s.h"\n' % (lib_dir, arch.name)

    txt += r'''
#define RAISE(errtype, msg) {PyObject* p; p = PyErr_Format( errtype, msg ); return p;}
'''
    return txt


class myresolver:

    def __init__(self, offset):
        self.offset = offset

    def ret(self):
        return "return PyLong_FromUnsignedLongLong(0x%X);" % self.offset


class resolver:

    def __init__(self):
        self.resolvers = keydefaultdict(myresolver)

    def get_resolver(self, offset):
        return self.resolvers[offset]


class JitCore_Cc_Base(JitCore):
    "JiT management, abstract class using a C compiler as backend"

    def __init__(self, ir_arch, bin_stream):
        self.jitted_block_delete_cb = self.deleteCB
        super(JitCore_Cc_Base, self).__init__(ir_arch, bin_stream)
        self.resolver = resolver()
        self.ir_arch = ir_arch
        self.states = {}
        self.tempdir = os.path.join(tempfile.gettempdir(), "miasm_cache")
        try:
            os.mkdir(self.tempdir, 0755)
        except OSError:
            pass
        if not os.access(self.tempdir, os.R_OK | os.W_OK):
            raise RuntimeError(
                'Cannot access cache directory %s ' % self.tempdir)
        self.exec_wrapper = None
        self.libs = None
        self.include_files = None

    def deleteCB(self, offset):
        raise NotImplementedError()

    def load(self):
        lib_dir = os.path.dirname(os.path.realpath(__file__))
        libs = [os.path.join(lib_dir, 'VmMngr.so'),
                os.path.join(lib_dir,
                             'arch/JitCore_%s.so' % (self.ir_arch.arch.name))]

        include_files = [os.path.dirname(__file__),
                         get_python_inc()]
        self.include_files = include_files
        self.libs = libs

    def init_codegen(self, codegen):
        """
        Get the code generator @codegen
        @codegen: an CGen instance
        """
        self.codegen = codegen

    def gen_c_code(self, block):
        """
        Return the C code corresponding to the @irblocks
        @irblocks: list of irblocks
        """
        f_declaration = 'int %s(block_id * BlockDst, JitCpu* jitcpu)' % self.FUNCNAME
        out = self.codegen.gen_c(block, log_mn=self.log_mn, log_regs=self.log_regs)
        out = [f_declaration + '{'] + out + ['}\n']
        c_code = out

        return self.gen_C_source(self.ir_arch, c_code)

    @staticmethod
    def gen_C_source(ir_arch, func_code):
        raise NotImplementedError()

Functions

def gen_core(

arch, attrib)

def gen_core(arch, attrib):
    lib_dir = os.path.dirname(os.path.realpath(__file__))

    txt = ""
    txt += '#include "%s/queue.h"\n' % lib_dir
    txt += '#include "%s/op_semantics.h"\n' % lib_dir
    txt += '#include "%s/vm_mngr.h"\n' % lib_dir
    txt += '#include "%s/vm_mngr_py.h"\n' % lib_dir
    txt += '#include "%s/bn.h"\n' % lib_dir
    txt += '#include "%s/JitCore.h"\n' % lib_dir
    txt += '#include "%s/arch/JitCore_%s.h"\n' % (lib_dir, arch.name)

    txt += r'''
#define RAISE(errtype, msg) {PyObject* p; p = PyErr_Format( errtype, msg ); return p;}
'''
    return txt

Classes

class JitCore_Cc_Base

JiT management, abstract class using a C compiler as backend

class JitCore_Cc_Base(JitCore):
    "JiT management, abstract class using a C compiler as backend"

    def __init__(self, ir_arch, bin_stream):
        self.jitted_block_delete_cb = self.deleteCB
        super(JitCore_Cc_Base, self).__init__(ir_arch, bin_stream)
        self.resolver = resolver()
        self.ir_arch = ir_arch
        self.states = {}
        self.tempdir = os.path.join(tempfile.gettempdir(), "miasm_cache")
        try:
            os.mkdir(self.tempdir, 0755)
        except OSError:
            pass
        if not os.access(self.tempdir, os.R_OK | os.W_OK):
            raise RuntimeError(
                'Cannot access cache directory %s ' % self.tempdir)
        self.exec_wrapper = None
        self.libs = None
        self.include_files = None

    def deleteCB(self, offset):
        raise NotImplementedError()

    def load(self):
        lib_dir = os.path.dirname(os.path.realpath(__file__))
        libs = [os.path.join(lib_dir, 'VmMngr.so'),
                os.path.join(lib_dir,
                             'arch/JitCore_%s.so' % (self.ir_arch.arch.name))]

        include_files = [os.path.dirname(__file__),
                         get_python_inc()]
        self.include_files = include_files
        self.libs = libs

    def init_codegen(self, codegen):
        """
        Get the code generator @codegen
        @codegen: an CGen instance
        """
        self.codegen = codegen

    def gen_c_code(self, block):
        """
        Return the C code corresponding to the @irblocks
        @irblocks: list of irblocks
        """
        f_declaration = 'int %s(block_id * BlockDst, JitCpu* jitcpu)' % self.FUNCNAME
        out = self.codegen.gen_c(block, log_mn=self.log_mn, log_regs=self.log_regs)
        out = [f_declaration + '{'] + out + ['}\n']
        c_code = out

        return self.gen_C_source(self.ir_arch, c_code)

    @staticmethod
    def gen_C_source(ir_arch, func_code):
        raise NotImplementedError()

Ancestors (in MRO)

Class variables

var FUNCNAME

var jitted_block_max_size

Static methods

def gen_C_source(

ir_arch, func_code)

@staticmethod
def gen_C_source(ir_arch, func_code):
    raise NotImplementedError()

Instance variables

var disasm_cb

var exec_wrapper

var include_files

var ir_arch

var jitted_block_delete_cb

var libs

var resolver

var states

var tempdir

Methods

def __init__(

self, ir_arch, bin_stream)

def __init__(self, ir_arch, bin_stream):
    self.jitted_block_delete_cb = self.deleteCB
    super(JitCore_Cc_Base, self).__init__(ir_arch, bin_stream)
    self.resolver = resolver()
    self.ir_arch = ir_arch
    self.states = {}
    self.tempdir = os.path.join(tempfile.gettempdir(), "miasm_cache")
    try:
        os.mkdir(self.tempdir, 0755)
    except OSError:
        pass
    if not os.access(self.tempdir, os.R_OK | os.W_OK):
        raise RuntimeError(
            'Cannot access cache directory %s ' % self.tempdir)
    self.exec_wrapper = None
    self.libs = None
    self.include_files = None

def add_block(

self, block)

Add a block to JiT and JiT it. @block: asm_bloc to add

def add_block(self, block):
    """Add a block to JiT and JiT it.
    @block: asm_bloc to add
    """
    irblocks = self.ir_arch.add_asmblock_to_ircfg(block, self.ircfg, gen_pc_updt = True)
    block.blocks = irblocks
    self.jit_irblocks(block.loc_key, irblocks)

def add_block_to_mem_interval(

self, vm, block)

Update vm to include block addresses in its memory range

def add_block_to_mem_interval(self, vm, block):
    "Update vm to include block addresses in its memory range"
    self.blocks_mem_interval += interval([(block.ad_min, block.ad_max - 1)])
    vm.reset_code_bloc_pool()
    for a, b in self.blocks_mem_interval:
        vm.add_code_bloc(a, b + 1)

def add_disassembly_splits(

self, *args)

The disassembly engine will stop on address in args if they are not at the block beginning

def add_disassembly_splits(self, *args):
    """The disassembly engine will stop on address in args if they
    are not at the block beginning"""
    self.split_dis.update(set(args))

def blocks_to_memrange(

self, blocks)

Return an interval instance standing for blocks addresses @blocks: list of AsmBlock instances

def blocks_to_memrange(self, blocks):
    """Return an interval instance standing for blocks addresses
    @blocks: list of AsmBlock instances
    """
    mem_range = interval()
    for block in blocks:
        mem_range += interval([(block.ad_min, block.ad_max - 1)])
    return mem_range

def clear_jitted_blocks(

self)

Reset all jitted blocks

def clear_jitted_blocks(self):
    "Reset all jitted blocks"
    self.offset_to_jitted_func.clear()
    self.loc_key_to_block.clear()
    self.blocks_mem_interval = interval()

def del_block_in_range(

self, ad1, ad2)

Find and remove jitted block in range [ad1, ad2]. Return the list of block removed. @ad1: First address @ad2: Last address

def del_block_in_range(self, ad1, ad2):
    """Find and remove jitted block in range [ad1, ad2].
    Return the list of block removed.
    @ad1: First address
    @ad2: Last address
    """
    # Find concerned blocks
    modified_blocks = set()
    for block in self.loc_key_to_block.values():
        if not block.lines:
            continue
        if block.ad_max <= ad1 or block.ad_min >= ad2:
            # Block not modified
            pass
        else:
            # Modified blocks
            modified_blocks.add(block)
    # Generate interval to delete
    del_interval = self.blocks_to_memrange(modified_blocks)
    # Remove interval from monitored interval list
    self.blocks_mem_interval -= del_interval
    # Remove modified blocks
    for block in modified_blocks:
        try:
            for irblock in block.blocks:
                # Remove offset -> jitted block link
                offset = self.ir_arch.loc_db.get_location_offset(irblock.loc_key)
                if offset in self.offset_to_jitted_func:
                    del(self.offset_to_jitted_func[offset])
        except AttributeError:
            # The block has never been translated in IR
            offset = self.ir_arch.loc_db.get_location_offset(block.loc_key)
            if offset in self.offset_to_jitted_func:
                del(self.offset_to_jitted_func[offset])
        # Remove label -> block link
        del(self.loc_key_to_block[block.loc_key])
    return modified_blocks

def deleteCB(

self, offset)

def deleteCB(self, offset):
    raise NotImplementedError()

def disasm_and_jit_block(

self, addr, vm)

Disassemble a new block and JiT it @addr: address of the block to disassemble (LocKey or int) @vm: VmMngr instance

def disasm_and_jit_block(self, addr, vm):
    """Disassemble a new block and JiT it
    @addr: address of the block to disassemble (LocKey or int)
    @vm: VmMngr instance
    """
    # Get the block
    if isinstance(addr, LocKey):
        addr = self.ir_arch.loc_db.get_location_offset(addr)
        if addr is None:
            raise RuntimeError("Unknown offset for LocKey")
    # Prepare disassembler
    self.mdis.lines_wd = self.options["jit_maxline"]
    # Disassemble it
    cur_block = self.mdis.dis_block(addr)
    if isinstance(cur_block, AsmBlockBad):
        return cur_block
    # Logging
    if self.log_newbloc:
        print cur_block.to_string(self.mdis.loc_db)
    # Update label -> block
    self.loc_key_to_block[cur_block.loc_key] = cur_block
    # Store min/max block address needed in jit automod code
    self.set_block_min_max(cur_block)
    # JiT it
    self.add_block(cur_block)
    # Update jitcode mem range
    self.add_block_to_mem_interval(vm, cur_block)
    return cur_block

def gen_c_code(

self, block)

Return the C code corresponding to the @irblocks @irblocks: list of irblocks

def gen_c_code(self, block):
    """
    Return the C code corresponding to the @irblocks
    @irblocks: list of irblocks
    """
    f_declaration = 'int %s(block_id * BlockDst, JitCpu* jitcpu)' % self.FUNCNAME
    out = self.codegen.gen_c(block, log_mn=self.log_mn, log_regs=self.log_regs)
    out = [f_declaration + '{'] + out + ['}\n']
    c_code = out
    return self.gen_C_source(self.ir_arch, c_code)

def hash_block(

self, block)

Build a hash of the block @block @block: asmblock

def hash_block(self, block):
    """
    Build a hash of the block @block
    @block: asmblock
    """
    block_raw = "".join(line.b for line in block.lines)
    offset = self.ir_arch.loc_db.get_location_offset(block.loc_key)
    block_hash = md5("%X_%s_%s_%s_%s" % (offset,
                                         self.arch_name,
                                         self.log_mn,
                                         self.log_regs,
                                         block_raw)).hexdigest()
    return block_hash

def init_codegen(

self, codegen)

Get the code generator @codegen @codegen: an CGen instance

def init_codegen(self, codegen):
    """
    Get the code generator @codegen
    @codegen: an CGen instance
    """
    self.codegen = codegen

def jit_irblocks(

self, label, irblocks)

JiT a group of irblocks. @label: the label of the irblocks @irblocks: a group of irblocks

def jit_irblocks(self, label, irblocks):
    """JiT a group of irblocks.
    @label: the label of the irblocks
    @irblocks: a group of irblocks
    """
    raise NotImplementedError("Abstract class")

def load(

self)

def load(self):
    lib_dir = os.path.dirname(os.path.realpath(__file__))
    libs = [os.path.join(lib_dir, 'VmMngr.so'),
            os.path.join(lib_dir,
                         'arch/JitCore_%s.so' % (self.ir_arch.arch.name))]
    include_files = [os.path.dirname(__file__),
                     get_python_inc()]
    self.include_files = include_files
    self.libs = libs

def remove_disassembly_splits(

self, *args)

The disassembly engine will no longer stop on address in args

def remove_disassembly_splits(self, *args):
    """The disassembly engine will no longer stop on address in args"""
    self.split_dis.difference_update(set(args))

def run_at(

self, cpu, offset, stop_offsets)

Run from the starting address @offset. Execution will stop if: - max_exec_per_call option is reached - a new, yet unknown, block is reached after the execution of block at address @offset - an address in @stop_offsets is reached @cpu: JitCpu instance @offset: starting address (int) @stop_offsets: set of address on which the jitter must stop

def run_at(self, cpu, offset, stop_offsets):
    """Run from the starting address @offset.
    Execution will stop if:
    - max_exec_per_call option is reached
    - a new, yet unknown, block is reached after the execution of block at
      address @offset
    - an address in @stop_offsets is reached
    @cpu: JitCpu instance
    @offset: starting address (int)
    @stop_offsets: set of address on which the jitter must stop
    """
    if offset is None:
        offset = getattr(cpu, self.ir_arch.pc.name)
    if offset not in self.offset_to_jitted_func:
        # Need to JiT the block
        cur_block = self.disasm_and_jit_block(offset, cpu.vmmngr)
        if isinstance(cur_block, AsmBlockBad):
            errno = cur_block.errno
            if errno == AsmBlockBad.ERROR_IO:
                cpu.vmmngr.set_exception(EXCEPT_ACCESS_VIOL)
            elif errno == AsmBlockBad.ERROR_CANNOT_DISASM:
                cpu.set_exception(EXCEPT_UNK_MNEMO)
            else:
                raise RuntimeError("Unhandled disasm result %r" % errno)
            return offset
    # Run the block and update cpu/vmmngr state
    return self.exec_wrapper(offset, cpu, self.offset_to_jitted_func.data,
                             stop_offsets,
                             self.options["max_exec_per_call"])

def set_block_min_max(

self, cur_block)

Update cur_block to set min/max address

def set_block_min_max(self, cur_block):
    "Update cur_block to set min/max address"
    if cur_block.lines:
        cur_block.ad_min = cur_block.lines[0].offset
        cur_block.ad_max = cur_block.lines[-1].offset + cur_block.lines[-1].l
    else:
        # 1 byte block for unknown mnemonic
        offset = ir_arch.loc_db.get_location_offset(cur_block.loc_key)
        cur_block.ad_min = offset
        cur_block.ad_max = offset+1

def set_options(

self, **kwargs)

Set options relative to the backend

def set_options(self, **kwargs):
    "Set options relative to the backend"
    self.options.update(kwargs)

def updt_automod_code(

self, vm)

Remove jitted code updated by memory write @vm: VmMngr instance

def updt_automod_code(self, vm):
    """Remove jitted code updated by memory write
    @vm: VmMngr instance
    """
    mem_range = []
    for addr_start, addr_stop in vm.get_memory_write():
        mem_range.append((addr_start, addr_stop))
    self.updt_automod_code_range(vm, mem_range)

def updt_automod_code_range(

self, vm, mem_range)

Remove jitted code in range @mem_range @vm: VmMngr instance @mem_range: list of start/stop addresses

def updt_automod_code_range(self, vm, mem_range):
    """Remove jitted code in range @mem_range
    @vm: VmMngr instance
    @mem_range: list of start/stop addresses
    """
    for addr_start, addr_stop in mem_range:
        self.del_block_in_range(addr_start, addr_stop)
    self.__updt_jitcode_mem_range(vm)
    vm.reset_memory_access()

class myresolver

class myresolver:

    def __init__(self, offset):
        self.offset = offset

    def ret(self):
        return "return PyLong_FromUnsignedLongLong(0x%X);" % self.offset

Ancestors (in MRO)

Instance variables

var offset

Methods

def __init__(

self, offset)

def __init__(self, offset):
    self.offset = offset

def ret(

self)

def ret(self):
    return "return PyLong_FromUnsignedLongLong(0x%X);" % self.offset

class resolver

class resolver:

    def __init__(self):
        self.resolvers = keydefaultdict(myresolver)

    def get_resolver(self, offset):
        return self.resolvers[offset]

Ancestors (in MRO)

Instance variables

var resolvers

Methods

def __init__(

self)

def __init__(self):
    self.resolvers = keydefaultdict(myresolver)

def get_resolver(

self, offset)

def get_resolver(self, offset):
    return self.resolvers[offset]