-import gflags
-import httplib
-import httplib2
+import collections
+import hashlib
import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
import threading
-import collections
+import functools
+import copy
+from ._ranges import locators_and_ranges, Range
+from .arvfile import StreamFileReader
+from arvados.retry import retry_method
from keep import *
import config
import errors
-
-LOCATOR = 0
-BLOCKSIZE = 1
-OFFSET = 2
-SEGMENTSIZE = 3
-
-def locators_and_ranges(data_locators, range_start, range_size, debug=False):
- '''
- Get blocks that are covered by the range
- data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
- range_start: start of range
- range_size: size of range
- returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
- '''
- if range_size == 0:
- return []
- resp = []
- range_start = long(range_start)
- range_size = long(range_size)
- range_end = range_start + range_size
- block_start = 0L
-
- # range_start/block_start is the inclusive lower bound
- # range_end/block_end is the exclusive upper bound
-
- hi = len(data_locators)
- lo = 0
- i = int((hi + lo) / 2)
- block_size = data_locators[i][BLOCKSIZE]
- block_start = data_locators[i][OFFSET]
- block_end = block_start + block_size
- if debug: print '---'
-
- # perform a binary search for the first block
- # assumes that all of the blocks are contigious, so range_start is guaranteed
- # to either fall into the range of a block or be outside the block range entirely
- while not (range_start >= block_start and range_start < block_end):
- if lo == i:
- # must be out of range, fail
- return []
- if range_start > block_start:
- lo = i
- else:
- hi = i
- i = int((hi + lo) / 2)
- if debug: print lo, i, hi
- block_size = data_locators[i][BLOCKSIZE]
- block_start = data_locators[i][OFFSET]
- block_end = block_start + block_size
-
- while i < len(data_locators):
- locator, block_size, block_start = data_locators[i]
- block_end = block_start + block_size
- if debug:
- print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
- if range_end <= block_start:
- # range ends before this block starts, so don't look at any more locators
- break
-
- #if range_start >= block_end:
- # range starts after this block ends, so go to next block
- # we should always start at the first block due to the binary above, so this test is redundant
- #next
-
- if range_start >= block_start and range_end <= block_end:
- # range starts and ends in this block
- resp.append([locator, block_size, range_start - block_start, range_size])
- elif range_start >= block_start and range_end > block_end:
- # range starts in this block
- resp.append([locator, block_size, range_start - block_start, block_end - range_start])
- elif range_start < block_start and range_end > block_end:
- # range starts in a previous block and extends to further blocks
- resp.append([locator, block_size, 0L, block_size])
- elif range_start < block_start and range_end <= block_end:
- # range starts in a previous block and ends in this block
- resp.append([locator, block_size, 0L, range_end - block_start])
- block_start = block_end
- i += 1
- return resp
-
-
-class StreamFileReader(object):
- def __init__(self, stream, segments, name):
- self._stream = stream
- self.segments = segments
- self._name = name
- self._filepos = 0L
-
- def name(self):
- return self._name
-
- def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self._name)
-
- def stream_name(self):
- return self._stream.name()
-
- def seek(self, pos):
- self._filepos = min(max(pos, 0L), self.size())
-
- def tell(self):
- return self._filepos
-
- def size(self):
- n = self.segments[-1]
- return n[OFFSET] + n[BLOCKSIZE]
-
- def read(self, size):
- """Read up to 'size' bytes from the stream, starting at the current file position"""
- if size == 0:
- return ''
-
- data = ''
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
- data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- self._filepos += len(data)
- return data
-
- def readfrom(self, start, size):
- """Read up to 'size' bytes from the stream, starting at 'start'"""
- if size == 0:
- return ''
-
- data = ''
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- return data
-
- def readall(self, size=2**20):
- while True:
- data = self.read(size)
- if data == '':
- break
- yield data
-
- def decompress(self, decompress, size):
- for segment in self.readall(size):
- data = decompress(segment)
- if data and data != '':
- yield data
-
- def readall_decompressed(self, size=2**20):
- self.seek(0)
- if re.search('\.bz2$', self._name):
- dc = bz2.BZ2Decompressor()
- return self.decompress(lambda segment: dc.decompress(segment), size)
- elif re.search('\.gz$', self._name):
- dc = zlib.decompressobj(16+zlib.MAX_WBITS)
- return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
- else:
- return self.readall(size)
-
- def readlines(self, decompress=True):
- if decompress:
- datasource = self.readall_decompressed()
- else:
- datasource = self.readall()
- data = ''
- for newdata in datasource:
- data += newdata
- sol = 0
- while True:
- eol = string.find(data, "\n", sol)
- if eol < 0:
- break
- yield data[sol:eol+1]
- sol = eol+1
- data = data[sol:]
- if data != '':
- yield data
-
- def as_manifest(self):
- manifest_text = ['.']
- manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
- manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
- return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
+from _normalize_stream import normalize_stream
class StreamReader(object):
- def __init__(self, tokens, keep=None, debug=False, _empty=False):
+ def __init__(self, tokens, keep=None, debug=False, _empty=False,
+ num_retries=0):
self._stream_name = None
self._data_locators = []
self._files = collections.OrderedDict()
-
- if keep is None:
- keep = KeepClient()
self._keep = keep
+ self.num_retries = num_retries
streamoffset = 0L
# parse stream
for tok in tokens:
if debug: print 'tok', tok
- if self._stream_name == None:
+ if self._stream_name is None:
self._stream_name = tok.replace('\\040', ' ')
continue
s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
if s:
blocksize = long(s.group(1))
- self._data_locators.append([tok, blocksize, streamoffset])
+ self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
continue
size = long(s.group(2))
name = s.group(3).replace('\\040', ' ')
if name not in self._files:
- self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
+ self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
else:
- n = self._files[name]
- n.segments.append([pos, size, n.size()])
+ filereader = self._files[name]
+ filereader.segments.append(Range(pos, filereader.size(), size))
continue
raise errors.SyntaxError("Invalid manifest format")
def size(self):
n = self._data_locators[-1]
- return n[OFFSET] + n[BLOCKSIZE]
+ return n.range_start + n.range_size
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
- def readfrom(self, start, size):
+ @retry_method
+ def _keepget(self, locator, num_retries=None):
+ return self._keep.get(locator, num_retries=num_retries)
+
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
- data = ''
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
- data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
- return data
+ if self._keep is None:
+ self._keep = KeepClient(num_retries=self.num_retries)
+ data = []
+ for lr in locators_and_ranges(self._data_locators, start, size):
+ data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ return ''.join(data)
def manifest_text(self, strip=False):
manifest_text = [self.name().replace(' ', '\\040')]
if strip:
for d in self._data_locators:
- m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
+ m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
manifest_text.append(m.group(0))
else:
- manifest_text.extend([d[LOCATOR] for d in self._data_locators])
- manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
+ manifest_text.extend([d.locator for d in self._data_locators])
+ manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
for seg in f.segments])
for f in self._files.values()])
return ' '.join(manifest_text) + '\n'