3198: New Collection/AravosFile design work in progress
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from .ranges import *
6 from arvados.retry import retry_method
7
8 def split(path):
9     """split(path) -> streamname, filename
10
11     Separate the stream name and file name in a /-separated stream path.
12     If no stream name is available, assume '.'.
13     """
14     try:
15         stream_name, file_name = path.rsplit('/', 1)
16     except ValueError:  # No / in string
17         stream_name, file_name = '.', path
18     return stream_name, file_name
19
20 class ArvadosFileBase(object):
21     def __init__(self, name, mode):
22         self.name = name
23         self.mode = mode
24         self.closed = False
25
26     @staticmethod
27     def _before_close(orig_func):
28         @functools.wraps(orig_func)
29         def wrapper(self, *args, **kwargs):
30             if self.closed:
31                 raise ValueError("I/O operation on closed stream file")
32             return orig_func(self, *args, **kwargs)
33         return wrapper
34
35     def __enter__(self):
36         return self
37
38     def __exit__(self, exc_type, exc_value, traceback):
39         try:
40             self.close()
41         except Exception:
42             if exc_type is None:
43                 raise
44
45     def close(self):
46         self.closed = True
47
48
49 class ArvadosFileReaderBase(ArvadosFileBase):
50     class _NameAttribute(str):
51         # The Python file API provides a plain .name attribute.
52         # Older SDK provided a name() method.
53         # This class provides both, for maximum compatibility.
54         def __call__(self):
55             return self
56
57     def __init__(self, name, mode, num_retries=None):
58         super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
59         self._filepos = 0L
60         self.num_retries = num_retries
61         self.need_lock = False
62         self._readline_cache = (None, None)
63
64     def __iter__(self):
65         while True:
66             data = self.readline()
67             if not data:
68                 break
69             yield data
70
71     def decompressed_name(self):
72         return re.sub('\.(bz2|gz)$', '', self.name)
73
74     @ArvadosFileBase._before_close
75     def seek(self, pos, whence=os.SEEK_CUR):
76         if whence == os.SEEK_CUR:
77             pos += self._filepos
78         elif whence == os.SEEK_END:
79             pos += self.size()
80         self._filepos = min(max(pos, 0L), self._size())
81
82     def tell(self):
83         return self._filepos
84
85     def size(self):
86         return self._size()
87
88     @ArvadosFileBase._before_close
89     @retry_method
90     def readall(self, size=2**20, num_retries=None):
91         while True:
92             data = self.read(size, num_retries=num_retries)
93             if data == '':
94                 break
95             yield data
96
97     @ArvadosFileBase._before_close
98     @retry_method
99     def readline(self, size=float('inf'), num_retries=None):
100         cache_pos, cache_data = self._readline_cache
101         if self.tell() == cache_pos:
102             data = [cache_data]
103         else:
104             data = ['']
105         data_size = len(data[-1])
106         while (data_size < size) and ('\n' not in data[-1]):
107             next_read = self.read(2 ** 20, num_retries=num_retries)
108             if not next_read:
109                 break
110             data.append(next_read)
111             data_size += len(next_read)
112         data = ''.join(data)
113         try:
114             nextline_index = data.index('\n') + 1
115         except ValueError:
116             nextline_index = len(data)
117         nextline_index = min(nextline_index, size)
118         self._readline_cache = (self.tell(), data[nextline_index:])
119         return data[:nextline_index]
120
121     @ArvadosFileBase._before_close
122     @retry_method
123     def decompress(self, decompress, size, num_retries=None):
124         for segment in self.readall(size, num_retries):
125             data = decompress(segment)
126             if data:
127                 yield data
128
129     @ArvadosFileBase._before_close
130     @retry_method
131     def readall_decompressed(self, size=2**20, num_retries=None):
132         self.seek(0)
133         if self.name.endswith('.bz2'):
134             dc = bz2.BZ2Decompressor()
135             return self.decompress(dc.decompress, size,
136                                    num_retries=num_retries)
137         elif self.name.endswith('.gz'):
138             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
139             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
140                                    size, num_retries=num_retries)
141         else:
142             return self.readall(size, num_retries=num_retries)
143
144     @ArvadosFileBase._before_close
145     @retry_method
146     def readlines(self, sizehint=float('inf'), num_retries=None):
147         data = []
148         data_size = 0
149         for s in self.readall(num_retries=num_retries):
150             data.append(s)
151             data_size += len(s)
152             if data_size >= sizehint:
153                 break
154         return ''.join(data).splitlines(True)
155
156
157 class StreamFileReader(ArvadosFileReaderBase):
158     def __init__(self, stream, segments, name):
159         super(StreamFileReader, self).__init__(name, 'rb')
160         self._stream = stream
161         self.segments = segments
162         self.num_retries = stream.num_retries
163         self._filepos = 0L
164         self.num_retries = stream.num_retries
165         self._readline_cache = (None, None)
166
167     def stream_name(self):
168         return self._stream.name()
169
170     def _size(self):
171         n = self.segments[-1]
172         return n[OFFSET] + n[BLOCKSIZE]
173
174     @ArvadosFileBase._before_close
175     @retry_method
176     def read(self, size, num_retries=None):
177         """Read up to 'size' bytes from the stream, starting at the current file position"""
178         if size == 0:
179             return ''
180
181         data = ''
182         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
183         if available_chunks:
184             locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
185             data = self._stream._readfrom(locator+segmentoffset, segmentsize,
186                                          num_retries=num_retries)
187
188         self._filepos += len(data)
189         return data
190
191     @ArvadosFileBase._before_close
192     @retry_method
193     def readfrom(self, start, size, num_retries=None):
194         """Read up to 'size' bytes from the stream, starting at 'start'"""
195         if size == 0:
196             return ''
197
198         data = []
199         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
200             data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
201                                               num_retries=num_retries))
202         return ''.join(data)
203
204     def as_manifest(self):
205         manifest_text = ['.']
206         manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
207         manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
208         return CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
209
210
211 class ArvadosFile(ArvadosFileReaderBase):
212     def __init__(self, name, mode, stream, segments):
213         super(ArvadosFile, self).__init__(name, mode)
214         self.segments = []
215
216     def truncate(self, size=None):
217         if size is None:
218             size = self._filepos
219
220         segs = locators_and_ranges(self.segments, 0, size)
221
222         newstream = []
223         self.segments = []
224         streamoffset = 0L
225         fileoffset = 0L
226
227         for seg in segs:
228             for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]):
229                 newstream.append([locator, blocksize, streamoffset])
230                 self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
231                 streamoffset += blocksize
232                 fileoffset += segmentsize
233         if len(newstream) == 0:
234             newstream.append(config.EMPTY_BLOCK_LOCATOR)
235             self.segments.append([0, 0, 0])
236         self._stream._data_locators = newstream
237         if self._filepos > fileoffset:
238             self._filepos = fileoffset
239
240     def _writeto(self, offset, data):
241         if offset > self._size():
242             raise ArgumentError("Offset is past the end of the file")
243         self._stream._append(data)
244         replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
245
246     def writeto(self, offset, data):
247         self._writeto(offset, data)
248
249     def write(self, data):
250         self._writeto(self._filepos, data)
251         self._filepos += len(data)
252
253     def writelines(self, seq):
254         for s in seq:
255             self._writeto(self._filepos, s)
256             self._filepos += len(s)
257
258     def flush(self):
259         pass
260
261     def add_segment(self, blocks, pos, size):
262         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(blocks, pos, size):
263             last = self.segments[-1] if self.segments else [0, 0, 0]
264             self.segments.append([locator, segmentsize, last[OFFSET]+last[BLOCKSIZE], segmentoffset])