14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
22 _logger = logging.getLogger('arvados.arvfile')
25 """split(path) -> streamname, filename
27 Separate the stream name and file name in a /-separated stream path and
28 return a tuple (stream_name, file_name). If no stream name is available,
33 stream_name, file_name = path.rsplit('/', 1)
34 except ValueError: # No / in string
35 stream_name, file_name = '.', path
36 return stream_name, file_name
38 class _FileLikeObjectBase(object):
39 def __init__(self, name, mode):
45 def _before_close(orig_func):
46 @functools.wraps(orig_func)
47 def before_close_wrapper(self, *args, **kwargs):
49 raise ValueError("I/O operation on closed stream file")
50 return orig_func(self, *args, **kwargs)
51 return before_close_wrapper
56 def __exit__(self, exc_type, exc_value, traceback):
67 class ArvadosFileReaderBase(_FileLikeObjectBase):
68 def __init__(self, name, mode, num_retries=None):
69 super(ArvadosFileReaderBase, self).__init__(name, mode)
71 self.num_retries = num_retries
72 self._readline_cache = (None, None)
76 data = self.readline()
81 def decompressed_name(self):
82 return re.sub('\.(bz2|gz)$', '', self.name)
84 @_FileLikeObjectBase._before_close
85 def seek(self, pos, whence=os.SEEK_SET):
86 if whence == os.SEEK_CUR:
88 elif whence == os.SEEK_END:
90 self._filepos = min(max(pos, 0L), self.size())
95 @_FileLikeObjectBase._before_close
97 def readall(self, size=2**20, num_retries=None):
99 data = self.read(size, num_retries=num_retries)
104 @_FileLikeObjectBase._before_close
106 def readline(self, size=float('inf'), num_retries=None):
107 cache_pos, cache_data = self._readline_cache
108 if self.tell() == cache_pos:
112 data_size = len(data[-1])
113 while (data_size < size) and ('\n' not in data[-1]):
114 next_read = self.read(2 ** 20, num_retries=num_retries)
117 data.append(next_read)
118 data_size += len(next_read)
121 nextline_index = data.index('\n') + 1
123 nextline_index = len(data)
124 nextline_index = min(nextline_index, size)
125 self._readline_cache = (self.tell(), data[nextline_index:])
126 return data[:nextline_index]
128 @_FileLikeObjectBase._before_close
130 def decompress(self, decompress, size, num_retries=None):
131 for segment in self.readall(size, num_retries):
132 data = decompress(segment)
136 @_FileLikeObjectBase._before_close
138 def readall_decompressed(self, size=2**20, num_retries=None):
140 if self.name.endswith('.bz2'):
141 dc = bz2.BZ2Decompressor()
142 return self.decompress(dc.decompress, size,
143 num_retries=num_retries)
144 elif self.name.endswith('.gz'):
145 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
146 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
147 size, num_retries=num_retries)
149 return self.readall(size, num_retries=num_retries)
151 @_FileLikeObjectBase._before_close
153 def readlines(self, sizehint=float('inf'), num_retries=None):
156 for s in self.readall(num_retries=num_retries):
159 if data_size >= sizehint:
161 return ''.join(data).splitlines(True)
164 raise NotImplementedError()
166 def read(self, size, num_retries=None):
167 raise NotImplementedError()
169 def readfrom(self, start, size, num_retries=None):
170 raise NotImplementedError()
173 class StreamFileReader(ArvadosFileReaderBase):
174 class _NameAttribute(str):
175 # The Python file API provides a plain .name attribute.
176 # Older SDK provided a name() method.
177 # This class provides both, for maximum compatibility.
181 def __init__(self, stream, segments, name):
182 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
183 self._stream = stream
184 self.segments = segments
186 def stream_name(self):
187 return self._stream.name()
190 n = self.segments[-1]
191 return n.range_start + n.range_size
193 @_FileLikeObjectBase._before_close
195 def read(self, size, num_retries=None):
196 """Read up to 'size' bytes from the stream, starting at the current file position"""
201 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
203 lr = available_chunks[0]
204 data = self._stream.readfrom(lr.locator+lr.segment_offset,
206 num_retries=num_retries)
208 self._filepos += len(data)
211 @_FileLikeObjectBase._before_close
213 def readfrom(self, start, size, num_retries=None):
214 """Read up to 'size' bytes from the stream, starting at 'start'"""
219 for lr in locators_and_ranges(self.segments, start, size):
220 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
221 num_retries=num_retries))
224 def as_manifest(self):
226 for r in self.segments:
227 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
228 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
231 def synchronized(orig_func):
232 @functools.wraps(orig_func)
233 def synchronized_wrapper(self, *args, **kwargs):
235 return orig_func(self, *args, **kwargs)
236 return synchronized_wrapper
238 class _BufferBlock(object):
239 """A stand-in for a Keep block that is in the process of being written.
241 Writers can append to it, get the size, and compute the Keep locator.
242 There are three valid states:
248 Block is in the process of being uploaded to Keep, append is an error.
251 The block has been written to Keep, its internal buffer has been
252 released, fetching the block will fetch it via keep client (since we
253 discarded the internal copy), and identifiers referring to the BufferBlock
254 can be replaced with the block locator.
262 def __init__(self, blockid, starting_capacity, owner):
265 the identifier for this block
268 the initial buffer capacity
271 ArvadosFile that owns this block
274 self.blockid = blockid
275 self.buffer_block = bytearray(starting_capacity)
276 self.buffer_view = memoryview(self.buffer_block)
277 self.write_pointer = 0
278 self._state = _BufferBlock.WRITABLE
281 self.lock = threading.Lock()
284 def append(self, data):
285 """Append some data to the buffer.
287 Only valid if the block is in WRITABLE state. Implements an expanding
288 buffer, doubling capacity as needed to accomdate all the data.
291 if self._state == _BufferBlock.WRITABLE:
292 while (self.write_pointer+len(data)) > len(self.buffer_block):
293 new_buffer_block = bytearray(len(self.buffer_block) * 2)
294 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
295 self.buffer_block = new_buffer_block
296 self.buffer_view = memoryview(self.buffer_block)
297 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
298 self.write_pointer += len(data)
301 raise AssertionError("Buffer block is not writable")
304 def set_state(self, nextstate, loc=None):
305 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
306 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
307 self._state = nextstate
308 if self._state == _BufferBlock.COMMITTED:
310 self.buffer_view = None
311 self.buffer_block = None
313 raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
320 """The amount of data written to the buffer."""
321 return self.write_pointer
325 """The Keep locator for this buffer's contents."""
326 if self._locator is None:
327 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
331 def clone(self, new_blockid, owner):
332 if self._state == _BufferBlock.COMMITTED:
333 raise AssertionError("Can only duplicate a writable or pending buffer block")
334 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
335 bufferblock.append(self.buffer_view[0:self.size()])
339 class NoopLock(object):
343 def __exit__(self, exc_type, exc_value, traceback):
346 def acquire(self, blocking=False):
353 def must_be_writable(orig_func):
354 @functools.wraps(orig_func)
355 def must_be_writable_wrapper(self, *args, **kwargs):
356 if not self.writable():
357 raise IOError(errno.EROFS, "Collection must be writable.")
358 return orig_func(self, *args, **kwargs)
359 return must_be_writable_wrapper
362 class _BlockManager(object):
363 """BlockManager handles buffer blocks.
365 Also handles background block uploads, and background block prefetch for a
366 Collection of ArvadosFiles.
369 def __init__(self, keep):
370 """keep: KeepClient object to use"""
372 self._bufferblocks = {}
373 self._put_queue = None
374 self._put_errors = None
375 self._put_threads = None
376 self._prefetch_queue = None
377 self._prefetch_threads = None
378 self.lock = threading.Lock()
379 self.prefetch_enabled = True
380 self.num_put_threads = 2
381 self.num_get_threads = 2
384 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
385 """Allocate a new, empty bufferblock in WRITABLE state and return it.
388 optional block identifier, otherwise one will be automatically assigned
391 optional capacity, otherwise will use default capacity
394 ArvadosFile that owns this block
398 blockid = "bufferblock%i" % len(self._bufferblocks)
399 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
400 self._bufferblocks[bufferblock.blockid] = bufferblock
404 def dup_block(self, block, owner):
405 """Create a new bufferblock initialized with the content of an existing bufferblock.
408 the buffer block to copy.
411 ArvadosFile that owns the new block
414 new_blockid = "bufferblock%i" % len(self._bufferblocks)
415 bufferblock = block.clone(new_blockid, owner)
416 self._bufferblocks[bufferblock.blockid] = bufferblock
420 def is_bufferblock(self, locator):
421 return locator in self._bufferblocks
424 def stop_threads(self):
425 """Shut down and wait for background upload and download threads to finish."""
427 if self._put_threads is not None:
428 for t in self._put_threads:
429 self._put_queue.put(None)
430 for t in self._put_threads:
432 self._put_threads = None
433 self._put_queue = None
434 self._put_errors = None
436 if self._prefetch_threads is not None:
437 for t in self._prefetch_threads:
438 self._prefetch_queue.put(None)
439 for t in self._prefetch_threads:
441 self._prefetch_threads = None
442 self._prefetch_queue = None
444 def commit_bufferblock(self, block):
445 """Initiate a background upload of a bufferblock.
447 This will block if the upload queue is at capacity, otherwise it will
452 def commit_bufferblock_worker(self):
453 """Background uploader thread."""
457 bufferblock = self._put_queue.get()
458 if bufferblock is None:
461 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
462 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
464 except Exception as e:
465 self._put_errors.put((bufferblock.locator(), e))
467 if self._put_queue is not None:
468 self._put_queue.task_done()
471 if self._put_threads is None:
472 # Start uploader threads.
474 # If we don't limit the Queue size, the upload queue can quickly
475 # grow to take up gigabytes of RAM if the writing process is
476 # generating data more quickly than it can be send to the Keep
479 # With two upload threads and a queue size of 2, this means up to 4
480 # blocks pending. If they are full 64 MiB blocks, that means up to
481 # 256 MiB of internal buffering, which is the same size as the
482 # default download block cache in KeepClient.
483 self._put_queue = Queue.Queue(maxsize=2)
484 self._put_errors = Queue.Queue()
486 self._put_threads = []
487 for i in xrange(0, self.num_put_threads):
488 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
489 self._put_threads.append(thread)
493 # Mark the block as PENDING so to disallow any more appends.
494 block.set_state(_BufferBlock.PENDING)
495 self._put_queue.put(block)
498 def get_bufferblock(self, locator):
499 return self._bufferblocks.get(locator)
501 def get_block_contents(self, locator, num_retries, cache_only=False):
504 First checks to see if the locator is a BufferBlock and return that, if
505 not, passes the request through to KeepClient.get().
509 if locator in self._bufferblocks:
510 bufferblock = self._bufferblocks[locator]
511 if bufferblock.state() != _BufferBlock.COMMITTED:
512 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
514 locator = bufferblock._locator
516 return self._keep.get_from_cache(locator)
518 return self._keep.get(locator, num_retries=num_retries)
520 def commit_all(self):
521 """Commit all outstanding buffer blocks.
523 Unlike commit_bufferblock(), this is a synchronous call, and will not
524 return until all buffer blocks are uploaded. Raises
525 KeepWriteError() if any blocks failed to upload.
529 items = self._bufferblocks.items()
532 if v.state() == _BufferBlock.WRITABLE:
533 self.commit_bufferblock(v)
536 if self._put_queue is not None:
537 self._put_queue.join()
539 if not self._put_errors.empty():
543 err.append(self._put_errors.get(False))
546 raise KeepWriteError("Error writing some blocks", err, label="block")
548 def block_prefetch(self, locator):
549 """Initiate a background download of a block.
551 This assumes that the underlying KeepClient implements a block cache,
552 so repeated requests for the same block will not result in repeated
553 downloads (unless the block is evicted from the cache.) This method
558 if not self.prefetch_enabled:
561 def block_prefetch_worker(self):
562 """The background downloader thread."""
565 b = self._prefetch_queue.get()
573 if locator in self._bufferblocks:
575 if self._prefetch_threads is None:
576 self._prefetch_queue = Queue.Queue()
577 self._prefetch_threads = []
578 for i in xrange(0, self.num_get_threads):
579 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
580 self._prefetch_threads.append(thread)
583 self._prefetch_queue.put(locator)
586 class ArvadosFile(object):
587 """Represent a file in a Collection.
589 ArvadosFile manages the underlying representation of a file in Keep as a
590 sequence of segments spanning a set of blocks, and implements random
593 This object may be accessed from multiple threads.
597 def __init__(self, parent, name, stream=[], segments=[]):
599 ArvadosFile constructor.
602 a list of Range objects representing a block stream
605 a list of Range objects representing segments
608 self._modified = True
610 self.lock = parent.root_collection().lock
612 self._add_segment(stream, s.locator, s.range_size)
613 self._current_bblock = None
617 return self.parent.writable()
621 return copy.copy(self._segments)
624 def clone(self, new_parent, new_name):
625 """Make a copy of this file."""
626 cp = ArvadosFile(new_parent, new_name)
627 cp.replace_contents(self)
632 def replace_contents(self, other):
633 """Replace segments of this file with segments from another `ArvadosFile` object."""
637 for other_segment in other.segments():
638 new_loc = other_segment.locator
639 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
640 if other_segment.locator not in map_loc:
641 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
642 if bufferblock.state() != _BufferBlock.WRITABLE:
643 map_loc[other_segment.locator] = bufferblock.locator()
645 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
646 new_loc = map_loc[other_segment.locator]
648 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
650 self._modified = True
652 def __eq__(self, other):
655 if not isinstance(other, ArvadosFile):
658 othersegs = other.segments()
660 if len(self._segments) != len(othersegs):
662 for i in xrange(0, len(othersegs)):
663 seg1 = self._segments[i]
668 if self.parent._my_block_manager().is_bufferblock(loc1):
669 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
671 if other.parent._my_block_manager().is_bufferblock(loc2):
672 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
674 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
675 seg1.range_start != seg2.range_start or
676 seg1.range_size != seg2.range_size or
677 seg1.segment_offset != seg2.segment_offset):
682 def __ne__(self, other):
683 return not self.__eq__(other)
686 def set_unmodified(self):
687 """Clear the modified flag"""
688 self._modified = False
692 """Test the modified flag"""
693 return self._modified
697 def truncate(self, size):
698 """Shrink the size of the file.
700 If `size` is less than the size of the file, the file contents after
701 `size` will be discarded. If `size` is greater than the current size
702 of the file, an IOError will be raised.
705 if size < self.size():
707 for r in self._segments:
708 range_end = r.range_start+r.range_size
709 if r.range_start >= size:
710 # segment is past the trucate size, all done
712 elif size < range_end:
713 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
714 nr.segment_offset = r.segment_offset
720 self._segments = new_segs
721 self._modified = True
722 elif size > self.size():
723 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
725 def readfrom(self, offset, size, num_retries, exact=False):
726 """Read upto `size` bytes from the file starting at `offset`.
729 If False (default), return less data than requested if the read
730 crosses a block boundary and the next block isn't cached. If True,
731 only return less data than requested when hitting EOF.
735 if size == 0 or offset >= self.size():
737 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
738 readsegs = locators_and_ranges(self._segments, offset, size)
741 self.parent._my_block_manager().block_prefetch(lr.locator)
745 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
747 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
752 def _repack_writes(self):
753 """Test if the buffer block has more data than actual segments.
755 This happens when a buffered write over-writes a file range written in
756 a previous buffered write. Re-pack the buffer block for efficiency
757 and to avoid leaking information.
760 segs = self._segments
762 # Sum up the segments to get the total bytes of the file referencing
763 # into the buffer block.
764 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
765 write_total = sum([s.range_size for s in bufferblock_segs])
767 if write_total < self._current_bblock.size():
768 # There is more data in the buffer block than is actually accounted for by segments, so
769 # re-pack into a new buffer by copying over to a new buffer block.
770 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
771 for t in bufferblock_segs:
772 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
773 t.segment_offset = new_bb.size() - t.range_size
775 self._current_bblock = new_bb
779 def writeto(self, offset, data, num_retries):
780 """Write `data` to the file starting at `offset`.
782 This will update existing bytes and/or extend the size of the file as
789 if offset > self.size():
790 raise ArgumentError("Offset is past the end of the file")
792 if len(data) > config.KEEP_BLOCK_SIZE:
793 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
795 self._modified = True
797 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
798 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
800 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
801 self._repack_writes()
802 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
803 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
804 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
806 self._current_bblock.append(data)
808 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
810 self.parent.notify(MOD, self.parent, self.name, (self, self))
817 if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
818 self._repack_writes()
819 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
820 self.parent.notify(MOD, self.parent, self.name, (self, self))
824 def add_segment(self, blocks, pos, size):
825 """Add a segment to the end of the file.
827 `pos` and `offset` reference a section of the stream described by
828 `blocks` (a list of Range objects)
831 self._add_segment(blocks, pos, size)
833 def _add_segment(self, blocks, pos, size):
834 """Internal implementation of add_segment."""
835 self._modified = True
836 for lr in locators_and_ranges(blocks, pos, size):
837 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
838 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
839 self._segments.append(r)
843 """Get the file size."""
845 n = self._segments[-1]
846 return n.range_start + n.range_size
851 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
854 for segment in self.segments:
855 loc = segment.locator
856 if loc.startswith("bufferblock"):
857 loc = self._bufferblocks[loc].calculate_locator()
858 if portable_locators:
859 loc = KeepLocator(loc).stripped()
860 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
861 segment.segment_offset, segment.range_size))
862 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
867 class ArvadosFileReader(ArvadosFileReaderBase):
868 """Wraps ArvadosFile in a file-like object supporting reading only.
870 Be aware that this class is NOT thread safe as there is no locking around
871 updating file pointer.
875 def __init__(self, arvadosfile, mode="r", num_retries=None):
876 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
877 self.arvadosfile = arvadosfile
880 return self.arvadosfile.size()
882 def stream_name(self):
883 return self.arvadosfile.parent.stream_name()
885 @_FileLikeObjectBase._before_close
887 def read(self, size=None, num_retries=None):
888 """Read up to `size` bytes from the file and return the result.
890 Starts at the current file position. If `size` is None, read the
891 entire remainder of the file.
895 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
898 self._filepos += len(rd)
899 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
902 data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
903 self._filepos += len(data)
906 @_FileLikeObjectBase._before_close
908 def readfrom(self, offset, size, num_retries=None):
909 """Read up to `size` bytes from the stream, starting at the specified file offset.
911 This method does not change the file position.
913 return self.arvadosfile.readfrom(offset, size, num_retries)
919 class ArvadosFileWriter(ArvadosFileReader):
920 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
922 Be aware that this class is NOT thread safe as there is no locking around
923 updating file pointer.
927 def __init__(self, arvadosfile, mode, num_retries=None):
928 super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
930 @_FileLikeObjectBase._before_close
932 def write(self, data, num_retries=None):
933 if self.mode[0] == "a":
934 self.arvadosfile.writeto(self.size(), data, num_retries)
936 self.arvadosfile.writeto(self._filepos, data, num_retries)
937 self._filepos += len(data)
940 @_FileLikeObjectBase._before_close
942 def writelines(self, seq, num_retries=None):
944 self.write(s, num_retries)
946 @_FileLikeObjectBase._before_close
947 def truncate(self, size=None):
950 self.arvadosfile.truncate(size)
951 if self._filepos > self.size():
952 self._filepos = self.size()
954 @_FileLikeObjectBase._before_close
956 self.arvadosfile.flush()
961 super(ArvadosFileWriter, self).close()