import fcntl
import time
import threading
+import collections
from keep import *
import config
range_end = range_start + range_size
block_start = 0L
+ # range_start/block_start is the inclusive lower bound
+ # range_end/block_end is the exclusive upper bound
+
hi = len(data_locators)
lo = 0
i = int((hi + lo) / 2)
block_start = data_locators[i][OFFSET]
block_end = block_start + block_size
if debug: print '---'
- while not (range_start >= block_start and range_start <= block_end):
+
+ # perform a binary search for the first block
+ # assumes that all of the blocks are contigious, so range_start is guaranteed
+ # to either fall into the range of a block or be outside the block range entirely
+ while not (range_start >= block_start and range_start < block_end):
if lo == i:
- break
+ # must be out of range, fail
+ return []
if range_start > block_start:
lo = i
else:
if range_end <= block_start:
# range ends before this block starts, so don't look at any more locators
break
- if range_start >= block_end:
+
+ #if range_start >= block_end:
# range starts after this block ends, so go to next block
- next
- elif range_start >= block_start and range_end <= block_end:
+ # we should always start at the first block due to the binary above, so this test is redundant
+ #next
+
+ if range_start >= block_start and range_end <= block_end:
# range starts and ends in this block
resp.append([locator, block_size, range_start - block_start, range_size])
elif range_start >= block_start and range_end > block_end:
def seek(self, pos):
self._filepos = min(max(pos, 0L), self.size())
- def tell(self, pos):
+ def tell(self):
return self._filepos
def size(self):
data = ''
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
- self._stream.seek(locator+segmentoffset)
- data += self._stream.read(segmentsize)
+ data += self._stream.readfrom(locator+segmentoffset, segmentsize)
self._filepos += len(data)
return data
if size == 0:
return ''
- data = []
+ data = ''
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- return data.join()
+ return data
def readall(self, size=2**20):
while True:
break
yield data
- def bunzip2(self, size):
- decompressor = bz2.BZ2Decompressor()
- for segment in self.readall(size):
- data = decompressor.decompress(segment)
- if data and data != '':
- yield data
-
- def gunzip(self, size):
- decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
+ def decompress(self, decompress, size):
for segment in self.readall(size):
- data = decompressor.decompress(decompressor.unconsumed_tail + segment)
+ data = decompress(segment)
if data and data != '':
yield data
def readall_decompressed(self, size=2**20):
self.seek(0)
if re.search('\.bz2$', self._name):
- return self.bunzip2(size)
+ dc = bz2.BZ2Decompressor()
+ return self.decompress(lambda segment: dc.decompress(segment), size)
elif re.search('\.gz$', self._name):
- return self.gunzip(size)
+ dc = zlib.decompressobj(16+zlib.MAX_WBITS)
+ return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
else:
return self.readall(size)
if decompress:
datasource = self.readall_decompressed()
else:
- self._stream.seek(self._pos + self._filepos)
datasource = self.readall()
data = ''
for newdata in datasource:
if data != '':
yield data
+ def as_manifest(self):
+ manifest_text = ['.']
+ manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
+ manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
+ return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
class StreamReader(object):
- def __init__(self, tokens):
- self._tokens = tokens
- self._pos = 0L
-
+ def __init__(self, tokens, keep=None, debug=False, _empty=False):
self._stream_name = None
- self.data_locators = []
- self.files = {}
+ self._data_locators = []
+ self._files = collections.OrderedDict()
+ if keep != None:
+ self._keep = keep
+ else:
+ self._keep = Keep.global_client_object()
+
streamoffset = 0L
- for tok in self._tokens:
+ # parse stream
+ for tok in tokens:
+ if debug: print 'tok', tok
if self._stream_name == None:
self._stream_name = tok.replace('\\040', ' ')
continue
s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
if s:
blocksize = long(s.group(1))
- self.data_locators.append([tok, blocksize, streamoffset])
+ self._data_locators.append([tok, blocksize, streamoffset])
streamoffset += blocksize
continue
pos = long(s.group(1))
size = long(s.group(2))
name = s.group(3).replace('\\040', ' ')
- if name not in self.files:
- self.files[name] = StreamFileReader(self, [[pos, size, 0]], name)
+ if name not in self._files:
+ self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
else:
- n = self.files[name]
+ n = self._files[name]
n.segments.append([pos, size, n.size()])
continue
raise errors.SyntaxError("Invalid manifest format")
-
- def tokens(self):
- return self._tokens
def name(self):
return self._stream_name
- def all_files(self):
- return self.files.values()
-
- def seek(self, pos):
- """Set the position of the next read operation."""
- self._pos = pos
+ def files(self):
+ return self._files
- def tell(self):
- return self._pos
+ def all_files(self):
+ return self._files.values()
def size(self):
- n = self.data_locators[-1]
- return n[self.OFFSET] + n[self.BLOCKSIZE]
+ n = self._data_locators[-1]
+ return n[OFFSET] + n[BLOCKSIZE]
def locators_and_ranges(self, range_start, range_size):
- return locators_and_ranges(self.data_locators, range_start, range_size)
-
- def read(self, size):
- """Read up to 'size' bytes from the stream, starting at the current file position"""
- if size == 0:
- return ''
- data = ''
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.data_locators, self._pos, size):
- data += Keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
- self._pos += len(data)
- return data
+ return locators_and_ranges(self._data_locators, range_start, range_size)
def readfrom(self, start, size):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
data = ''
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.data_locators, start, size):
- data += Keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
+ data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
return data
+
+ def manifest_text(self):
+ manifest_text = [self.name().replace(' ', '\\040')]
+ manifest_text.extend([d[LOCATOR] for d in self._data_locators])
+ manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
+ for seg in f.segments])
+ for f in self._files.values()])
+ return ' '.join(manifest_text) + '\n'