6 from arvados.retry import retry_method
9 """split(path) -> streamname, filename
11 Separate the stream name and file name in a /-separated stream path.
12 If no stream name is available, assume '.'.
15 stream_name, file_name = path.rsplit('/', 1)
16 except ValueError: # No / in string
17 stream_name, file_name = '.', path
18 return stream_name, file_name
20 class ArvadosFileBase(object):
21 def __init__(self, name, mode):
27 def _before_close(orig_func):
28 @functools.wraps(orig_func)
29 def wrapper(self, *args, **kwargs):
31 raise ValueError("I/O operation on closed stream file")
32 return orig_func(self, *args, **kwargs)
38 def __exit__(self, exc_type, exc_value, traceback):
49 class ArvadosFileReaderBase(ArvadosFileBase):
50 class _NameAttribute(str):
51 # The Python file API provides a plain .name attribute.
52 # Older SDK provided a name() method.
53 # This class provides both, for maximum compatibility.
57 def __init__(self, name, mode, num_retries=None):
58 super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
60 self.num_retries = num_retries
61 self.need_lock = False
62 self._readline_cache = (None, None)
66 data = self.readline()
71 def decompressed_name(self):
72 return re.sub('\.(bz2|gz)$', '', self.name)
74 @ArvadosFileBase._before_close
75 def seek(self, pos, whence=os.SEEK_CUR):
76 if whence == os.SEEK_CUR:
78 elif whence == os.SEEK_END:
80 self._filepos = min(max(pos, 0L), self._size())
88 @ArvadosFileBase._before_close
90 def readall(self, size=2**20, num_retries=None):
92 data = self.read(size, num_retries=num_retries)
97 @ArvadosFileBase._before_close
99 def readline(self, size=float('inf'), num_retries=None):
100 cache_pos, cache_data = self._readline_cache
101 if self.tell() == cache_pos:
105 data_size = len(data[-1])
106 while (data_size < size) and ('\n' not in data[-1]):
107 next_read = self.read(2 ** 20, num_retries=num_retries)
110 data.append(next_read)
111 data_size += len(next_read)
114 nextline_index = data.index('\n') + 1
116 nextline_index = len(data)
117 nextline_index = min(nextline_index, size)
118 self._readline_cache = (self.tell(), data[nextline_index:])
119 return data[:nextline_index]
121 @ArvadosFileBase._before_close
123 def decompress(self, decompress, size, num_retries=None):
124 for segment in self.readall(size, num_retries):
125 data = decompress(segment)
129 @ArvadosFileBase._before_close
131 def readall_decompressed(self, size=2**20, num_retries=None):
133 if self.name.endswith('.bz2'):
134 dc = bz2.BZ2Decompressor()
135 return self.decompress(dc.decompress, size,
136 num_retries=num_retries)
137 elif self.name.endswith('.gz'):
138 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
139 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
140 size, num_retries=num_retries)
142 return self.readall(size, num_retries=num_retries)
144 @ArvadosFileBase._before_close
146 def readlines(self, sizehint=float('inf'), num_retries=None):
149 for s in self.readall(num_retries=num_retries):
152 if data_size >= sizehint:
154 return ''.join(data).splitlines(True)
157 class StreamFileReader(ArvadosFileReaderBase):
158 def __init__(self, stream, segments, name):
159 super(StreamFileReader, self).__init__(name, 'rb')
160 self._stream = stream
161 self.segments = segments
162 self.num_retries = stream.num_retries
164 self.num_retries = stream.num_retries
165 self._readline_cache = (None, None)
167 def stream_name(self):
168 return self._stream.name()
171 n = self.segments[-1]
172 return n[OFFSET] + n[BLOCKSIZE]
174 @ArvadosFileBase._before_close
176 def read(self, size, num_retries=None):
177 """Read up to 'size' bytes from the stream, starting at the current file position"""
182 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
184 locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
185 data = self._stream._readfrom(locator+segmentoffset, segmentsize,
186 num_retries=num_retries)
188 self._filepos += len(data)
191 @ArvadosFileBase._before_close
193 def readfrom(self, start, size, num_retries=None):
194 """Read up to 'size' bytes from the stream, starting at 'start'"""
199 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
200 data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
201 num_retries=num_retries))
204 def as_manifest(self):
205 manifest_text = ['.']
206 manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
207 manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
208 return CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
211 class ArvadosFile(ArvadosFileReaderBase):
212 def __init__(self, name, mode, stream, segments):
213 super(ArvadosFile, self).__init__(name, mode)
216 def truncate(self, size=None):
220 segs = locators_and_ranges(self.segments, 0, size)
228 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]):
229 newstream.append([locator, blocksize, streamoffset])
230 self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
231 streamoffset += blocksize
232 fileoffset += segmentsize
233 if len(newstream) == 0:
234 newstream.append(config.EMPTY_BLOCK_LOCATOR)
235 self.segments.append([0, 0, 0])
236 self._stream._data_locators = newstream
237 if self._filepos > fileoffset:
238 self._filepos = fileoffset
240 def _writeto(self, offset, data):
241 if offset > self._size():
242 raise ArgumentError("Offset is past the end of the file")
243 self._stream._append(data)
244 replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
246 def writeto(self, offset, data):
247 self._writeto(offset, data)
249 def write(self, data):
250 self._writeto(self._filepos, data)
251 self._filepos += len(data)
253 def writelines(self, seq):
255 self._writeto(self._filepos, s)
256 self._filepos += len(s)
261 def add_segment(self, blocks, pos, size):
262 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(blocks, pos, size):
263 last = self.segments[-1] if self.segments else [0, 0, 0]
264 self.segments.append([locator, segmentsize, last[OFFSET]+last[BLOCKSIZE], segmentoffset])