miasm2.core.bin_stream module
# # Copyright (C) 2011 EADS France, Fabrice Desclaux <fabrice.desclaux@eads.net> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # class bin_stream(object): # Cache must be initialized by entering atomic mode _cache = None CACHE_SIZE = 10000 # By default, no atomic mode _atomic_mode = False def __init__(self, *args, **kargs): pass def __repr__(self): return "<%s !!>" % self.__class__.__name__ def hexdump(self, offset, l): return def enter_atomic_mode(self): """Enter atomic mode. In this mode, read may be cached""" assert not self._atomic_mode self._atomic_mode = True self._cache = {} def leave_atomic_mode(self): """Leave atomic mode""" assert self._atomic_mode self._atomic_mode = False self._cache = None def _getbytes(self, start, length): return self.bin[start:start + length] def getbytes(self, start, l=1): """Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read Wrapper on _getbytes, with atomic mode handling. """ if self._atomic_mode: val = self._cache.get((start,l), None) if val is None: val = self._getbytes(start, l) self._cache[(start,l)] = val else: val = self._getbytes(start, l) return val def getbits(self, start, n): """Return the bits from the bit stream @start: the offset in bits @n: number of bits to read """ # Trivial case if n == 0: return 0 # Get initial bytes if n > self.getlen() * 8: raise IOError('not enough bits %r %r' % (n, len(self.bin) * 8)) byte_start = start / 8 byte_stop = (start + n + 7) / 8 temp = self.getbytes(byte_start, byte_stop - byte_start) if not temp: raise IOError('cannot get bytes') # Init start = start % 8 out = 0 while n: # Get needed bits, working on maximum 8 bits at a time cur_byte_idx = start / 8 new_bits = ord(temp[cur_byte_idx]) to_keep = 8 - start % 8 new_bits &= (1 << to_keep) - 1 cur_len = min(to_keep, n) new_bits >>= (to_keep - cur_len) # Update output out <<= cur_len out |= new_bits # Update counters n -= cur_len start += cur_len return out class bin_stream_str(bin_stream): def __init__(self, input_str="", offset=0L, shift=0): bin_stream.__init__(self) self.bin = input_str self.offset = offset self.shift = shift self.l = len(input_str) def _getbytes(self, start, l=1): if start + l + self.shift > self.l: raise IOError("not enough bytes in str") return super(bin_stream_str, self)._getbytes(start + self.shift, l) def readbs(self, l=1): if self.offset + l + self.shift > self.l: raise IOError("not enough bytes in str") self.offset += l return self.bin[self.offset - l + self.shift:self.offset + self.shift] def __str__(self): out = self.bin[self.offset + self.shift:] return out def setoffset(self, val): self.offset = val def getlen(self): return self.l - (self.offset + self.shift) class bin_stream_file(bin_stream): def __init__(self, binary, offset=0L, shift=0): bin_stream.__init__(self) self.bin = binary self.bin.seek(0, 2) self.shift = shift self.l = self.bin.tell() self.offset = offset def getoffset(self): return self.bin.tell() - self.shift def setoffset(self, val): self.bin.seek(val + self.shift) offset = property(getoffset, setoffset) def readbs(self, l=1): if self.offset + l + self.shift > self.l: raise IOError("not enough bytes in file") return self.bin.read(l) def __str__(self): return str(self.bin) def getlen(self): return self.l - (self.offset + self.shift) class bin_stream_container(bin_stream): def __init__(self, virt_view, offset=0L): bin_stream.__init__(self) self.bin = virt_view self.l = virt_view.max_addr() self.offset = offset def is_addr_in(self, ad): return self.bin.is_addr_in(ad) def getlen(self): return self.l def readbs(self, l=1): if self.offset + l > self.l: raise IOError("not enough bytes") self.offset += l return self.bin.get(self.offset - l, self.offset) def _getbytes(self, start, l=1): try: return self.bin.get(start, start + l) except ValueError: raise IOError("cannot get bytes") def __str__(self): out = self.bin.get(self.offset, self.offset + self.l) return out def setoffset(self, val): self.offset = val class bin_stream_pe(bin_stream_container): pass class bin_stream_elf(bin_stream_container): pass class bin_stream_vm(bin_stream): def __init__(self, vm, offset=0L, base_offset=0L): self.offset = offset self.base_offset = base_offset self.vm = vm def getlen(self): return 0xFFFFFFFFFFFFFFFF def _getbytes(self, start, l=1): try: s = self.vm.get_mem(start + self.base_offset, l) except: raise IOError('cannot get mem ad', hex(start)) return s def readbs(self, l=1): try: s = self.vm.get_mem(self.offset + self.base_offset, l) except: raise IOError('cannot get mem ad', hex(self.offset)) self.offset += l return s def setoffset(self, val): self.offset = val
Classes
class bin_stream
class bin_stream(object): # Cache must be initialized by entering atomic mode _cache = None CACHE_SIZE = 10000 # By default, no atomic mode _atomic_mode = False def __init__(self, *args, **kargs): pass def __repr__(self): return "<%s !!>" % self.__class__.__name__ def hexdump(self, offset, l): return def enter_atomic_mode(self): """Enter atomic mode. In this mode, read may be cached""" assert not self._atomic_mode self._atomic_mode = True self._cache = {} def leave_atomic_mode(self): """Leave atomic mode""" assert self._atomic_mode self._atomic_mode = False self._cache = None def _getbytes(self, start, length): return self.bin[start:start + length] def getbytes(self, start, l=1): """Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read Wrapper on _getbytes, with atomic mode handling. """ if self._atomic_mode: val = self._cache.get((start,l), None) if val is None: val = self._getbytes(start, l) self._cache[(start,l)] = val else: val = self._getbytes(start, l) return val def getbits(self, start, n): """Return the bits from the bit stream @start: the offset in bits @n: number of bits to read """ # Trivial case if n == 0: return 0 # Get initial bytes if n > self.getlen() * 8: raise IOError('not enough bits %r %r' % (n, len(self.bin) * 8)) byte_start = start / 8 byte_stop = (start + n + 7) / 8 temp = self.getbytes(byte_start, byte_stop - byte_start) if not temp: raise IOError('cannot get bytes') # Init start = start % 8 out = 0 while n: # Get needed bits, working on maximum 8 bits at a time cur_byte_idx = start / 8 new_bits = ord(temp[cur_byte_idx]) to_keep = 8 - start % 8 new_bits &= (1 << to_keep) - 1 cur_len = min(to_keep, n) new_bits >>= (to_keep - cur_len) # Update output out <<= cur_len out |= new_bits # Update counters n -= cur_len start += cur_len return out
Ancestors (in MRO)
- bin_stream
- __builtin__.object
Class variables
var CACHE_SIZE
Methods
def __init__(
self, *args, **kargs)
def __init__(self, *args, **kargs): pass
def enter_atomic_mode(
self)
Enter atomic mode. In this mode, read may be cached
def enter_atomic_mode(self): """Enter atomic mode. In this mode, read may be cached""" assert not self._atomic_mode self._atomic_mode = True self._cache = {}
def getbits(
self, start, n)
Return the bits from the bit stream @start: the offset in bits @n: number of bits to read
def getbits(self, start, n): """Return the bits from the bit stream @start: the offset in bits @n: number of bits to read """ # Trivial case if n == 0: return 0 # Get initial bytes if n > self.getlen() * 8: raise IOError('not enough bits %r %r' % (n, len(self.bin) * 8)) byte_start = start / 8 byte_stop = (start + n + 7) / 8 temp = self.getbytes(byte_start, byte_stop - byte_start) if not temp: raise IOError('cannot get bytes') # Init start = start % 8 out = 0 while n: # Get needed bits, working on maximum 8 bits at a time cur_byte_idx = start / 8 new_bits = ord(temp[cur_byte_idx]) to_keep = 8 - start % 8 new_bits &= (1 << to_keep) - 1 cur_len = min(to_keep, n) new_bits >>= (to_keep - cur_len) # Update output out <<= cur_len out |= new_bits # Update counters n -= cur_len start += cur_len return out
def getbytes(
self, start, l=1)
Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read
Wrapper on _getbytes, with atomic mode handling.
def getbytes(self, start, l=1): """Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read Wrapper on _getbytes, with atomic mode handling. """ if self._atomic_mode: val = self._cache.get((start,l), None) if val is None: val = self._getbytes(start, l) self._cache[(start,l)] = val else: val = self._getbytes(start, l) return val
def hexdump(
self, offset, l)
def hexdump(self, offset, l): return
def leave_atomic_mode(
self)
Leave atomic mode
def leave_atomic_mode(self): """Leave atomic mode""" assert self._atomic_mode self._atomic_mode = False self._cache = None
class bin_stream_container
class bin_stream_container(bin_stream): def __init__(self, virt_view, offset=0L): bin_stream.__init__(self) self.bin = virt_view self.l = virt_view.max_addr() self.offset = offset def is_addr_in(self, ad): return self.bin.is_addr_in(ad) def getlen(self): return self.l def readbs(self, l=1): if self.offset + l > self.l: raise IOError("not enough bytes") self.offset += l return self.bin.get(self.offset - l, self.offset) def _getbytes(self, start, l=1): try: return self.bin.get(start, start + l) except ValueError: raise IOError("cannot get bytes") def __str__(self): out = self.bin.get(self.offset, self.offset + self.l) return out def setoffset(self, val): self.offset = val
Ancestors (in MRO)
- bin_stream_container
- bin_stream
- __builtin__.object
Class variables
Instance variables
var bin
var l
var offset
Methods
def __init__(
self, virt_view, offset=0L)
Inheritance:
bin_stream
.__init__
def __init__(self, virt_view, offset=0L): bin_stream.__init__(self) self.bin = virt_view self.l = virt_view.max_addr() self.offset = offset
def enter_atomic_mode(
self)
Inheritance:
bin_stream
.enter_atomic_mode
Enter atomic mode. In this mode, read may be cached
def enter_atomic_mode(self): """Enter atomic mode. In this mode, read may be cached""" assert not self._atomic_mode self._atomic_mode = True self._cache = {}
def getbits(
self, start, n)
Inheritance:
bin_stream
.getbits
Return the bits from the bit stream @start: the offset in bits @n: number of bits to read
def getbits(self, start, n): """Return the bits from the bit stream @start: the offset in bits @n: number of bits to read """ # Trivial case if n == 0: return 0 # Get initial bytes if n > self.getlen() * 8: raise IOError('not enough bits %r %r' % (n, len(self.bin) * 8)) byte_start = start / 8 byte_stop = (start + n + 7) / 8 temp = self.getbytes(byte_start, byte_stop - byte_start) if not temp: raise IOError('cannot get bytes') # Init start = start % 8 out = 0 while n: # Get needed bits, working on maximum 8 bits at a time cur_byte_idx = start / 8 new_bits = ord(temp[cur_byte_idx]) to_keep = 8 - start % 8 new_bits &= (1 << to_keep) - 1 cur_len = min(to_keep, n) new_bits >>= (to_keep - cur_len) # Update output out <<= cur_len out |= new_bits # Update counters n -= cur_len start += cur_len return out
def getbytes(
self, start, l=1)
Inheritance:
bin_stream
.getbytes
Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read
Wrapper on _getbytes, with atomic mode handling.
def getbytes(self, start, l=1): """Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read Wrapper on _getbytes, with atomic mode handling. """ if self._atomic_mode: val = self._cache.get((start,l), None) if val is None: val = self._getbytes(start, l) self._cache[(start,l)] = val else: val = self._getbytes(start, l) return val
def getlen(
self)
def getlen(self): return self.l
def is_addr_in(
self, ad)
def is_addr_in(self, ad): return self.bin.is_addr_in(ad)
def leave_atomic_mode(
self)
Inheritance:
bin_stream
.leave_atomic_mode
Leave atomic mode
def leave_atomic_mode(self): """Leave atomic mode""" assert self._atomic_mode self._atomic_mode = False self._cache = None
def readbs(
self, l=1)
def readbs(self, l=1): if self.offset + l > self.l: raise IOError("not enough bytes") self.offset += l return self.bin.get(self.offset - l, self.offset)
def setoffset(
self, val)
def setoffset(self, val): self.offset = val
class bin_stream_elf
class bin_stream_elf(bin_stream_container): pass
Ancestors (in MRO)
- bin_stream_elf
- bin_stream_container
- bin_stream
- __builtin__.object
Class variables
Instance variables
Methods
def __init__(
self, virt_view, offset=0L)
Inheritance:
bin_stream_container
.__init__
def __init__(self, virt_view, offset=0L): bin_stream.__init__(self) self.bin = virt_view self.l = virt_view.max_addr() self.offset = offset
def enter_atomic_mode(
self)
Inheritance:
bin_stream_container
.enter_atomic_mode
Enter atomic mode. In this mode, read may be cached
def enter_atomic_mode(self): """Enter atomic mode. In this mode, read may be cached""" assert not self._atomic_mode self._atomic_mode = True self._cache = {}
def getbits(
self, start, n)
Inheritance:
bin_stream_container
.getbits
Return the bits from the bit stream @start: the offset in bits @n: number of bits to read
def getbits(self, start, n): """Return the bits from the bit stream @start: the offset in bits @n: number of bits to read """ # Trivial case if n == 0: return 0 # Get initial bytes if n > self.getlen() * 8: raise IOError('not enough bits %r %r' % (n, len(self.bin) * 8)) byte_start = start / 8 byte_stop = (start + n + 7) / 8 temp = self.getbytes(byte_start, byte_stop - byte_start) if not temp: raise IOError('cannot get bytes') # Init start = start % 8 out = 0 while n: # Get needed bits, working on maximum 8 bits at a time cur_byte_idx = start / 8 new_bits = ord(temp[cur_byte_idx]) to_keep = 8 - start % 8 new_bits &= (1 << to_keep) - 1 cur_len = min(to_keep, n) new_bits >>= (to_keep - cur_len) # Update output out <<= cur_len out |= new_bits # Update counters n -= cur_len start += cur_len return out
def getbytes(
self, start, l=1)
Inheritance:
bin_stream_container
.getbytes
Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read
Wrapper on _getbytes, with atomic mode handling.
def getbytes(self, start, l=1): """Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read Wrapper on _getbytes, with atomic mode handling. """ if self._atomic_mode: val = self._cache.get((start,l), None) if val is None: val = self._getbytes(start, l) self._cache[(start,l)] = val else: val = self._getbytes(start, l) return val
def hexdump(
self, offset, l)
Inheritance:
bin_stream_container
.hexdump
def hexdump(self, offset, l): return
def is_addr_in(
self, ad)
Inheritance:
bin_stream_container
.is_addr_in
def is_addr_in(self, ad): return self.bin.is_addr_in(ad)
def leave_atomic_mode(
self)
Inheritance:
bin_stream_container
.leave_atomic_mode
Leave atomic mode
def leave_atomic_mode(self): """Leave atomic mode""" assert self._atomic_mode self._atomic_mode = False self._cache = None
def readbs(
self, l=1)
Inheritance:
bin_stream_container
.readbs
def readbs(self, l=1): if self.offset + l > self.l: raise IOError("not enough bytes") self.offset += l return self.bin.get(self.offset - l, self.offset)
def setoffset(
self, val)
Inheritance:
bin_stream_container
.setoffset
def setoffset(self, val): self.offset = val
class bin_stream_file
class bin_stream_file(bin_stream): def __init__(self, binary, offset=0L, shift=0): bin_stream.__init__(self) self.bin = binary self.bin.seek(0, 2) self.shift = shift self.l = self.bin.tell() self.offset = offset def getoffset(self): return self.bin.tell() - self.shift def setoffset(self, val): self.bin.seek(val + self.shift) offset = property(getoffset, setoffset) def readbs(self, l=1): if self.offset + l + self.shift > self.l: raise IOError("not enough bytes in file") return self.bin.read(l) def __str__(self): return str(self.bin) def getlen(self): return self.l - (self.offset + self.shift)
Ancestors (in MRO)
- bin_stream_file
- bin_stream
- __builtin__.object
Class variables
var offset
Instance variables
var bin
var l
var offset
var shift
Methods
def __init__(
self, binary, offset=0L, shift=0)
Inheritance:
bin_stream
.__init__
def __init__(self, binary, offset=0L, shift=0): bin_stream.__init__(self) self.bin = binary self.bin.seek(0, 2) self.shift = shift self.l = self.bin.tell() self.offset = offset
def enter_atomic_mode(
self)
Inheritance:
bin_stream
.enter_atomic_mode
Enter atomic mode. In this mode, read may be cached
def enter_atomic_mode(self): """Enter atomic mode. In this mode, read may be cached""" assert not self._atomic_mode self._atomic_mode = True self._cache = {}
def getbits(
self, start, n)
Inheritance:
bin_stream
.getbits
Return the bits from the bit stream @start: the offset in bits @n: number of bits to read
def getbits(self, start, n): """Return the bits from the bit stream @start: the offset in bits @n: number of bits to read """ # Trivial case if n == 0: return 0 # Get initial bytes if n > self.getlen() * 8: raise IOError('not enough bits %r %r' % (n, len(self.bin) * 8)) byte_start = start / 8 byte_stop = (start + n + 7) / 8 temp = self.getbytes(byte_start, byte_stop - byte_start) if not temp: raise IOError('cannot get bytes') # Init start = start % 8 out = 0 while n: # Get needed bits, working on maximum 8 bits at a time cur_byte_idx = start / 8 new_bits = ord(temp[cur_byte_idx]) to_keep = 8 - start % 8 new_bits &= (1 << to_keep) - 1 cur_len = min(to_keep, n) new_bits >>= (to_keep - cur_len) # Update output out <<= cur_len out |= new_bits # Update counters n -= cur_len start += cur_len return out
def getbytes(
self, start, l=1)
Inheritance:
bin_stream
.getbytes
Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read
Wrapper on _getbytes, with atomic mode handling.
def getbytes(self, start, l=1): """Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read Wrapper on _getbytes, with atomic mode handling. """ if self._atomic_mode: val = self._cache.get((start,l), None) if val is None: val = self._getbytes(start, l) self._cache[(start,l)] = val else: val = self._getbytes(start, l) return val
def getlen(
self)
def getlen(self): return self.l - (self.offset + self.shift)
def getoffset(
self)
def getoffset(self): return self.bin.tell() - self.shift
def leave_atomic_mode(
self)
Inheritance:
bin_stream
.leave_atomic_mode
Leave atomic mode
def leave_atomic_mode(self): """Leave atomic mode""" assert self._atomic_mode self._atomic_mode = False self._cache = None
def readbs(
self, l=1)
def readbs(self, l=1): if self.offset + l + self.shift > self.l: raise IOError("not enough bytes in file") return self.bin.read(l)
def setoffset(
self, val)
def setoffset(self, val): self.bin.seek(val + self.shift)
class bin_stream_pe
class bin_stream_pe(bin_stream_container): pass
Ancestors (in MRO)
- bin_stream_pe
- bin_stream_container
- bin_stream
- __builtin__.object
Class variables
Instance variables
Methods
def __init__(
self, virt_view, offset=0L)
Inheritance:
bin_stream_container
.__init__
def __init__(self, virt_view, offset=0L): bin_stream.__init__(self) self.bin = virt_view self.l = virt_view.max_addr() self.offset = offset
def enter_atomic_mode(
self)
Inheritance:
bin_stream_container
.enter_atomic_mode
Enter atomic mode. In this mode, read may be cached
def enter_atomic_mode(self): """Enter atomic mode. In this mode, read may be cached""" assert not self._atomic_mode self._atomic_mode = True self._cache = {}
def getbits(
self, start, n)
Inheritance:
bin_stream_container
.getbits
Return the bits from the bit stream @start: the offset in bits @n: number of bits to read
def getbits(self, start, n): """Return the bits from the bit stream @start: the offset in bits @n: number of bits to read """ # Trivial case if n == 0: return 0 # Get initial bytes if n > self.getlen() * 8: raise IOError('not enough bits %r %r' % (n, len(self.bin) * 8)) byte_start = start / 8 byte_stop = (start + n + 7) / 8 temp = self.getbytes(byte_start, byte_stop - byte_start) if not temp: raise IOError('cannot get bytes') # Init start = start % 8 out = 0 while n: # Get needed bits, working on maximum 8 bits at a time cur_byte_idx = start / 8 new_bits = ord(temp[cur_byte_idx]) to_keep = 8 - start % 8 new_bits &= (1 << to_keep) - 1 cur_len = min(to_keep, n) new_bits >>= (to_keep - cur_len) # Update output out <<= cur_len out |= new_bits # Update counters n -= cur_len start += cur_len return out
def getbytes(
self, start, l=1)
Inheritance:
bin_stream_container
.getbytes
Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read
Wrapper on _getbytes, with atomic mode handling.
def getbytes(self, start, l=1): """Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read Wrapper on _getbytes, with atomic mode handling. """ if self._atomic_mode: val = self._cache.get((start,l), None) if val is None: val = self._getbytes(start, l) self._cache[(start,l)] = val else: val = self._getbytes(start, l) return val
def hexdump(
self, offset, l)
Inheritance:
bin_stream_container
.hexdump
def hexdump(self, offset, l): return
def is_addr_in(
self, ad)
Inheritance:
bin_stream_container
.is_addr_in
def is_addr_in(self, ad): return self.bin.is_addr_in(ad)
def leave_atomic_mode(
self)
Inheritance:
bin_stream_container
.leave_atomic_mode
Leave atomic mode
def leave_atomic_mode(self): """Leave atomic mode""" assert self._atomic_mode self._atomic_mode = False self._cache = None
def readbs(
self, l=1)
Inheritance:
bin_stream_container
.readbs
def readbs(self, l=1): if self.offset + l > self.l: raise IOError("not enough bytes") self.offset += l return self.bin.get(self.offset - l, self.offset)
def setoffset(
self, val)
Inheritance:
bin_stream_container
.setoffset
def setoffset(self, val): self.offset = val
class bin_stream_str
class bin_stream_str(bin_stream): def __init__(self, input_str="", offset=0L, shift=0): bin_stream.__init__(self) self.bin = input_str self.offset = offset self.shift = shift self.l = len(input_str) def _getbytes(self, start, l=1): if start + l + self.shift > self.l: raise IOError("not enough bytes in str") return super(bin_stream_str, self)._getbytes(start + self.shift, l) def readbs(self, l=1): if self.offset + l + self.shift > self.l: raise IOError("not enough bytes in str") self.offset += l return self.bin[self.offset - l + self.shift:self.offset + self.shift] def __str__(self): out = self.bin[self.offset + self.shift:] return out def setoffset(self, val): self.offset = val def getlen(self): return self.l - (self.offset + self.shift)
Ancestors (in MRO)
- bin_stream_str
- bin_stream
- __builtin__.object
Class variables
Instance variables
var bin
var l
var offset
var shift
Methods
def __init__(
self, input_str='', offset=0L, shift=0)
Inheritance:
bin_stream
.__init__
def __init__(self, input_str="", offset=0L, shift=0): bin_stream.__init__(self) self.bin = input_str self.offset = offset self.shift = shift self.l = len(input_str)
def enter_atomic_mode(
self)
Inheritance:
bin_stream
.enter_atomic_mode
Enter atomic mode. In this mode, read may be cached
def enter_atomic_mode(self): """Enter atomic mode. In this mode, read may be cached""" assert not self._atomic_mode self._atomic_mode = True self._cache = {}
def getbits(
self, start, n)
Inheritance:
bin_stream
.getbits
Return the bits from the bit stream @start: the offset in bits @n: number of bits to read
def getbits(self, start, n): """Return the bits from the bit stream @start: the offset in bits @n: number of bits to read """ # Trivial case if n == 0: return 0 # Get initial bytes if n > self.getlen() * 8: raise IOError('not enough bits %r %r' % (n, len(self.bin) * 8)) byte_start = start / 8 byte_stop = (start + n + 7) / 8 temp = self.getbytes(byte_start, byte_stop - byte_start) if not temp: raise IOError('cannot get bytes') # Init start = start % 8 out = 0 while n: # Get needed bits, working on maximum 8 bits at a time cur_byte_idx = start / 8 new_bits = ord(temp[cur_byte_idx]) to_keep = 8 - start % 8 new_bits &= (1 << to_keep) - 1 cur_len = min(to_keep, n) new_bits >>= (to_keep - cur_len) # Update output out <<= cur_len out |= new_bits # Update counters n -= cur_len start += cur_len return out
def getbytes(
self, start, l=1)
Inheritance:
bin_stream
.getbytes
Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read
Wrapper on _getbytes, with atomic mode handling.
def getbytes(self, start, l=1): """Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read Wrapper on _getbytes, with atomic mode handling. """ if self._atomic_mode: val = self._cache.get((start,l), None) if val is None: val = self._getbytes(start, l) self._cache[(start,l)] = val else: val = self._getbytes(start, l) return val
def getlen(
self)
def getlen(self): return self.l - (self.offset + self.shift)
def leave_atomic_mode(
self)
Inheritance:
bin_stream
.leave_atomic_mode
Leave atomic mode
def leave_atomic_mode(self): """Leave atomic mode""" assert self._atomic_mode self._atomic_mode = False self._cache = None
def readbs(
self, l=1)
def readbs(self, l=1): if self.offset + l + self.shift > self.l: raise IOError("not enough bytes in str") self.offset += l return self.bin[self.offset - l + self.shift:self.offset + self.shift]
def setoffset(
self, val)
def setoffset(self, val): self.offset = val
class bin_stream_vm
class bin_stream_vm(bin_stream): def __init__(self, vm, offset=0L, base_offset=0L): self.offset = offset self.base_offset = base_offset self.vm = vm def getlen(self): return 0xFFFFFFFFFFFFFFFF def _getbytes(self, start, l=1): try: s = self.vm.get_mem(start + self.base_offset, l) except: raise IOError('cannot get mem ad', hex(start)) return s def readbs(self, l=1): try: s = self.vm.get_mem(self.offset + self.base_offset, l) except: raise IOError('cannot get mem ad', hex(self.offset)) self.offset += l return s def setoffset(self, val): self.offset = val
Ancestors (in MRO)
- bin_stream_vm
- bin_stream
- __builtin__.object
Class variables
Instance variables
var base_offset
var offset
var vm
Methods
def __init__(
self, vm, offset=0L, base_offset=0L)
Inheritance:
bin_stream
.__init__
def __init__(self, vm, offset=0L, base_offset=0L): self.offset = offset self.base_offset = base_offset self.vm = vm
def enter_atomic_mode(
self)
Inheritance:
bin_stream
.enter_atomic_mode
Enter atomic mode. In this mode, read may be cached
def enter_atomic_mode(self): """Enter atomic mode. In this mode, read may be cached""" assert not self._atomic_mode self._atomic_mode = True self._cache = {}
def getbits(
self, start, n)
Inheritance:
bin_stream
.getbits
Return the bits from the bit stream @start: the offset in bits @n: number of bits to read
def getbits(self, start, n): """Return the bits from the bit stream @start: the offset in bits @n: number of bits to read """ # Trivial case if n == 0: return 0 # Get initial bytes if n > self.getlen() * 8: raise IOError('not enough bits %r %r' % (n, len(self.bin) * 8)) byte_start = start / 8 byte_stop = (start + n + 7) / 8 temp = self.getbytes(byte_start, byte_stop - byte_start) if not temp: raise IOError('cannot get bytes') # Init start = start % 8 out = 0 while n: # Get needed bits, working on maximum 8 bits at a time cur_byte_idx = start / 8 new_bits = ord(temp[cur_byte_idx]) to_keep = 8 - start % 8 new_bits &= (1 << to_keep) - 1 cur_len = min(to_keep, n) new_bits >>= (to_keep - cur_len) # Update output out <<= cur_len out |= new_bits # Update counters n -= cur_len start += cur_len return out
def getbytes(
self, start, l=1)
Inheritance:
bin_stream
.getbytes
Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read
Wrapper on _getbytes, with atomic mode handling.
def getbytes(self, start, l=1): """Return the bytes from the bit stream @start: starting offset (in byte) @l: (optional) number of bytes to read Wrapper on _getbytes, with atomic mode handling. """ if self._atomic_mode: val = self._cache.get((start,l), None) if val is None: val = self._getbytes(start, l) self._cache[(start,l)] = val else: val = self._getbytes(start, l) return val
def getlen(
self)
def getlen(self): return 0xFFFFFFFFFFFFFFFF
def leave_atomic_mode(
self)
Inheritance:
bin_stream
.leave_atomic_mode
Leave atomic mode
def leave_atomic_mode(self): """Leave atomic mode""" assert self._atomic_mode self._atomic_mode = False self._cache = None
def readbs(
self, l=1)
def readbs(self, l=1): try: s = self.vm.get_mem(self.offset + self.base_offset, l) except: raise IOError('cannot get mem ad', hex(self.offset)) self.offset += l return s
def setoffset(
self, val)
def setoffset(self, val): self.offset = val