13 from .errors import KeepWriteError, AssertionError
14 from .keep import KeepLocator
15 from ._normalize_stream import normalize_stream
16 from ._ranges import locators_and_ranges, replace_range, Range
17 from .retry import retry_method
20 """split(path) -> streamname, filename
22 Separate the stream name and file name in a /-separated stream path and
23 return a tuple (stream_name, file_name). If no stream name is available,
28 stream_name, file_name = path.rsplit('/', 1)
29 except ValueError: # No / in string
30 stream_name, file_name = '.', path
31 return stream_name, file_name
33 class _FileLikeObjectBase(object):
34 def __init__(self, name, mode):
40 def _before_close(orig_func):
41 @functools.wraps(orig_func)
42 def before_close_wrapper(self, *args, **kwargs):
44 raise ValueError("I/O operation on closed stream file")
45 return orig_func(self, *args, **kwargs)
46 return before_close_wrapper
51 def __exit__(self, exc_type, exc_value, traceback):
62 class ArvadosFileReaderBase(_FileLikeObjectBase):
63 def __init__(self, name, mode, num_retries=None):
64 super(ArvadosFileReaderBase, self).__init__(name, mode)
66 self.num_retries = num_retries
67 self._readline_cache = (None, None)
71 data = self.readline()
76 def decompressed_name(self):
77 return re.sub('\.(bz2|gz)$', '', self.name)
79 @_FileLikeObjectBase._before_close
80 def seek(self, pos, whence=os.SEEK_CUR):
81 if whence == os.SEEK_CUR:
83 elif whence == os.SEEK_END:
85 self._filepos = min(max(pos, 0L), self.size())
90 @_FileLikeObjectBase._before_close
92 def readall(self, size=2**20, num_retries=None):
94 data = self.read(size, num_retries=num_retries)
99 @_FileLikeObjectBase._before_close
101 def readline(self, size=float('inf'), num_retries=None):
102 cache_pos, cache_data = self._readline_cache
103 if self.tell() == cache_pos:
107 data_size = len(data[-1])
108 while (data_size < size) and ('\n' not in data[-1]):
109 next_read = self.read(2 ** 20, num_retries=num_retries)
112 data.append(next_read)
113 data_size += len(next_read)
116 nextline_index = data.index('\n') + 1
118 nextline_index = len(data)
119 nextline_index = min(nextline_index, size)
120 self._readline_cache = (self.tell(), data[nextline_index:])
121 return data[:nextline_index]
123 @_FileLikeObjectBase._before_close
125 def decompress(self, decompress, size, num_retries=None):
126 for segment in self.readall(size, num_retries):
127 data = decompress(segment)
131 @_FileLikeObjectBase._before_close
133 def readall_decompressed(self, size=2**20, num_retries=None):
135 if self.name.endswith('.bz2'):
136 dc = bz2.BZ2Decompressor()
137 return self.decompress(dc.decompress, size,
138 num_retries=num_retries)
139 elif self.name.endswith('.gz'):
140 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
141 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
142 size, num_retries=num_retries)
144 return self.readall(size, num_retries=num_retries)
146 @_FileLikeObjectBase._before_close
148 def readlines(self, sizehint=float('inf'), num_retries=None):
151 for s in self.readall(num_retries=num_retries):
154 if data_size >= sizehint:
156 return ''.join(data).splitlines(True)
159 raise NotImplementedError()
161 def read(self, size, num_retries=None):
162 raise NotImplementedError()
164 def readfrom(self, start, size, num_retries=None):
165 raise NotImplementedError()
168 class StreamFileReader(ArvadosFileReaderBase):
169 class _NameAttribute(str):
170 # The Python file API provides a plain .name attribute.
171 # Older SDK provided a name() method.
172 # This class provides both, for maximum compatibility.
176 def __init__(self, stream, segments, name):
177 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
178 self._stream = stream
179 self.segments = segments
181 def stream_name(self):
182 return self._stream.name()
185 n = self.segments[-1]
186 return n.range_start + n.range_size
188 @_FileLikeObjectBase._before_close
190 def read(self, size, num_retries=None):
191 """Read up to 'size' bytes from the stream, starting at the current file position"""
196 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
198 lr = available_chunks[0]
199 data = self._stream.readfrom(lr.locator+lr.segment_offset,
201 num_retries=num_retries)
203 self._filepos += len(data)
206 @_FileLikeObjectBase._before_close
208 def readfrom(self, start, size, num_retries=None):
209 """Read up to 'size' bytes from the stream, starting at 'start'"""
214 for lr in locators_and_ranges(self.segments, start, size):
215 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
216 num_retries=num_retries))
219 def as_manifest(self):
221 for r in self.segments:
222 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
223 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
226 def synchronized(orig_func):
227 @functools.wraps(orig_func)
228 def synchronized_wrapper(self, *args, **kwargs):
230 return orig_func(self, *args, **kwargs)
231 return synchronized_wrapper
233 class _BufferBlock(object):
234 """A stand-in for a Keep block that is in the process of being written.
236 Writers can append to it, get the size, and compute the Keep locator.
237 There are three valid states:
243 Block is in the process of being uploaded to Keep, append is an error.
246 The block has been written to Keep, its internal buffer has been
247 released, fetching the block will fetch it via keep client (since we
248 discarded the internal copy), and identifiers referring to the BufferBlock
249 can be replaced with the block locator.
257 def __init__(self, blockid, starting_capacity, owner):
260 the identifier for this block
263 the initial buffer capacity
266 ArvadosFile that owns this block
269 self.blockid = blockid
270 self.buffer_block = bytearray(starting_capacity)
271 self.buffer_view = memoryview(self.buffer_block)
272 self.write_pointer = 0
273 self._state = _BufferBlock.WRITABLE
276 self.lock = threading.Lock()
279 def append(self, data):
280 """Append some data to the buffer.
282 Only valid if the block is in WRITABLE state. Implements an expanding
283 buffer, doubling capacity as needed to accomdate all the data.
286 if self._state == _BufferBlock.WRITABLE:
287 while (self.write_pointer+len(data)) > len(self.buffer_block):
288 new_buffer_block = bytearray(len(self.buffer_block) * 2)
289 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
290 self.buffer_block = new_buffer_block
291 self.buffer_view = memoryview(self.buffer_block)
292 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
293 self.write_pointer += len(data)
296 raise AssertionError("Buffer block is not writable")
299 def set_state(self, nextstate, loc=None):
300 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
301 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
302 self._state = nextstate
303 if self._state == _BufferBlock.COMMITTED:
305 self.buffer_view = None
306 self.buffer_block = None
308 raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
315 """The amount of data written to the buffer."""
316 return self.write_pointer
320 """The Keep locator for this buffer's contents."""
321 if self._locator is None:
322 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
326 def clone(self, new_blockid, owner):
327 if self._state == _BufferBlock.COMMITTED:
328 raise AssertionError("Can only duplicate a writable or pending buffer block")
329 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
330 bufferblock.append(self.buffer_view[0:self.size()])
334 class NoopLock(object):
338 def __exit__(self, exc_type, exc_value, traceback):
341 def acquire(self, blocking=False):
348 def must_be_writable(orig_func):
349 @functools.wraps(orig_func)
350 def must_be_writable_wrapper(self, *args, **kwargs):
351 if not self.writable():
352 raise IOError((errno.EROFS, "Collection must be writable."))
353 return orig_func(self, *args, **kwargs)
354 return must_be_writable_wrapper
357 class _BlockManager(object):
358 """BlockManager handles buffer blocks.
360 Also handles background block uploads, and background block prefetch for a
361 Collection of ArvadosFiles.
364 def __init__(self, keep):
365 """keep: KeepClient object to use"""
367 self._bufferblocks = {}
368 self._put_queue = None
369 self._put_errors = None
370 self._put_threads = None
371 self._prefetch_queue = None
372 self._prefetch_threads = None
373 self.lock = threading.Lock()
374 self.prefetch_enabled = True
375 self.num_put_threads = 2
376 self.num_get_threads = 2
379 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
380 """Allocate a new, empty bufferblock in WRITABLE state and return it.
383 optional block identifier, otherwise one will be automatically assigned
386 optional capacity, otherwise will use default capacity
389 ArvadosFile that owns this block
393 blockid = "bufferblock%i" % len(self._bufferblocks)
394 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
395 self._bufferblocks[bufferblock.blockid] = bufferblock
399 def dup_block(self, block, owner):
400 """Create a new bufferblock initialized with the content of an existing bufferblock.
403 the buffer block to copy.
406 ArvadosFile that owns the new block
409 new_blockid = "bufferblock%i" % len(self._bufferblocks)
410 bufferblock = block.clone(new_blockid, owner)
411 self._bufferblocks[bufferblock.blockid] = bufferblock
415 def is_bufferblock(self, locator):
416 return locator in self._bufferblocks
419 def stop_threads(self):
420 """Shut down and wait for background upload and download threads to finish."""
422 if self._put_threads is not None:
423 for t in self._put_threads:
424 self._put_queue.put(None)
425 for t in self._put_threads:
427 self._put_threads = None
428 self._put_queue = None
429 self._put_errors = None
431 if self._prefetch_threads is not None:
432 for t in self._prefetch_threads:
433 self._prefetch_queue.put(None)
434 for t in self._prefetch_threads:
436 self._prefetch_threads = None
437 self._prefetch_queue = None
439 def commit_bufferblock(self, block):
440 """Initiate a background upload of a bufferblock.
442 This will block if the upload queue is at capacity, otherwise it will
447 def commit_bufferblock_worker(self):
448 """Background uploader thread."""
452 bufferblock = self._put_queue.get()
453 if bufferblock is None:
455 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
456 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
458 except Exception as e:
459 self._put_errors.put((bufferblock.locator(), e))
461 if self._put_queue is not None:
462 self._put_queue.task_done()
465 if self._put_threads is None:
466 # Start uploader threads.
468 # If we don't limit the Queue size, the upload queue can quickly
469 # grow to take up gigabytes of RAM if the writing process is
470 # generating data more quickly than it can be send to the Keep
473 # With two upload threads and a queue size of 2, this means up to 4
474 # blocks pending. If they are full 64 MiB blocks, that means up to
475 # 256 MiB of internal buffering, which is the same size as the
476 # default download block cache in KeepClient.
477 self._put_queue = Queue.Queue(maxsize=2)
478 self._put_errors = Queue.Queue()
480 self._put_threads = []
481 for i in xrange(0, self.num_put_threads):
482 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
483 self._put_threads.append(thread)
487 # Mark the block as PENDING so to disallow any more appends.
488 block.set_state(_BufferBlock.PENDING)
489 self._put_queue.put(block)
492 def get_bufferblock(self, locator):
493 return self._bufferblocks.get(locator)
495 def get_block_contents(self, locator, num_retries, cache_only=False):
498 First checks to see if the locator is a BufferBlock and return that, if
499 not, passes the request through to KeepClient.get().
503 if locator in self._bufferblocks:
504 bufferblock = self._bufferblocks[locator]
505 if bufferblock.state() != _BufferBlock.COMMITTED:
506 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
508 locator = bufferblock._locator
510 return self._keep.get_from_cache(locator)
512 return self._keep.get(locator, num_retries=num_retries)
514 def commit_all(self):
515 """Commit all outstanding buffer blocks.
517 Unlike commit_bufferblock(), this is a synchronous call, and will not
518 return until all buffer blocks are uploaded. Raises
519 KeepWriteError() if any blocks failed to upload.
523 items = self._bufferblocks.items()
526 if v.state() == _BufferBlock.WRITABLE:
527 self.commit_bufferblock(v)
530 if self._put_queue is not None:
531 self._put_queue.join()
533 if not self._put_errors.empty():
537 err.append(self._put_errors.get(False))
540 raise KeepWriteError("Error writing some blocks", err, label="block")
542 def block_prefetch(self, locator):
543 """Initiate a background download of a block.
545 This assumes that the underlying KeepClient implements a block cache,
546 so repeated requests for the same block will not result in repeated
547 downloads (unless the block is evicted from the cache.) This method
552 if not self.prefetch_enabled:
555 def block_prefetch_worker(self):
556 """The background downloader thread."""
559 b = self._prefetch_queue.get()
567 if locator in self._bufferblocks:
569 if self._prefetch_threads is None:
570 self._prefetch_queue = Queue.Queue()
571 self._prefetch_threads = []
572 for i in xrange(0, self.num_get_threads):
573 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
574 self._prefetch_threads.append(thread)
577 self._prefetch_queue.put(locator)
580 class ArvadosFile(object):
581 """Represent a file in a Collection.
583 ArvadosFile manages the underlying representation of a file in Keep as a
584 sequence of segments spanning a set of blocks, and implements random
587 This object may be accessed from multiple threads.
591 def __init__(self, parent, stream=[], segments=[]):
593 ArvadosFile constructor.
596 a list of Range objects representing a block stream
599 a list of Range objects representing segments
602 self._modified = True
604 self.lock = parent.root_collection().lock
606 self._add_segment(stream, s.locator, s.range_size)
607 self._current_bblock = None
610 return self.parent.writable()
614 return copy.copy(self._segments)
617 def clone(self, new_parent):
618 """Make a copy of this file."""
619 cp = ArvadosFile(new_parent)
620 cp.replace_contents(self)
625 def replace_contents(self, other):
626 """Replace segments of this file with segments from another `ArvadosFile` object."""
630 for other_segment in other.segments():
631 new_loc = other_segment.locator
632 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
633 if other_segment.locator not in map_loc:
634 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
635 if bufferblock.state() != _BufferBlock.WRITABLE:
636 map_loc[other_segment.locator] = bufferblock.locator()
638 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
639 new_loc = map_loc[other_segment.locator]
641 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
643 self._modified = True
645 def __eq__(self, other):
648 if not isinstance(other, ArvadosFile):
651 othersegs = other.segments()
653 if len(self._segments) != len(othersegs):
655 for i in xrange(0, len(othersegs)):
656 seg1 = self._segments[i]
661 if self.parent._my_block_manager().is_bufferblock(loc1):
662 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
664 if other.parent._my_block_manager().is_bufferblock(loc2):
665 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
667 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
668 seg1.range_start != seg2.range_start or
669 seg1.range_size != seg2.range_size or
670 seg1.segment_offset != seg2.segment_offset):
675 def __ne__(self, other):
676 return not self.__eq__(other)
679 def set_unmodified(self):
680 """Clear the modified flag"""
681 self._modified = False
685 """Test the modified flag"""
686 return self._modified
690 def truncate(self, size):
691 """Shrink the size of the file.
693 If `size` is less than the size of the file, the file contents after
694 `size` will be discarded. If `size` is greater than the current size
695 of the file, an IOError will be raised.
698 if size < self.size():
700 for r in self._segments:
701 range_end = r.range_start+r.range_size
702 if r.range_start >= size:
703 # segment is past the trucate size, all done
705 elif size < range_end:
706 nr = Range(r.locator, r.range_start, size - r.range_start)
707 nr.segment_offset = r.segment_offset
713 self._segments = new_segs
714 self._modified = True
715 elif size > self.size():
716 raise IOError("truncate() does not support extending the file size")
718 def readfrom(self, offset, size, num_retries):
719 """Read upto `size` bytes from the file starting at `offset`."""
722 if size == 0 or offset >= self.size():
724 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
725 readsegs = locators_and_ranges(self._segments, offset, size)
728 self.parent._my_block_manager().block_prefetch(lr.locator)
732 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
734 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
739 def _repack_writes(self):
740 """Test if the buffer block has more data than actual segments.
742 This happens when a buffered write over-writes a file range written in
743 a previous buffered write. Re-pack the buffer block for efficiency
744 and to avoid leaking information.
747 segs = self._segments
749 # Sum up the segments to get the total bytes of the file referencing
750 # into the buffer block.
751 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
752 write_total = sum([s.range_size for s in bufferblock_segs])
754 if write_total < self._current_bblock.size():
755 # There is more data in the buffer block than is actually accounted for by segments, so
756 # re-pack into a new buffer by copying over to a new buffer block.
757 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
758 for t in bufferblock_segs:
759 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
760 t.segment_offset = new_bb.size() - t.range_size
762 self._current_bblock = new_bb
766 def writeto(self, offset, data, num_retries):
767 """Write `data` to the file starting at `offset`.
769 This will update existing bytes and/or extend the size of the file as
776 if offset > self.size():
777 raise ArgumentError("Offset is past the end of the file")
779 if len(data) > config.KEEP_BLOCK_SIZE:
780 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
782 self._modified = True
784 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
785 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
787 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
788 self._repack_writes()
789 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
790 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
791 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
793 self._current_bblock.append(data)
795 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
799 if self._current_bblock:
800 self._repack_writes()
801 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
805 def add_segment(self, blocks, pos, size):
806 """Add a segment to the end of the file.
808 `pos` and `offset` reference a section of the stream described by
809 `blocks` (a list of Range objects)
812 self._add_segment(blocks, pos, size)
814 def _add_segment(self, blocks, pos, size):
815 """Internal implementation of add_segment."""
816 self._modified = True
817 for lr in locators_and_ranges(blocks, pos, size):
818 last = self._segments[-1] if self._segments else Range(0, 0, 0)
819 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
820 self._segments.append(r)
824 """Get the file size."""
826 n = self._segments[-1]
827 return n.range_start + n.range_size
832 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
835 for segment in self.segments:
836 loc = segment.locator
837 if loc.startswith("bufferblock"):
838 loc = self._bufferblocks[loc].calculate_locator()
839 if portable_locators:
840 loc = KeepLocator(loc).stripped()
841 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
842 segment.segment_offset, segment.range_size))
843 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
848 class ArvadosFileReader(ArvadosFileReaderBase):
849 """Wraps ArvadosFile in a file-like object supporting reading only.
851 Be aware that this class is NOT thread safe as there is no locking around
852 updating file pointer.
856 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
857 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
858 self.arvadosfile = arvadosfile
861 return self.arvadosfile.size()
863 def stream_name(self):
864 return self.arvadosfile.parent.stream_name()
866 @_FileLikeObjectBase._before_close
868 def read(self, size, num_retries=None):
869 """Read up to `size` bytes from the stream, starting at the current file position."""
870 data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
871 self._filepos += len(data)
874 @_FileLikeObjectBase._before_close
876 def readfrom(self, offset, size, num_retries=None):
877 """Read up to `size` bytes from the stream, starting at the current file position."""
878 return self.arvadosfile.readfrom(offset, size, num_retries)
884 class ArvadosFileWriter(ArvadosFileReader):
885 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
887 Be aware that this class is NOT thread safe as there is no locking around
888 updating file pointer.
892 def __init__(self, arvadosfile, name, mode, num_retries=None):
893 super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
895 @_FileLikeObjectBase._before_close
897 def write(self, data, num_retries=None):
898 if self.mode[0] == "a":
899 self.arvadosfile.writeto(self.size(), data, num_retries)
901 self.arvadosfile.writeto(self._filepos, data, num_retries)
902 self._filepos += len(data)
904 @_FileLikeObjectBase._before_close
906 def writelines(self, seq, num_retries=None):
908 self.write(s, num_retries)
910 @_FileLikeObjectBase._before_close
911 def truncate(self, size=None):
914 self.arvadosfile.truncate(size)
915 if self._filepos > self.size():
916 self._filepos = self.size()
918 @_FileLikeObjectBase._before_close
920 self.arvadosfile.flush()
925 super(ArvadosFileWriter, self).close()