5 from ._ranges import locators_and_ranges, replace_range, Range
6 from arvados.retry import retry_method
13 from .errors import KeepWriteError, AssertionError
14 from .keep import KeepLocator
15 from _normalize_stream import normalize_stream
18 """split(path) -> streamname, filename
20 Separate the stream name and file name in a /-separated stream path and
21 return a tuple (stream_name, file_name). If no stream name is available,
26 stream_name, file_name = path.rsplit('/', 1)
27 except ValueError: # No / in string
28 stream_name, file_name = '.', path
29 return stream_name, file_name
31 class _FileLikeObjectBase(object):
32 def __init__(self, name, mode):
38 def _before_close(orig_func):
39 @functools.wraps(orig_func)
40 def before_close_wrapper(self, *args, **kwargs):
42 raise ValueError("I/O operation on closed stream file")
43 return orig_func(self, *args, **kwargs)
44 return before_close_wrapper
49 def __exit__(self, exc_type, exc_value, traceback):
60 class ArvadosFileReaderBase(_FileLikeObjectBase):
61 def __init__(self, name, mode, num_retries=None):
62 super(ArvadosFileReaderBase, self).__init__(name, mode)
64 self.num_retries = num_retries
65 self._readline_cache = (None, None)
69 data = self.readline()
74 def decompressed_name(self):
75 return re.sub('\.(bz2|gz)$', '', self.name)
77 @_FileLikeObjectBase._before_close
78 def seek(self, pos, whence=os.SEEK_CUR):
79 if whence == os.SEEK_CUR:
81 elif whence == os.SEEK_END:
83 self._filepos = min(max(pos, 0L), self.size())
88 @_FileLikeObjectBase._before_close
90 def readall(self, size=2**20, num_retries=None):
92 data = self.read(size, num_retries=num_retries)
97 @_FileLikeObjectBase._before_close
99 def readline(self, size=float('inf'), num_retries=None):
100 cache_pos, cache_data = self._readline_cache
101 if self.tell() == cache_pos:
105 data_size = len(data[-1])
106 while (data_size < size) and ('\n' not in data[-1]):
107 next_read = self.read(2 ** 20, num_retries=num_retries)
110 data.append(next_read)
111 data_size += len(next_read)
114 nextline_index = data.index('\n') + 1
116 nextline_index = len(data)
117 nextline_index = min(nextline_index, size)
118 self._readline_cache = (self.tell(), data[nextline_index:])
119 return data[:nextline_index]
121 @_FileLikeObjectBase._before_close
123 def decompress(self, decompress, size, num_retries=None):
124 for segment in self.readall(size, num_retries):
125 data = decompress(segment)
129 @_FileLikeObjectBase._before_close
131 def readall_decompressed(self, size=2**20, num_retries=None):
133 if self.name.endswith('.bz2'):
134 dc = bz2.BZ2Decompressor()
135 return self.decompress(dc.decompress, size,
136 num_retries=num_retries)
137 elif self.name.endswith('.gz'):
138 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
139 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
140 size, num_retries=num_retries)
142 return self.readall(size, num_retries=num_retries)
144 @_FileLikeObjectBase._before_close
146 def readlines(self, sizehint=float('inf'), num_retries=None):
149 for s in self.readall(num_retries=num_retries):
152 if data_size >= sizehint:
154 return ''.join(data).splitlines(True)
157 raise NotImplementedError()
159 def read(self, size, num_retries=None):
160 raise NotImplementedError()
162 def readfrom(self, start, size, num_retries=None):
163 raise NotImplementedError()
166 class StreamFileReader(ArvadosFileReaderBase):
167 class _NameAttribute(str):
168 # The Python file API provides a plain .name attribute.
169 # Older SDK provided a name() method.
170 # This class provides both, for maximum compatibility.
174 def __init__(self, stream, segments, name):
175 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
176 self._stream = stream
177 self.segments = segments
179 def stream_name(self):
180 return self._stream.name()
183 n = self.segments[-1]
184 return n.range_start + n.range_size
186 @_FileLikeObjectBase._before_close
188 def read(self, size, num_retries=None):
189 """Read up to 'size' bytes from the stream, starting at the current file position"""
194 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
196 lr = available_chunks[0]
197 data = self._stream.readfrom(lr.locator+lr.segment_offset,
199 num_retries=num_retries)
201 self._filepos += len(data)
204 @_FileLikeObjectBase._before_close
206 def readfrom(self, start, size, num_retries=None):
207 """Read up to 'size' bytes from the stream, starting at 'start'"""
212 for lr in locators_and_ranges(self.segments, start, size):
213 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
214 num_retries=num_retries))
217 def as_manifest(self):
219 for r in self.segments:
220 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
221 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
224 def synchronized(orig_func):
225 @functools.wraps(orig_func)
226 def synchronized_wrapper(self, *args, **kwargs):
228 return orig_func(self, *args, **kwargs)
229 return synchronized_wrapper
231 class _BufferBlock(object):
232 """A stand-in for a Keep block that is in the process of being written.
234 Writers can append to it, get the size, and compute the Keep locator.
235 There are three valid states:
241 Block is in the process of being uploaded to Keep, append is an error.
244 The block has been written to Keep, its internal buffer has been
245 released, fetching the block will fetch it via keep client (since we
246 discarded the internal copy), and identifiers referring to the BufferBlock
247 can be replaced with the block locator.
255 def __init__(self, blockid, starting_capacity, owner):
258 the identifier for this block
261 the initial buffer capacity
264 ArvadosFile that owns this block
267 self.blockid = blockid
268 self.buffer_block = bytearray(starting_capacity)
269 self.buffer_view = memoryview(self.buffer_block)
270 self.write_pointer = 0
271 self._state = _BufferBlock.WRITABLE
274 self.lock = threading.Lock()
277 def append(self, data):
278 """Append some data to the buffer.
280 Only valid if the block is in WRITABLE state. Implements an expanding
281 buffer, doubling capacity as needed to accomdate all the data.
284 if self._state == _BufferBlock.WRITABLE:
285 while (self.write_pointer+len(data)) > len(self.buffer_block):
286 new_buffer_block = bytearray(len(self.buffer_block) * 2)
287 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
288 self.buffer_block = new_buffer_block
289 self.buffer_view = memoryview(self.buffer_block)
290 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
291 self.write_pointer += len(data)
294 raise AssertionError("Buffer block is not writable")
297 def set_state(self, nextstate, loc=None):
298 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
299 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
300 self._state = nextstate
301 if self._state == _BufferBlock.COMMITTED:
303 self.buffer_view = None
304 self.buffer_block = None
306 raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
313 """The amount of data written to the buffer."""
314 return self.write_pointer
318 """The Keep locator for this buffer's contents."""
319 if self._locator is None:
320 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
324 def clone(self, new_blockid, owner):
325 if self._state == _BufferBlock.COMMITTED:
326 raise AssertionError("Can only duplicate a writable or pending buffer block")
327 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
328 bufferblock.append(self.buffer_view[0:self.size()])
332 class NoopLock(object):
336 def __exit__(self, exc_type, exc_value, traceback):
339 def acquire(self, blocking=False):
346 def must_be_writable(orig_func):
347 @functools.wraps(orig_func)
348 def must_be_writable_wrapper(self, *args, **kwargs):
349 if not self.writable():
350 raise IOError((errno.EROFS, "Collection must be writable."))
351 return orig_func(self, *args, **kwargs)
352 return must_be_writable_wrapper
355 class _BlockManager(object):
356 """BlockManager handles buffer blocks.
358 Also handles background block uploads, and background block prefetch for a
359 Collection of ArvadosFiles.
362 def __init__(self, keep):
363 """keep: KeepClient object to use"""
365 self._bufferblocks = {}
366 self._put_queue = None
367 self._put_errors = None
368 self._put_threads = None
369 self._prefetch_queue = None
370 self._prefetch_threads = None
371 self.lock = threading.Lock()
372 self.prefetch_enabled = True
373 self.num_put_threads = 2
374 self.num_get_threads = 2
377 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
378 """Allocate a new, empty bufferblock in WRITABLE state and return it.
381 optional block identifier, otherwise one will be automatically assigned
384 optional capacity, otherwise will use default capacity
387 ArvadosFile that owns this block
391 blockid = "bufferblock%i" % len(self._bufferblocks)
392 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
393 self._bufferblocks[bufferblock.blockid] = bufferblock
397 def dup_block(self, block, owner):
398 """Create a new bufferblock initialized with the content of an existing bufferblock.
401 the buffer block to copy.
404 ArvadosFile that owns the new block
407 new_blockid = "bufferblock%i" % len(self._bufferblocks)
408 bufferblock = block.clone(new_blockid, owner)
409 self._bufferblocks[bufferblock.blockid] = bufferblock
413 def is_bufferblock(self, locator):
414 return locator in self._bufferblocks
417 def stop_threads(self):
418 """Shut down and wait for background upload and download threads to finish."""
420 if self._put_threads is not None:
421 for t in self._put_threads:
422 self._put_queue.put(None)
423 for t in self._put_threads:
425 self._put_threads = None
426 self._put_queue = None
427 self._put_errors = None
429 if self._prefetch_threads is not None:
430 for t in self._prefetch_threads:
431 self._prefetch_queue.put(None)
432 for t in self._prefetch_threads:
434 self._prefetch_threads = None
435 self._prefetch_queue = None
437 def commit_bufferblock(self, block):
438 """Initiate a background upload of a bufferblock.
440 This will block if the upload queue is at capacity, otherwise it will
445 def commit_bufferblock_worker(self):
446 """Background uploader thread."""
450 bufferblock = self._put_queue.get()
451 if bufferblock is None:
453 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
454 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
456 except Exception as e:
457 self._put_errors.put((bufferblock.locator(), e))
459 if self._put_queue is not None:
460 self._put_queue.task_done()
463 if self._put_threads is None:
464 # Start uploader threads.
466 # If we don't limit the Queue size, the upload queue can quickly
467 # grow to take up gigabytes of RAM if the writing process is
468 # generating data more quickly than it can be send to the Keep
471 # With two upload threads and a queue size of 2, this means up to 4
472 # blocks pending. If they are full 64 MiB blocks, that means up to
473 # 256 MiB of internal buffering, which is the same size as the
474 # default download block cache in KeepClient.
475 self._put_queue = Queue.Queue(maxsize=2)
476 self._put_errors = Queue.Queue()
478 self._put_threads = []
479 for i in xrange(0, self.num_put_threads):
480 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
481 self._put_threads.append(thread)
485 # Mark the block as PENDING so to disallow any more appends.
486 block.set_state(_BufferBlock.PENDING)
487 self._put_queue.put(block)
490 def get_bufferblock(self, locator):
491 return self._bufferblocks.get(locator)
493 def get_block_contents(self, locator, num_retries, cache_only=False):
496 First checks to see if the locator is a BufferBlock and return that, if
497 not, passes the request through to KeepClient.get().
501 if locator in self._bufferblocks:
502 bufferblock = self._bufferblocks[locator]
503 if bufferblock.state() != _BufferBlock.COMMITTED:
504 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
506 locator = bufferblock._locator
508 return self._keep.get_from_cache(locator)
510 return self._keep.get(locator, num_retries=num_retries)
512 def commit_all(self):
513 """Commit all outstanding buffer blocks.
515 Unlike commit_bufferblock(), this is a synchronous call, and will not
516 return until all buffer blocks are uploaded. Raises
517 KeepWriteError() if any blocks failed to upload.
521 items = self._bufferblocks.items()
524 if v.state() == _BufferBlock.WRITABLE:
525 self.commit_bufferblock(v)
528 if self._put_queue is not None:
529 self._put_queue.join()
531 if not self._put_errors.empty():
535 err.append(self._put_errors.get(False))
538 raise KeepWriteError("Error writing some blocks", err, label="block")
540 def block_prefetch(self, locator):
541 """Initiate a background download of a block.
543 This assumes that the underlying KeepClient implements a block cache,
544 so repeated requests for the same block will not result in repeated
545 downloads (unless the block is evicted from the cache.) This method
550 if not self.prefetch_enabled:
553 def block_prefetch_worker(self):
554 """The background downloader thread."""
557 b = self._prefetch_queue.get()
565 if locator in self._bufferblocks:
567 if self._prefetch_threads is None:
568 self._prefetch_queue = Queue.Queue()
569 self._prefetch_threads = []
570 for i in xrange(0, self.num_get_threads):
571 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
572 self._prefetch_threads.append(thread)
575 self._prefetch_queue.put(locator)
578 class ArvadosFile(object):
579 """Represent a file in a Collection.
581 ArvadosFile manages the underlying representation of a file in Keep as a
582 sequence of segments spanning a set of blocks, and implements random
585 This object may be accessed from multiple threads.
589 def __init__(self, parent, stream=[], segments=[]):
591 ArvadosFile constructor.
594 a list of Range objects representing a block stream
597 a list of Range objects representing segments
600 self._modified = True
602 self.lock = parent.root_collection().lock
604 self._add_segment(stream, s.locator, s.range_size)
605 self._current_bblock = None
608 return self.parent.writable()
612 return copy.copy(self._segments)
615 def clone(self, new_parent):
616 """Make a copy of this file."""
617 cp = ArvadosFile(new_parent)
618 cp.replace_contents(self)
623 def replace_contents(self, other):
624 """Replace segments of this file with segments from another `ArvadosFile` object."""
628 for other_segment in other.segments():
629 new_loc = other_segment.locator
630 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
631 if other_segment.locator not in map_loc:
632 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
633 if bufferblock.state() != _BufferBlock.WRITABLE:
634 map_loc[other_segment.locator] = bufferblock.locator()
636 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
637 new_loc = map_loc[other_segment.locator]
639 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
641 self._modified = True
643 def __eq__(self, other):
646 if not isinstance(other, ArvadosFile):
649 othersegs = other.segments()
651 if len(self._segments) != len(othersegs):
653 for i in xrange(0, len(othersegs)):
654 seg1 = self._segments[i]
659 if self.parent._my_block_manager().is_bufferblock(loc1):
660 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
662 if other.parent._my_block_manager().is_bufferblock(loc2):
663 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
665 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
666 seg1.range_start != seg2.range_start or
667 seg1.range_size != seg2.range_size or
668 seg1.segment_offset != seg2.segment_offset):
673 def __ne__(self, other):
674 return not self.__eq__(other)
677 def set_unmodified(self):
678 """Clear the modified flag"""
679 self._modified = False
683 """Test the modified flag"""
684 return self._modified
688 def truncate(self, size):
689 """Shrink the size of the file.
691 If `size` is less than the size of the file, the file contents after
692 `size` will be discarded. If `size` is greater than the current size
693 of the file, an IOError will be raised.
696 if size < self.size():
698 for r in self._segments:
699 range_end = r.range_start+r.range_size
700 if r.range_start >= size:
701 # segment is past the trucate size, all done
703 elif size < range_end:
704 nr = Range(r.locator, r.range_start, size - r.range_start)
705 nr.segment_offset = r.segment_offset
711 self._segments = new_segs
712 self._modified = True
713 elif size > self.size():
714 raise IOError("truncate() does not support extending the file size")
716 def readfrom(self, offset, size, num_retries):
717 """Read upto `size` bytes from the file starting at `offset`."""
720 if size == 0 or offset >= self.size():
722 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
723 readsegs = locators_and_ranges(self._segments, offset, size)
726 self.parent._my_block_manager().block_prefetch(lr.locator)
730 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
732 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
737 def _repack_writes(self):
738 """Test if the buffer block has more data than actual segments.
740 This happens when a buffered write over-writes a file range written in
741 a previous buffered write. Re-pack the buffer block for efficiency
742 and to avoid leaking information.
745 segs = self._segments
747 # Sum up the segments to get the total bytes of the file referencing
748 # into the buffer block.
749 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
750 write_total = sum([s.range_size for s in bufferblock_segs])
752 if write_total < self._current_bblock.size():
753 # There is more data in the buffer block than is actually accounted for by segments, so
754 # re-pack into a new buffer by copying over to a new buffer block.
755 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
756 for t in bufferblock_segs:
757 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
758 t.segment_offset = new_bb.size() - t.range_size
760 self._current_bblock = new_bb
764 def writeto(self, offset, data, num_retries):
765 """Write `data` to the file starting at `offset`.
767 This will update existing bytes and/or extend the size of the file as
774 if offset > self.size():
775 raise ArgumentError("Offset is past the end of the file")
777 if len(data) > config.KEEP_BLOCK_SIZE:
778 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
780 self._modified = True
782 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
783 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
785 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
786 self._repack_writes()
787 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
788 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
789 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
791 self._current_bblock.append(data)
793 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
797 if self._current_bblock:
798 self._repack_writes()
799 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
803 def add_segment(self, blocks, pos, size):
804 """Add a segment to the end of the file.
806 `pos` and `offset` reference a section of the stream described by
807 `blocks` (a list of Range objects)
810 self._add_segment(blocks, pos, size)
812 def _add_segment(self, blocks, pos, size):
813 """Internal implementation of add_segment."""
814 self._modified = True
815 for lr in locators_and_ranges(blocks, pos, size):
816 last = self._segments[-1] if self._segments else Range(0, 0, 0)
817 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
818 self._segments.append(r)
822 """Get the file size."""
824 n = self._segments[-1]
825 return n.range_start + n.range_size
830 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
833 for segment in self.segments:
834 loc = segment.locator
835 if loc.startswith("bufferblock"):
836 loc = self._bufferblocks[loc].calculate_locator()
837 if portable_locators:
838 loc = KeepLocator(loc).stripped()
839 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
840 segment.segment_offset, segment.range_size))
841 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
846 class ArvadosFileReader(ArvadosFileReaderBase):
847 """Wraps ArvadosFile in a file-like object supporting reading only.
849 Be aware that this class is NOT thread safe as there is no locking around
850 updating file pointer.
854 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
855 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
856 self.arvadosfile = arvadosfile
859 return self.arvadosfile.size()
861 def stream_name(self):
862 return self.arvadosfile.parent.stream_name()
864 @_FileLikeObjectBase._before_close
866 def read(self, size, num_retries=None):
867 """Read up to `size` bytes from the stream, starting at the current file position."""
868 data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
869 self._filepos += len(data)
872 @_FileLikeObjectBase._before_close
874 def readfrom(self, offset, size, num_retries=None):
875 """Read up to `size` bytes from the stream, starting at the current file position."""
876 return self.arvadosfile.readfrom(offset, size, num_retries)
882 class ArvadosFileWriter(ArvadosFileReader):
883 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
885 Be aware that this class is NOT thread safe as there is no locking around
886 updating file pointer.
890 def __init__(self, arvadosfile, name, mode, num_retries=None):
891 super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
893 @_FileLikeObjectBase._before_close
895 def write(self, data, num_retries=None):
896 if self.mode[0] == "a":
897 self.arvadosfile.writeto(self.size(), data, num_retries)
899 self.arvadosfile.writeto(self._filepos, data, num_retries)
900 self._filepos += len(data)
902 @_FileLikeObjectBase._before_close
904 def writelines(self, seq, num_retries=None):
906 self.write(s, num_retries)
908 @_FileLikeObjectBase._before_close
909 def truncate(self, size=None):
912 self.arvadosfile.truncate(size)
913 if self._filepos > self.size():
914 self._filepos = self.size()
916 @_FileLikeObjectBase._before_close
918 self.arvadosfile.flush()
923 super(ArvadosFileWriter, self).close()