1 from __future__ import absolute_import
17 from .errors import KeepWriteError, AssertionError, ArgumentError
18 from .keep import KeepLocator
19 from ._normalize_stream import normalize_stream
20 from ._ranges import locators_and_ranges, replace_range, Range
21 from .retry import retry_method
26 _logger = logging.getLogger('arvados.arvfile')
29 """split(path) -> streamname, filename
31 Separate the stream name and file name in a /-separated stream path and
32 return a tuple (stream_name, file_name). If no stream name is available,
37 stream_name, file_name = path.rsplit('/', 1)
38 except ValueError: # No / in string
39 stream_name, file_name = '.', path
40 return stream_name, file_name
43 class UnownedBlockError(Exception):
44 """Raised when there's an writable block without an owner on the BlockManager."""
48 class _FileLikeObjectBase(object):
49 def __init__(self, name, mode):
55 def _before_close(orig_func):
56 @functools.wraps(orig_func)
57 def before_close_wrapper(self, *args, **kwargs):
59 raise ValueError("I/O operation on closed stream file")
60 return orig_func(self, *args, **kwargs)
61 return before_close_wrapper
66 def __exit__(self, exc_type, exc_value, traceback):
77 class ArvadosFileReaderBase(_FileLikeObjectBase):
78 def __init__(self, name, mode, num_retries=None):
79 super(ArvadosFileReaderBase, self).__init__(name, mode)
81 self.num_retries = num_retries
82 self._readline_cache = (None, None)
86 data = self.readline()
91 def decompressed_name(self):
92 return re.sub('\.(bz2|gz)$', '', self.name)
94 @_FileLikeObjectBase._before_close
95 def seek(self, pos, whence=os.SEEK_SET):
96 if whence == os.SEEK_CUR:
98 elif whence == os.SEEK_END:
100 self._filepos = min(max(pos, 0), self.size())
105 @_FileLikeObjectBase._before_close
107 def readall(self, size=2**20, num_retries=None):
109 data = self.read(size, num_retries=num_retries)
114 @_FileLikeObjectBase._before_close
116 def readline(self, size=float('inf'), num_retries=None):
117 cache_pos, cache_data = self._readline_cache
118 if self.tell() == cache_pos:
120 self._filepos += len(cache_data)
123 data_size = len(data[-1])
124 while (data_size < size) and ('\n' not in data[-1]):
125 next_read = self.read(2 ** 20, num_retries=num_retries)
128 data.append(next_read)
129 data_size += len(next_read)
132 nextline_index = data.index('\n') + 1
134 nextline_index = len(data)
135 nextline_index = min(nextline_index, size)
136 self._filepos -= len(data) - nextline_index
137 self._readline_cache = (self.tell(), data[nextline_index:])
138 return data[:nextline_index]
140 @_FileLikeObjectBase._before_close
142 def decompress(self, decompress, size, num_retries=None):
143 for segment in self.readall(size, num_retries=num_retries):
144 data = decompress(segment)
148 @_FileLikeObjectBase._before_close
150 def readall_decompressed(self, size=2**20, num_retries=None):
152 if self.name.endswith('.bz2'):
153 dc = bz2.BZ2Decompressor()
154 return self.decompress(dc.decompress, size,
155 num_retries=num_retries)
156 elif self.name.endswith('.gz'):
157 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
158 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
159 size, num_retries=num_retries)
161 return self.readall(size, num_retries=num_retries)
163 @_FileLikeObjectBase._before_close
165 def readlines(self, sizehint=float('inf'), num_retries=None):
168 for s in self.readall(num_retries=num_retries):
171 if data_size >= sizehint:
173 return ''.join(data).splitlines(True)
176 raise NotImplementedError()
178 def read(self, size, num_retries=None):
179 raise NotImplementedError()
181 def readfrom(self, start, size, num_retries=None):
182 raise NotImplementedError()
185 class StreamFileReader(ArvadosFileReaderBase):
186 class _NameAttribute(str):
187 # The Python file API provides a plain .name attribute.
188 # Older SDK provided a name() method.
189 # This class provides both, for maximum compatibility.
193 def __init__(self, stream, segments, name):
194 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
195 self._stream = stream
196 self.segments = segments
198 def stream_name(self):
199 return self._stream.name()
202 n = self.segments[-1]
203 return n.range_start + n.range_size
205 @_FileLikeObjectBase._before_close
207 def read(self, size, num_retries=None):
208 """Read up to 'size' bytes from the stream, starting at the current file position"""
213 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
215 lr = available_chunks[0]
216 data = self._stream.readfrom(lr.locator+lr.segment_offset,
218 num_retries=num_retries)
220 self._filepos += len(data)
223 @_FileLikeObjectBase._before_close
225 def readfrom(self, start, size, num_retries=None):
226 """Read up to 'size' bytes from the stream, starting at 'start'"""
231 for lr in locators_and_ranges(self.segments, start, size):
232 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
233 num_retries=num_retries))
236 def as_manifest(self):
238 for r in self.segments:
239 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
240 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
243 def synchronized(orig_func):
244 @functools.wraps(orig_func)
245 def synchronized_wrapper(self, *args, **kwargs):
247 return orig_func(self, *args, **kwargs)
248 return synchronized_wrapper
251 class StateChangeError(Exception):
252 def __init__(self, message, state, nextstate):
253 super(StateChangeError, self).__init__(message)
255 self.nextstate = nextstate
257 class _BufferBlock(object):
258 """A stand-in for a Keep block that is in the process of being written.
260 Writers can append to it, get the size, and compute the Keep locator.
261 There are three valid states:
267 Block is in the process of being uploaded to Keep, append is an error.
270 The block has been written to Keep, its internal buffer has been
271 released, fetching the block will fetch it via keep client (since we
272 discarded the internal copy), and identifiers referring to the BufferBlock
273 can be replaced with the block locator.
282 def __init__(self, blockid, starting_capacity, owner):
285 the identifier for this block
288 the initial buffer capacity
291 ArvadosFile that owns this block
294 self.blockid = blockid
295 self.buffer_block = bytearray(starting_capacity)
296 self.buffer_view = memoryview(self.buffer_block)
297 self.write_pointer = 0
298 self._state = _BufferBlock.WRITABLE
301 self.lock = threading.Lock()
302 self.wait_for_commit = threading.Event()
306 def append(self, data):
307 """Append some data to the buffer.
309 Only valid if the block is in WRITABLE state. Implements an expanding
310 buffer, doubling capacity as needed to accomdate all the data.
313 if self._state == _BufferBlock.WRITABLE:
314 while (self.write_pointer+len(data)) > len(self.buffer_block):
315 new_buffer_block = bytearray(len(self.buffer_block) * 2)
316 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
317 self.buffer_block = new_buffer_block
318 self.buffer_view = memoryview(self.buffer_block)
319 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
320 self.write_pointer += len(data)
323 raise AssertionError("Buffer block is not writable")
325 STATE_TRANSITIONS = frozenset([
327 (PENDING, COMMITTED),
332 def set_state(self, nextstate, val=None):
333 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
334 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
335 self._state = nextstate
337 if self._state == _BufferBlock.PENDING:
338 self.wait_for_commit.clear()
340 if self._state == _BufferBlock.COMMITTED:
342 self.buffer_view = None
343 self.buffer_block = None
344 self.wait_for_commit.set()
346 if self._state == _BufferBlock.ERROR:
348 self.wait_for_commit.set()
355 """The amount of data written to the buffer."""
356 return self.write_pointer
360 """The Keep locator for this buffer's contents."""
361 if self._locator is None:
362 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
366 def clone(self, new_blockid, owner):
367 if self._state == _BufferBlock.COMMITTED:
368 raise AssertionError("Cannot duplicate committed buffer block")
369 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
370 bufferblock.append(self.buffer_view[0:self.size()])
376 self.buffer_block = None
377 self.buffer_view = None
380 class NoopLock(object):
384 def __exit__(self, exc_type, exc_value, traceback):
387 def acquire(self, blocking=False):
394 def must_be_writable(orig_func):
395 @functools.wraps(orig_func)
396 def must_be_writable_wrapper(self, *args, **kwargs):
397 if not self.writable():
398 raise IOError(errno.EROFS, "Collection is read-only.")
399 return orig_func(self, *args, **kwargs)
400 return must_be_writable_wrapper
403 class _BlockManager(object):
404 """BlockManager handles buffer blocks.
406 Also handles background block uploads, and background block prefetch for a
407 Collection of ArvadosFiles.
411 DEFAULT_PUT_THREADS = 2
412 DEFAULT_GET_THREADS = 2
414 def __init__(self, keep, copies=None, put_threads=None):
415 """keep: KeepClient object to use"""
417 self._bufferblocks = collections.OrderedDict()
418 self._put_queue = None
419 self._put_threads = None
420 self._prefetch_queue = None
421 self._prefetch_threads = None
422 self.lock = threading.Lock()
423 self.prefetch_enabled = True
425 self.num_put_threads = put_threads
427 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
428 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
430 self._pending_write_size = 0
431 self.threads_lock = threading.Lock()
434 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
435 """Allocate a new, empty bufferblock in WRITABLE state and return it.
438 optional block identifier, otherwise one will be automatically assigned
441 optional capacity, otherwise will use default capacity
444 ArvadosFile that owns this block
447 return self._alloc_bufferblock(blockid, starting_capacity, owner)
449 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
451 blockid = "%s" % uuid.uuid4()
452 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
453 self._bufferblocks[bufferblock.blockid] = bufferblock
457 def dup_block(self, block, owner):
458 """Create a new bufferblock initialized with the content of an existing bufferblock.
461 the buffer block to copy.
464 ArvadosFile that owns the new block
467 new_blockid = "bufferblock%i" % len(self._bufferblocks)
468 bufferblock = block.clone(new_blockid, owner)
469 self._bufferblocks[bufferblock.blockid] = bufferblock
473 def is_bufferblock(self, locator):
474 return locator in self._bufferblocks
476 def _commit_bufferblock_worker(self):
477 """Background uploader thread."""
481 bufferblock = self._put_queue.get()
482 if bufferblock is None:
485 if self.copies is None:
486 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
488 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
489 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
491 except Exception as e:
492 bufferblock.set_state(_BufferBlock.ERROR, e)
494 if self._put_queue is not None:
495 self._put_queue.task_done()
497 def start_put_threads(self):
498 with self.threads_lock:
499 if self._put_threads is None:
500 # Start uploader threads.
502 # If we don't limit the Queue size, the upload queue can quickly
503 # grow to take up gigabytes of RAM if the writing process is
504 # generating data more quickly than it can be send to the Keep
507 # With two upload threads and a queue size of 2, this means up to 4
508 # blocks pending. If they are full 64 MiB blocks, that means up to
509 # 256 MiB of internal buffering, which is the same size as the
510 # default download block cache in KeepClient.
511 self._put_queue = Queue.Queue(maxsize=2)
513 self._put_threads = []
514 for i in xrange(0, self.num_put_threads):
515 thread = threading.Thread(target=self._commit_bufferblock_worker)
516 self._put_threads.append(thread)
520 def _block_prefetch_worker(self):
521 """The background downloader thread."""
524 b = self._prefetch_queue.get()
529 _logger.exception("Exception doing block prefetch")
532 def start_get_threads(self):
533 if self._prefetch_threads is None:
534 self._prefetch_queue = Queue.Queue()
535 self._prefetch_threads = []
536 for i in xrange(0, self.num_get_threads):
537 thread = threading.Thread(target=self._block_prefetch_worker)
538 self._prefetch_threads.append(thread)
544 def stop_threads(self):
545 """Shut down and wait for background upload and download threads to finish."""
547 if self._put_threads is not None:
548 for t in self._put_threads:
549 self._put_queue.put(None)
550 for t in self._put_threads:
552 self._put_threads = None
553 self._put_queue = None
555 if self._prefetch_threads is not None:
556 for t in self._prefetch_threads:
557 self._prefetch_queue.put(None)
558 for t in self._prefetch_threads:
560 self._prefetch_threads = None
561 self._prefetch_queue = None
566 def __exit__(self, exc_type, exc_value, traceback):
570 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
571 """Packs small blocks together before uploading"""
572 self._pending_write_size += closed_file_size
574 # Check if there are enough small blocks for filling up one in full
575 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
577 # Search blocks ready for getting packed together before being committed to Keep.
578 # A WRITABLE block always has an owner.
579 # A WRITABLE block with its owner.closed() implies that it's
580 # size is <= KEEP_BLOCK_SIZE/2.
582 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
583 except AttributeError:
584 # Writable blocks without owner shouldn't exist.
585 raise UnownedBlockError()
587 if len(small_blocks) <= 1:
588 # Not enough small blocks for repacking
591 # Update the pending write size count with its true value, just in case
592 # some small file was opened, written and closed several times.
593 self._pending_write_size = sum([b.size() for b in small_blocks])
594 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
597 new_bb = self._alloc_bufferblock()
598 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
599 bb = small_blocks.pop(0)
601 self._pending_write_size -= bb.size()
602 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
603 arvfile.set_segments([Range(new_bb.blockid,
606 new_bb.write_pointer - bb.size())])
607 self._delete_bufferblock(bb.blockid)
608 self.commit_bufferblock(new_bb, sync=sync)
610 def commit_bufferblock(self, block, sync):
611 """Initiate a background upload of a bufferblock.
614 The block object to upload
617 If `sync` is True, upload the block synchronously.
618 If `sync` is False, upload the block asynchronously. This will
619 return immediately unless the upload queue is at capacity, in
620 which case it will wait on an upload queue slot.
624 # Mark the block as PENDING so to disallow any more appends.
625 block.set_state(_BufferBlock.PENDING)
626 except StateChangeError as e:
627 if e.state == _BufferBlock.PENDING:
629 block.wait_for_commit.wait()
632 if block.state() == _BufferBlock.COMMITTED:
634 elif block.state() == _BufferBlock.ERROR:
641 if self.copies is None:
642 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
644 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
645 block.set_state(_BufferBlock.COMMITTED, loc)
646 except Exception as e:
647 block.set_state(_BufferBlock.ERROR, e)
650 self.start_put_threads()
651 self._put_queue.put(block)
654 def get_bufferblock(self, locator):
655 return self._bufferblocks.get(locator)
658 def delete_bufferblock(self, locator):
659 self._delete_bufferblock(locator)
661 def _delete_bufferblock(self, locator):
662 bb = self._bufferblocks[locator]
664 del self._bufferblocks[locator]
666 def get_block_contents(self, locator, num_retries, cache_only=False):
669 First checks to see if the locator is a BufferBlock and return that, if
670 not, passes the request through to KeepClient.get().
674 if locator in self._bufferblocks:
675 bufferblock = self._bufferblocks[locator]
676 if bufferblock.state() != _BufferBlock.COMMITTED:
677 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
679 locator = bufferblock._locator
681 return self._keep.get_from_cache(locator)
683 return self._keep.get(locator, num_retries=num_retries)
685 def commit_all(self):
686 """Commit all outstanding buffer blocks.
688 This is a synchronous call, and will not return until all buffer blocks
689 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
692 self.repack_small_blocks(force=True, sync=True)
695 items = self._bufferblocks.items()
698 if v.state() != _BufferBlock.COMMITTED and v.owner:
699 v.owner.flush(sync=False)
702 if self._put_queue is not None:
703 self._put_queue.join()
707 if v.state() == _BufferBlock.ERROR:
708 err.append((v.locator(), v.error))
710 raise KeepWriteError("Error writing some blocks", err, label="block")
713 # flush again with sync=True to remove committed bufferblocks from
716 v.owner.flush(sync=True)
718 def block_prefetch(self, locator):
719 """Initiate a background download of a block.
721 This assumes that the underlying KeepClient implements a block cache,
722 so repeated requests for the same block will not result in repeated
723 downloads (unless the block is evicted from the cache.) This method
728 if not self.prefetch_enabled:
731 if self._keep.get_from_cache(locator) is not None:
735 if locator in self._bufferblocks:
738 self.start_get_threads()
739 self._prefetch_queue.put(locator)
742 class ArvadosFile(object):
743 """Represent a file in a Collection.
745 ArvadosFile manages the underlying representation of a file in Keep as a
746 sequence of segments spanning a set of blocks, and implements random
749 This object may be accessed from multiple threads.
753 def __init__(self, parent, name, stream=[], segments=[]):
755 ArvadosFile constructor.
758 a list of Range objects representing a block stream
761 a list of Range objects representing segments
765 self._writers = set()
766 self._committed = False
768 self.lock = parent.root_collection().lock
770 self._add_segment(stream, s.locator, s.range_size)
771 self._current_bblock = None
774 return self.parent.writable()
777 def permission_expired(self, as_of_dt=None):
778 """Returns True if any of the segment's locators is expired"""
779 for r in self._segments:
780 if KeepLocator(r.locator).permission_expired(as_of_dt):
786 return copy.copy(self._segments)
789 def clone(self, new_parent, new_name):
790 """Make a copy of this file."""
791 cp = ArvadosFile(new_parent, new_name)
792 cp.replace_contents(self)
797 def replace_contents(self, other):
798 """Replace segments of this file with segments from another `ArvadosFile` object."""
802 for other_segment in other.segments():
803 new_loc = other_segment.locator
804 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
805 if other_segment.locator not in map_loc:
806 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
807 if bufferblock.state() != _BufferBlock.WRITABLE:
808 map_loc[other_segment.locator] = bufferblock.locator()
810 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
811 new_loc = map_loc[other_segment.locator]
813 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
815 self.set_committed(False)
817 def __eq__(self, other):
820 if not isinstance(other, ArvadosFile):
823 othersegs = other.segments()
825 if len(self._segments) != len(othersegs):
827 for i in xrange(0, len(othersegs)):
828 seg1 = self._segments[i]
833 if self.parent._my_block_manager().is_bufferblock(loc1):
834 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
836 if other.parent._my_block_manager().is_bufferblock(loc2):
837 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
839 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
840 seg1.range_start != seg2.range_start or
841 seg1.range_size != seg2.range_size or
842 seg1.segment_offset != seg2.segment_offset):
847 def __ne__(self, other):
848 return not self.__eq__(other)
851 def set_segments(self, segs):
852 self._segments = segs
855 def set_committed(self, value=True):
856 """Set committed flag.
858 If value is True, set committed to be True.
860 If value is False, set committed to be False for this and all parents.
862 if value == self._committed:
864 self._committed = value
865 if self._committed is False and self.parent is not None:
866 self.parent.set_committed(False)
870 """Get whether this is committed or not."""
871 return self._committed
874 def add_writer(self, writer):
875 """Add an ArvadosFileWriter reference to the list of writers"""
876 if isinstance(writer, ArvadosFileWriter):
877 self._writers.add(writer)
880 def remove_writer(self, writer, flush):
882 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
883 and do some block maintenance tasks.
885 self._writers.remove(writer)
887 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
888 # File writer closed, not small enough for repacking
891 # All writers closed and size is adequate for repacking
892 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
896 Get whether this is closed or not. When the writers list is empty, the file
897 is supposed to be closed.
899 return len(self._writers) == 0
903 def truncate(self, size):
904 """Shrink the size of the file.
906 If `size` is less than the size of the file, the file contents after
907 `size` will be discarded. If `size` is greater than the current size
908 of the file, an IOError will be raised.
911 if size < self.size():
913 for r in self._segments:
914 range_end = r.range_start+r.range_size
915 if r.range_start >= size:
916 # segment is past the trucate size, all done
918 elif size < range_end:
919 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
920 nr.segment_offset = r.segment_offset
926 self._segments = new_segs
927 self.set_committed(False)
928 elif size > self.size():
929 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
931 def readfrom(self, offset, size, num_retries, exact=False):
932 """Read up to `size` bytes from the file starting at `offset`.
935 If False (default), return less data than requested if the read
936 crosses a block boundary and the next block isn't cached. If True,
937 only return less data than requested when hitting EOF.
941 if size == 0 or offset >= self.size():
943 readsegs = locators_and_ranges(self._segments, offset, size)
944 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
949 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
951 blockview = memoryview(block)
952 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
958 if lr.locator not in locs:
959 self.parent._my_block_manager().block_prefetch(lr.locator)
964 def _repack_writes(self, num_retries):
965 """Test if the buffer block has more data than actual segments.
967 This happens when a buffered write over-writes a file range written in
968 a previous buffered write. Re-pack the buffer block for efficiency
969 and to avoid leaking information.
972 segs = self._segments
974 # Sum up the segments to get the total bytes of the file referencing
975 # into the buffer block.
976 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
977 write_total = sum([s.range_size for s in bufferblock_segs])
979 if write_total < self._current_bblock.size():
980 # There is more data in the buffer block than is actually accounted for by segments, so
981 # re-pack into a new buffer by copying over to a new buffer block.
982 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
983 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
984 for t in bufferblock_segs:
985 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
986 t.segment_offset = new_bb.size() - t.range_size
988 self._current_bblock = new_bb
992 def writeto(self, offset, data, num_retries):
993 """Write `data` to the file starting at `offset`.
995 This will update existing bytes and/or extend the size of the file as
1002 if offset > self.size():
1003 raise ArgumentError("Offset is past the end of the file")
1005 if len(data) > config.KEEP_BLOCK_SIZE:
1006 # Chunk it up into smaller writes
1008 dataview = memoryview(data)
1009 while n < len(data):
1010 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1011 n += config.KEEP_BLOCK_SIZE
1014 self.set_committed(False)
1016 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1017 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1019 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1020 self._repack_writes(num_retries)
1021 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1022 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1023 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1025 self._current_bblock.append(data)
1027 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1029 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1034 def flush(self, sync=True, num_retries=0):
1035 """Flush the current bufferblock to Keep.
1038 If True, commit block synchronously, wait until buffer block has been written.
1039 If False, commit block asynchronously, return immediately after putting block into
1042 if self.committed():
1045 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1046 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1047 self._repack_writes(num_retries)
1048 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1052 for s in self._segments:
1053 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1055 if bb.state() != _BufferBlock.COMMITTED:
1056 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1057 to_delete.add(s.locator)
1058 s.locator = bb.locator()
1060 self.parent._my_block_manager().delete_bufferblock(s)
1062 self.parent.notify(MOD, self.parent, self.name, (self, self))
1066 def add_segment(self, blocks, pos, size):
1067 """Add a segment to the end of the file.
1069 `pos` and `offset` reference a section of the stream described by
1070 `blocks` (a list of Range objects)
1073 self._add_segment(blocks, pos, size)
1075 def _add_segment(self, blocks, pos, size):
1076 """Internal implementation of add_segment."""
1077 self.set_committed(False)
1078 for lr in locators_and_ranges(blocks, pos, size):
1079 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1080 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1081 self._segments.append(r)
1085 """Get the file size."""
1087 n = self._segments[-1]
1088 return n.range_start + n.range_size
1093 def manifest_text(self, stream_name=".", portable_locators=False,
1094 normalize=False, only_committed=False):
1097 for segment in self.segments:
1098 loc = segment.locator
1099 if self.parent._my_block_manager().is_bufferblock(loc):
1102 loc = self._bufferblocks[loc].calculate_locator()
1103 if portable_locators:
1104 loc = KeepLocator(loc).stripped()
1105 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1106 segment.segment_offset, segment.range_size))
1107 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1113 def _reparent(self, newparent, newname):
1114 self.set_committed(False)
1115 self.flush(sync=True)
1116 self.parent.remove(self.name)
1117 self.parent = newparent
1119 self.lock = self.parent.root_collection().lock
1122 class ArvadosFileReader(ArvadosFileReaderBase):
1123 """Wraps ArvadosFile in a file-like object supporting reading only.
1125 Be aware that this class is NOT thread safe as there is no locking around
1126 updating file pointer.
1130 def __init__(self, arvadosfile, num_retries=None):
1131 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1132 self.arvadosfile = arvadosfile
1135 return self.arvadosfile.size()
1137 def stream_name(self):
1138 return self.arvadosfile.parent.stream_name()
1140 @_FileLikeObjectBase._before_close
1142 def read(self, size=None, num_retries=None):
1143 """Read up to `size` bytes from the file and return the result.
1145 Starts at the current file position. If `size` is None, read the
1146 entire remainder of the file.
1150 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1153 self._filepos += len(rd)
1154 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1155 return ''.join(data)
1157 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1158 self._filepos += len(data)
1161 @_FileLikeObjectBase._before_close
1163 def readfrom(self, offset, size, num_retries=None):
1164 """Read up to `size` bytes from the stream, starting at the specified file offset.
1166 This method does not change the file position.
1168 return self.arvadosfile.readfrom(offset, size, num_retries)
1174 class ArvadosFileWriter(ArvadosFileReader):
1175 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1177 Be aware that this class is NOT thread safe as there is no locking around
1178 updating file pointer.
1182 def __init__(self, arvadosfile, mode, num_retries=None):
1183 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1185 self.arvadosfile.add_writer(self)
1187 @_FileLikeObjectBase._before_close
1189 def write(self, data, num_retries=None):
1190 if self.mode[0] == "a":
1191 self.arvadosfile.writeto(self.size(), data, num_retries)
1193 self.arvadosfile.writeto(self._filepos, data, num_retries)
1194 self._filepos += len(data)
1197 @_FileLikeObjectBase._before_close
1199 def writelines(self, seq, num_retries=None):
1201 self.write(s, num_retries=num_retries)
1203 @_FileLikeObjectBase._before_close
1204 def truncate(self, size=None):
1206 size = self._filepos
1207 self.arvadosfile.truncate(size)
1208 if self._filepos > self.size():
1209 self._filepos = self.size()
1211 @_FileLikeObjectBase._before_close
1213 self.arvadosfile.flush()
1215 def close(self, flush=True):
1217 self.arvadosfile.remove_writer(self, flush)
1218 super(ArvadosFileWriter, self).close()