13 from .errors import KeepWriteError, AssertionError
14 from .keep import KeepLocator
15 from ._normalize_stream import normalize_stream
16 from ._ranges import locators_and_ranges, replace_range, Range
17 from .retry import retry_method
22 """split(path) -> streamname, filename
24 Separate the stream name and file name in a /-separated stream path and
25 return a tuple (stream_name, file_name). If no stream name is available,
30 stream_name, file_name = path.rsplit('/', 1)
31 except ValueError: # No / in string
32 stream_name, file_name = '.', path
33 return stream_name, file_name
35 class _FileLikeObjectBase(object):
36 def __init__(self, name, mode):
42 def _before_close(orig_func):
43 @functools.wraps(orig_func)
44 def before_close_wrapper(self, *args, **kwargs):
46 raise ValueError("I/O operation on closed stream file")
47 return orig_func(self, *args, **kwargs)
48 return before_close_wrapper
53 def __exit__(self, exc_type, exc_value, traceback):
64 class ArvadosFileReaderBase(_FileLikeObjectBase):
65 def __init__(self, name, mode, num_retries=None):
66 super(ArvadosFileReaderBase, self).__init__(name, mode)
68 self.num_retries = num_retries
69 self._readline_cache = (None, None)
73 data = self.readline()
78 def decompressed_name(self):
79 return re.sub('\.(bz2|gz)$', '', self.name)
81 @_FileLikeObjectBase._before_close
82 def seek(self, pos, whence=os.SEEK_SET):
83 if whence == os.SEEK_CUR:
85 elif whence == os.SEEK_END:
87 self._filepos = min(max(pos, 0L), self.size())
92 @_FileLikeObjectBase._before_close
94 def readall(self, size=2**20, num_retries=None):
96 data = self.read(size, num_retries=num_retries)
101 @_FileLikeObjectBase._before_close
103 def readline(self, size=float('inf'), num_retries=None):
104 cache_pos, cache_data = self._readline_cache
105 if self.tell() == cache_pos:
109 data_size = len(data[-1])
110 while (data_size < size) and ('\n' not in data[-1]):
111 next_read = self.read(2 ** 20, num_retries=num_retries)
114 data.append(next_read)
115 data_size += len(next_read)
118 nextline_index = data.index('\n') + 1
120 nextline_index = len(data)
121 nextline_index = min(nextline_index, size)
122 self._readline_cache = (self.tell(), data[nextline_index:])
123 return data[:nextline_index]
125 @_FileLikeObjectBase._before_close
127 def decompress(self, decompress, size, num_retries=None):
128 for segment in self.readall(size, num_retries):
129 data = decompress(segment)
133 @_FileLikeObjectBase._before_close
135 def readall_decompressed(self, size=2**20, num_retries=None):
137 if self.name.endswith('.bz2'):
138 dc = bz2.BZ2Decompressor()
139 return self.decompress(dc.decompress, size,
140 num_retries=num_retries)
141 elif self.name.endswith('.gz'):
142 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
143 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
144 size, num_retries=num_retries)
146 return self.readall(size, num_retries=num_retries)
148 @_FileLikeObjectBase._before_close
150 def readlines(self, sizehint=float('inf'), num_retries=None):
153 for s in self.readall(num_retries=num_retries):
156 if data_size >= sizehint:
158 return ''.join(data).splitlines(True)
161 raise NotImplementedError()
163 def read(self, size, num_retries=None):
164 raise NotImplementedError()
166 def readfrom(self, start, size, num_retries=None):
167 raise NotImplementedError()
170 class StreamFileReader(ArvadosFileReaderBase):
171 class _NameAttribute(str):
172 # The Python file API provides a plain .name attribute.
173 # Older SDK provided a name() method.
174 # This class provides both, for maximum compatibility.
178 def __init__(self, stream, segments, name):
179 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
180 self._stream = stream
181 self.segments = segments
183 def stream_name(self):
184 return self._stream.name()
187 n = self.segments[-1]
188 return n.range_start + n.range_size
190 @_FileLikeObjectBase._before_close
192 def read(self, size, num_retries=None):
193 """Read up to 'size' bytes from the stream, starting at the current file position"""
198 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
200 lr = available_chunks[0]
201 data = self._stream.readfrom(lr.locator+lr.segment_offset,
203 num_retries=num_retries)
205 self._filepos += len(data)
208 @_FileLikeObjectBase._before_close
210 def readfrom(self, start, size, num_retries=None):
211 """Read up to 'size' bytes from the stream, starting at 'start'"""
216 for lr in locators_and_ranges(self.segments, start, size):
217 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
218 num_retries=num_retries))
221 def as_manifest(self):
223 for r in self.segments:
224 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
225 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
228 def synchronized(orig_func):
229 @functools.wraps(orig_func)
230 def synchronized_wrapper(self, *args, **kwargs):
232 return orig_func(self, *args, **kwargs)
233 return synchronized_wrapper
235 class _BufferBlock(object):
236 """A stand-in for a Keep block that is in the process of being written.
238 Writers can append to it, get the size, and compute the Keep locator.
239 There are three valid states:
245 Block is in the process of being uploaded to Keep, append is an error.
248 The block has been written to Keep, its internal buffer has been
249 released, fetching the block will fetch it via keep client (since we
250 discarded the internal copy), and identifiers referring to the BufferBlock
251 can be replaced with the block locator.
259 def __init__(self, blockid, starting_capacity, owner):
262 the identifier for this block
265 the initial buffer capacity
268 ArvadosFile that owns this block
271 self.blockid = blockid
272 self.buffer_block = bytearray(starting_capacity)
273 self.buffer_view = memoryview(self.buffer_block)
274 self.write_pointer = 0
275 self._state = _BufferBlock.WRITABLE
278 self.lock = threading.Lock()
281 def append(self, data):
282 """Append some data to the buffer.
284 Only valid if the block is in WRITABLE state. Implements an expanding
285 buffer, doubling capacity as needed to accomdate all the data.
288 if self._state == _BufferBlock.WRITABLE:
289 while (self.write_pointer+len(data)) > len(self.buffer_block):
290 new_buffer_block = bytearray(len(self.buffer_block) * 2)
291 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
292 self.buffer_block = new_buffer_block
293 self.buffer_view = memoryview(self.buffer_block)
294 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
295 self.write_pointer += len(data)
298 raise AssertionError("Buffer block is not writable")
301 def set_state(self, nextstate, loc=None):
302 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
303 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
304 self._state = nextstate
305 if self._state == _BufferBlock.COMMITTED:
307 self.buffer_view = None
308 self.buffer_block = None
310 raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
317 """The amount of data written to the buffer."""
318 return self.write_pointer
322 """The Keep locator for this buffer's contents."""
323 if self._locator is None:
324 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
328 def clone(self, new_blockid, owner):
329 if self._state == _BufferBlock.COMMITTED:
330 raise AssertionError("Can only duplicate a writable or pending buffer block")
331 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
332 bufferblock.append(self.buffer_view[0:self.size()])
336 class NoopLock(object):
340 def __exit__(self, exc_type, exc_value, traceback):
343 def acquire(self, blocking=False):
350 def must_be_writable(orig_func):
351 @functools.wraps(orig_func)
352 def must_be_writable_wrapper(self, *args, **kwargs):
353 if not self.writable():
354 raise IOError((errno.EROFS, "Collection must be writable."))
355 return orig_func(self, *args, **kwargs)
356 return must_be_writable_wrapper
359 class _BlockManager(object):
360 """BlockManager handles buffer blocks.
362 Also handles background block uploads, and background block prefetch for a
363 Collection of ArvadosFiles.
366 def __init__(self, keep):
367 """keep: KeepClient object to use"""
369 self._bufferblocks = {}
370 self._put_queue = None
371 self._put_errors = None
372 self._put_threads = None
373 self._prefetch_queue = None
374 self._prefetch_threads = None
375 self.lock = threading.Lock()
376 self.prefetch_enabled = True
377 self.num_put_threads = 2
378 self.num_get_threads = 2
381 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
382 """Allocate a new, empty bufferblock in WRITABLE state and return it.
385 optional block identifier, otherwise one will be automatically assigned
388 optional capacity, otherwise will use default capacity
391 ArvadosFile that owns this block
395 blockid = "bufferblock%i" % len(self._bufferblocks)
396 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
397 self._bufferblocks[bufferblock.blockid] = bufferblock
401 def dup_block(self, block, owner):
402 """Create a new bufferblock initialized with the content of an existing bufferblock.
405 the buffer block to copy.
408 ArvadosFile that owns the new block
411 new_blockid = "bufferblock%i" % len(self._bufferblocks)
412 bufferblock = block.clone(new_blockid, owner)
413 self._bufferblocks[bufferblock.blockid] = bufferblock
417 def is_bufferblock(self, locator):
418 return locator in self._bufferblocks
421 def stop_threads(self):
422 """Shut down and wait for background upload and download threads to finish."""
424 if self._put_threads is not None:
425 for t in self._put_threads:
426 self._put_queue.put(None)
427 for t in self._put_threads:
429 self._put_threads = None
430 self._put_queue = None
431 self._put_errors = None
433 if self._prefetch_threads is not None:
434 for t in self._prefetch_threads:
435 self._prefetch_queue.put(None)
436 for t in self._prefetch_threads:
438 self._prefetch_threads = None
439 self._prefetch_queue = None
441 def commit_bufferblock(self, block):
442 """Initiate a background upload of a bufferblock.
444 This will block if the upload queue is at capacity, otherwise it will
449 def commit_bufferblock_worker(self):
450 """Background uploader thread."""
454 bufferblock = self._put_queue.get()
455 if bufferblock is None:
457 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
458 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
460 except Exception as e:
461 self._put_errors.put((bufferblock.locator(), e))
463 if self._put_queue is not None:
464 self._put_queue.task_done()
467 if self._put_threads is None:
468 # Start uploader threads.
470 # If we don't limit the Queue size, the upload queue can quickly
471 # grow to take up gigabytes of RAM if the writing process is
472 # generating data more quickly than it can be send to the Keep
475 # With two upload threads and a queue size of 2, this means up to 4
476 # blocks pending. If they are full 64 MiB blocks, that means up to
477 # 256 MiB of internal buffering, which is the same size as the
478 # default download block cache in KeepClient.
479 self._put_queue = Queue.Queue(maxsize=2)
480 self._put_errors = Queue.Queue()
482 self._put_threads = []
483 for i in xrange(0, self.num_put_threads):
484 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
485 self._put_threads.append(thread)
489 # Mark the block as PENDING so to disallow any more appends.
490 block.set_state(_BufferBlock.PENDING)
491 self._put_queue.put(block)
494 def get_bufferblock(self, locator):
495 return self._bufferblocks.get(locator)
497 def get_block_contents(self, locator, num_retries, cache_only=False):
500 First checks to see if the locator is a BufferBlock and return that, if
501 not, passes the request through to KeepClient.get().
505 if locator in self._bufferblocks:
506 bufferblock = self._bufferblocks[locator]
507 if bufferblock.state() != _BufferBlock.COMMITTED:
508 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
510 locator = bufferblock._locator
512 return self._keep.get_from_cache(locator)
514 return self._keep.get(locator, num_retries=num_retries)
516 def commit_all(self):
517 """Commit all outstanding buffer blocks.
519 Unlike commit_bufferblock(), this is a synchronous call, and will not
520 return until all buffer blocks are uploaded. Raises
521 KeepWriteError() if any blocks failed to upload.
525 items = self._bufferblocks.items()
528 if v.state() == _BufferBlock.WRITABLE:
529 self.commit_bufferblock(v)
532 if self._put_queue is not None:
533 self._put_queue.join()
535 if not self._put_errors.empty():
539 err.append(self._put_errors.get(False))
542 raise KeepWriteError("Error writing some blocks", err, label="block")
544 def block_prefetch(self, locator):
545 """Initiate a background download of a block.
547 This assumes that the underlying KeepClient implements a block cache,
548 so repeated requests for the same block will not result in repeated
549 downloads (unless the block is evicted from the cache.) This method
554 if not self.prefetch_enabled:
557 def block_prefetch_worker(self):
558 """The background downloader thread."""
561 b = self._prefetch_queue.get()
569 if locator in self._bufferblocks:
571 if self._prefetch_threads is None:
572 self._prefetch_queue = Queue.Queue()
573 self._prefetch_threads = []
574 for i in xrange(0, self.num_get_threads):
575 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
576 self._prefetch_threads.append(thread)
579 self._prefetch_queue.put(locator)
582 class ArvadosFile(object):
583 """Represent a file in a Collection.
585 ArvadosFile manages the underlying representation of a file in Keep as a
586 sequence of segments spanning a set of blocks, and implements random
589 This object may be accessed from multiple threads.
593 def __init__(self, parent, name, stream=[], segments=[]):
595 ArvadosFile constructor.
598 a list of Range objects representing a block stream
601 a list of Range objects representing segments
604 self._modified = True
606 self.lock = parent.root_collection().lock
608 self._add_segment(stream, s.locator, s.range_size)
609 self._current_bblock = None
613 return self.parent.writable()
617 return copy.copy(self._segments)
620 def clone(self, new_parent, new_name):
621 """Make a copy of this file."""
622 cp = ArvadosFile(new_parent, new_name)
623 cp.replace_contents(self)
628 def replace_contents(self, other):
629 """Replace segments of this file with segments from another `ArvadosFile` object."""
633 for other_segment in other.segments():
634 new_loc = other_segment.locator
635 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
636 if other_segment.locator not in map_loc:
637 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
638 if bufferblock.state() != _BufferBlock.WRITABLE:
639 map_loc[other_segment.locator] = bufferblock.locator()
641 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
642 new_loc = map_loc[other_segment.locator]
644 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
646 self._modified = True
648 def __eq__(self, other):
651 if not isinstance(other, ArvadosFile):
654 othersegs = other.segments()
656 if len(self._segments) != len(othersegs):
658 for i in xrange(0, len(othersegs)):
659 seg1 = self._segments[i]
664 if self.parent._my_block_manager().is_bufferblock(loc1):
665 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
667 if other.parent._my_block_manager().is_bufferblock(loc2):
668 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
670 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
671 seg1.range_start != seg2.range_start or
672 seg1.range_size != seg2.range_size or
673 seg1.segment_offset != seg2.segment_offset):
678 def __ne__(self, other):
679 return not self.__eq__(other)
682 def set_unmodified(self):
683 """Clear the modified flag"""
684 self._modified = False
688 """Test the modified flag"""
689 return self._modified
693 def truncate(self, size):
694 """Shrink the size of the file.
696 If `size` is less than the size of the file, the file contents after
697 `size` will be discarded. If `size` is greater than the current size
698 of the file, an IOError will be raised.
701 if size < self.size():
703 for r in self._segments:
704 range_end = r.range_start+r.range_size
705 if r.range_start >= size:
706 # segment is past the trucate size, all done
708 elif size < range_end:
709 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
710 nr.segment_offset = r.segment_offset
716 self._segments = new_segs
717 self._modified = True
718 elif size > self.size():
719 raise IOError("truncate() does not support extending the file size")
721 def readfrom(self, offset, size, num_retries):
722 """Read upto `size` bytes from the file starting at `offset`."""
725 if size == 0 or offset >= self.size():
727 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
728 readsegs = locators_and_ranges(self._segments, offset, size)
731 self.parent._my_block_manager().block_prefetch(lr.locator)
735 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
737 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
742 def _repack_writes(self):
743 """Test if the buffer block has more data than actual segments.
745 This happens when a buffered write over-writes a file range written in
746 a previous buffered write. Re-pack the buffer block for efficiency
747 and to avoid leaking information.
750 segs = self._segments
752 # Sum up the segments to get the total bytes of the file referencing
753 # into the buffer block.
754 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
755 write_total = sum([s.range_size for s in bufferblock_segs])
757 if write_total < self._current_bblock.size():
758 # There is more data in the buffer block than is actually accounted for by segments, so
759 # re-pack into a new buffer by copying over to a new buffer block.
760 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
761 for t in bufferblock_segs:
762 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
763 t.segment_offset = new_bb.size() - t.range_size
765 self._current_bblock = new_bb
769 def writeto(self, offset, data, num_retries):
770 """Write `data` to the file starting at `offset`.
772 This will update existing bytes and/or extend the size of the file as
779 if offset > self.size():
780 raise ArgumentError("Offset is past the end of the file")
782 if len(data) > config.KEEP_BLOCK_SIZE:
783 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
785 self._modified = True
787 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
788 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
790 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
791 self._repack_writes()
792 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
793 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
794 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
796 self._current_bblock.append(data)
798 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
802 if self._current_bblock:
803 self._repack_writes()
804 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
805 self.parent.notify(MOD, self.parent, self.name, (self, self))
809 def add_segment(self, blocks, pos, size):
810 """Add a segment to the end of the file.
812 `pos` and `offset` reference a section of the stream described by
813 `blocks` (a list of Range objects)
816 self._add_segment(blocks, pos, size)
818 def _add_segment(self, blocks, pos, size):
819 """Internal implementation of add_segment."""
820 self._modified = True
821 for lr in locators_and_ranges(blocks, pos, size):
822 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
823 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
824 self._segments.append(r)
828 """Get the file size."""
830 n = self._segments[-1]
831 return n.range_start + n.range_size
836 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
839 for segment in self.segments:
840 loc = segment.locator
841 if loc.startswith("bufferblock"):
842 loc = self._bufferblocks[loc].calculate_locator()
843 if portable_locators:
844 loc = KeepLocator(loc).stripped()
845 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
846 segment.segment_offset, segment.range_size))
847 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
852 class ArvadosFileReader(ArvadosFileReaderBase):
853 """Wraps ArvadosFile in a file-like object supporting reading only.
855 Be aware that this class is NOT thread safe as there is no locking around
856 updating file pointer.
860 def __init__(self, arvadosfile, mode="r", num_retries=None):
861 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
862 self.arvadosfile = arvadosfile
865 return self.arvadosfile.size()
867 def stream_name(self):
868 return self.arvadosfile.parent.stream_name()
870 @_FileLikeObjectBase._before_close
872 def read(self, size, num_retries=None):
873 """Read up to `size` bytes from the stream, starting at the current file position."""
874 data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
875 self._filepos += len(data)
878 @_FileLikeObjectBase._before_close
880 def readfrom(self, offset, size, num_retries=None):
881 """Read up to `size` bytes from the stream, starting at the current file position."""
882 return self.arvadosfile.readfrom(offset, size, num_retries)
888 class ArvadosFileWriter(ArvadosFileReader):
889 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
891 Be aware that this class is NOT thread safe as there is no locking around
892 updating file pointer.
896 def __init__(self, arvadosfile, mode, num_retries=None):
897 super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
899 @_FileLikeObjectBase._before_close
901 def write(self, data, num_retries=None):
902 if self.mode[0] == "a":
903 self.arvadosfile.writeto(self.size(), data, num_retries)
905 self.arvadosfile.writeto(self._filepos, data, num_retries)
906 self._filepos += len(data)
908 @_FileLikeObjectBase._before_close
910 def writelines(self, seq, num_retries=None):
912 self.write(s, num_retries)
914 @_FileLikeObjectBase._before_close
915 def truncate(self, size=None):
918 self.arvadosfile.truncate(size)
919 if self._filepos > self.size():
920 self._filepos = self.size()
922 @_FileLikeObjectBase._before_close
924 self.arvadosfile.flush()
929 super(ArvadosFileWriter, self).close()