import fcntl
import time
import threading
+import collections
from keep import *
import config
import errors
+LOCATOR = 0
+BLOCKSIZE = 1
+OFFSET = 2
+SEGMENTSIZE = 3
+
+def locators_and_ranges(data_locators, range_start, range_size, debug=False):
+ '''
+ Get blocks that are covered by the range
+ data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
+ range_start: start of range
+ range_size: size of range
+ returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
+ '''
+ if range_size == 0:
+ return []
+ resp = []
+ range_start = long(range_start)
+ range_size = long(range_size)
+ range_end = range_start + range_size
+ block_start = 0L
+
+ # range_start/block_start is the inclusive lower bound
+ # range_end/block_end is the exclusive upper bound
+
+ hi = len(data_locators)
+ lo = 0
+ i = int((hi + lo) / 2)
+ block_size = data_locators[i][BLOCKSIZE]
+ block_start = data_locators[i][OFFSET]
+ block_end = block_start + block_size
+ if debug: print '---'
+
+ # perform a binary search for the first block
+ # assumes that all of the blocks are contigious, so range_start is guaranteed
+ # to either fall into the range of a block or be outside the block range entirely
+ while not (range_start >= block_start and range_start < block_end):
+ if lo == i:
+ # must be out of range, fail
+ return []
+ if range_start > block_start:
+ lo = i
+ else:
+ hi = i
+ i = int((hi + lo) / 2)
+ if debug: print lo, i, hi
+ block_size = data_locators[i][BLOCKSIZE]
+ block_start = data_locators[i][OFFSET]
+ block_end = block_start + block_size
+
+ while i < len(data_locators):
+ locator, block_size, block_start = data_locators[i]
+ block_end = block_start + block_size
+ if debug:
+ print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
+ if range_end <= block_start:
+ # range ends before this block starts, so don't look at any more locators
+ break
+
+ #if range_start >= block_end:
+ # range starts after this block ends, so go to next block
+ # we should always start at the first block due to the binary above, so this test is redundant
+ #next
+
+ if range_start >= block_start and range_end <= block_end:
+ # range starts and ends in this block
+ resp.append([locator, block_size, range_start - block_start, range_size])
+ elif range_start >= block_start and range_end > block_end:
+ # range starts in this block
+ resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+ elif range_start < block_start and range_end > block_end:
+ # range starts in a previous block and extends to further blocks
+ resp.append([locator, block_size, 0L, block_size])
+ elif range_start < block_start and range_end <= block_end:
+ # range starts in a previous block and ends in this block
+ resp.append([locator, block_size, 0L, range_end - block_start])
+ block_start = block_end
+ i += 1
+ return resp
+
+
class StreamFileReader(object):
- def __init__(self, stream, pos, size, name):
+ def __init__(self, stream, segments, name):
self._stream = stream
- self._pos = pos
- self._size = size
+ self.segments = segments
self._name = name
- self._filepos = 0
+ self._filepos = 0L
def name(self):
return self._name
def decompressed_name(self):
return re.sub('\.(bz2|gz)$', '', self._name)
- def size(self):
- return self._size
-
def stream_name(self):
return self._stream.name()
- def read(self, size, **kwargs):
- self._stream.seek(self._pos + self._filepos)
- data = self._stream.read(min(size, self._size - self._filepos))
- self._filepos += len(data)
+ def seek(self, pos):
+ self._filepos = min(max(pos, 0L), self.size())
+
+ def tell(self):
+ return self._filepos
+
+ def size(self):
+ n = self.segments[-1]
+ return n[OFFSET] + n[BLOCKSIZE]
+
+ def read(self, size):
+ """Read up to 'size' bytes from the stream, starting at the current file position"""
+ if size == 0:
+ return ''
+
+ data = ''
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
+ data += self._stream.readfrom(locator+segmentoffset, segmentsize)
+ self._filepos += len(data)
+ return data
+
+ def readfrom(self, start, size):
+ """Read up to 'size' bytes from the stream, starting at 'start'"""
+ if size == 0:
+ return ''
+
+ data = ''
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
+ data += self._stream.readfrom(locator+segmentoffset, segmentsize)
return data
- def readall(self, size=2**20, **kwargs):
+ def readall(self, size=2**20):
while True:
- data = self.read(size, **kwargs)
+ data = self.read(size)
if data == '':
break
yield data
- def seek(self, pos):
- self._filepos = pos
-
- def bunzip2(self, size):
- decompressor = bz2.BZ2Decompressor()
- for chunk in self.readall(size):
- data = decompressor.decompress(chunk)
- if data and data != '':
- yield data
-
- def gunzip(self, size):
- decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
- for chunk in self.readall(size):
- data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
+ def decompress(self, decompress, size):
+ for segment in self.readall(size):
+ data = decompress(segment)
if data and data != '':
yield data
def readall_decompressed(self, size=2**20):
- self._stream.seek(self._pos + self._filepos)
+ self.seek(0)
if re.search('\.bz2$', self._name):
- return self.bunzip2(size)
+ dc = bz2.BZ2Decompressor()
+ return self.decompress(lambda segment: dc.decompress(segment), size)
elif re.search('\.gz$', self._name):
- return self.gunzip(size)
+ dc = zlib.decompressobj(16+zlib.MAX_WBITS)
+ return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
else:
return self.readall(size)
if decompress:
datasource = self.readall_decompressed()
else:
- self._stream.seek(self._pos + self._filepos)
datasource = self.readall()
data = ''
for newdata in datasource:
yield data
def as_manifest(self):
- if self.size() == 0:
- return ("%s %s 0:0:%s\n"
- % (self._stream.name(), config.EMPTY_BLOCK_LOCATOR, self.name()))
- return string.join(self._stream.tokens_for_range(self._pos, self._size),
- " ") + "\n"
+ manifest_text = ['.']
+ manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
+ manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
+ return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
class StreamReader(object):
- def __init__(self, tokens):
- self._tokens = tokens
- self._current_datablock_data = None
- self._current_datablock_pos = 0
- self._current_datablock_index = -1
- self._pos = 0
-
+ def __init__(self, tokens, keep=None, debug=False, _empty=False):
self._stream_name = None
- self.data_locators = []
- self.files = []
+ self._data_locators = []
+ self._files = collections.OrderedDict()
+
+ if keep != None:
+ self._keep = keep
+ else:
+ self._keep = Keep.global_client_object()
+
+ streamoffset = 0L
- for tok in self._tokens:
+ # parse stream
+ for tok in tokens:
+ if debug: print 'tok', tok
if self._stream_name == None:
self._stream_name = tok.replace('\\040', ' ')
- elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
- self.data_locators += [tok]
- elif re.search(r'^\d+:\d+:\S+', tok):
- pos, size, name = tok.split(':',2)
- self.files += [[int(pos), int(size), name.replace('\\040', ' ')]]
- else:
- raise errors.SyntaxError("Invalid manifest format")
-
- def tokens(self):
- return self._tokens
-
- def tokens_for_range(self, range_start, range_size):
- resp = [self._stream_name]
- return_all_tokens = False
- block_start = 0
- token_bytes_skipped = 0
- for locator in self.data_locators:
- sizehint = re.search(r'\+(\d+)', locator)
- if not sizehint:
- return_all_tokens = True
- if return_all_tokens:
- resp += [locator]
- next
- blocksize = int(sizehint.group(0))
- if range_start + range_size <= block_start:
- break
- if range_start < block_start + blocksize:
- resp += [locator]
- else:
- token_bytes_skipped += blocksize
- block_start += blocksize
- for f in self.files:
- if ((f[0] < range_start + range_size)
- and
- (f[0] + f[1] > range_start)
- and
- f[1] > 0):
- resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
- return resp
+ continue
+
+ s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
+ if s:
+ blocksize = long(s.group(1))
+ self._data_locators.append([tok, blocksize, streamoffset])
+ streamoffset += blocksize
+ continue
+
+ s = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ if s:
+ pos = long(s.group(1))
+ size = long(s.group(2))
+ name = s.group(3).replace('\\040', ' ')
+ if name not in self._files:
+ self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
+ else:
+ n = self._files[name]
+ n.segments.append([pos, size, n.size()])
+ continue
+
+ raise errors.SyntaxError("Invalid manifest format")
def name(self):
return self._stream_name
+ def files(self):
+ return self._files
+
def all_files(self):
- for f in self.files:
- pos, size, name = f
- yield StreamFileReader(self, pos, size, name)
-
- def nextdatablock(self):
- if self._current_datablock_index < 0:
- self._current_datablock_pos = 0
- self._current_datablock_index = 0
- else:
- self._current_datablock_pos += self.current_datablock_size()
- self._current_datablock_index += 1
- self._current_datablock_data = None
-
- def current_datablock_data(self):
- if self._current_datablock_data == None:
- self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
- return self._current_datablock_data
-
- def current_datablock_size(self):
- if self._current_datablock_index < 0:
- self.nextdatablock()
- sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
- if sizehint:
- return int(sizehint.group(0))
- return len(self.current_datablock_data())
+ return self._files.values()
- def seek(self, pos):
- """Set the position of the next read operation."""
- self._pos = pos
-
- def really_seek(self):
- """Find and load the appropriate data block, so the byte at
- _pos is in memory.
- """
- if self._pos == self._current_datablock_pos:
- return True
- if (self._current_datablock_pos != None and
- self._pos >= self._current_datablock_pos and
- self._pos <= self._current_datablock_pos + self.current_datablock_size()):
- return True
- if self._pos < self._current_datablock_pos:
- self._current_datablock_index = -1
- self.nextdatablock()
- while (self._pos > self._current_datablock_pos and
- self._pos > self._current_datablock_pos + self.current_datablock_size()):
- self.nextdatablock()
+ def size(self):
+ n = self._data_locators[-1]
+ return n[OFFSET] + n[BLOCKSIZE]
- def read(self, size):
- """Read no more than size bytes -- but at least one byte,
- unless _pos is already at the end of the stream.
- """
+ def locators_and_ranges(self, range_start, range_size):
+ return locators_and_ranges(self._data_locators, range_start, range_size)
+
+ def readfrom(self, start, size):
+ """Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
- self.really_seek()
- while self._pos >= self._current_datablock_pos + self.current_datablock_size():
- self.nextdatablock()
- if self._current_datablock_index >= len(self.data_locators):
- return None
- data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
- self._pos += len(data)
+ data = ''
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
+ data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
return data
+
+ def manifest_text(self):
+ manifest_text = [self.name().replace(' ', '\\040')]
+ manifest_text.extend([d[LOCATOR] for d in self._data_locators])
+ manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
+ for seg in f.segments])
+ for f in self._files.values()])
+ return ' '.join(manifest_text) + '\n'