14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
23 _logger = logging.getLogger('arvados.arvfile')
26 """split(path) -> streamname, filename
28 Separate the stream name and file name in a /-separated stream path and
29 return a tuple (stream_name, file_name). If no stream name is available,
34 stream_name, file_name = path.rsplit('/', 1)
35 except ValueError: # No / in string
36 stream_name, file_name = '.', path
37 return stream_name, file_name
39 class _FileLikeObjectBase(object):
40 def __init__(self, name, mode):
46 def _before_close(orig_func):
47 @functools.wraps(orig_func)
48 def before_close_wrapper(self, *args, **kwargs):
50 raise ValueError("I/O operation on closed stream file")
51 return orig_func(self, *args, **kwargs)
52 return before_close_wrapper
57 def __exit__(self, exc_type, exc_value, traceback):
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69 def __init__(self, name, mode, num_retries=None):
70 super(ArvadosFileReaderBase, self).__init__(name, mode)
72 self.num_retries = num_retries
73 self._readline_cache = (None, None)
77 data = self.readline()
82 def decompressed_name(self):
83 return re.sub('\.(bz2|gz)$', '', self.name)
85 @_FileLikeObjectBase._before_close
86 def seek(self, pos, whence=os.SEEK_SET):
87 if whence == os.SEEK_CUR:
89 elif whence == os.SEEK_END:
91 self._filepos = min(max(pos, 0L), self.size())
96 @_FileLikeObjectBase._before_close
98 def readall(self, size=2**20, num_retries=None):
100 data = self.read(size, num_retries=num_retries)
105 @_FileLikeObjectBase._before_close
107 def readline(self, size=float('inf'), num_retries=None):
108 cache_pos, cache_data = self._readline_cache
109 if self.tell() == cache_pos:
111 self._filepos += len(cache_data)
114 data_size = len(data[-1])
115 while (data_size < size) and ('\n' not in data[-1]):
116 next_read = self.read(2 ** 20, num_retries=num_retries)
119 data.append(next_read)
120 data_size += len(next_read)
123 nextline_index = data.index('\n') + 1
125 nextline_index = len(data)
126 nextline_index = min(nextline_index, size)
127 self._filepos -= len(data) - nextline_index
128 self._readline_cache = (self.tell(), data[nextline_index:])
129 return data[:nextline_index]
131 @_FileLikeObjectBase._before_close
133 def decompress(self, decompress, size, num_retries=None):
134 for segment in self.readall(size, num_retries=num_retries):
135 data = decompress(segment)
139 @_FileLikeObjectBase._before_close
141 def readall_decompressed(self, size=2**20, num_retries=None):
143 if self.name.endswith('.bz2'):
144 dc = bz2.BZ2Decompressor()
145 return self.decompress(dc.decompress, size,
146 num_retries=num_retries)
147 elif self.name.endswith('.gz'):
148 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
149 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
150 size, num_retries=num_retries)
152 return self.readall(size, num_retries=num_retries)
154 @_FileLikeObjectBase._before_close
156 def readlines(self, sizehint=float('inf'), num_retries=None):
159 for s in self.readall(num_retries=num_retries):
162 if data_size >= sizehint:
164 return ''.join(data).splitlines(True)
167 raise NotImplementedError()
169 def read(self, size, num_retries=None):
170 raise NotImplementedError()
172 def readfrom(self, start, size, num_retries=None):
173 raise NotImplementedError()
176 class StreamFileReader(ArvadosFileReaderBase):
177 class _NameAttribute(str):
178 # The Python file API provides a plain .name attribute.
179 # Older SDK provided a name() method.
180 # This class provides both, for maximum compatibility.
184 def __init__(self, stream, segments, name):
185 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
186 self._stream = stream
187 self.segments = segments
189 def stream_name(self):
190 return self._stream.name()
193 n = self.segments[-1]
194 return n.range_start + n.range_size
196 @_FileLikeObjectBase._before_close
198 def read(self, size, num_retries=None):
199 """Read up to 'size' bytes from the stream, starting at the current file position"""
204 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
206 lr = available_chunks[0]
207 data = self._stream.readfrom(lr.locator+lr.segment_offset,
209 num_retries=num_retries)
211 self._filepos += len(data)
214 @_FileLikeObjectBase._before_close
216 def readfrom(self, start, size, num_retries=None):
217 """Read up to 'size' bytes from the stream, starting at 'start'"""
222 for lr in locators_and_ranges(self.segments, start, size):
223 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
224 num_retries=num_retries))
227 def as_manifest(self):
229 for r in self.segments:
230 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
231 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234 def synchronized(orig_func):
235 @functools.wraps(orig_func)
236 def synchronized_wrapper(self, *args, **kwargs):
238 return orig_func(self, *args, **kwargs)
239 return synchronized_wrapper
242 class StateChangeError(Exception):
243 def __init__(self, message, state, nextstate):
244 super(StateChangeError, self).__init__(message)
246 self.nextstate = nextstate
248 class _BufferBlock(object):
249 """A stand-in for a Keep block that is in the process of being written.
251 Writers can append to it, get the size, and compute the Keep locator.
252 There are three valid states:
258 Block is in the process of being uploaded to Keep, append is an error.
261 The block has been written to Keep, its internal buffer has been
262 released, fetching the block will fetch it via keep client (since we
263 discarded the internal copy), and identifiers referring to the BufferBlock
264 can be replaced with the block locator.
273 def __init__(self, blockid, starting_capacity, owner):
276 the identifier for this block
279 the initial buffer capacity
282 ArvadosFile that owns this block
285 self.blockid = blockid
286 self.buffer_block = bytearray(starting_capacity)
287 self.buffer_view = memoryview(self.buffer_block)
288 self.write_pointer = 0
289 self._state = _BufferBlock.WRITABLE
292 self.lock = threading.Lock()
293 self.wait_for_commit = threading.Event()
297 def append(self, data):
298 """Append some data to the buffer.
300 Only valid if the block is in WRITABLE state. Implements an expanding
301 buffer, doubling capacity as needed to accomdate all the data.
304 if self._state == _BufferBlock.WRITABLE:
305 while (self.write_pointer+len(data)) > len(self.buffer_block):
306 new_buffer_block = bytearray(len(self.buffer_block) * 2)
307 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
308 self.buffer_block = new_buffer_block
309 self.buffer_view = memoryview(self.buffer_block)
310 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
311 self.write_pointer += len(data)
314 raise AssertionError("Buffer block is not writable")
316 STATE_TRANSITIONS = frozenset([
318 (PENDING, COMMITTED),
323 def set_state(self, nextstate, val=None):
324 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
325 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
326 self._state = nextstate
328 if self._state == _BufferBlock.PENDING:
329 self.wait_for_commit.clear()
331 if self._state == _BufferBlock.COMMITTED:
333 self.buffer_view = None
334 self.buffer_block = None
335 self.wait_for_commit.set()
337 if self._state == _BufferBlock.ERROR:
339 self.wait_for_commit.set()
346 """The amount of data written to the buffer."""
347 return self.write_pointer
351 """The Keep locator for this buffer's contents."""
352 if self._locator is None:
353 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
357 def clone(self, new_blockid, owner):
358 if self._state == _BufferBlock.COMMITTED:
359 raise AssertionError("Cannot duplicate committed buffer block")
360 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
361 bufferblock.append(self.buffer_view[0:self.size()])
367 self.buffer_block = None
368 self.buffer_view = None
371 class NoopLock(object):
375 def __exit__(self, exc_type, exc_value, traceback):
378 def acquire(self, blocking=False):
385 def must_be_writable(orig_func):
386 @functools.wraps(orig_func)
387 def must_be_writable_wrapper(self, *args, **kwargs):
388 if not self.writable():
389 raise IOError(errno.EROFS, "Collection is read-only.")
390 return orig_func(self, *args, **kwargs)
391 return must_be_writable_wrapper
394 class _BlockManager(object):
395 """BlockManager handles buffer blocks.
397 Also handles background block uploads, and background block prefetch for a
398 Collection of ArvadosFiles.
402 DEFAULT_PUT_THREADS = 2
403 DEFAULT_GET_THREADS = 2
405 def __init__(self, keep, copies=None):
406 """keep: KeepClient object to use"""
408 self._bufferblocks = {}
409 self._put_queue = None
410 self._put_threads = None
411 self._prefetch_queue = None
412 self._prefetch_threads = None
413 self.lock = threading.Lock()
414 self.prefetch_enabled = True
415 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
416 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
420 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
421 """Allocate a new, empty bufferblock in WRITABLE state and return it.
424 optional block identifier, otherwise one will be automatically assigned
427 optional capacity, otherwise will use default capacity
430 ArvadosFile that owns this block
434 blockid = "bufferblock%i" % len(self._bufferblocks)
435 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
436 self._bufferblocks[bufferblock.blockid] = bufferblock
440 def dup_block(self, block, owner):
441 """Create a new bufferblock initialized with the content of an existing bufferblock.
444 the buffer block to copy.
447 ArvadosFile that owns the new block
450 new_blockid = "bufferblock%i" % len(self._bufferblocks)
451 bufferblock = block.clone(new_blockid, owner)
452 self._bufferblocks[bufferblock.blockid] = bufferblock
456 def is_bufferblock(self, locator):
457 return locator in self._bufferblocks
459 def _commit_bufferblock_worker(self):
460 """Background uploader thread."""
464 bufferblock = self._put_queue.get()
465 if bufferblock is None:
468 if self.copies is None:
469 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
471 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
472 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
474 except Exception as e:
475 bufferblock.set_state(_BufferBlock.ERROR, e)
477 if self._put_queue is not None:
478 self._put_queue.task_done()
481 def start_put_threads(self):
482 if self._put_threads is None:
483 # Start uploader threads.
485 # If we don't limit the Queue size, the upload queue can quickly
486 # grow to take up gigabytes of RAM if the writing process is
487 # generating data more quickly than it can be send to the Keep
490 # With two upload threads and a queue size of 2, this means up to 4
491 # blocks pending. If they are full 64 MiB blocks, that means up to
492 # 256 MiB of internal buffering, which is the same size as the
493 # default download block cache in KeepClient.
494 self._put_queue = Queue.Queue(maxsize=2)
496 self._put_threads = []
497 for i in xrange(0, self.num_put_threads):
498 thread = threading.Thread(target=self._commit_bufferblock_worker)
499 self._put_threads.append(thread)
503 def _block_prefetch_worker(self):
504 """The background downloader thread."""
507 b = self._prefetch_queue.get()
515 def start_get_threads(self):
516 if self._prefetch_threads is None:
517 self._prefetch_queue = Queue.Queue()
518 self._prefetch_threads = []
519 for i in xrange(0, self.num_get_threads):
520 thread = threading.Thread(target=self._block_prefetch_worker)
521 self._prefetch_threads.append(thread)
527 def stop_threads(self):
528 """Shut down and wait for background upload and download threads to finish."""
530 if self._put_threads is not None:
531 for t in self._put_threads:
532 self._put_queue.put(None)
533 for t in self._put_threads:
535 self._put_threads = None
536 self._put_queue = None
538 if self._prefetch_threads is not None:
539 for t in self._prefetch_threads:
540 self._prefetch_queue.put(None)
541 for t in self._prefetch_threads:
543 self._prefetch_threads = None
544 self._prefetch_queue = None
549 def __exit__(self, exc_type, exc_value, traceback):
552 def repack_small_blocks(self, force=False):
553 """Packs small blocks together before uploading"""
554 # Candidate bblocks -- This could be sorted in some way to prioritize some
556 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)]
557 if len(small_blocks) == 0:
560 # Check if there's enough small blocks for combining and uploading
561 pending_write_size = sum([b.size() for b in small_blocks])
562 if force or (pending_write_size > (config.KEEP_BLOCK_SIZE / 2)):
563 if len(small_blocks) == 1:
564 # No small blocks for repacking, leave this one alone
565 # so it's committed before exiting.
567 new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
568 self._bufferblocks[new_bb.blockid] = new_bb
570 while len(small_blocks) > 0 and size <= (config.KEEP_BLOCK_SIZE / 2):
571 bb = small_blocks.pop(0)
574 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
575 # FIXME: We shoudn't be accessing _segments directly
576 bb.owner._segments = [Range(new_bb.blockid, 0, bb.size(), size-bb.size())]
578 del self._bufferblocks[bb.blockid]
579 # new_bb's size greater half a keep block, let's commit it
580 self.commit_bufferblock(new_bb, sync=True)
582 def commit_bufferblock(self, block, sync):
583 """Initiate a background upload of a bufferblock.
586 The block object to upload
589 If `sync` is True, upload the block synchronously.
590 If `sync` is False, upload the block asynchronously. This will
591 return immediately unless the upload queue is at capacity, in
592 which case it will wait on an upload queue slot.
596 # Mark the block as PENDING so to disallow any more appends.
597 block.set_state(_BufferBlock.PENDING)
598 except StateChangeError as e:
599 if e.state == _BufferBlock.PENDING:
601 block.wait_for_commit.wait()
604 if block.state() == _BufferBlock.COMMITTED:
606 elif block.state() == _BufferBlock.ERROR:
613 if self.copies is None:
614 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
616 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
617 block.set_state(_BufferBlock.COMMITTED, loc)
618 except Exception as e:
619 block.set_state(_BufferBlock.ERROR, e)
622 self.start_put_threads()
623 self._put_queue.put(block)
626 def get_bufferblock(self, locator):
627 return self._bufferblocks.get(locator)
630 def delete_bufferblock(self, locator):
631 bb = self._bufferblocks[locator]
633 del self._bufferblocks[locator]
635 def get_block_contents(self, locator, num_retries, cache_only=False):
638 First checks to see if the locator is a BufferBlock and return that, if
639 not, passes the request through to KeepClient.get().
643 if locator in self._bufferblocks:
644 bufferblock = self._bufferblocks[locator]
645 if bufferblock.state() != _BufferBlock.COMMITTED:
646 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
648 locator = bufferblock._locator
650 return self._keep.get_from_cache(locator)
652 return self._keep.get(locator, num_retries=num_retries)
654 def commit_all(self):
655 """Commit all outstanding buffer blocks.
657 This is a synchronous call, and will not return until all buffer blocks
658 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
662 self.repack_small_blocks(force=True)
663 items = self._bufferblocks.items()
666 if v.state() != _BufferBlock.COMMITTED and v.owner:
667 v.owner.flush(sync=False)
670 if self._put_queue is not None:
671 self._put_queue.join()
675 if v.state() == _BufferBlock.ERROR:
676 err.append((v.locator(), v.error))
678 raise KeepWriteError("Error writing some blocks", err, label="block")
681 # flush again with sync=True to remove committed bufferblocks from
684 v.owner.flush(sync=True)
686 def block_prefetch(self, locator):
687 """Initiate a background download of a block.
689 This assumes that the underlying KeepClient implements a block cache,
690 so repeated requests for the same block will not result in repeated
691 downloads (unless the block is evicted from the cache.) This method
696 if not self.prefetch_enabled:
699 if self._keep.get_from_cache(locator) is not None:
703 if locator in self._bufferblocks:
706 self.start_get_threads()
707 self._prefetch_queue.put(locator)
710 class ArvadosFile(object):
711 """Represent a file in a Collection.
713 ArvadosFile manages the underlying representation of a file in Keep as a
714 sequence of segments spanning a set of blocks, and implements random
717 This object may be accessed from multiple threads.
721 def __init__(self, parent, name, stream=[], segments=[]):
723 ArvadosFile constructor.
726 a list of Range objects representing a block stream
729 a list of Range objects representing segments
734 self._committed = False
736 self.lock = parent.root_collection().lock
738 self._add_segment(stream, s.locator, s.range_size)
739 self._current_bblock = None
742 return self.parent.writable()
746 return copy.copy(self._segments)
749 def clone(self, new_parent, new_name):
750 """Make a copy of this file."""
751 cp = ArvadosFile(new_parent, new_name)
752 cp.replace_contents(self)
757 def replace_contents(self, other):
758 """Replace segments of this file with segments from another `ArvadosFile` object."""
762 for other_segment in other.segments():
763 new_loc = other_segment.locator
764 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
765 if other_segment.locator not in map_loc:
766 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
767 if bufferblock.state() != _BufferBlock.WRITABLE:
768 map_loc[other_segment.locator] = bufferblock.locator()
770 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
771 new_loc = map_loc[other_segment.locator]
773 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
775 self._committed = False
777 def __eq__(self, other):
780 if not isinstance(other, ArvadosFile):
783 othersegs = other.segments()
785 if len(self._segments) != len(othersegs):
787 for i in xrange(0, len(othersegs)):
788 seg1 = self._segments[i]
793 if self.parent._my_block_manager().is_bufferblock(loc1):
794 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
796 if other.parent._my_block_manager().is_bufferblock(loc2):
797 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
799 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
800 seg1.range_start != seg2.range_start or
801 seg1.range_size != seg2.range_size or
802 seg1.segment_offset != seg2.segment_offset):
807 def __ne__(self, other):
808 return not self.__eq__(other)
811 def set_committed(self):
812 """Set committed flag to False"""
813 self._committed = True
817 """Get whether this is committed or not."""
818 return self._committed
821 def set_closed(self):
822 """Set current block as pending and closed flag to False"""
824 self.parent._my_block_manager().repack_small_blocks()
828 """Get whether this is closed or not."""
833 def truncate(self, size):
834 """Shrink the size of the file.
836 If `size` is less than the size of the file, the file contents after
837 `size` will be discarded. If `size` is greater than the current size
838 of the file, an IOError will be raised.
841 if size < self.size():
843 for r in self._segments:
844 range_end = r.range_start+r.range_size
845 if r.range_start >= size:
846 # segment is past the trucate size, all done
848 elif size < range_end:
849 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
850 nr.segment_offset = r.segment_offset
856 self._segments = new_segs
857 self._committed = False
858 elif size > self.size():
859 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
861 def readfrom(self, offset, size, num_retries, exact=False):
862 """Read up to `size` bytes from the file starting at `offset`.
865 If False (default), return less data than requested if the read
866 crosses a block boundary and the next block isn't cached. If True,
867 only return less data than requested when hitting EOF.
871 if size == 0 or offset >= self.size():
873 readsegs = locators_and_ranges(self._segments, offset, size)
874 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
879 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
881 blockview = memoryview(block)
882 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
888 if lr.locator not in locs:
889 self.parent._my_block_manager().block_prefetch(lr.locator)
894 def _repack_writes(self, num_retries):
895 """Test if the buffer block has more data than actual segments.
897 This happens when a buffered write over-writes a file range written in
898 a previous buffered write. Re-pack the buffer block for efficiency
899 and to avoid leaking information.
902 segs = self._segments
904 # Sum up the segments to get the total bytes of the file referencing
905 # into the buffer block.
906 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
907 write_total = sum([s.range_size for s in bufferblock_segs])
909 if write_total < self._current_bblock.size():
910 # There is more data in the buffer block than is actually accounted for by segments, so
911 # re-pack into a new buffer by copying over to a new buffer block.
912 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
913 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
914 for t in bufferblock_segs:
915 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
916 t.segment_offset = new_bb.size() - t.range_size
918 self._current_bblock = new_bb
922 def writeto(self, offset, data, num_retries):
923 """Write `data` to the file starting at `offset`.
925 This will update existing bytes and/or extend the size of the file as
932 if offset > self.size():
933 raise ArgumentError("Offset is past the end of the file")
935 if len(data) > config.KEEP_BLOCK_SIZE:
936 # Chunk it up into smaller writes
938 dataview = memoryview(data)
940 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
941 n += config.KEEP_BLOCK_SIZE
944 self._committed = False
946 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
947 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
949 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
950 self._repack_writes(num_retries)
951 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
952 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
953 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
955 self._current_bblock.append(data)
957 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
959 self.parent.notify(WRITE, self.parent, self.name, (self, self))
964 def flush(self, sync=True, num_retries=0):
965 """Flush the current bufferblock to Keep.
968 If True, commit block synchronously, wait until buffer block has been written.
969 If False, commit block asynchronously, return immediately after putting block into
975 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
976 if self._current_bblock.state() == _BufferBlock.WRITABLE:
977 self._repack_writes(num_retries)
978 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
982 for s in self._segments:
983 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
985 if bb.state() != _BufferBlock.COMMITTED:
986 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
987 to_delete.add(s.locator)
988 s.locator = bb.locator()
990 self.parent._my_block_manager().delete_bufferblock(s)
992 self.parent.notify(MOD, self.parent, self.name, (self, self))
996 def add_segment(self, blocks, pos, size):
997 """Add a segment to the end of the file.
999 `pos` and `offset` reference a section of the stream described by
1000 `blocks` (a list of Range objects)
1003 self._add_segment(blocks, pos, size)
1005 def _add_segment(self, blocks, pos, size):
1006 """Internal implementation of add_segment."""
1007 self._committed = False
1008 for lr in locators_and_ranges(blocks, pos, size):
1009 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1010 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1011 self._segments.append(r)
1015 """Get the file size."""
1017 n = self._segments[-1]
1018 return n.range_start + n.range_size
1023 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1026 for segment in self.segments:
1027 loc = segment.locator
1028 if loc.startswith("bufferblock"):
1029 loc = self._bufferblocks[loc].calculate_locator()
1030 if portable_locators:
1031 loc = KeepLocator(loc).stripped()
1032 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1033 segment.segment_offset, segment.range_size))
1034 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1040 def _reparent(self, newparent, newname):
1041 self._committed = False
1042 self.flush(sync=True)
1043 self.parent.remove(self.name)
1044 self.parent = newparent
1046 self.lock = self.parent.root_collection().lock
1049 class ArvadosFileReader(ArvadosFileReaderBase):
1050 """Wraps ArvadosFile in a file-like object supporting reading only.
1052 Be aware that this class is NOT thread safe as there is no locking around
1053 updating file pointer.
1057 def __init__(self, arvadosfile, num_retries=None):
1058 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1059 self.arvadosfile = arvadosfile
1062 return self.arvadosfile.size()
1064 def stream_name(self):
1065 return self.arvadosfile.parent.stream_name()
1067 @_FileLikeObjectBase._before_close
1069 def read(self, size=None, num_retries=None):
1070 """Read up to `size` bytes from the file and return the result.
1072 Starts at the current file position. If `size` is None, read the
1073 entire remainder of the file.
1077 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1080 self._filepos += len(rd)
1081 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1082 return ''.join(data)
1084 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1085 self._filepos += len(data)
1088 @_FileLikeObjectBase._before_close
1090 def readfrom(self, offset, size, num_retries=None):
1091 """Read up to `size` bytes from the stream, starting at the specified file offset.
1093 This method does not change the file position.
1095 return self.arvadosfile.readfrom(offset, size, num_retries)
1101 class ArvadosFileWriter(ArvadosFileReader):
1102 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1104 Be aware that this class is NOT thread safe as there is no locking around
1105 updating file pointer.
1109 def __init__(self, arvadosfile, mode, num_retries=None):
1110 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1113 @_FileLikeObjectBase._before_close
1115 def write(self, data, num_retries=None):
1116 if self.mode[0] == "a":
1117 self.arvadosfile.writeto(self.size(), data, num_retries)
1119 self.arvadosfile.writeto(self._filepos, data, num_retries)
1120 self._filepos += len(data)
1123 @_FileLikeObjectBase._before_close
1125 def writelines(self, seq, num_retries=None):
1127 self.write(s, num_retries=num_retries)
1129 @_FileLikeObjectBase._before_close
1130 def truncate(self, size=None):
1132 size = self._filepos
1133 self.arvadosfile.truncate(size)
1134 if self._filepos > self.size():
1135 self._filepos = self.size()
1137 @_FileLikeObjectBase._before_close
1139 self.arvadosfile.flush()
1141 def close(self, flush=True):
1145 self.arvadosfile.set_closed()
1146 super(ArvadosFileWriter, self).close()