8784: Fix test for latest firefox.
[arvados.git] / sdk / python / arvados / stream.py
1 from __future__ import print_function
2 from __future__ import absolute_import
3 from future.utils import listvalues
4 from builtins import object
5 import collections
6 import hashlib
7 import os
8 import re
9 import threading
10 import functools
11 import copy
12
13 from ._ranges import locators_and_ranges, Range
14 from .arvfile import StreamFileReader
15 from arvados.retry import retry_method
16 from arvados.keep import *
17 from . import config
18 from . import errors
19 from ._normalize_stream import normalize_stream
20
21 class StreamReader(object):
22     def __init__(self, tokens, keep=None, debug=False, _empty=False,
23                  num_retries=0):
24         self._stream_name = None
25         self._data_locators = []
26         self._files = collections.OrderedDict()
27         self._keep = keep
28         self.num_retries = num_retries
29
30         streamoffset = 0
31
32         # parse stream
33         for tok in tokens:
34             if debug: print('tok', tok)
35             if self._stream_name is None:
36                 self._stream_name = tok.replace('\\040', ' ')
37                 continue
38
39             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
40             if s:
41                 blocksize = int(s.group(1))
42                 self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
43                 streamoffset += blocksize
44                 continue
45
46             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
47             if s:
48                 pos = int(s.group(1))
49                 size = int(s.group(2))
50                 name = s.group(3).replace('\\040', ' ')
51                 if name not in self._files:
52                     self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
53                 else:
54                     filereader = self._files[name]
55                     filereader.segments.append(Range(pos, filereader.size(), size))
56                 continue
57
58             raise errors.SyntaxError("Invalid manifest format")
59
60     def name(self):
61         return self._stream_name
62
63     def files(self):
64         return self._files
65
66     def all_files(self):
67         return listvalues(self._files)
68
69     def size(self):
70         n = self._data_locators[-1]
71         return n.range_start + n.range_size
72
73     def locators_and_ranges(self, range_start, range_size):
74         return locators_and_ranges(self._data_locators, range_start, range_size)
75
76     @retry_method
77     def _keepget(self, locator, num_retries=None):
78         return self._keep.get(locator, num_retries=num_retries)
79
80     @retry_method
81     def readfrom(self, start, size, num_retries=None):
82         """Read up to 'size' bytes from the stream, starting at 'start'"""
83         if size == 0:
84             return b''
85         if self._keep is None:
86             self._keep = KeepClient(num_retries=self.num_retries)
87         data = []
88         for lr in locators_and_ranges(self._data_locators, start, size):
89             data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
90         return b''.join(data)
91
92     def manifest_text(self, strip=False):
93         manifest_text = [self.name().replace(' ', '\\040')]
94         if strip:
95             for d in self._data_locators:
96                 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
97                 manifest_text.append(m.group(0))
98         else:
99             manifest_text.extend([d.locator for d in self._data_locators])
100         manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
101                                         for seg in f.segments])
102                               for f in listvalues(self._files)])
103         return ' '.join(manifest_text) + '\n'