6 from arvados.retry import retry_method
14 """split(path) -> streamname, filename
16 Separate the stream name and file name in a /-separated stream path.
17 If no stream name is available, assume '.'.
20 stream_name, file_name = path.rsplit('/', 1)
21 except ValueError: # No / in string
22 stream_name, file_name = '.', path
23 return stream_name, file_name
25 class ArvadosFileBase(object):
26 def __init__(self, name, mode):
32 def _before_close(orig_func):
33 @functools.wraps(orig_func)
34 def wrapper(self, *args, **kwargs):
36 raise ValueError("I/O operation on closed stream file")
37 return orig_func(self, *args, **kwargs)
43 def __exit__(self, exc_type, exc_value, traceback):
54 class ArvadosFileReaderBase(ArvadosFileBase):
55 class _NameAttribute(str):
56 # The Python file API provides a plain .name attribute.
57 # Older SDK provided a name() method.
58 # This class provides both, for maximum compatibility.
62 def __init__(self, name, mode, num_retries=None):
63 super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
65 self.num_retries = num_retries
66 self._readline_cache = (None, None)
70 data = self.readline()
75 def decompressed_name(self):
76 return re.sub('\.(bz2|gz)$', '', self.name)
78 @ArvadosFileBase._before_close
79 def seek(self, pos, whence=os.SEEK_CUR):
80 if whence == os.SEEK_CUR:
82 elif whence == os.SEEK_END:
84 self._filepos = min(max(pos, 0L), self.size())
89 @ArvadosFileBase._before_close
91 def readall(self, size=2**20, num_retries=None):
93 data = self.read(size, num_retries=num_retries)
98 @ArvadosFileBase._before_close
100 def readline(self, size=float('inf'), num_retries=None):
101 cache_pos, cache_data = self._readline_cache
102 if self.tell() == cache_pos:
106 data_size = len(data[-1])
107 while (data_size < size) and ('\n' not in data[-1]):
108 next_read = self.read(2 ** 20, num_retries=num_retries)
111 data.append(next_read)
112 data_size += len(next_read)
115 nextline_index = data.index('\n') + 1
117 nextline_index = len(data)
118 nextline_index = min(nextline_index, size)
119 self._readline_cache = (self.tell(), data[nextline_index:])
120 return data[:nextline_index]
122 @ArvadosFileBase._before_close
124 def decompress(self, decompress, size, num_retries=None):
125 for segment in self.readall(size, num_retries):
126 data = decompress(segment)
130 @ArvadosFileBase._before_close
132 def readall_decompressed(self, size=2**20, num_retries=None):
134 if self.name.endswith('.bz2'):
135 dc = bz2.BZ2Decompressor()
136 return self.decompress(dc.decompress, size,
137 num_retries=num_retries)
138 elif self.name.endswith('.gz'):
139 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
140 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
141 size, num_retries=num_retries)
143 return self.readall(size, num_retries=num_retries)
145 @ArvadosFileBase._before_close
147 def readlines(self, sizehint=float('inf'), num_retries=None):
150 for s in self.readall(num_retries=num_retries):
153 if data_size >= sizehint:
155 return ''.join(data).splitlines(True)
158 class StreamFileReader(ArvadosFileReaderBase):
159 def __init__(self, stream, segments, name):
160 super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
161 self._stream = stream
162 self.segments = segments
164 def stream_name(self):
165 return self._stream.name()
168 n = self.segments[-1]
169 return n.range_start + n.range_size
171 @ArvadosFileBase._before_close
173 def read(self, size, num_retries=None):
174 """Read up to 'size' bytes from the stream, starting at the current file position"""
179 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
181 lr = available_chunks[0]
182 data = self._stream._readfrom(lr.locator+lr.segment_offset,
184 num_retries=num_retries)
186 self._filepos += len(data)
189 @ArvadosFileBase._before_close
191 def readfrom(self, start, size, num_retries=None):
192 """Read up to 'size' bytes from the stream, starting at 'start'"""
197 for lr in locators_and_ranges(self.segments, start, size):
198 data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
199 num_retries=num_retries))
202 def as_manifest(self):
203 from stream import normalize_stream
205 for r in self.segments:
206 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
207 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
210 class BufferBlock(object):
212 A BufferBlock is a stand-in for a Keep block that is in the process of being
213 written. Writers can append to it, get the size, and compute the Keep locator.
215 There are three valid states:
217 WRITABLE - can append
219 PENDING - is in the process of being uploaded to Keep, append is an error
221 COMMITTED - the block has been written to Keep, its internal buffer has been
222 released, and the BufferBlock should be discarded in favor of fetching the
223 block through normal Keep means.
229 def __init__(self, blockid, starting_capacity):
231 blockid: the identifier for this block
232 starting_capacity: the initial buffer capacity
234 self.blockid = blockid
235 self.buffer_block = bytearray(starting_capacity)
236 self.buffer_view = memoryview(self.buffer_block)
237 self.write_pointer = 0
238 self.state = BufferBlock.WRITABLE
241 def append(self, data):
243 Append some data to the buffer. Only valid if the block is in WRITABLE
244 state. Implements an expanding buffer, doubling capacity as needed to
245 accomdate all the data.
247 if self.state == BufferBlock.WRITABLE:
248 while (self.write_pointer+len(data)) > len(self.buffer_block):
249 new_buffer_block = bytearray(len(self.buffer_block) * 2)
250 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
251 self.buffer_block = new_buffer_block
252 self.buffer_view = memoryview(self.buffer_block)
253 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
254 self.write_pointer += len(data)
257 raise AssertionError("Buffer block is not writable")
260 '''Amount of data written to the buffer'''
261 return self.write_pointer
264 '''The Keep locator for this buffer's contents.'''
265 if self._locator is None:
266 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
269 class AsyncKeepWriteErrors(Exception):
271 Roll up one or more Keep write exceptions (generated by background
272 threads) into a single one.
274 def __init__(self, errors):
278 return "\n".join(self.errors)
280 class BlockManager(object):
282 BlockManager handles buffer blocks, background block uploads, and
283 background block prefetch for a Collection of ArvadosFiles.
285 def __init__(self, keep):
286 '''keep: KeepClient object to use'''
288 self._bufferblocks = {}
289 self._put_queue = None
290 self._put_errors = None
291 self._put_threads = None
292 self._prefetch_queue = None
293 self._prefetch_threads = None
295 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14):
297 Allocate a new, empty bufferblock in WRITABLE state and return it.
298 blockid: optional block identifier, otherwise one will be automatically assigned
299 starting_capacity: optional capacity, otherwise will use default capacity
302 blockid = "bufferblock%i" % len(self._bufferblocks)
303 bb = BufferBlock(blockid, starting_capacity=starting_capacity)
304 self._bufferblocks[bb.blockid] = bb
307 def stop_threads(self):
309 Shut down and wait for background upload and download threads to finish.
311 if self._put_threads is not None:
312 for t in self._put_threads:
313 self._put_queue.put(None)
314 for t in self._put_threads:
316 self._put_threads = None
317 self._put_queue = None
318 self._put_errors = None
320 if self._prefetch_threads is not None:
321 for t in self._prefetch_threads:
322 self._prefetch_queue.put(None)
323 for t in self._prefetch_threads:
325 self._prefetch_threads = None
326 self._prefetch_queue = None
328 def commit_bufferblock(self, block):
330 Initiate a background upload of a bufferblock. This will block if the
331 upload queue is at capacity, otherwise it will return immediately.
336 Background uploader thread.
340 b = self._put_queue.get()
343 b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
344 b.state = BufferBlock.COMMITTED
346 b.buffer_block = None
347 except Exception as e:
349 self._put_errors.put(e)
351 if self._put_queue is not None:
352 self._put_queue.task_done()
354 if self._put_threads is None:
355 # Start uploader threads.
357 # If we don't limit the Queue size, the upload queue can quickly
358 # grow to take up gigabytes of RAM if the writing process is
359 # generating data more quickly than it can be send to the Keep
362 # With two upload threads and a queue size of 2, this means up to 4
363 # blocks pending. If they are full 64 MiB blocks, that means up to
364 # 256 MiB of internal buffering, which is the same size as the
365 # default download block cache in KeepClient.
366 self._put_queue = Queue.Queue(maxsize=2)
367 self._put_errors = Queue.Queue()
368 self._put_threads = [threading.Thread(target=worker, args=(self,)),
369 threading.Thread(target=worker, args=(self,))]
370 for t in self._put_threads:
374 # Mark the block as PENDING so to disallow any more appends.
375 block.state = BufferBlock.PENDING
376 self._put_queue.put(block)
378 def get_block(self, locator, num_retries, cache_only=False):
380 Fetch a block. First checks to see if the locator is a BufferBlock and
381 return that, if not, passes the request through to KeepClient.get().
383 if locator in self._bufferblocks:
384 bb = self._bufferblocks[locator]
385 if bb.state != BufferBlock.COMMITTED:
386 return bb.buffer_view[0:bb.write_pointer].tobytes()
388 locator = bb._locator
389 return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
391 def commit_all(self):
393 Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this
394 is a synchronous call, and will not return until all buffer blocks are
395 uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to
398 for k,v in self._bufferblocks.items():
399 if v.state == BufferBlock.WRITABLE:
400 self.commit_bufferblock(v)
401 if self._put_queue is not None:
402 self._put_queue.join()
403 if not self._put_errors.empty():
407 e.append(self._put_errors.get(False))
410 raise AsyncKeepWriteErrors(e)
412 def block_prefetch(self, locator):
414 Initiate a background download of a block. This assumes that the
415 underlying KeepClient implements a block cache, so repeated requests
416 for the same block will not result in repeated downloads (unless the
417 block is evicted from the cache.) This method does not block.
420 '''Background downloader thread.'''
423 b = self._prefetch_queue.get()
430 if locator in self._bufferblocks:
432 if self._prefetch_threads is None:
433 self._prefetch_queue = Queue.Queue()
434 self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
435 threading.Thread(target=worker, args=(self,))]
436 for t in self._prefetch_threads:
439 self._prefetch_queue.put(locator)
441 class ArvadosFile(object):
443 Manages the underyling representation of a file in Keep as a sequence of
444 segments over a set of blocks, supporting random read/write access.
447 def __init__(self, parent, stream=[], segments=[]):
449 stream: a list of Range objects representing a block stream
450 segments: a list of Range objects representing segments
453 self._modified = True
456 self.add_segment(stream, s.locator, s.range_size)
457 self._current_bblock = None
458 self.lock = threading.Lock()
462 Make a copy of this file.
464 # TODO: copy bufferblocks.
467 cp.parent = self.parent
469 cp.segments = [Range(r.locator, r.range_start, r.range_size, r.segment_offset) for r in self.segments]
472 def set_unmodified(self):
473 self._modified = False
476 return self._modified
478 def truncate(self, size):
480 for r in self.segments:
481 range_end = r.range_start+r.range_size
482 if r.range_start >= size:
483 # segment is past the trucate size, all done
485 elif size < range_end:
486 nr = Range(r.locator, r.range_start, size - r.range_start)
487 nr.segment_offset = r.segment_offset
493 self.segments = new_segs
494 self._modified = True
496 def readfrom(self, offset, size, num_retries):
497 if size == 0 or offset >= self.size():
501 for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE):
502 self.parent._my_block_manager().block_prefetch(lr.locator)
504 for lr in locators_and_ranges(self.segments, offset, size):
505 d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
507 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
512 def _repack_writes(self):
513 '''Test if the buffer block has more data than is referenced by actual segments
514 (this happens when a buffered write over-writes a file range written in
515 a previous buffered write). Re-pack the buffer block for efficiency
516 and to avoid leaking information.
520 # Sum up the segments to get the total bytes of the file referencing
521 # into the buffer block.
522 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
523 write_total = sum([s.range_size for s in bufferblock_segs])
525 if write_total < self._current_bblock.size():
526 # There is more data in the buffer block than is actually accounted for by segments, so
527 # re-pack into a new buffer by copying over to a new buffer block.
528 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total)
529 for t in bufferblock_segs:
530 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
531 t.segment_offset = new_bb.size() - t.range_size
533 self._current_bblock = new_bb
535 def writeto(self, offset, data, num_retries):
539 if offset > self.size():
540 raise ArgumentError("Offset is past the end of the file")
542 if len(data) > config.KEEP_BLOCK_SIZE:
543 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
545 self._modified = True
547 if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
548 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock()
550 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
551 self._repack_writes()
552 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
553 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
554 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock()
556 self._current_bblock.append(data)
557 replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
559 def add_segment(self, blocks, pos, size):
560 self._modified = True
561 for lr in locators_and_ranges(blocks, pos, size):
562 last = self.segments[-1] if self.segments else Range(0, 0, 0)
563 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
564 self.segments.append(r)
568 n = self.segments[-1]
569 return n.range_start + n.range_size
574 class ArvadosFileReader(ArvadosFileReaderBase):
575 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
576 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
577 self.arvadosfile = arvadosfile.clone()
580 return self.arvadosfile.size()
582 @ArvadosFileBase._before_close
584 def read(self, size, num_retries=None):
585 """Read up to 'size' bytes from the stream, starting at the current file position"""
586 data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
587 self._filepos += len(data)
590 @ArvadosFileBase._before_close
592 def readfrom(self, offset, size, num_retries=None):
593 """Read up to 'size' bytes from the stream, starting at the current file position"""
594 return self.arvadosfile.readfrom(offset, size, num_retries)
600 class SynchronizedArvadosFile(object):
601 def __init__(self, arvadosfile):
602 self.arvadosfile = arvadosfile
607 def __getattr__(self, name):
608 with self.arvadosfile.lock:
609 return getattr(self.arvadosfile, name)
612 class ArvadosFileWriter(ArvadosFileReader):
613 def __init__(self, arvadosfile, name, mode, num_retries=None):
614 self.arvadosfile = SynchronizedArvadosFile(arvadosfile)
615 super(ArvadosFileWriter, self).__init__(self.arvadosfile, name, mode, num_retries=num_retries)
617 @ArvadosFileBase._before_close
619 def write(self, data, num_retries=None):
620 if self.mode[0] == "a":
621 self.arvadosfile.writeto(self.size(), data)
623 self.arvadosfile.writeto(self._filepos, data, num_retries)
624 self._filepos += len(data)
626 @ArvadosFileBase._before_close
628 def writelines(self, seq, num_retries=None):
632 def truncate(self, size=None):
635 self.arvadosfile.truncate(size)
636 if self._filepos > self.size():
637 self._filepos = self.size()