6 from arvados.retry import retry_method
14 from .errors import KeepWriteError, AssertionError
17 """Separate the stream name and file name in a /-separated stream path and
18 return a tuple (stream_name, file_name).
20 If no stream name is available, assume '.'.
24 stream_name, file_name = path.rsplit('/', 1)
25 except ValueError: # No / in string
26 stream_name, file_name = '.', path
27 return stream_name, file_name
29 class FileLikeObjectBase(object):
30 def __init__(self, name, mode):
36 def _before_close(orig_func):
37 @functools.wraps(orig_func)
38 def before_close_wrapper(self, *args, **kwargs):
40 raise ValueError("I/O operation on closed stream file")
41 return orig_func(self, *args, **kwargs)
42 return before_close_wrapper
47 def __exit__(self, exc_type, exc_value, traceback):
58 class ArvadosFileReaderBase(FileLikeObjectBase):
59 class _NameAttribute(str):
60 # The Python file API provides a plain .name attribute.
61 # Older SDK provided a name() method.
62 # This class provides both, for maximum compatibility.
66 def __init__(self, name, mode, num_retries=None):
67 super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
69 self.num_retries = num_retries
70 self._readline_cache = (None, None)
74 data = self.readline()
79 def decompressed_name(self):
80 return re.sub('\.(bz2|gz)$', '', self.name)
82 @FileLikeObjectBase._before_close
83 def seek(self, pos, whence=os.SEEK_CUR):
84 if whence == os.SEEK_CUR:
86 elif whence == os.SEEK_END:
88 self._filepos = min(max(pos, 0L), self.size())
93 @FileLikeObjectBase._before_close
95 def readall(self, size=2**20, num_retries=None):
97 data = self.read(size, num_retries=num_retries)
102 @FileLikeObjectBase._before_close
104 def readline(self, size=float('inf'), num_retries=None):
105 cache_pos, cache_data = self._readline_cache
106 if self.tell() == cache_pos:
110 data_size = len(data[-1])
111 while (data_size < size) and ('\n' not in data[-1]):
112 next_read = self.read(2 ** 20, num_retries=num_retries)
115 data.append(next_read)
116 data_size += len(next_read)
119 nextline_index = data.index('\n') + 1
121 nextline_index = len(data)
122 nextline_index = min(nextline_index, size)
123 self._readline_cache = (self.tell(), data[nextline_index:])
124 return data[:nextline_index]
126 @FileLikeObjectBase._before_close
128 def decompress(self, decompress, size, num_retries=None):
129 for segment in self.readall(size, num_retries):
130 data = decompress(segment)
134 @FileLikeObjectBase._before_close
136 def readall_decompressed(self, size=2**20, num_retries=None):
138 if self.name.endswith('.bz2'):
139 dc = bz2.BZ2Decompressor()
140 return self.decompress(dc.decompress, size,
141 num_retries=num_retries)
142 elif self.name.endswith('.gz'):
143 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
144 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
145 size, num_retries=num_retries)
147 return self.readall(size, num_retries=num_retries)
149 @FileLikeObjectBase._before_close
151 def readlines(self, sizehint=float('inf'), num_retries=None):
154 for s in self.readall(num_retries=num_retries):
157 if data_size >= sizehint:
159 return ''.join(data).splitlines(True)
162 raise NotImplementedError()
164 def read(self, size, num_retries=None):
165 raise NotImplementedError()
167 def readfrom(self, start, size, num_retries=None):
168 raise NotImplementedError()
171 class StreamFileReader(ArvadosFileReaderBase):
172 def __init__(self, stream, segments, name):
173 super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
174 self._stream = stream
175 self.segments = segments
177 def stream_name(self):
178 return self._stream.name()
181 n = self.segments[-1]
182 return n.range_start + n.range_size
184 @FileLikeObjectBase._before_close
186 def read(self, size, num_retries=None):
187 """Read up to 'size' bytes from the stream, starting at the current file position"""
192 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
194 lr = available_chunks[0]
195 data = self._stream._readfrom(lr.locator+lr.segment_offset,
197 num_retries=num_retries)
199 self._filepos += len(data)
202 @FileLikeObjectBase._before_close
204 def readfrom(self, start, size, num_retries=None):
205 """Read up to 'size' bytes from the stream, starting at 'start'"""
210 for lr in locators_and_ranges(self.segments, start, size):
211 data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
212 num_retries=num_retries))
215 def as_manifest(self):
216 from stream import normalize_stream
218 for r in self.segments:
219 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
220 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
223 class BufferBlock(object):
224 """A BufferBlock is a stand-in for a Keep block that is in the process of being
227 Writers can append to it, get the size, and compute the Keep locator.
228 There are three valid states:
234 Block is in the process of being uploaded to Keep, append is an error.
237 The block has been written to Keep, its internal buffer has been
238 released, fetching the block will fetch it via keep client (since we
239 discarded the internal copy), and identifiers referring to the BufferBlock
240 can be replaced with the block locator.
248 def __init__(self, blockid, starting_capacity, owner):
251 the identifier for this block
254 the initial buffer capacity
257 ArvadosFile that owns this block
260 self.blockid = blockid
261 self.buffer_block = bytearray(starting_capacity)
262 self.buffer_view = memoryview(self.buffer_block)
263 self.write_pointer = 0
264 self.state = BufferBlock.WRITABLE
268 def append(self, data):
269 """Append some data to the buffer.
271 Only valid if the block is in WRITABLE state. Implements an expanding
272 buffer, doubling capacity as needed to accomdate all the data.
275 if self.state == BufferBlock.WRITABLE:
276 while (self.write_pointer+len(data)) > len(self.buffer_block):
277 new_buffer_block = bytearray(len(self.buffer_block) * 2)
278 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
279 self.buffer_block = new_buffer_block
280 self.buffer_view = memoryview(self.buffer_block)
281 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
282 self.write_pointer += len(data)
285 raise AssertionError("Buffer block is not writable")
288 """The amount of data written to the buffer."""
289 return self.write_pointer
292 """The Keep locator for this buffer's contents."""
293 if self._locator is None:
294 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
298 def synchronized(orig_func):
299 @functools.wraps(orig_func)
300 def synchronized_wrapper(self, *args, **kwargs):
302 return orig_func(self, *args, **kwargs)
303 return synchronized_wrapper
305 class NoopLock(object):
309 def __exit__(self, exc_type, exc_value, traceback):
312 def acquire(self, blocking=False):
322 def must_be_writable(orig_func):
323 @functools.wraps(orig_func)
324 def must_be_writable_wrapper(self, *args, **kwargs):
325 if self.sync_mode() == SYNC_READONLY:
326 raise IOError((errno.EROFS, "Collection is read only"))
327 return orig_func(self, *args, **kwargs)
328 return must_be_writable_wrapper
331 class BlockManager(object):
332 """BlockManager handles buffer blocks, background block uploads, and background
333 block prefetch for a Collection of ArvadosFiles.
336 def __init__(self, keep):
337 """keep: KeepClient object to use"""
339 self._bufferblocks = {}
340 self._put_queue = None
341 self._put_errors = None
342 self._put_threads = None
343 self._prefetch_queue = None
344 self._prefetch_threads = None
345 self.lock = threading.Lock()
346 self.prefetch_enabled = True
347 self.num_put_threads = 2
348 self.num_get_threads = 2
351 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
352 """Allocate a new, empty bufferblock in WRITABLE state and return it.
355 optional block identifier, otherwise one will be automatically assigned
358 optional capacity, otherwise will use default capacity
361 ArvadosFile that owns this block
365 blockid = "bufferblock%i" % len(self._bufferblocks)
366 bufferblock = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
367 self._bufferblocks[bufferblock.blockid] = bufferblock
371 def dup_block(self, blockid, owner):
372 """Create a new bufferblock in WRITABLE state, initialized with the content of
373 an existing bufferblock.
376 the block to copy. May be an existing buffer block id.
379 ArvadosFile that owns the new block
382 new_blockid = "bufferblock%i" % len(self._bufferblocks)
383 block = self._bufferblocks[blockid]
384 if block.state != BufferBlock.WRITABLE:
385 raise AssertionError("Can only duplicate a writable buffer block")
387 bufferblock = BufferBlock(new_blockid, block.size(), owner)
388 bufferblock.append(block.buffer_view[0:block.size()])
389 self._bufferblocks[bufferblock.blockid] = bufferblock
393 def is_bufferblock(self, locator):
394 return locator in self._bufferblocks
397 def stop_threads(self):
398 """Shut down and wait for background upload and download threads to finish."""
400 if self._put_threads is not None:
401 for t in self._put_threads:
402 self._put_queue.put(None)
403 for t in self._put_threads:
405 self._put_threads = None
406 self._put_queue = None
407 self._put_errors = None
409 if self._prefetch_threads is not None:
410 for t in self._prefetch_threads:
411 self._prefetch_queue.put(None)
412 for t in self._prefetch_threads:
414 self._prefetch_threads = None
415 self._prefetch_queue = None
417 def commit_bufferblock(self, block):
418 """Initiate a background upload of a bufferblock.
420 This will block if the upload queue is at capacity, otherwise it will
425 def commit_bufferblock_worker(self):
426 """Background uploader thread."""
430 bufferblock = self._put_queue.get()
431 if bufferblock is None:
433 bufferblock._locator = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
434 bufferblock.state = BufferBlock.COMMITTED
435 bufferblock.buffer_view = None
436 bufferblock.buffer_block = None
437 except Exception as e:
439 self._put_errors.put((bufferblock.locator(), e))
441 if self._put_queue is not None:
442 self._put_queue.task_done()
445 if self._put_threads is None:
446 # Start uploader threads.
448 # If we don't limit the Queue size, the upload queue can quickly
449 # grow to take up gigabytes of RAM if the writing process is
450 # generating data more quickly than it can be send to the Keep
453 # With two upload threads and a queue size of 2, this means up to 4
454 # blocks pending. If they are full 64 MiB blocks, that means up to
455 # 256 MiB of internal buffering, which is the same size as the
456 # default download block cache in KeepClient.
457 self._put_queue = Queue.Queue(maxsize=2)
458 self._put_errors = Queue.Queue()
460 self._put_threads = []
461 for i in xrange(0, self.num_put_threads):
462 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
463 self._put_threads.append(thread)
467 # Mark the block as PENDING so to disallow any more appends.
468 block.state = BufferBlock.PENDING
469 self._put_queue.put(block)
471 def get_bufferblock(self, locator):
473 return self._bufferblocks.get(locator)
475 def get_block_contents(self, locator, num_retries, cache_only=False):
478 First checks to see if the locator is a BufferBlock and return that, if
479 not, passes the request through to KeepClient.get().
483 if locator in self._bufferblocks:
484 bufferblock = self._bufferblocks[locator]
485 if bufferblock.state != BufferBlock.COMMITTED:
486 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
488 locator = bufferblock._locator
490 return self._keep.get_from_cache(locator)
492 return self._keep.get(locator, num_retries=num_retries)
494 def commit_all(self):
495 """Commit all outstanding buffer blocks.
497 Unlike commit_bufferblock(), this is a synchronous call, and will not
498 return until all buffer blocks are uploaded. Raises
499 KeepWriteError() if any blocks failed to upload.
503 items = self._bufferblocks.items()
506 if v.state == BufferBlock.WRITABLE:
507 self.commit_bufferblock(v)
510 if self._put_queue is not None:
511 self._put_queue.join()
512 if not self._put_errors.empty():
516 err.append(self._put_errors.get(False))
519 raise KeepWriteError("Error writing some blocks", err)
521 def block_prefetch(self, locator):
522 """Initiate a background download of a block.
524 This assumes that the underlying KeepClient implements a block cache,
525 so repeated requests for the same block will not result in repeated
526 downloads (unless the block is evicted from the cache.) This method
531 if not self.prefetch_enabled:
534 def block_prefetch_worker(self):
535 """The background downloader thread."""
538 b = self._prefetch_queue.get()
546 if locator in self._bufferblocks:
548 if self._prefetch_threads is None:
549 self._prefetch_queue = Queue.Queue()
550 self._prefetch_threads = []
551 for i in xrange(0, self.num_get_threads):
552 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
553 self._prefetch_threads.append(thread)
556 self._prefetch_queue.put(locator)
559 class ArvadosFile(object):
560 """ArvadosFile manages the underlying representation of a file in Keep as a
561 sequence of segments spanning a set of blocks, and implements random
564 This object may be accessed from multiple threads.
568 def __init__(self, parent, stream=[], segments=[]):
570 ArvadosFile constructor.
573 a list of Range objects representing a block stream
576 a list of Range objects representing segments
579 self._modified = True
581 self.lock = parent.root_collection().lock
583 self._add_segment(stream, s.locator, s.range_size)
584 self._current_bblock = None
587 return self.parent.sync_mode()
591 return copy.copy(self._segments)
594 def clone(self, new_parent):
595 """Make a copy of this file."""
596 cp = ArvadosFile(new_parent)
599 for r in self._segments:
601 if self.parent._my_block_manager().is_bufferblock(r.locator):
602 if r.locator not in map_loc:
603 bufferblock = get_bufferblock(r.locator)
604 if bufferblock.state == BufferBlock.COMITTED:
605 map_loc[r.locator] = bufferblock.locator()
607 map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp)
608 new_loc = map_loc[r.locator]
610 cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
616 def replace_contents(self, other):
617 """Replace segments of this file with segments from another `ArvadosFile` object."""
618 self._segments = other.segments()
619 self._modified = True
621 def __eq__(self, other):
624 if not isinstance(other, ArvadosFile):
629 return self._segments == s
631 def __ne__(self, other):
632 return not self.__eq__(other)
635 def set_unmodified(self):
636 """Clear the modified flag"""
637 self._modified = False
641 """Test the modified flag"""
642 return self._modified
646 def truncate(self, size):
647 """Shrink the size of the file.
649 If `size` is less than the size of the file, the file contents after
650 `size` will be discarded. If `size` is greater than the current size
651 of the file, an IOError will be raised.
654 if size < self.size():
656 for r in self._segments:
657 range_end = r.range_start+r.range_size
658 if r.range_start >= size:
659 # segment is past the trucate size, all done
661 elif size < range_end:
662 nr = Range(r.locator, r.range_start, size - r.range_start)
663 nr.segment_offset = r.segment_offset
669 self._segments = new_segs
670 self._modified = True
671 elif size > self.size():
672 raise IOError("truncate() does not support extending the file size")
674 def readfrom(self, offset, size, num_retries):
675 """Read upto `size` bytes from the file starting at `offset`."""
678 if size == 0 or offset >= self.size():
680 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
681 readsegs = locators_and_ranges(self._segments, offset, size)
684 self.parent._my_block_manager().block_prefetch(lr.locator)
688 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
690 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
695 def _repack_writes(self):
696 """Test if the buffer block has more data than is referenced by actual
699 This happens when a buffered write over-writes a file range written in
700 a previous buffered write. Re-pack the buffer block for efficiency
701 and to avoid leaking information.
704 segs = self._segments
706 # Sum up the segments to get the total bytes of the file referencing
707 # into the buffer block.
708 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
709 write_total = sum([s.range_size for s in bufferblock_segs])
711 if write_total < self._current_bblock.size():
712 # There is more data in the buffer block than is actually accounted for by segments, so
713 # re-pack into a new buffer by copying over to a new buffer block.
714 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
715 for t in bufferblock_segs:
716 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
717 t.segment_offset = new_bb.size() - t.range_size
719 self._current_bblock = new_bb
723 def writeto(self, offset, data, num_retries):
724 """Write `data` to the file starting at `offset`.
726 This will update existing bytes and/or extend the size of the file as
733 if offset > self.size():
734 raise ArgumentError("Offset is past the end of the file")
736 if len(data) > config.KEEP_BLOCK_SIZE:
737 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
739 self._modified = True
741 if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
742 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
744 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
745 self._repack_writes()
746 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
747 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
748 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
750 self._current_bblock.append(data)
752 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
756 def add_segment(self, blocks, pos, size):
757 """Add a segment to the end of the file, with `pos` and `offset` referencing a
758 section of the stream described by `blocks` (a list of Range objects)
761 self._add_segment(blocks, pos, size)
763 def _add_segment(self, blocks, pos, size):
764 """Internal implementation of add_segment."""
765 self._modified = True
766 for lr in locators_and_ranges(blocks, pos, size):
767 last = self._segments[-1] if self._segments else Range(0, 0, 0)
768 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
769 self._segments.append(r)
773 """Get the file size."""
775 n = self._segments[-1]
776 return n.range_start + n.range_size
780 class ArvadosFileReader(ArvadosFileReaderBase):
781 """Wraps ArvadosFile in a file-like object supporting reading only.
783 Be aware that this class is NOT thread safe as there is no locking around
784 updating file pointer.
788 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
789 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
790 self.arvadosfile = arvadosfile
793 return self.arvadosfile.size()
795 @FileLikeObjectBase._before_close
797 def read(self, size, num_retries=None):
798 """Read up to `size` bytes from the stream, starting at the current file position."""
799 data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
800 self._filepos += len(data)
803 @FileLikeObjectBase._before_close
805 def readfrom(self, offset, size, num_retries=None):
806 """Read up to `size` bytes from the stream, starting at the current file position."""
807 return self.arvadosfile.readfrom(offset, size, num_retries)
813 class ArvadosFileWriter(ArvadosFileReader):
814 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
816 Be aware that this class is NOT thread safe as there is no locking around
817 updating file pointer.
821 def __init__(self, arvadosfile, name, mode, num_retries=None):
822 super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
824 @FileLikeObjectBase._before_close
826 def write(self, data, num_retries=None):
827 if self.mode[0] == "a":
828 self.arvadosfile.writeto(self.size(), data, num_retries)
830 self.arvadosfile.writeto(self._filepos, data, num_retries)
831 self._filepos += len(data)
833 @FileLikeObjectBase._before_close
835 def writelines(self, seq, num_retries=None):
837 self.write(s, num_retries)
839 def truncate(self, size=None):
842 self.arvadosfile.truncate(size)
843 if self._filepos > self.size():
844 self._filepos = self.size()
847 if self.arvadosfile.parent.sync_mode() == SYNC_LIVE:
848 self.arvadosfile.parent.root_collection().save()