9 from ._ranges import locators_and_ranges, Range
10 from .arvfile import StreamFileReader
11 from arvados.retry import retry_method
15 from _normalize_stream import normalize_stream
17 class StreamReader(object):
18 def __init__(self, tokens, keep=None, debug=False, _empty=False,
20 self._stream_name = None
21 self._data_locators = []
22 self._files = collections.OrderedDict()
24 self.num_retries = num_retries
30 if debug: print 'tok', tok
31 if self._stream_name is None:
32 self._stream_name = tok.replace('\\040', ' ')
35 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
37 blocksize = long(s.group(1))
38 self._data_locators.append(Range(tok, streamoffset, blocksize))
39 streamoffset += blocksize
42 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
44 pos = long(s.group(1))
45 size = long(s.group(2))
46 name = s.group(3).replace('\\040', ' ')
47 if name not in self._files:
48 self._files[name] = StreamFileReader(self, [Range(pos, 0, size)], name)
50 filereader = self._files[name]
51 filereader.segments.append(Range(pos, filereader.size(), size))
54 raise errors.SyntaxError("Invalid manifest format")
57 return self._stream_name
63 return self._files.values()
66 n = self._data_locators[-1]
67 return n.range_start + n.range_size
69 def locators_and_ranges(self, range_start, range_size):
70 return locators_and_ranges(self._data_locators, range_start, range_size)
73 def _keepget(self, locator, num_retries=None):
74 return self._keep.get(locator, num_retries=num_retries)
77 def readfrom(self, start, size, num_retries=None):
78 """Read up to 'size' bytes from the stream, starting at 'start'"""
81 if self._keep is None:
82 self._keep = KeepClient(num_retries=self.num_retries)
84 for lr in locators_and_ranges(self._data_locators, start, size):
85 data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
88 def manifest_text(self, strip=False):
89 manifest_text = [self.name().replace(' ', '\\040')]
91 for d in self._data_locators:
92 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
93 manifest_text.append(m.group(0))
95 manifest_text.extend([d.locator for d in self._data_locators])
96 manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
97 for seg in f.segments])
98 for f in self._files.values()])
99 return ' '.join(manifest_text) + '\n'