Merge branch '15397-remove-obsolete-apis'
[arvados.git] / sdk / python / arvados / stream.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import collections
6 import hashlib
7 import os
8 import re
9 import threading
10 import functools
11 import copy
12
13 from ._ranges import locators_and_ranges, Range
14 from .arvfile import StreamFileReader
15 from arvados.retry import retry_method
16 from arvados.keep import *
17 from . import config
18 from . import errors
19 from . import util
20 from ._normalize_stream import normalize_stream
21
22 class StreamReader(object):
23     @util._deprecated('3.0', 'arvados.collection.Collecttion')
24     def __init__(self, tokens, keep=None, debug=False, _empty=False,
25                  num_retries=10):
26         self._stream_name = None
27         self._data_locators = []
28         self._files = collections.OrderedDict()
29         self._keep = keep
30         self.num_retries = num_retries
31
32         streamoffset = 0
33
34         # parse stream
35         for tok in tokens:
36             if debug: print('tok', tok)
37             if self._stream_name is None:
38                 self._stream_name = tok.replace('\\040', ' ')
39                 continue
40
41             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
42             if s:
43                 blocksize = int(s.group(1))
44                 self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
45                 streamoffset += blocksize
46                 continue
47
48             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
49             if s:
50                 pos = int(s.group(1))
51                 size = int(s.group(2))
52                 name = s.group(3).replace('\\040', ' ')
53                 if name not in self._files:
54                     self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
55                 else:
56                     filereader = self._files[name]
57                     filereader.segments.append(Range(pos, filereader.size(), size))
58                 continue
59
60             raise errors.SyntaxError("Invalid manifest format")
61
62     def name(self):
63         return self._stream_name
64
65     def files(self):
66         return self._files
67
68     def all_files(self):
69         return list(self._files.values())
70
71     def size(self):
72         n = self._data_locators[-1]
73         return n.range_start + n.range_size
74
75     def locators_and_ranges(self, range_start, range_size):
76         return locators_and_ranges(self._data_locators, range_start, range_size)
77
78     @retry_method
79     def _keepget(self, locator, num_retries=None):
80         return self._keep.get(locator, num_retries=num_retries)
81
82     @retry_method
83     def readfrom(self, start, size, num_retries=None):
84         """Read up to 'size' bytes from the stream, starting at 'start'"""
85         if size == 0:
86             return b''
87         if self._keep is None:
88             self._keep = KeepClient(num_retries=self.num_retries)
89         data = []
90         for lr in locators_and_ranges(self._data_locators, start, size):
91             data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
92         return b''.join(data)
93
94     def manifest_text(self, strip=False):
95         manifest_text = [self.name().replace(' ', '\\040')]
96         if strip:
97             for d in self._data_locators:
98                 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
99                 manifest_text.append(m.group(0))
100         else:
101             manifest_text.extend([d.locator for d in self._data_locators])
102         manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
103                                         for seg in f.segments])
104                               for f in self._files.values()])
105         return ' '.join(manifest_text) + '\n'