14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
23 _logger = logging.getLogger('arvados.arvfile')
26 """split(path) -> streamname, filename
28 Separate the stream name and file name in a /-separated stream path and
29 return a tuple (stream_name, file_name). If no stream name is available,
34 stream_name, file_name = path.rsplit('/', 1)
35 except ValueError: # No / in string
36 stream_name, file_name = '.', path
37 return stream_name, file_name
39 class _FileLikeObjectBase(object):
40 def __init__(self, name, mode):
46 def _before_close(orig_func):
47 @functools.wraps(orig_func)
48 def before_close_wrapper(self, *args, **kwargs):
50 raise ValueError("I/O operation on closed stream file")
51 return orig_func(self, *args, **kwargs)
52 return before_close_wrapper
57 def __exit__(self, exc_type, exc_value, traceback):
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69 def __init__(self, name, mode, num_retries=None):
70 super(ArvadosFileReaderBase, self).__init__(name, mode)
72 self.num_retries = num_retries
73 self._readline_cache = (None, None)
77 data = self.readline()
82 def decompressed_name(self):
83 return re.sub('\.(bz2|gz)$', '', self.name)
85 @_FileLikeObjectBase._before_close
86 def seek(self, pos, whence=os.SEEK_SET):
87 if whence == os.SEEK_CUR:
89 elif whence == os.SEEK_END:
91 self._filepos = min(max(pos, 0L), self.size())
96 @_FileLikeObjectBase._before_close
98 def readall(self, size=2**20, num_retries=None):
100 data = self.read(size, num_retries=num_retries)
105 @_FileLikeObjectBase._before_close
107 def readline(self, size=float('inf'), num_retries=None):
108 cache_pos, cache_data = self._readline_cache
109 if self.tell() == cache_pos:
111 self._filepos += len(cache_data)
114 data_size = len(data[-1])
115 while (data_size < size) and ('\n' not in data[-1]):
116 next_read = self.read(2 ** 20, num_retries=num_retries)
119 data.append(next_read)
120 data_size += len(next_read)
123 nextline_index = data.index('\n') + 1
125 nextline_index = len(data)
126 nextline_index = min(nextline_index, size)
127 self._filepos -= len(data) - nextline_index
128 self._readline_cache = (self.tell(), data[nextline_index:])
129 return data[:nextline_index]
131 @_FileLikeObjectBase._before_close
133 def decompress(self, decompress, size, num_retries=None):
134 for segment in self.readall(size, num_retries=num_retries):
135 data = decompress(segment)
139 @_FileLikeObjectBase._before_close
141 def readall_decompressed(self, size=2**20, num_retries=None):
143 if self.name.endswith('.bz2'):
144 dc = bz2.BZ2Decompressor()
145 return self.decompress(dc.decompress, size,
146 num_retries=num_retries)
147 elif self.name.endswith('.gz'):
148 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
149 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
150 size, num_retries=num_retries)
152 return self.readall(size, num_retries=num_retries)
154 @_FileLikeObjectBase._before_close
156 def readlines(self, sizehint=float('inf'), num_retries=None):
159 for s in self.readall(num_retries=num_retries):
162 if data_size >= sizehint:
164 return ''.join(data).splitlines(True)
167 raise NotImplementedError()
169 def read(self, size, num_retries=None):
170 raise NotImplementedError()
172 def readfrom(self, start, size, num_retries=None):
173 raise NotImplementedError()
176 class StreamFileReader(ArvadosFileReaderBase):
177 class _NameAttribute(str):
178 # The Python file API provides a plain .name attribute.
179 # Older SDK provided a name() method.
180 # This class provides both, for maximum compatibility.
184 def __init__(self, stream, segments, name):
185 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
186 self._stream = stream
187 self.segments = segments
189 def stream_name(self):
190 return self._stream.name()
193 n = self.segments[-1]
194 return n.range_start + n.range_size
196 @_FileLikeObjectBase._before_close
198 def read(self, size, num_retries=None):
199 """Read up to 'size' bytes from the stream, starting at the current file position"""
204 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
206 lr = available_chunks[0]
207 data = self._stream.readfrom(lr.locator+lr.segment_offset,
209 num_retries=num_retries)
211 self._filepos += len(data)
214 @_FileLikeObjectBase._before_close
216 def readfrom(self, start, size, num_retries=None):
217 """Read up to 'size' bytes from the stream, starting at 'start'"""
222 for lr in locators_and_ranges(self.segments, start, size):
223 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
224 num_retries=num_retries))
227 def as_manifest(self):
229 for r in self.segments:
230 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
231 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234 def synchronized(orig_func):
235 @functools.wraps(orig_func)
236 def synchronized_wrapper(self, *args, **kwargs):
238 return orig_func(self, *args, **kwargs)
239 return synchronized_wrapper
242 class StateChangeError(Exception):
243 def __init__(self, message, state, nextstate):
244 super(StateChangeError, self).__init__(message)
246 self.nextstate = nextstate
248 class _BufferBlock(object):
249 """A stand-in for a Keep block that is in the process of being written.
251 Writers can append to it, get the size, and compute the Keep locator.
252 There are three valid states:
258 Block is in the process of being uploaded to Keep, append is an error.
261 The block has been written to Keep, its internal buffer has been
262 released, fetching the block will fetch it via keep client (since we
263 discarded the internal copy), and identifiers referring to the BufferBlock
264 can be replaced with the block locator.
273 def __init__(self, blockid, starting_capacity, owner):
276 the identifier for this block
279 the initial buffer capacity
282 ArvadosFile that owns this block
285 self.blockid = blockid
286 self.buffer_block = bytearray(starting_capacity)
287 self.buffer_view = memoryview(self.buffer_block)
288 self.write_pointer = 0
289 self._state = _BufferBlock.WRITABLE
292 self.lock = threading.Lock()
293 self.wait_for_commit = threading.Event()
297 def append(self, data):
298 """Append some data to the buffer.
300 Only valid if the block is in WRITABLE state. Implements an expanding
301 buffer, doubling capacity as needed to accomdate all the data.
304 if self._state == _BufferBlock.WRITABLE:
305 while (self.write_pointer+len(data)) > len(self.buffer_block):
306 new_buffer_block = bytearray(len(self.buffer_block) * 2)
307 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
308 self.buffer_block = new_buffer_block
309 self.buffer_view = memoryview(self.buffer_block)
310 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
311 self.write_pointer += len(data)
314 raise AssertionError("Buffer block is not writable")
316 STATE_TRANSITIONS = frozenset([
318 (PENDING, COMMITTED),
323 def set_state(self, nextstate, val=None):
324 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
325 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
326 self._state = nextstate
328 if self._state == _BufferBlock.PENDING:
329 self.wait_for_commit.clear()
331 if self._state == _BufferBlock.COMMITTED:
333 self.buffer_view = None
334 self.buffer_block = None
335 self.wait_for_commit.set()
337 if self._state == _BufferBlock.ERROR:
339 self.wait_for_commit.set()
346 """The amount of data written to the buffer."""
347 return self.write_pointer
351 """The Keep locator for this buffer's contents."""
352 if self._locator is None:
353 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
357 def clone(self, new_blockid, owner):
358 if self._state == _BufferBlock.COMMITTED:
359 raise AssertionError("Cannot duplicate committed buffer block")
360 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
361 bufferblock.append(self.buffer_view[0:self.size()])
367 self.buffer_block = None
368 self.buffer_view = None
371 class NoopLock(object):
375 def __exit__(self, exc_type, exc_value, traceback):
378 def acquire(self, blocking=False):
385 def must_be_writable(orig_func):
386 @functools.wraps(orig_func)
387 def must_be_writable_wrapper(self, *args, **kwargs):
388 if not self.writable():
389 raise IOError(errno.EROFS, "Collection is read-only.")
390 return orig_func(self, *args, **kwargs)
391 return must_be_writable_wrapper
394 class _BlockManager(object):
395 """BlockManager handles buffer blocks.
397 Also handles background block uploads, and background block prefetch for a
398 Collection of ArvadosFiles.
402 DEFAULT_PUT_THREADS = 2
403 DEFAULT_GET_THREADS = 2
405 def __init__(self, keep, copies=None):
406 """keep: KeepClient object to use"""
408 self._bufferblocks = {}
409 self._put_queue = None
410 self._put_threads = None
411 self._prefetch_queue = None
412 self._prefetch_threads = None
413 self.lock = threading.Lock()
414 self.prefetch_enabled = True
415 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
416 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
420 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
421 """Allocate a new, empty bufferblock in WRITABLE state and return it.
424 optional block identifier, otherwise one will be automatically assigned
427 optional capacity, otherwise will use default capacity
430 ArvadosFile that owns this block
434 blockid = "bufferblock%i" % len(self._bufferblocks)
435 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
436 self._bufferblocks[bufferblock.blockid] = bufferblock
440 def dup_block(self, block, owner):
441 """Create a new bufferblock initialized with the content of an existing bufferblock.
444 the buffer block to copy.
447 ArvadosFile that owns the new block
450 new_blockid = "bufferblock%i" % len(self._bufferblocks)
451 bufferblock = block.clone(new_blockid, owner)
452 self._bufferblocks[bufferblock.blockid] = bufferblock
456 def is_bufferblock(self, locator):
457 return locator in self._bufferblocks
459 def _commit_bufferblock_worker(self):
460 """Background uploader thread."""
464 bufferblock = self._put_queue.get()
465 if bufferblock is None:
468 if self.copies is None:
469 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
471 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
472 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
474 except Exception as e:
475 bufferblock.set_state(_BufferBlock.ERROR, e)
477 if self._put_queue is not None:
478 self._put_queue.task_done()
481 def start_put_threads(self):
482 if self._put_threads is None:
483 # Start uploader threads.
485 # If we don't limit the Queue size, the upload queue can quickly
486 # grow to take up gigabytes of RAM if the writing process is
487 # generating data more quickly than it can be send to the Keep
490 # With two upload threads and a queue size of 2, this means up to 4
491 # blocks pending. If they are full 64 MiB blocks, that means up to
492 # 256 MiB of internal buffering, which is the same size as the
493 # default download block cache in KeepClient.
494 self._put_queue = Queue.Queue(maxsize=2)
496 self._put_threads = []
497 for i in xrange(0, self.num_put_threads):
498 thread = threading.Thread(target=self._commit_bufferblock_worker)
499 self._put_threads.append(thread)
503 def _block_prefetch_worker(self):
504 """The background downloader thread."""
507 b = self._prefetch_queue.get()
515 def start_get_threads(self):
516 if self._prefetch_threads is None:
517 self._prefetch_queue = Queue.Queue()
518 self._prefetch_threads = []
519 for i in xrange(0, self.num_get_threads):
520 thread = threading.Thread(target=self._block_prefetch_worker)
521 self._prefetch_threads.append(thread)
527 def stop_threads(self):
528 """Shut down and wait for background upload and download threads to finish."""
530 if self._put_threads is not None:
531 for t in self._put_threads:
532 self._put_queue.put(None)
533 for t in self._put_threads:
535 self._put_threads = None
536 self._put_queue = None
538 if self._prefetch_threads is not None:
539 for t in self._prefetch_threads:
540 self._prefetch_queue.put(None)
541 for t in self._prefetch_threads:
543 self._prefetch_threads = None
544 self._prefetch_queue = None
549 def __exit__(self, exc_type, exc_value, traceback):
553 def repack_small_blocks(self, force=False, sync=False):
554 """Packs small blocks together before uploading"""
555 # Search blocks ready for getting packed together before being committed to Keep.
556 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
557 if len(small_blocks) <= 1:
558 # Not enough small blocks for repacking
561 # Check if there are enough small blocks for filling up one in full
562 pending_write_size = sum([b.size() for b in small_blocks])
563 if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
564 new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
565 self._bufferblocks[new_bb.blockid] = new_bb
567 while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
568 bb = small_blocks.pop(0)
571 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
572 arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
574 del self._bufferblocks[bb.blockid]
575 self.commit_bufferblock(new_bb, sync=sync)
577 def commit_bufferblock(self, block, sync):
578 """Initiate a background upload of a bufferblock.
581 The block object to upload
584 If `sync` is True, upload the block synchronously.
585 If `sync` is False, upload the block asynchronously. This will
586 return immediately unless the upload queue is at capacity, in
587 which case it will wait on an upload queue slot.
591 # Mark the block as PENDING so to disallow any more appends.
592 block.set_state(_BufferBlock.PENDING)
593 except StateChangeError as e:
594 if e.state == _BufferBlock.PENDING:
596 block.wait_for_commit.wait()
599 if block.state() == _BufferBlock.COMMITTED:
601 elif block.state() == _BufferBlock.ERROR:
608 if self.copies is None:
609 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
611 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
612 block.set_state(_BufferBlock.COMMITTED, loc)
613 except Exception as e:
614 block.set_state(_BufferBlock.ERROR, e)
617 self.start_put_threads()
618 self._put_queue.put(block)
621 def get_bufferblock(self, locator):
622 return self._bufferblocks.get(locator)
625 def delete_bufferblock(self, locator):
626 bb = self._bufferblocks[locator]
628 del self._bufferblocks[locator]
630 def get_block_contents(self, locator, num_retries, cache_only=False):
633 First checks to see if the locator is a BufferBlock and return that, if
634 not, passes the request through to KeepClient.get().
638 if locator in self._bufferblocks:
639 bufferblock = self._bufferblocks[locator]
640 if bufferblock.state() != _BufferBlock.COMMITTED:
641 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
643 locator = bufferblock._locator
645 return self._keep.get_from_cache(locator)
647 return self._keep.get(locator, num_retries=num_retries)
649 def commit_all(self):
650 """Commit all outstanding buffer blocks.
652 This is a synchronous call, and will not return until all buffer blocks
653 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
656 self.repack_small_blocks(force=True, sync=True)
659 items = self._bufferblocks.items()
662 if v.state() != _BufferBlock.COMMITTED and v.owner:
663 v.owner.flush(sync=False)
666 if self._put_queue is not None:
667 self._put_queue.join()
671 if v.state() == _BufferBlock.ERROR:
672 err.append((v.locator(), v.error))
674 raise KeepWriteError("Error writing some blocks", err, label="block")
677 # flush again with sync=True to remove committed bufferblocks from
680 v.owner.flush(sync=True)
682 def block_prefetch(self, locator):
683 """Initiate a background download of a block.
685 This assumes that the underlying KeepClient implements a block cache,
686 so repeated requests for the same block will not result in repeated
687 downloads (unless the block is evicted from the cache.) This method
692 if not self.prefetch_enabled:
695 if self._keep.get_from_cache(locator) is not None:
699 if locator in self._bufferblocks:
702 self.start_get_threads()
703 self._prefetch_queue.put(locator)
706 class ArvadosFile(object):
707 """Represent a file in a Collection.
709 ArvadosFile manages the underlying representation of a file in Keep as a
710 sequence of segments spanning a set of blocks, and implements random
713 This object may be accessed from multiple threads.
717 def __init__(self, parent, name, stream=[], segments=[]):
719 ArvadosFile constructor.
722 a list of Range objects representing a block stream
725 a list of Range objects representing segments
729 self._writers = set()
730 self._committed = False
732 self.lock = parent.root_collection().lock
734 self._add_segment(stream, s.locator, s.range_size)
735 self._current_bblock = None
738 return self.parent.writable()
742 return copy.copy(self._segments)
745 def clone(self, new_parent, new_name):
746 """Make a copy of this file."""
747 cp = ArvadosFile(new_parent, new_name)
748 cp.replace_contents(self)
753 def replace_contents(self, other):
754 """Replace segments of this file with segments from another `ArvadosFile` object."""
758 for other_segment in other.segments():
759 new_loc = other_segment.locator
760 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
761 if other_segment.locator not in map_loc:
762 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
763 if bufferblock.state() != _BufferBlock.WRITABLE:
764 map_loc[other_segment.locator] = bufferblock.locator()
766 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
767 new_loc = map_loc[other_segment.locator]
769 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
771 self._committed = False
773 def __eq__(self, other):
776 if not isinstance(other, ArvadosFile):
779 othersegs = other.segments()
781 if len(self._segments) != len(othersegs):
783 for i in xrange(0, len(othersegs)):
784 seg1 = self._segments[i]
789 if self.parent._my_block_manager().is_bufferblock(loc1):
790 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
792 if other.parent._my_block_manager().is_bufferblock(loc2):
793 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
795 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
796 seg1.range_start != seg2.range_start or
797 seg1.range_size != seg2.range_size or
798 seg1.segment_offset != seg2.segment_offset):
803 def __ne__(self, other):
804 return not self.__eq__(other)
807 def set_segments(self, segs):
808 self._segments = segs
811 def set_committed(self):
812 """Set committed flag to True"""
813 self._committed = True
817 """Get whether this is committed or not."""
818 return self._committed
821 def add_writer(self, writer):
822 """Add an ArvadosFileWriter reference to the list of writers"""
823 if isinstance(writer, ArvadosFileWriter):
824 self._writers.add(writer)
827 def remove_writer(self, writer):
829 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
830 and do some block maintenance tasks.
832 self._writers.remove(writer)
834 if self.size() > config.KEEP_BLOCK_SIZE / 2:
835 # File writer closed, not small enough for repacking
838 # All writers closed and size is adequate for repacking
839 self.parent._my_block_manager().repack_small_blocks()
843 Get whether this is closed or not. When the writers list is empty, the file
844 is supposed to be closed.
846 return len(self._writers) == 0
850 def truncate(self, size):
851 """Shrink the size of the file.
853 If `size` is less than the size of the file, the file contents after
854 `size` will be discarded. If `size` is greater than the current size
855 of the file, an IOError will be raised.
858 if size < self.size():
860 for r in self._segments:
861 range_end = r.range_start+r.range_size
862 if r.range_start >= size:
863 # segment is past the trucate size, all done
865 elif size < range_end:
866 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
867 nr.segment_offset = r.segment_offset
873 self._segments = new_segs
874 self._committed = False
875 elif size > self.size():
876 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
878 def readfrom(self, offset, size, num_retries, exact=False):
879 """Read up to `size` bytes from the file starting at `offset`.
882 If False (default), return less data than requested if the read
883 crosses a block boundary and the next block isn't cached. If True,
884 only return less data than requested when hitting EOF.
888 if size == 0 or offset >= self.size():
890 readsegs = locators_and_ranges(self._segments, offset, size)
891 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
896 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
898 blockview = memoryview(block)
899 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
905 if lr.locator not in locs:
906 self.parent._my_block_manager().block_prefetch(lr.locator)
911 def _repack_writes(self, num_retries):
912 """Test if the buffer block has more data than actual segments.
914 This happens when a buffered write over-writes a file range written in
915 a previous buffered write. Re-pack the buffer block for efficiency
916 and to avoid leaking information.
919 segs = self._segments
921 # Sum up the segments to get the total bytes of the file referencing
922 # into the buffer block.
923 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
924 write_total = sum([s.range_size for s in bufferblock_segs])
926 if write_total < self._current_bblock.size():
927 # There is more data in the buffer block than is actually accounted for by segments, so
928 # re-pack into a new buffer by copying over to a new buffer block.
929 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
930 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
931 for t in bufferblock_segs:
932 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
933 t.segment_offset = new_bb.size() - t.range_size
935 self._current_bblock = new_bb
939 def writeto(self, offset, data, num_retries):
940 """Write `data` to the file starting at `offset`.
942 This will update existing bytes and/or extend the size of the file as
949 if offset > self.size():
950 raise ArgumentError("Offset is past the end of the file")
952 if len(data) > config.KEEP_BLOCK_SIZE:
953 # Chunk it up into smaller writes
955 dataview = memoryview(data)
957 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
958 n += config.KEEP_BLOCK_SIZE
961 self._committed = False
963 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
964 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
966 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
967 self._repack_writes(num_retries)
968 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
969 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
970 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
972 self._current_bblock.append(data)
974 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
976 self.parent.notify(WRITE, self.parent, self.name, (self, self))
981 def flush(self, sync=True, num_retries=0):
982 """Flush the current bufferblock to Keep.
985 If True, commit block synchronously, wait until buffer block has been written.
986 If False, commit block asynchronously, return immediately after putting block into
992 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
993 if self._current_bblock.state() == _BufferBlock.WRITABLE:
994 self._repack_writes(num_retries)
995 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
999 for s in self._segments:
1000 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1002 if bb.state() != _BufferBlock.COMMITTED:
1003 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1004 to_delete.add(s.locator)
1005 s.locator = bb.locator()
1007 self.parent._my_block_manager().delete_bufferblock(s)
1009 self.parent.notify(MOD, self.parent, self.name, (self, self))
1013 def add_segment(self, blocks, pos, size):
1014 """Add a segment to the end of the file.
1016 `pos` and `offset` reference a section of the stream described by
1017 `blocks` (a list of Range objects)
1020 self._add_segment(blocks, pos, size)
1022 def _add_segment(self, blocks, pos, size):
1023 """Internal implementation of add_segment."""
1024 self._committed = False
1025 for lr in locators_and_ranges(blocks, pos, size):
1026 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1027 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1028 self._segments.append(r)
1032 """Get the file size."""
1034 n = self._segments[-1]
1035 return n.range_start + n.range_size
1040 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1043 for segment in self.segments:
1044 loc = segment.locator
1045 if loc.startswith("bufferblock"):
1046 loc = self._bufferblocks[loc].calculate_locator()
1047 if portable_locators:
1048 loc = KeepLocator(loc).stripped()
1049 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1050 segment.segment_offset, segment.range_size))
1051 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1057 def _reparent(self, newparent, newname):
1058 self._committed = False
1059 self.flush(sync=True)
1060 self.parent.remove(self.name)
1061 self.parent = newparent
1063 self.lock = self.parent.root_collection().lock
1066 class ArvadosFileReader(ArvadosFileReaderBase):
1067 """Wraps ArvadosFile in a file-like object supporting reading only.
1069 Be aware that this class is NOT thread safe as there is no locking around
1070 updating file pointer.
1074 def __init__(self, arvadosfile, num_retries=None):
1075 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1076 self.arvadosfile = arvadosfile
1079 return self.arvadosfile.size()
1081 def stream_name(self):
1082 return self.arvadosfile.parent.stream_name()
1084 @_FileLikeObjectBase._before_close
1086 def read(self, size=None, num_retries=None):
1087 """Read up to `size` bytes from the file and return the result.
1089 Starts at the current file position. If `size` is None, read the
1090 entire remainder of the file.
1094 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1097 self._filepos += len(rd)
1098 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1099 return ''.join(data)
1101 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1102 self._filepos += len(data)
1105 @_FileLikeObjectBase._before_close
1107 def readfrom(self, offset, size, num_retries=None):
1108 """Read up to `size` bytes from the stream, starting at the specified file offset.
1110 This method does not change the file position.
1112 return self.arvadosfile.readfrom(offset, size, num_retries)
1118 class ArvadosFileWriter(ArvadosFileReader):
1119 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1121 Be aware that this class is NOT thread safe as there is no locking around
1122 updating file pointer.
1126 def __init__(self, arvadosfile, mode, num_retries=None):
1127 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1129 self.arvadosfile.add_writer(self)
1131 @_FileLikeObjectBase._before_close
1133 def write(self, data, num_retries=None):
1134 if self.mode[0] == "a":
1135 self.arvadosfile.writeto(self.size(), data, num_retries)
1137 self.arvadosfile.writeto(self._filepos, data, num_retries)
1138 self._filepos += len(data)
1141 @_FileLikeObjectBase._before_close
1143 def writelines(self, seq, num_retries=None):
1145 self.write(s, num_retries=num_retries)
1147 @_FileLikeObjectBase._before_close
1148 def truncate(self, size=None):
1150 size = self._filepos
1151 self.arvadosfile.truncate(size)
1152 if self._filepos > self.size():
1153 self._filepos = self.size()
1155 @_FileLikeObjectBase._before_close
1157 self.arvadosfile.flush()
1161 self.arvadosfile.remove_writer(self)
1162 super(ArvadosFileWriter, self).close()