13 from .errors import KeepWriteError, AssertionError, ArgumentError
14 from .keep import KeepLocator
15 from ._normalize_stream import normalize_stream
16 from ._ranges import locators_and_ranges, replace_range, Range
17 from .retry import retry_method
20 """split(path) -> streamname, filename
22 Separate the stream name and file name in a /-separated stream path and
23 return a tuple (stream_name, file_name). If no stream name is available,
28 stream_name, file_name = path.rsplit('/', 1)
29 except ValueError: # No / in string
30 stream_name, file_name = '.', path
31 return stream_name, file_name
33 class _FileLikeObjectBase(object):
34 def __init__(self, name, mode):
40 def _before_close(orig_func):
41 @functools.wraps(orig_func)
42 def before_close_wrapper(self, *args, **kwargs):
44 raise ValueError("I/O operation on closed stream file")
45 return orig_func(self, *args, **kwargs)
46 return before_close_wrapper
51 def __exit__(self, exc_type, exc_value, traceback):
62 class ArvadosFileReaderBase(_FileLikeObjectBase):
63 def __init__(self, name, mode, num_retries=None):
64 super(ArvadosFileReaderBase, self).__init__(name, mode)
66 self.num_retries = num_retries
67 self._readline_cache = (None, None)
71 data = self.readline()
76 def decompressed_name(self):
77 return re.sub('\.(bz2|gz)$', '', self.name)
79 @_FileLikeObjectBase._before_close
80 def seek(self, pos, whence=os.SEEK_SET):
81 if whence == os.SEEK_CUR:
83 elif whence == os.SEEK_END:
85 self._filepos = min(max(pos, 0L), self.size())
90 @_FileLikeObjectBase._before_close
92 def readall(self, size=2**20, num_retries=None):
94 data = self.read(size, num_retries=num_retries)
99 @_FileLikeObjectBase._before_close
101 def readline(self, size=float('inf'), num_retries=None):
102 cache_pos, cache_data = self._readline_cache
103 if self.tell() == cache_pos:
107 data_size = len(data[-1])
108 while (data_size < size) and ('\n' not in data[-1]):
109 next_read = self.read(2 ** 20, num_retries=num_retries)
112 data.append(next_read)
113 data_size += len(next_read)
116 nextline_index = data.index('\n') + 1
118 nextline_index = len(data)
119 nextline_index = min(nextline_index, size)
120 self._readline_cache = (self.tell(), data[nextline_index:])
121 return data[:nextline_index]
123 @_FileLikeObjectBase._before_close
125 def decompress(self, decompress, size, num_retries=None):
126 for segment in self.readall(size, num_retries):
127 data = decompress(segment)
131 @_FileLikeObjectBase._before_close
133 def readall_decompressed(self, size=2**20, num_retries=None):
135 if self.name.endswith('.bz2'):
136 dc = bz2.BZ2Decompressor()
137 return self.decompress(dc.decompress, size,
138 num_retries=num_retries)
139 elif self.name.endswith('.gz'):
140 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
141 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
142 size, num_retries=num_retries)
144 return self.readall(size, num_retries=num_retries)
146 @_FileLikeObjectBase._before_close
148 def readlines(self, sizehint=float('inf'), num_retries=None):
151 for s in self.readall(num_retries=num_retries):
154 if data_size >= sizehint:
156 return ''.join(data).splitlines(True)
159 raise NotImplementedError()
161 def read(self, size, num_retries=None):
162 raise NotImplementedError()
164 def readfrom(self, start, size, num_retries=None):
165 raise NotImplementedError()
168 class StreamFileReader(ArvadosFileReaderBase):
169 class _NameAttribute(str):
170 # The Python file API provides a plain .name attribute.
171 # Older SDK provided a name() method.
172 # This class provides both, for maximum compatibility.
176 def __init__(self, stream, segments, name):
177 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
178 self._stream = stream
179 self.segments = segments
181 def stream_name(self):
182 return self._stream.name()
185 n = self.segments[-1]
186 return n.range_start + n.range_size
188 @_FileLikeObjectBase._before_close
190 def read(self, size, num_retries=None):
191 """Read up to 'size' bytes from the stream, starting at the current file position"""
196 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
198 lr = available_chunks[0]
199 data = self._stream.readfrom(lr.locator+lr.segment_offset,
201 num_retries=num_retries)
203 self._filepos += len(data)
206 @_FileLikeObjectBase._before_close
208 def readfrom(self, start, size, num_retries=None):
209 """Read up to 'size' bytes from the stream, starting at 'start'"""
214 for lr in locators_and_ranges(self.segments, start, size):
215 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
216 num_retries=num_retries))
219 def as_manifest(self):
221 for r in self.segments:
222 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
223 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
226 def synchronized(orig_func):
227 @functools.wraps(orig_func)
228 def synchronized_wrapper(self, *args, **kwargs):
230 return orig_func(self, *args, **kwargs)
231 return synchronized_wrapper
233 class _BufferBlock(object):
234 """A stand-in for a Keep block that is in the process of being written.
236 Writers can append to it, get the size, and compute the Keep locator.
237 There are three valid states:
243 Block is in the process of being uploaded to Keep, append is an error.
246 The block has been written to Keep, its internal buffer has been
247 released, fetching the block will fetch it via keep client (since we
248 discarded the internal copy), and identifiers referring to the BufferBlock
249 can be replaced with the block locator.
257 def __init__(self, blockid, starting_capacity, owner):
260 the identifier for this block
263 the initial buffer capacity
266 ArvadosFile that owns this block
269 self.blockid = blockid
270 self.buffer_block = bytearray(starting_capacity)
271 self.buffer_view = memoryview(self.buffer_block)
272 self.write_pointer = 0
273 self._state = _BufferBlock.WRITABLE
276 self.lock = threading.Lock()
279 def append(self, data):
280 """Append some data to the buffer.
282 Only valid if the block is in WRITABLE state. Implements an expanding
283 buffer, doubling capacity as needed to accomdate all the data.
286 if self._state == _BufferBlock.WRITABLE:
287 while (self.write_pointer+len(data)) > len(self.buffer_block):
288 new_buffer_block = bytearray(len(self.buffer_block) * 2)
289 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
290 self.buffer_block = new_buffer_block
291 self.buffer_view = memoryview(self.buffer_block)
292 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
293 self.write_pointer += len(data)
296 raise AssertionError("Buffer block is not writable")
299 def set_state(self, nextstate, loc=None):
300 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
301 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
302 self._state = nextstate
303 if self._state == _BufferBlock.COMMITTED:
305 self.buffer_view = None
306 self.buffer_block = None
308 raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
315 """The amount of data written to the buffer."""
316 return self.write_pointer
320 """The Keep locator for this buffer's contents."""
321 if self._locator is None:
322 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
326 def clone(self, new_blockid, owner):
327 if self._state == _BufferBlock.COMMITTED:
328 raise AssertionError("Can only duplicate a writable or pending buffer block")
329 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
330 bufferblock.append(self.buffer_view[0:self.size()])
334 class NoopLock(object):
338 def __exit__(self, exc_type, exc_value, traceback):
341 def acquire(self, blocking=False):
348 def must_be_writable(orig_func):
349 @functools.wraps(orig_func)
350 def must_be_writable_wrapper(self, *args, **kwargs):
351 if not self.writable():
352 raise IOError((errno.EROFS, "Collection must be writable."))
353 return orig_func(self, *args, **kwargs)
354 return must_be_writable_wrapper
357 class _BlockManager(object):
358 """BlockManager handles buffer blocks.
360 Also handles background block uploads, and background block prefetch for a
361 Collection of ArvadosFiles.
364 def __init__(self, keep):
365 """keep: KeepClient object to use"""
367 self._bufferblocks = {}
368 self._put_queue = None
369 self._put_errors = None
370 self._put_threads = None
371 self._prefetch_queue = None
372 self._prefetch_threads = None
373 self.lock = threading.Lock()
374 self.prefetch_enabled = True
375 self.num_put_threads = 2
376 self.num_get_threads = 2
379 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
380 """Allocate a new, empty bufferblock in WRITABLE state and return it.
383 optional block identifier, otherwise one will be automatically assigned
386 optional capacity, otherwise will use default capacity
389 ArvadosFile that owns this block
393 blockid = "bufferblock%i" % len(self._bufferblocks)
394 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
395 self._bufferblocks[bufferblock.blockid] = bufferblock
399 def dup_block(self, block, owner):
400 """Create a new bufferblock initialized with the content of an existing bufferblock.
403 the buffer block to copy.
406 ArvadosFile that owns the new block
409 new_blockid = "bufferblock%i" % len(self._bufferblocks)
410 bufferblock = block.clone(new_blockid, owner)
411 self._bufferblocks[bufferblock.blockid] = bufferblock
415 def is_bufferblock(self, locator):
416 return locator in self._bufferblocks
419 def stop_threads(self):
420 """Shut down and wait for background upload and download threads to finish."""
422 if self._put_threads is not None:
423 for t in self._put_threads:
424 self._put_queue.put(None)
425 for t in self._put_threads:
427 self._put_threads = None
428 self._put_queue = None
429 self._put_errors = None
431 if self._prefetch_threads is not None:
432 for t in self._prefetch_threads:
433 self._prefetch_queue.put(None)
434 for t in self._prefetch_threads:
436 self._prefetch_threads = None
437 self._prefetch_queue = None
439 def commit_bufferblock(self, block):
440 """Initiate a background upload of a bufferblock.
442 This will block if the upload queue is at capacity, otherwise it will
447 def commit_bufferblock_worker(self):
448 """Background uploader thread."""
452 bufferblock = self._put_queue.get()
453 if bufferblock is None:
455 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
456 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
458 except Exception as e:
459 self._put_errors.put((bufferblock.locator(), e))
461 if self._put_queue is not None:
462 self._put_queue.task_done()
465 if self._put_threads is None:
466 # Start uploader threads.
468 # If we don't limit the Queue size, the upload queue can quickly
469 # grow to take up gigabytes of RAM if the writing process is
470 # generating data more quickly than it can be send to the Keep
473 # With two upload threads and a queue size of 2, this means up to 4
474 # blocks pending. If they are full 64 MiB blocks, that means up to
475 # 256 MiB of internal buffering, which is the same size as the
476 # default download block cache in KeepClient.
477 self._put_queue = Queue.Queue(maxsize=2)
478 self._put_errors = Queue.Queue()
480 self._put_threads = []
481 for i in xrange(0, self.num_put_threads):
482 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
483 self._put_threads.append(thread)
487 if block.state() == _BufferBlock.WRITABLE:
488 # Mark the block as PENDING so to disallow any more appends.
489 block.set_state(_BufferBlock.PENDING)
490 self._put_queue.put(block)
493 def get_bufferblock(self, locator):
494 return self._bufferblocks.get(locator)
496 def get_block_contents(self, locator, num_retries, cache_only=False):
499 First checks to see if the locator is a BufferBlock and return that, if
500 not, passes the request through to KeepClient.get().
504 if locator in self._bufferblocks:
505 bufferblock = self._bufferblocks[locator]
506 if bufferblock.state() != _BufferBlock.COMMITTED:
507 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
509 locator = bufferblock._locator
511 return self._keep.get_from_cache(locator)
513 return self._keep.get(locator, num_retries=num_retries)
515 def commit_all(self):
516 """Commit all outstanding buffer blocks.
518 Unlike commit_bufferblock(), this is a synchronous call, and will not
519 return until all buffer blocks are uploaded. Raises
520 KeepWriteError() if any blocks failed to upload.
524 items = self._bufferblocks.items()
530 if self._put_queue is not None:
531 self._put_queue.join()
533 if not self._put_errors.empty():
537 err.append(self._put_errors.get(False))
540 raise KeepWriteError("Error writing some blocks", err, label="block")
542 def block_prefetch(self, locator):
543 """Initiate a background download of a block.
545 This assumes that the underlying KeepClient implements a block cache,
546 so repeated requests for the same block will not result in repeated
547 downloads (unless the block is evicted from the cache.) This method
552 if not self.prefetch_enabled:
555 def block_prefetch_worker(self):
556 """The background downloader thread."""
559 b = self._prefetch_queue.get()
567 if locator in self._bufferblocks:
569 if self._prefetch_threads is None:
570 self._prefetch_queue = Queue.Queue()
571 self._prefetch_threads = []
572 for i in xrange(0, self.num_get_threads):
573 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
574 self._prefetch_threads.append(thread)
577 self._prefetch_queue.put(locator)
580 class ArvadosFile(object):
581 """Represent a file in a Collection.
583 ArvadosFile manages the underlying representation of a file in Keep as a
584 sequence of segments spanning a set of blocks, and implements random
587 This object may be accessed from multiple threads.
591 def __init__(self, parent, stream=[], segments=[]):
593 ArvadosFile constructor.
596 a list of Range objects representing a block stream
599 a list of Range objects representing segments
602 self._modified = True
604 self.lock = parent.root_collection().lock
606 self._add_segment(stream, s.locator, s.range_size)
607 self._current_bblock = None
610 return self.parent.writable()
614 return copy.copy(self._segments)
617 def clone(self, new_parent):
618 """Make a copy of this file."""
619 cp = ArvadosFile(new_parent)
620 cp.replace_contents(self)
625 def replace_contents(self, other):
626 """Replace segments of this file with segments from another `ArvadosFile` object."""
630 for other_segment in other.segments():
631 new_loc = other_segment.locator
632 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
633 if other_segment.locator not in map_loc:
634 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
635 if bufferblock.state() != _BufferBlock.WRITABLE:
636 map_loc[other_segment.locator] = bufferblock.locator()
638 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
639 new_loc = map_loc[other_segment.locator]
641 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
643 self._modified = True
645 def __eq__(self, other):
648 if not isinstance(other, ArvadosFile):
651 othersegs = other.segments()
653 if len(self._segments) != len(othersegs):
655 for i in xrange(0, len(othersegs)):
656 seg1 = self._segments[i]
661 if self.parent._my_block_manager().is_bufferblock(loc1):
662 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
664 if other.parent._my_block_manager().is_bufferblock(loc2):
665 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
667 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
668 seg1.range_start != seg2.range_start or
669 seg1.range_size != seg2.range_size or
670 seg1.segment_offset != seg2.segment_offset):
675 def __ne__(self, other):
676 return not self.__eq__(other)
679 def set_unmodified(self):
680 """Clear the modified flag"""
681 self._modified = False
685 """Test the modified flag"""
686 return self._modified
690 def truncate(self, size):
691 """Shrink the size of the file.
693 If `size` is less than the size of the file, the file contents after
694 `size` will be discarded. If `size` is greater than the current size
695 of the file, an IOError will be raised.
698 if size < self.size():
700 for r in self._segments:
701 range_end = r.range_start+r.range_size
702 if r.range_start >= size:
703 # segment is past the trucate size, all done
705 elif size < range_end:
706 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
707 nr.segment_offset = r.segment_offset
713 self._segments = new_segs
714 self._modified = True
715 elif size > self.size():
716 raise IOError("truncate() does not support extending the file size")
719 def readfrom(self, offset, size, num_retries, exact=False):
720 """Read up to `size` bytes from the file starting at `offset`.
723 If False (default), return less data than requested if the read
724 crosses a block boundary and the next block isn't cached. If True,
725 only return less data than requested when hitting EOF.
729 if size == 0 or offset >= self.size():
731 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
732 readsegs = locators_and_ranges(self._segments, offset, size)
735 self.parent._my_block_manager().block_prefetch(lr.locator)
739 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
741 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
746 def _repack_writes(self, num_retries):
747 """Test if the buffer block has more data than actual segments.
749 This happens when a buffered write over-writes a file range written in
750 a previous buffered write. Re-pack the buffer block for efficiency
751 and to avoid leaking information.
754 segs = self._segments
756 # Sum up the segments to get the total bytes of the file referencing
757 # into the buffer block.
758 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
759 write_total = sum([s.range_size for s in bufferblock_segs])
761 if write_total < self._current_bblock.size():
762 # There is more data in the buffer block than is actually accounted for by segments, so
763 # re-pack into a new buffer by copying over to a new buffer block.
764 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
765 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
766 for t in bufferblock_segs:
767 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
768 t.segment_offset = new_bb.size() - t.range_size
770 self._current_bblock = new_bb
774 def writeto(self, offset, data, num_retries):
775 """Write `data` to the file starting at `offset`.
777 This will update existing bytes and/or extend the size of the file as
784 if offset > self.size():
785 raise ArgumentError("Offset is past the end of the file")
787 if len(data) > config.KEEP_BLOCK_SIZE:
788 # Chunk it up into smaller writes
790 dataview = memoryview(data)
792 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
793 n += config.KEEP_BLOCK_SIZE
796 self._modified = True
798 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
799 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
801 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
802 self._repack_writes(num_retries)
803 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
804 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
805 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
807 self._current_bblock.append(data)
809 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
812 def flush(self, num_retries=0):
813 if self._current_bblock:
814 self._repack_writes(num_retries)
815 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
819 def add_segment(self, blocks, pos, size):
820 """Add a segment to the end of the file.
822 `pos` and `offset` reference a section of the stream described by
823 `blocks` (a list of Range objects)
826 self._add_segment(blocks, pos, size)
828 def _add_segment(self, blocks, pos, size):
829 """Internal implementation of add_segment."""
830 self._modified = True
831 for lr in locators_and_ranges(blocks, pos, size):
832 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
833 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
834 self._segments.append(r)
838 """Get the file size."""
840 n = self._segments[-1]
841 return n.range_start + n.range_size
846 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
849 for segment in self.segments:
850 loc = segment.locator
851 if loc.startswith("bufferblock"):
852 loc = self._bufferblocks[loc].calculate_locator()
853 if portable_locators:
854 loc = KeepLocator(loc).stripped()
855 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
856 segment.segment_offset, segment.range_size))
857 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
862 class ArvadosFileReader(ArvadosFileReaderBase):
863 """Wraps ArvadosFile in a file-like object supporting reading only.
865 Be aware that this class is NOT thread safe as there is no locking around
866 updating file pointer.
870 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
871 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
872 self.arvadosfile = arvadosfile
875 return self.arvadosfile.size()
877 def stream_name(self):
878 return self.arvadosfile.parent.stream_name()
880 @_FileLikeObjectBase._before_close
882 def read(self, size, num_retries=None):
883 """Read up to `size` bytes from the stream, starting at the current file position."""
884 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
885 self._filepos += len(data)
888 @_FileLikeObjectBase._before_close
890 def readfrom(self, offset, size, num_retries=None):
891 """Read up to `size` bytes from the stream, starting at the current file position."""
892 return self.arvadosfile.readfrom(offset, size, num_retries)
898 class ArvadosFileWriter(ArvadosFileReader):
899 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
901 Be aware that this class is NOT thread safe as there is no locking around
902 updating file pointer.
906 def __init__(self, arvadosfile, name, mode, num_retries=None):
907 super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
909 @_FileLikeObjectBase._before_close
911 def write(self, data, num_retries=None):
912 if self.mode[0] == "a":
913 self.arvadosfile.writeto(self.size(), data, num_retries)
915 self.arvadosfile.writeto(self._filepos, data, num_retries)
916 self._filepos += len(data)
918 @_FileLikeObjectBase._before_close
920 def writelines(self, seq, num_retries=None):
922 self.write(s, num_retries)
924 @_FileLikeObjectBase._before_close
925 def truncate(self, size=None):
928 self.arvadosfile.truncate(size)
929 if self._filepos > self.size():
930 self._filepos = self.size()
932 @_FileLikeObjectBase._before_close
934 self.arvadosfile.flush()
939 super(ArvadosFileWriter, self).close()