5 from ._ranges import locators_and_ranges, replace_range
6 from arvados.retry import retry_method
14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from _normalize_stream import normalize_stream
19 """Separate the stream name and file name in a /-separated stream path and
20 return a tuple (stream_name, file_name).
22 If no stream name is available, assume '.'.
26 stream_name, file_name = path.rsplit('/', 1)
27 except ValueError: # No / in string
28 stream_name, file_name = '.', path
29 return stream_name, file_name
31 class _FileLikeObjectBase(object):
32 def __init__(self, name, mode):
38 def _before_close(orig_func):
39 @functools.wraps(orig_func)
40 def before_close_wrapper(self, *args, **kwargs):
42 raise ValueError("I/O operation on closed stream file")
43 return orig_func(self, *args, **kwargs)
44 return before_close_wrapper
49 def __exit__(self, exc_type, exc_value, traceback):
60 class ArvadosFileReaderBase(_FileLikeObjectBase):
61 class _NameAttribute(str):
62 # The Python file API provides a plain .name attribute.
63 # Older SDK provided a name() method.
64 # This class provides both, for maximum compatibility.
68 def __init__(self, name, mode, num_retries=None):
69 super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
71 self.num_retries = num_retries
72 self._readline_cache = (None, None)
76 data = self.readline()
81 def decompressed_name(self):
82 return re.sub('\.(bz2|gz)$', '', self.name)
84 @FileLikeObjectBase._before_close
85 def seek(self, pos, whence=os.SEEK_CUR):
86 if whence == os.SEEK_CUR:
88 elif whence == os.SEEK_END:
90 self._filepos = min(max(pos, 0L), self.size())
95 @FileLikeObjectBase._before_close
97 def readall(self, size=2**20, num_retries=None):
99 data = self.read(size, num_retries=num_retries)
104 @FileLikeObjectBase._before_close
106 def readline(self, size=float('inf'), num_retries=None):
107 cache_pos, cache_data = self._readline_cache
108 if self.tell() == cache_pos:
112 data_size = len(data[-1])
113 while (data_size < size) and ('\n' not in data[-1]):
114 next_read = self.read(2 ** 20, num_retries=num_retries)
117 data.append(next_read)
118 data_size += len(next_read)
121 nextline_index = data.index('\n') + 1
123 nextline_index = len(data)
124 nextline_index = min(nextline_index, size)
125 self._readline_cache = (self.tell(), data[nextline_index:])
126 return data[:nextline_index]
128 @FileLikeObjectBase._before_close
130 def decompress(self, decompress, size, num_retries=None):
131 for segment in self.readall(size, num_retries):
132 data = decompress(segment)
136 @FileLikeObjectBase._before_close
138 def readall_decompressed(self, size=2**20, num_retries=None):
140 if self.name.endswith('.bz2'):
141 dc = bz2.BZ2Decompressor()
142 return self.decompress(dc.decompress, size,
143 num_retries=num_retries)
144 elif self.name.endswith('.gz'):
145 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
146 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
147 size, num_retries=num_retries)
149 return self.readall(size, num_retries=num_retries)
151 @FileLikeObjectBase._before_close
153 def readlines(self, sizehint=float('inf'), num_retries=None):
156 for s in self.readall(num_retries=num_retries):
159 if data_size >= sizehint:
161 return ''.join(data).splitlines(True)
164 raise NotImplementedError()
166 def read(self, size, num_retries=None):
167 raise NotImplementedError()
169 def readfrom(self, start, size, num_retries=None):
170 raise NotImplementedError()
173 class StreamFileReader(ArvadosFileReaderBase):
174 def __init__(self, stream, segments, name):
175 super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
176 self._stream = stream
177 self.segments = segments
179 def stream_name(self):
180 return self._stream.name()
183 n = self.segments[-1]
184 return n.range_start + n.range_size
186 @FileLikeObjectBase._before_close
188 def read(self, size, num_retries=None):
189 """Read up to 'size' bytes from the stream, starting at the current file position"""
194 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
196 lr = available_chunks[0]
197 data = self._stream._readfrom(lr.locator+lr.segment_offset,
199 num_retries=num_retries)
201 self._filepos += len(data)
204 @FileLikeObjectBase._before_close
206 def readfrom(self, start, size, num_retries=None):
207 """Read up to 'size' bytes from the stream, starting at 'start'"""
212 for lr in locators_and_ranges(self.segments, start, size):
213 data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
214 num_retries=num_retries))
217 def as_manifest(self):
219 for r in self.segments:
220 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
221 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
224 def synchronized(orig_func):
225 @functools.wraps(orig_func)
226 def synchronized_wrapper(self, *args, **kwargs):
228 return orig_func(self, *args, **kwargs)
229 return synchronized_wrapper
231 class _BufferBlock(object):
232 """A BufferBlock is a stand-in for a Keep block that is in the process of being
235 Writers can append to it, get the size, and compute the Keep locator.
236 There are three valid states:
242 Block is in the process of being uploaded to Keep, append is an error.
245 The block has been written to Keep, its internal buffer has been
246 released, fetching the block will fetch it via keep client (since we
247 discarded the internal copy), and identifiers referring to the BufferBlock
248 can be replaced with the block locator.
256 def __init__(self, blockid, starting_capacity, owner):
259 the identifier for this block
262 the initial buffer capacity
265 ArvadosFile that owns this block
268 self.blockid = blockid
269 self.buffer_block = bytearray(starting_capacity)
270 self.buffer_view = memoryview(self.buffer_block)
271 self.write_pointer = 0
272 self._state = _BufferBlock.WRITABLE
275 self.lock = threading.Lock()
278 def append(self, data):
279 """Append some data to the buffer.
281 Only valid if the block is in WRITABLE state. Implements an expanding
282 buffer, doubling capacity as needed to accomdate all the data.
285 if self._state == _BufferBlock.WRITABLE:
286 while (self.write_pointer+len(data)) > len(self.buffer_block):
287 new_buffer_block = bytearray(len(self.buffer_block) * 2)
288 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
289 self.buffer_block = new_buffer_block
290 self.buffer_view = memoryview(self.buffer_block)
291 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
292 self.write_pointer += len(data)
295 raise AssertionError("Buffer block is not writable")
298 def set_state(self, nextstate, loc=None):
299 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
300 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
301 self._state = nextstate
302 if self._state == _BufferBlock.COMMITTED:
303 bufferblock._locator = loc
304 bufferblock.buffer_view = None
305 bufferblock.buffer_block = None
307 raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
314 """The amount of data written to the buffer."""
315 return self.write_pointer
319 """The Keep locator for this buffer's contents."""
320 if self._locator is None:
321 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
325 class NoopLock(object):
329 def __exit__(self, exc_type, exc_value, traceback):
332 def acquire(self, blocking=False):
342 def must_be_writable(orig_func):
343 @functools.wraps(orig_func)
344 def must_be_writable_wrapper(self, *args, **kwargs):
345 if self.sync_mode() == SYNC_READONLY:
346 raise IOError((errno.EROFS, "Collection is read only"))
347 return orig_func(self, *args, **kwargs)
348 return must_be_writable_wrapper
351 class _BlockManager(object):
352 """BlockManager handles buffer blocks, background block uploads, and background
353 block prefetch for a Collection of ArvadosFiles.
356 def __init__(self, keep):
357 """keep: KeepClient object to use"""
359 self._bufferblocks = {}
360 self._put_queue = None
361 self._put_errors = None
362 self._put_threads = None
363 self._prefetch_queue = None
364 self._prefetch_threads = None
365 self.lock = threading.Lock()
366 self.prefetch_enabled = True
367 self.num_put_threads = 2
368 self.num_get_threads = 2
371 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
372 """Allocate a new, empty bufferblock in WRITABLE state and return it.
375 optional block identifier, otherwise one will be automatically assigned
378 optional capacity, otherwise will use default capacity
381 ArvadosFile that owns this block
385 blockid = "bufferblock%i" % len(self._bufferblocks)
386 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
387 self._bufferblocks[bufferblock.blockid] = bufferblock
391 def dup_block(self, block, owner):
392 """Create a new bufferblock in WRITABLE state, initialized with the content of
393 an existing bufferblock.
396 the buffer block to copy.
399 ArvadosFile that owns the new block
402 new_blockid = "bufferblock%i" % len(self._bufferblocks)
404 if block._state == _BufferBlock.COMMITTED:
405 raise AssertionError("Can only duplicate a writable or pending buffer block")
407 bufferblock = _BufferBlock(new_blockid, block.size(), owner)
408 bufferblock.append(block.buffer_view[0:block.size()])
409 self._bufferblocks[bufferblock.blockid] = bufferblock
413 def is_bufferblock(self, locator):
414 return locator in self._bufferblocks
417 def stop_threads(self):
418 """Shut down and wait for background upload and download threads to finish."""
420 if self._put_threads is not None:
421 for t in self._put_threads:
422 self._put_queue.put(None)
423 for t in self._put_threads:
425 self._put_threads = None
426 self._put_queue = None
427 self._put_errors = None
429 if self._prefetch_threads is not None:
430 for t in self._prefetch_threads:
431 self._prefetch_queue.put(None)
432 for t in self._prefetch_threads:
434 self._prefetch_threads = None
435 self._prefetch_queue = None
437 def commit_bufferblock(self, block):
438 """Initiate a background upload of a bufferblock.
440 This will block if the upload queue is at capacity, otherwise it will
445 def commit_bufferblock_worker(self):
446 """Background uploader thread."""
450 bufferblock = self._put_queue.get()
451 if bufferblock is None:
453 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
454 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
456 except Exception as e:
458 self._put_errors.put((bufferblock.locator(), e))
460 if self._put_queue is not None:
461 self._put_queue.task_done()
464 if self._put_threads is None:
465 # Start uploader threads.
467 # If we don't limit the Queue size, the upload queue can quickly
468 # grow to take up gigabytes of RAM if the writing process is
469 # generating data more quickly than it can be send to the Keep
472 # With two upload threads and a queue size of 2, this means up to 4
473 # blocks pending. If they are full 64 MiB blocks, that means up to
474 # 256 MiB of internal buffering, which is the same size as the
475 # default download block cache in KeepClient.
476 self._put_queue = Queue.Queue(maxsize=2)
477 self._put_errors = Queue.Queue()
479 self._put_threads = []
480 for i in xrange(0, self.num_put_threads):
481 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
482 self._put_threads.append(thread)
486 # Mark the block as PENDING so to disallow any more appends.
487 block.set_state(_BufferBlock.PENDING)
488 self._put_queue.put(block)
491 def get_bufferblock(self, locator):
492 return self._bufferblocks.get(locator)
494 def get_block_contents(self, locator, num_retries, cache_only=False):
497 First checks to see if the locator is a BufferBlock and return that, if
498 not, passes the request through to KeepClient.get().
502 if locator in self._bufferblocks:
503 bufferblock = self._bufferblocks[locator]
504 if bufferblock.state() != _BufferBlock.COMMITTED:
505 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
507 locator = bufferblock._locator
509 return self._keep.get_from_cache(locator)
511 return self._keep.get(locator, num_retries=num_retries)
513 def commit_all(self):
514 """Commit all outstanding buffer blocks.
516 Unlike commit_bufferblock(), this is a synchronous call, and will not
517 return until all buffer blocks are uploaded. Raises
518 KeepWriteError() if any blocks failed to upload.
522 items = self._bufferblocks.items()
525 if v.state() == _BufferBlock.WRITABLE:
526 self.commit_bufferblock(v)
529 if self._put_queue is not None:
530 self._put_queue.join()
532 if not self._put_errors.empty():
536 err.append(self._put_errors.get(False))
539 raise KeepWriteError("Error writing some blocks", err)
541 def block_prefetch(self, locator):
542 """Initiate a background download of a block.
544 This assumes that the underlying KeepClient implements a block cache,
545 so repeated requests for the same block will not result in repeated
546 downloads (unless the block is evicted from the cache.) This method
551 if not self.prefetch_enabled:
554 def block_prefetch_worker(self):
555 """The background downloader thread."""
558 b = self._prefetch_queue.get()
566 if locator in self._bufferblocks:
568 if self._prefetch_threads is None:
569 self._prefetch_queue = Queue.Queue()
570 self._prefetch_threads = []
571 for i in xrange(0, self.num_get_threads):
572 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
573 self._prefetch_threads.append(thread)
576 self._prefetch_queue.put(locator)
579 class ArvadosFile(object):
580 """ArvadosFile manages the underlying representation of a file in Keep as a
581 sequence of segments spanning a set of blocks, and implements random
584 This object may be accessed from multiple threads.
588 def __init__(self, parent, stream=[], segments=[]):
590 ArvadosFile constructor.
593 a list of Range objects representing a block stream
596 a list of Range objects representing segments
599 self._modified = True
601 self.lock = parent.root_collection().lock
603 self._add_segment(stream, s.locator, s.range_size)
604 self._current_bblock = None
607 return self.parent.sync_mode()
611 return copy.copy(self._segments)
614 def clone(self, new_parent):
615 """Make a copy of this file."""
616 cp = ArvadosFile(new_parent)
617 cp.replace_contents(self)
622 def replace_contents(self, other):
623 """Replace segments of this file with segments from another `ArvadosFile` object."""
627 for other_segment in other.segments():
628 new_loc = other_segment.locator
629 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
630 if other_segment.locator not in map_loc:
631 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
632 if bufferblock.state() != _BufferBlock.WRITABLE:
633 map_loc[other_segment.locator] = bufferblock.locator()
635 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
636 new_loc = map_loc[other_segment.locator]
638 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
640 self._modified = True
642 def __eq__(self, other):
645 if not isinstance(other, ArvadosFile):
648 othersegs = other.segments()
650 if len(self._segments) != len(othersegs):
652 for i in xrange(0, len(othersegs)):
653 seg1 = self._segments[i]
658 if self.parent._my_block_manager().is_bufferblock(loc1):
659 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
661 if other.parent._my_block_manager().is_bufferblock(loc2):
662 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
664 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
665 seg1.range_start != seg2.range_start or
666 seg1.range_size != seg2.range_size or
667 seg1.segment_offset != seg2.segment_offset):
672 def __ne__(self, other):
673 return not self.__eq__(other)
676 def set_unmodified(self):
677 """Clear the modified flag"""
678 self._modified = False
682 """Test the modified flag"""
683 return self._modified
687 def truncate(self, size):
688 """Shrink the size of the file.
690 If `size` is less than the size of the file, the file contents after
691 `size` will be discarded. If `size` is greater than the current size
692 of the file, an IOError will be raised.
695 if size < self.size():
697 for r in self._segments:
698 range_end = r.range_start+r.range_size
699 if r.range_start >= size:
700 # segment is past the trucate size, all done
702 elif size < range_end:
703 nr = Range(r.locator, r.range_start, size - r.range_start)
704 nr.segment_offset = r.segment_offset
710 self._segments = new_segs
711 self._modified = True
712 elif size > self.size():
713 raise IOError("truncate() does not support extending the file size")
715 def readfrom(self, offset, size, num_retries):
716 """Read upto `size` bytes from the file starting at `offset`."""
719 if size == 0 or offset >= self.size():
721 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
722 readsegs = locators_and_ranges(self._segments, offset, size)
725 self.parent._my_block_manager().block_prefetch(lr.locator)
729 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
731 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
736 def _repack_writes(self):
737 """Test if the buffer block has more data than is referenced by actual
740 This happens when a buffered write over-writes a file range written in
741 a previous buffered write. Re-pack the buffer block for efficiency
742 and to avoid leaking information.
745 segs = self._segments
747 # Sum up the segments to get the total bytes of the file referencing
748 # into the buffer block.
749 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
750 write_total = sum([s.range_size for s in bufferblock_segs])
752 if write_total < self._current_bblock.size():
753 # There is more data in the buffer block than is actually accounted for by segments, so
754 # re-pack into a new buffer by copying over to a new buffer block.
755 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
756 for t in bufferblock_segs:
757 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
758 t.segment_offset = new_bb.size() - t.range_size
760 self._current_bblock = new_bb
764 def writeto(self, offset, data, num_retries):
765 """Write `data` to the file starting at `offset`.
767 This will update existing bytes and/or extend the size of the file as
774 if offset > self.size():
775 raise ArgumentError("Offset is past the end of the file")
777 if len(data) > config.KEEP_BLOCK_SIZE:
778 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
780 self._modified = True
782 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
783 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
785 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
786 self._repack_writes()
787 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
788 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
789 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
791 self._current_bblock.append(data)
793 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
797 def add_segment(self, blocks, pos, size):
798 """Add a segment to the end of the file, with `pos` and `offset` referencing a
799 section of the stream described by `blocks` (a list of Range objects)
802 self._add_segment(blocks, pos, size)
804 def _add_segment(self, blocks, pos, size):
805 """Internal implementation of add_segment."""
806 self._modified = True
807 for lr in locators_and_ranges(blocks, pos, size):
808 last = self._segments[-1] if self._segments else Range(0, 0, 0)
809 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
810 self._segments.append(r)
814 """Get the file size."""
816 n = self._segments[-1]
817 return n.range_start + n.range_size
823 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
826 for segment in item.segments:
827 loc = segment.locator
828 if loc.startswith("bufferblock"):
829 loc = item._bufferblocks[loc].calculate_locator()
830 if portable_locators:
831 loc = KeepLocator(loc).stripped()
832 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
833 segment.segment_offset, segment.range_size))
834 stream[stream_name] = filestream
835 buf += ' '.join(normalize_stream(stream_name, stream))
840 class ArvadosFileReader(ArvadosFileReaderBase):
841 """Wraps ArvadosFile in a file-like object supporting reading only.
843 Be aware that this class is NOT thread safe as there is no locking around
844 updating file pointer.
848 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
849 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
850 self.arvadosfile = arvadosfile
853 return self.arvadosfile.size()
855 @FileLikeObjectBase._before_close
857 def read(self, size, num_retries=None):
858 """Read up to `size` bytes from the stream, starting at the current file position."""
859 data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
860 self._filepos += len(data)
863 @FileLikeObjectBase._before_close
865 def readfrom(self, offset, size, num_retries=None):
866 """Read up to `size` bytes from the stream, starting at the current file position."""
867 return self.arvadosfile.readfrom(offset, size, num_retries)
873 class ArvadosFileWriter(ArvadosFileReader):
874 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
876 Be aware that this class is NOT thread safe as there is no locking around
877 updating file pointer.
881 def __init__(self, arvadosfile, name, mode, num_retries=None):
882 super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
884 @FileLikeObjectBase._before_close
886 def write(self, data, num_retries=None):
887 if self.mode[0] == "a":
888 self.arvadosfile.writeto(self.size(), data, num_retries)
890 self.arvadosfile.writeto(self._filepos, data, num_retries)
891 self._filepos += len(data)
893 @FileLikeObjectBase._before_close
895 def writelines(self, seq, num_retries=None):
897 self.write(s, num_retries)
899 def truncate(self, size=None):
902 self.arvadosfile.truncate(size)
903 if self._filepos > self.size():
904 self._filepos = self.size()
907 if self.arvadosfile.parent.sync_mode() == SYNC_LIVE:
908 self.arvadosfile.parent.root_collection().save()