6 from arvados.retry import retry_method
16 """split(path) -> streamname, filename
18 Separate the stream name and file name in a /-separated stream path.
19 If no stream name is available, assume '.'.
22 stream_name, file_name = path.rsplit('/', 1)
23 except ValueError: # No / in string
24 stream_name, file_name = '.', path
25 return stream_name, file_name
27 class ArvadosFileBase(object):
28 def __init__(self, name, mode):
34 def _before_close(orig_func):
35 @functools.wraps(orig_func)
36 def wrapper(self, *args, **kwargs):
38 raise ValueError("I/O operation on closed stream file")
39 return orig_func(self, *args, **kwargs)
45 def __exit__(self, exc_type, exc_value, traceback):
56 class ArvadosFileReaderBase(ArvadosFileBase):
57 class _NameAttribute(str):
58 # The Python file API provides a plain .name attribute.
59 # Older SDK provided a name() method.
60 # This class provides both, for maximum compatibility.
64 def __init__(self, name, mode, num_retries=None):
65 super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
67 self.num_retries = num_retries
68 self._readline_cache = (None, None)
72 data = self.readline()
77 def decompressed_name(self):
78 return re.sub('\.(bz2|gz)$', '', self.name)
80 @ArvadosFileBase._before_close
81 def seek(self, pos, whence=os.SEEK_CUR):
82 if whence == os.SEEK_CUR:
84 elif whence == os.SEEK_END:
86 self._filepos = min(max(pos, 0L), self.size())
91 @ArvadosFileBase._before_close
93 def readall(self, size=2**20, num_retries=None):
95 data = self.read(size, num_retries=num_retries)
100 @ArvadosFileBase._before_close
102 def readline(self, size=float('inf'), num_retries=None):
103 cache_pos, cache_data = self._readline_cache
104 if self.tell() == cache_pos:
108 data_size = len(data[-1])
109 while (data_size < size) and ('\n' not in data[-1]):
110 next_read = self.read(2 ** 20, num_retries=num_retries)
113 data.append(next_read)
114 data_size += len(next_read)
117 nextline_index = data.index('\n') + 1
119 nextline_index = len(data)
120 nextline_index = min(nextline_index, size)
121 self._readline_cache = (self.tell(), data[nextline_index:])
122 return data[:nextline_index]
124 @ArvadosFileBase._before_close
126 def decompress(self, decompress, size, num_retries=None):
127 for segment in self.readall(size, num_retries):
128 data = decompress(segment)
132 @ArvadosFileBase._before_close
134 def readall_decompressed(self, size=2**20, num_retries=None):
136 if self.name.endswith('.bz2'):
137 dc = bz2.BZ2Decompressor()
138 return self.decompress(dc.decompress, size,
139 num_retries=num_retries)
140 elif self.name.endswith('.gz'):
141 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
142 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
143 size, num_retries=num_retries)
145 return self.readall(size, num_retries=num_retries)
147 @ArvadosFileBase._before_close
149 def readlines(self, sizehint=float('inf'), num_retries=None):
152 for s in self.readall(num_retries=num_retries):
155 if data_size >= sizehint:
157 return ''.join(data).splitlines(True)
160 raise NotImplementedError()
162 def read(self, size, num_retries=None):
163 raise NotImplementedError()
165 def readfrom(self, start, size, num_retries=None):
166 raise NotImplementedError()
169 class StreamFileReader(ArvadosFileReaderBase):
170 def __init__(self, stream, segments, name):
171 super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
172 self._stream = stream
173 self.segments = segments
175 def stream_name(self):
176 return self._stream.name()
179 n = self.segments[-1]
180 return n.range_start + n.range_size
182 @ArvadosFileBase._before_close
184 def read(self, size, num_retries=None):
185 """Read up to 'size' bytes from the stream, starting at the current file position"""
190 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
192 lr = available_chunks[0]
193 data = self._stream._readfrom(lr.locator+lr.segment_offset,
195 num_retries=num_retries)
197 self._filepos += len(data)
200 @ArvadosFileBase._before_close
202 def readfrom(self, start, size, num_retries=None):
203 """Read up to 'size' bytes from the stream, starting at 'start'"""
208 for lr in locators_and_ranges(self.segments, start, size):
209 data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
210 num_retries=num_retries))
213 def as_manifest(self):
214 from stream import normalize_stream
216 for r in self.segments:
217 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
218 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
221 class BufferBlock(object):
223 A BufferBlock is a stand-in for a Keep block that is in the process of being
224 written. Writers can append to it, get the size, and compute the Keep locator.
226 There are three valid states:
232 Block is in the process of being uploaded to Keep, append is an error.
235 The block has been written to Keep, its internal buffer has been
236 released, fetching the block will fetch it via keep client (since we
237 discarded the internal copy), and identifiers referring to the BufferBlock
238 can be replaced with the block locator.
244 def __init__(self, blockid, starting_capacity, owner):
247 the identifier for this block
250 the initial buffer capacity
253 ArvadosFile that owns this block
255 self.blockid = blockid
256 self.buffer_block = bytearray(starting_capacity)
257 self.buffer_view = memoryview(self.buffer_block)
258 self.write_pointer = 0
259 self.state = BufferBlock.WRITABLE
263 def append(self, data):
265 Append some data to the buffer. Only valid if the block is in WRITABLE
266 state. Implements an expanding buffer, doubling capacity as needed to
267 accomdate all the data.
269 if self.state == BufferBlock.WRITABLE:
270 while (self.write_pointer+len(data)) > len(self.buffer_block):
271 new_buffer_block = bytearray(len(self.buffer_block) * 2)
272 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
273 self.buffer_block = new_buffer_block
274 self.buffer_view = memoryview(self.buffer_block)
275 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
276 self.write_pointer += len(data)
279 raise AssertionError("Buffer block is not writable")
282 """Amount of data written to the buffer"""
283 return self.write_pointer
286 """The Keep locator for this buffer's contents."""
287 if self._locator is None:
288 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
292 class AsyncKeepWriteErrors(Exception):
294 Roll up one or more Keep write exceptions (generated by background
295 threads) into a single one.
297 def __init__(self, errors):
301 return "\n".join(self.errors)
303 def synchronized(orig_func):
304 @functools.wraps(orig_func)
305 def wrapper(self, *args, **kwargs):
307 return orig_func(self, *args, **kwargs)
310 class NoopLock(object):
314 def __exit__(self, exc_type, exc_value, traceback):
317 def acquire(self, blocking=False):
327 def must_be_writable(orig_func):
328 @functools.wraps(orig_func)
329 def wrapper(self, *args, **kwargs):
330 if self.sync_mode() == SYNC_READONLY:
331 raise IOError((errno.EROFS, "Collection is read only"))
332 return orig_func(self, *args, **kwargs)
336 class BlockManager(object):
338 BlockManager handles buffer blocks, background block uploads, and
339 background block prefetch for a Collection of ArvadosFiles.
341 def __init__(self, keep):
342 """keep: KeepClient object to use"""
344 self._bufferblocks = {}
345 self._put_queue = None
346 self._put_errors = None
347 self._put_threads = None
348 self._prefetch_queue = None
349 self._prefetch_threads = None
350 self.lock = threading.Lock()
351 self.prefetch_enabled = True
352 self.num_put_threads = 2
353 self.num_get_threads = 2
356 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
358 Allocate a new, empty bufferblock in WRITABLE state and return it.
361 optional block identifier, otherwise one will be automatically assigned
364 optional capacity, otherwise will use default capacity
367 ArvadosFile that owns this block
370 blockid = "bufferblock%i" % len(self._bufferblocks)
371 bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
372 self._bufferblocks[bb.blockid] = bb
376 def dup_block(self, blockid, owner):
378 Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
381 the block to copy. May be an existing buffer block id.
384 ArvadosFile that owns the new block
386 new_blockid = "bufferblock%i" % len(self._bufferblocks)
387 block = self._bufferblocks[blockid]
388 bb = BufferBlock(new_blockid, len(block), owner)
390 self._bufferblocks[bb.blockid] = bb
394 def is_bufferblock(self, id):
395 return id in self._bufferblocks
398 def stop_threads(self):
400 Shut down and wait for background upload and download threads to finish.
402 if self._put_threads is not None:
403 for t in self._put_threads:
404 self._put_queue.put(None)
405 for t in self._put_threads:
407 self._put_threads = None
408 self._put_queue = None
409 self._put_errors = None
411 if self._prefetch_threads is not None:
412 for t in self._prefetch_threads:
413 self._prefetch_queue.put(None)
414 for t in self._prefetch_threads:
416 self._prefetch_threads = None
417 self._prefetch_queue = None
419 def commit_bufferblock(self, block):
421 Initiate a background upload of a bufferblock. This will block if the
422 upload queue is at capacity, otherwise it will return immediately.
427 Background uploader thread.
431 b = self._put_queue.get()
434 b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
435 b.state = BufferBlock.COMMITTED
437 b.buffer_block = None
438 except Exception as e:
440 self._put_errors.put(e)
442 if self._put_queue is not None:
443 self._put_queue.task_done()
446 if self._put_threads is None:
447 # Start uploader threads.
449 # If we don't limit the Queue size, the upload queue can quickly
450 # grow to take up gigabytes of RAM if the writing process is
451 # generating data more quickly than it can be send to the Keep
454 # With two upload threads and a queue size of 2, this means up to 4
455 # blocks pending. If they are full 64 MiB blocks, that means up to
456 # 256 MiB of internal buffering, which is the same size as the
457 # default download block cache in KeepClient.
458 self._put_queue = Queue.Queue(maxsize=2)
459 self._put_errors = Queue.Queue()
461 self._put_threads = []
462 for i in xrange(0, self.num_put_threads):
463 t = threading.Thread(target=worker, args=(self,))
464 self._put_threads.append(t)
468 # Mark the block as PENDING so to disallow any more appends.
469 block.state = BufferBlock.PENDING
470 self._put_queue.put(block)
472 def get_block(self, locator, num_retries, cache_only=False):
474 Fetch a block. First checks to see if the locator is a BufferBlock and
475 return that, if not, passes the request through to KeepClient.get().
478 if locator in self._bufferblocks:
479 bb = self._bufferblocks[locator]
480 if bb.state != BufferBlock.COMMITTED:
481 return bb.buffer_view[0:bb.write_pointer].tobytes()
483 locator = bb._locator
484 return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
486 def commit_all(self):
488 Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this
489 is a synchronous call, and will not return until all buffer blocks are
490 uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to
494 items = self._bufferblocks.items()
497 if v.state == BufferBlock.WRITABLE:
498 self.commit_bufferblock(v)
501 if self._put_queue is not None:
502 self._put_queue.join()
503 if not self._put_errors.empty():
507 e.append(self._put_errors.get(False))
510 raise AsyncKeepWriteErrors(e)
512 def block_prefetch(self, locator):
514 Initiate a background download of a block. This assumes that the
515 underlying KeepClient implements a block cache, so repeated requests
516 for the same block will not result in repeated downloads (unless the
517 block is evicted from the cache.) This method does not block.
520 if not self.prefetch_enabled:
524 """Background downloader thread."""
527 b = self._prefetch_queue.get()
535 if locator in self._bufferblocks:
537 if self._prefetch_threads is None:
538 self._prefetch_queue = Queue.Queue()
539 self._prefetch_threads = []
540 for i in xrange(0, self.num_get_threads):
541 t = threading.Thread(target=worker, args=(self,))
542 self._prefetch_threads.append(t)
545 self._prefetch_queue.put(locator)
548 class ArvadosFile(object):
549 """ArvadosFile manages the underlying representation of a file in Keep as a
550 sequence of segments spanning a set of blocks, and implements random
551 read/write access. This object may be accessed from multiple threads.
555 def __init__(self, parent, stream=[], segments=[]):
558 a list of Range objects representing a block stream
561 a list of Range objects representing segments
564 self._modified = True
566 self.lock = parent.root_collection().lock
568 self._add_segment(stream, s.locator, s.range_size)
569 self._current_bblock = None
572 return self.parent.sync_mode()
576 return copy.copy(self._segments)
579 def clone(self, new_parent):
580 """Make a copy of this file."""
581 cp = ArvadosFile(new_parent)
584 for r in self._segments:
586 if self.parent._my_block_manager().is_bufferblock(r.locator):
587 if r.locator not in map_loc:
588 map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
589 new_loc = map_loc[r.locator]
591 cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
597 def replace_contents(self, other):
598 """Replace segments of this file with segments from another `ArvadosFile` object."""
599 self._segments = other.segments()
600 self._modified = True
602 def __eq__(self, other):
605 if not isinstance(other, ArvadosFile):
610 return self._segments == s
612 def __ne__(self, other):
613 return not self.__eq__(other)
616 def set_unmodified(self):
617 """Clear the modified flag"""
618 self._modified = False
622 """Test the modified flag"""
623 return self._modified
627 def truncate(self, size):
629 Adjust the size of the file. If `size` is less than the size of the file,
630 the file contents after `size` will be discarded. If `size` is greater
631 than the current size of the file, an IOError will be raised.
633 if size < self.size():
635 for r in self._segments:
636 range_end = r.range_start+r.range_size
637 if r.range_start >= size:
638 # segment is past the trucate size, all done
640 elif size < range_end:
641 nr = Range(r.locator, r.range_start, size - r.range_start)
642 nr.segment_offset = r.segment_offset
648 self._segments = new_segs
649 self._modified = True
650 elif size > self.size():
651 raise IOError("truncate() does not support extending the file size")
653 def readfrom(self, offset, size, num_retries):
655 read upto `size` bytes from the file starting at `offset`.
658 if size == 0 or offset >= self.size():
660 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
661 readsegs = locators_and_ranges(self._segments, offset, size)
664 self.parent._my_block_manager().block_prefetch(lr.locator)
668 d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
670 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
675 def _repack_writes(self):
677 Test if the buffer block has more data than is referenced by actual segments
678 (this happens when a buffered write over-writes a file range written in
679 a previous buffered write). Re-pack the buffer block for efficiency
680 and to avoid leaking information.
682 segs = self._segments
684 # Sum up the segments to get the total bytes of the file referencing
685 # into the buffer block.
686 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
687 write_total = sum([s.range_size for s in bufferblock_segs])
689 if write_total < self._current_bblock.size():
690 # There is more data in the buffer block than is actually accounted for by segments, so
691 # re-pack into a new buffer by copying over to a new buffer block.
692 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
693 for t in bufferblock_segs:
694 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
695 t.segment_offset = new_bb.size() - t.range_size
697 self._current_bblock = new_bb
701 def writeto(self, offset, data, num_retries):
703 Write `data` to the file starting at `offset`. This will update
704 existing bytes and/or extend the size of the file as necessary.
709 if offset > self.size():
710 raise ArgumentError("Offset is past the end of the file")
712 if len(data) > config.KEEP_BLOCK_SIZE:
713 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
715 self._modified = True
717 if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
718 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
720 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
721 self._repack_writes()
722 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
723 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
724 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
726 self._current_bblock.append(data)
728 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
732 def add_segment(self, blocks, pos, size):
734 Add a segment to the end of the file, with `pos` and `offset` referencing a
735 section of the stream described by `blocks` (a list of Range objects)
737 self._add_segment(blocks, pos, size)
739 def _add_segment(self, blocks, pos, size):
743 self._modified = True
744 for lr in locators_and_ranges(blocks, pos, size):
745 last = self._segments[-1] if self._segments else Range(0, 0, 0)
746 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
747 self._segments.append(r)
751 """Get the file size"""
753 n = self._segments[-1]
754 return n.range_start + n.range_size
758 class ArvadosFileReader(ArvadosFileReaderBase):
759 """Wraps ArvadosFile in a file-like object supporting reading only. Be aware
760 that this class is NOT thread safe as there is no locking around updating file
764 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
765 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
766 self.arvadosfile = arvadosfile
769 return self.arvadosfile.size()
771 @ArvadosFileBase._before_close
773 def read(self, size, num_retries=None):
774 """Read up to `size` bytes from the stream, starting at the current file position"""
775 data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
776 self._filepos += len(data)
779 @ArvadosFileBase._before_close
781 def readfrom(self, offset, size, num_retries=None):
782 """Read up to `size` bytes from the stream, starting at the current file position"""
783 return self.arvadosfile.readfrom(offset, size, num_retries)
789 class ArvadosFileWriter(ArvadosFileReader):
790 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
791 Be aware that this class is NOT thread safe as there is no locking around
792 updating file pointer.
796 def __init__(self, arvadosfile, name, mode, num_retries=None):
797 super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
799 @ArvadosFileBase._before_close
801 def write(self, data, num_retries=None):
802 if self.mode[0] == "a":
803 self.arvadosfile.writeto(self.size(), data, num_retries)
805 self.arvadosfile.writeto(self._filepos, data, num_retries)
806 self._filepos += len(data)
808 @ArvadosFileBase._before_close
810 def writelines(self, seq, num_retries=None):
812 self.write(s, num_retries)
814 def truncate(self, size=None):
817 self.arvadosfile.truncate(size)
818 if self._filepos > self.size():
819 self._filepos = self.size()
822 if self.arvadosfile.parent.sync_mode() == SYNC_LIVE:
823 self.arvadosfile.parent.root_collection().save()