16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
41 class _FileLikeObjectBase(object):
42 def __init__(self, name, mode):
48 def _before_close(orig_func):
49 @functools.wraps(orig_func)
50 def before_close_wrapper(self, *args, **kwargs):
52 raise ValueError("I/O operation on closed stream file")
53 return orig_func(self, *args, **kwargs)
54 return before_close_wrapper
59 def __exit__(self, exc_type, exc_value, traceback):
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71 def __init__(self, name, mode, num_retries=None):
72 super(ArvadosFileReaderBase, self).__init__(name, mode)
74 self.num_retries = num_retries
75 self._readline_cache = (None, None)
79 data = self.readline()
84 def decompressed_name(self):
85 return re.sub('\.(bz2|gz)$', '', self.name)
87 @_FileLikeObjectBase._before_close
88 def seek(self, pos, whence=os.SEEK_SET):
89 if whence == os.SEEK_CUR:
91 elif whence == os.SEEK_END:
93 self._filepos = min(max(pos, 0L), self.size())
98 @_FileLikeObjectBase._before_close
100 def readall(self, size=2**20, num_retries=None):
102 data = self.read(size, num_retries=num_retries)
107 @_FileLikeObjectBase._before_close
109 def readline(self, size=float('inf'), num_retries=None):
110 cache_pos, cache_data = self._readline_cache
111 if self.tell() == cache_pos:
113 self._filepos += len(cache_data)
116 data_size = len(data[-1])
117 while (data_size < size) and ('\n' not in data[-1]):
118 next_read = self.read(2 ** 20, num_retries=num_retries)
121 data.append(next_read)
122 data_size += len(next_read)
125 nextline_index = data.index('\n') + 1
127 nextline_index = len(data)
128 nextline_index = min(nextline_index, size)
129 self._filepos -= len(data) - nextline_index
130 self._readline_cache = (self.tell(), data[nextline_index:])
131 return data[:nextline_index]
133 @_FileLikeObjectBase._before_close
135 def decompress(self, decompress, size, num_retries=None):
136 for segment in self.readall(size, num_retries=num_retries):
137 data = decompress(segment)
141 @_FileLikeObjectBase._before_close
143 def readall_decompressed(self, size=2**20, num_retries=None):
145 if self.name.endswith('.bz2'):
146 dc = bz2.BZ2Decompressor()
147 return self.decompress(dc.decompress, size,
148 num_retries=num_retries)
149 elif self.name.endswith('.gz'):
150 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152 size, num_retries=num_retries)
154 return self.readall(size, num_retries=num_retries)
156 @_FileLikeObjectBase._before_close
158 def readlines(self, sizehint=float('inf'), num_retries=None):
161 for s in self.readall(num_retries=num_retries):
164 if data_size >= sizehint:
166 return ''.join(data).splitlines(True)
169 raise NotImplementedError()
171 def read(self, size, num_retries=None):
172 raise NotImplementedError()
174 def readfrom(self, start, size, num_retries=None):
175 raise NotImplementedError()
178 class StreamFileReader(ArvadosFileReaderBase):
179 class _NameAttribute(str):
180 # The Python file API provides a plain .name attribute.
181 # Older SDK provided a name() method.
182 # This class provides both, for maximum compatibility.
186 def __init__(self, stream, segments, name):
187 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188 self._stream = stream
189 self.segments = segments
191 def stream_name(self):
192 return self._stream.name()
195 n = self.segments[-1]
196 return n.range_start + n.range_size
198 @_FileLikeObjectBase._before_close
200 def read(self, size, num_retries=None):
201 """Read up to 'size' bytes from the stream, starting at the current file position"""
206 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
208 lr = available_chunks[0]
209 data = self._stream.readfrom(lr.locator+lr.segment_offset,
211 num_retries=num_retries)
213 self._filepos += len(data)
216 @_FileLikeObjectBase._before_close
218 def readfrom(self, start, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at 'start'"""
224 for lr in locators_and_ranges(self.segments, start, size):
225 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226 num_retries=num_retries))
229 def as_manifest(self):
231 for r in self.segments:
232 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
236 def synchronized(orig_func):
237 @functools.wraps(orig_func)
238 def synchronized_wrapper(self, *args, **kwargs):
240 return orig_func(self, *args, **kwargs)
241 return synchronized_wrapper
244 class StateChangeError(Exception):
245 def __init__(self, message, state, nextstate):
246 super(StateChangeError, self).__init__(message)
248 self.nextstate = nextstate
250 class _BufferBlock(object):
251 """A stand-in for a Keep block that is in the process of being written.
253 Writers can append to it, get the size, and compute the Keep locator.
254 There are three valid states:
260 Block is in the process of being uploaded to Keep, append is an error.
263 The block has been written to Keep, its internal buffer has been
264 released, fetching the block will fetch it via keep client (since we
265 discarded the internal copy), and identifiers referring to the BufferBlock
266 can be replaced with the block locator.
275 def __init__(self, blockid, starting_capacity, owner):
278 the identifier for this block
281 the initial buffer capacity
284 ArvadosFile that owns this block
287 self.blockid = blockid
288 self.buffer_block = bytearray(starting_capacity)
289 self.buffer_view = memoryview(self.buffer_block)
290 self.write_pointer = 0
291 self._state = _BufferBlock.WRITABLE
294 self.lock = threading.Lock()
295 self.wait_for_commit = threading.Event()
299 def append(self, data):
300 """Append some data to the buffer.
302 Only valid if the block is in WRITABLE state. Implements an expanding
303 buffer, doubling capacity as needed to accomdate all the data.
306 if self._state == _BufferBlock.WRITABLE:
307 while (self.write_pointer+len(data)) > len(self.buffer_block):
308 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310 self.buffer_block = new_buffer_block
311 self.buffer_view = memoryview(self.buffer_block)
312 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313 self.write_pointer += len(data)
316 raise AssertionError("Buffer block is not writable")
318 STATE_TRANSITIONS = frozenset([
320 (PENDING, COMMITTED),
325 def set_state(self, nextstate, val=None):
326 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328 self._state = nextstate
330 if self._state == _BufferBlock.PENDING:
331 self.wait_for_commit.clear()
333 if self._state == _BufferBlock.COMMITTED:
335 self.buffer_view = None
336 self.buffer_block = None
337 self.wait_for_commit.set()
339 if self._state == _BufferBlock.ERROR:
341 self.wait_for_commit.set()
348 """The amount of data written to the buffer."""
349 return self.write_pointer
353 """The Keep locator for this buffer's contents."""
354 if self._locator is None:
355 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
359 def clone(self, new_blockid, owner):
360 if self._state == _BufferBlock.COMMITTED:
361 raise AssertionError("Cannot duplicate committed buffer block")
362 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363 bufferblock.append(self.buffer_view[0:self.size()])
369 self.buffer_block = None
370 self.buffer_view = None
373 class NoopLock(object):
377 def __exit__(self, exc_type, exc_value, traceback):
380 def acquire(self, blocking=False):
387 def must_be_writable(orig_func):
388 @functools.wraps(orig_func)
389 def must_be_writable_wrapper(self, *args, **kwargs):
390 if not self.writable():
391 raise IOError(errno.EROFS, "Collection is read-only.")
392 return orig_func(self, *args, **kwargs)
393 return must_be_writable_wrapper
396 class _BlockManager(object):
397 """BlockManager handles buffer blocks.
399 Also handles background block uploads, and background block prefetch for a
400 Collection of ArvadosFiles.
404 DEFAULT_PUT_THREADS = 2
405 DEFAULT_GET_THREADS = 2
407 def __init__(self, keep, copies=None):
408 """keep: KeepClient object to use"""
410 self._bufferblocks = collections.OrderedDict()
411 self._put_queue = None
412 self._put_threads = None
413 self._prefetch_queue = None
414 self._prefetch_threads = None
415 self.lock = threading.Lock()
416 self.prefetch_enabled = True
417 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
422 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
423 """Allocate a new, empty bufferblock in WRITABLE state and return it.
426 optional block identifier, otherwise one will be automatically assigned
429 optional capacity, otherwise will use default capacity
432 ArvadosFile that owns this block
435 return self._alloc_bufferblock(blockid, starting_capacity, owner)
437 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
439 blockid = "%s" % uuid.uuid4()
440 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
441 self._bufferblocks[bufferblock.blockid] = bufferblock
445 def dup_block(self, block, owner):
446 """Create a new bufferblock initialized with the content of an existing bufferblock.
449 the buffer block to copy.
452 ArvadosFile that owns the new block
455 new_blockid = "bufferblock%i" % len(self._bufferblocks)
456 bufferblock = block.clone(new_blockid, owner)
457 self._bufferblocks[bufferblock.blockid] = bufferblock
461 def is_bufferblock(self, locator):
462 return locator in self._bufferblocks
464 def _commit_bufferblock_worker(self):
465 """Background uploader thread."""
469 bufferblock = self._put_queue.get()
470 if bufferblock is None:
473 if self.copies is None:
474 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
476 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
477 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
479 except Exception as e:
480 bufferblock.set_state(_BufferBlock.ERROR, e)
482 if self._put_queue is not None:
483 self._put_queue.task_done()
486 def start_put_threads(self):
487 if self._put_threads is None:
488 # Start uploader threads.
490 # If we don't limit the Queue size, the upload queue can quickly
491 # grow to take up gigabytes of RAM if the writing process is
492 # generating data more quickly than it can be send to the Keep
495 # With two upload threads and a queue size of 2, this means up to 4
496 # blocks pending. If they are full 64 MiB blocks, that means up to
497 # 256 MiB of internal buffering, which is the same size as the
498 # default download block cache in KeepClient.
499 self._put_queue = Queue.Queue(maxsize=2)
501 self._put_threads = []
502 for i in xrange(0, self.num_put_threads):
503 thread = threading.Thread(target=self._commit_bufferblock_worker)
504 self._put_threads.append(thread)
508 def _block_prefetch_worker(self):
509 """The background downloader thread."""
512 b = self._prefetch_queue.get()
520 def start_get_threads(self):
521 if self._prefetch_threads is None:
522 self._prefetch_queue = Queue.Queue()
523 self._prefetch_threads = []
524 for i in xrange(0, self.num_get_threads):
525 thread = threading.Thread(target=self._block_prefetch_worker)
526 self._prefetch_threads.append(thread)
532 def stop_threads(self):
533 """Shut down and wait for background upload and download threads to finish."""
535 if self._put_threads is not None:
536 for t in self._put_threads:
537 self._put_queue.put(None)
538 for t in self._put_threads:
540 self._put_threads = None
541 self._put_queue = None
543 if self._prefetch_threads is not None:
544 for t in self._prefetch_threads:
545 self._prefetch_queue.put(None)
546 for t in self._prefetch_threads:
548 self._prefetch_threads = None
549 self._prefetch_queue = None
554 def __exit__(self, exc_type, exc_value, traceback):
558 def repack_small_blocks(self, force=False, sync=False):
559 """Packs small blocks together before uploading"""
560 # Search blocks ready for getting packed together before being committed to Keep.
561 # A WRITABLE block always has an owner.
562 # A WRITABLE block with its owner.closed() implies that it's
563 # size is <= KEEP_BLOCK_SIZE/2.
564 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
565 if len(small_blocks) <= 1:
566 # Not enough small blocks for repacking
569 # Check if there are enough small blocks for filling up one in full
570 pending_write_size = sum([b.size() for b in small_blocks])
571 if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
572 new_bb = self._alloc_bufferblock()
573 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
574 bb = small_blocks.pop(0)
576 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
577 arvfile.set_segments([Range(new_bb.blockid,
580 new_bb.write_pointer - bb.size())])
581 self._delete_bufferblock(bb.blockid)
582 self.commit_bufferblock(new_bb, sync=sync)
584 def commit_bufferblock(self, block, sync):
585 """Initiate a background upload of a bufferblock.
588 The block object to upload
591 If `sync` is True, upload the block synchronously.
592 If `sync` is False, upload the block asynchronously. This will
593 return immediately unless the upload queue is at capacity, in
594 which case it will wait on an upload queue slot.
598 # Mark the block as PENDING so to disallow any more appends.
599 block.set_state(_BufferBlock.PENDING)
600 except StateChangeError as e:
601 if e.state == _BufferBlock.PENDING:
603 block.wait_for_commit.wait()
606 if block.state() == _BufferBlock.COMMITTED:
608 elif block.state() == _BufferBlock.ERROR:
615 if self.copies is None:
616 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
618 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
619 block.set_state(_BufferBlock.COMMITTED, loc)
620 except Exception as e:
621 block.set_state(_BufferBlock.ERROR, e)
624 self.start_put_threads()
625 self._put_queue.put(block)
628 def get_bufferblock(self, locator):
629 return self._bufferblocks.get(locator)
632 def delete_bufferblock(self, locator):
633 self._delete_bufferblock(locator)
635 def _delete_bufferblock(self, locator):
636 bb = self._bufferblocks[locator]
638 del self._bufferblocks[locator]
640 def get_block_contents(self, locator, num_retries, cache_only=False):
643 First checks to see if the locator is a BufferBlock and return that, if
644 not, passes the request through to KeepClient.get().
648 if locator in self._bufferblocks:
649 bufferblock = self._bufferblocks[locator]
650 if bufferblock.state() != _BufferBlock.COMMITTED:
651 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
653 locator = bufferblock._locator
655 return self._keep.get_from_cache(locator)
657 return self._keep.get(locator, num_retries=num_retries)
659 def commit_all(self):
660 """Commit all outstanding buffer blocks.
662 This is a synchronous call, and will not return until all buffer blocks
663 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
666 self.repack_small_blocks(force=True, sync=True)
669 items = self._bufferblocks.items()
672 if v.state() != _BufferBlock.COMMITTED and v.owner:
673 v.owner.flush(sync=False)
676 if self._put_queue is not None:
677 self._put_queue.join()
681 if v.state() == _BufferBlock.ERROR:
682 err.append((v.locator(), v.error))
684 raise KeepWriteError("Error writing some blocks", err, label="block")
687 # flush again with sync=True to remove committed bufferblocks from
690 v.owner.flush(sync=True)
692 def block_prefetch(self, locator):
693 """Initiate a background download of a block.
695 This assumes that the underlying KeepClient implements a block cache,
696 so repeated requests for the same block will not result in repeated
697 downloads (unless the block is evicted from the cache.) This method
702 if not self.prefetch_enabled:
705 if self._keep.get_from_cache(locator) is not None:
709 if locator in self._bufferblocks:
712 self.start_get_threads()
713 self._prefetch_queue.put(locator)
716 class ArvadosFile(object):
717 """Represent a file in a Collection.
719 ArvadosFile manages the underlying representation of a file in Keep as a
720 sequence of segments spanning a set of blocks, and implements random
723 This object may be accessed from multiple threads.
727 def __init__(self, parent, name, stream=[], segments=[]):
729 ArvadosFile constructor.
732 a list of Range objects representing a block stream
735 a list of Range objects representing segments
739 self._writers = set()
740 self._committed = False
742 self.lock = parent.root_collection().lock
744 self._add_segment(stream, s.locator, s.range_size)
745 self._current_bblock = None
748 return self.parent.writable()
752 return copy.copy(self._segments)
755 def clone(self, new_parent, new_name):
756 """Make a copy of this file."""
757 cp = ArvadosFile(new_parent, new_name)
758 cp.replace_contents(self)
763 def replace_contents(self, other):
764 """Replace segments of this file with segments from another `ArvadosFile` object."""
768 for other_segment in other.segments():
769 new_loc = other_segment.locator
770 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
771 if other_segment.locator not in map_loc:
772 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
773 if bufferblock.state() != _BufferBlock.WRITABLE:
774 map_loc[other_segment.locator] = bufferblock.locator()
776 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
777 new_loc = map_loc[other_segment.locator]
779 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
781 self._committed = False
783 def __eq__(self, other):
786 if not isinstance(other, ArvadosFile):
789 othersegs = other.segments()
791 if len(self._segments) != len(othersegs):
793 for i in xrange(0, len(othersegs)):
794 seg1 = self._segments[i]
799 if self.parent._my_block_manager().is_bufferblock(loc1):
800 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
802 if other.parent._my_block_manager().is_bufferblock(loc2):
803 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
805 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
806 seg1.range_start != seg2.range_start or
807 seg1.range_size != seg2.range_size or
808 seg1.segment_offset != seg2.segment_offset):
813 def __ne__(self, other):
814 return not self.__eq__(other)
817 def set_segments(self, segs):
818 self._segments = segs
821 def set_committed(self):
822 """Set committed flag to True"""
823 self._committed = True
827 """Get whether this is committed or not."""
828 return self._committed
831 def add_writer(self, writer):
832 """Add an ArvadosFileWriter reference to the list of writers"""
833 if isinstance(writer, ArvadosFileWriter):
834 self._writers.add(writer)
837 def remove_writer(self, writer):
839 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
840 and do some block maintenance tasks.
842 self._writers.remove(writer)
844 if self.size() > config.KEEP_BLOCK_SIZE / 2:
845 # File writer closed, not small enough for repacking
848 # All writers closed and size is adequate for repacking
849 self.parent._my_block_manager().repack_small_blocks()
853 Get whether this is closed or not. When the writers list is empty, the file
854 is supposed to be closed.
856 return len(self._writers) == 0
860 def truncate(self, size):
861 """Shrink the size of the file.
863 If `size` is less than the size of the file, the file contents after
864 `size` will be discarded. If `size` is greater than the current size
865 of the file, an IOError will be raised.
868 if size < self.size():
870 for r in self._segments:
871 range_end = r.range_start+r.range_size
872 if r.range_start >= size:
873 # segment is past the trucate size, all done
875 elif size < range_end:
876 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
877 nr.segment_offset = r.segment_offset
883 self._segments = new_segs
884 self._committed = False
885 elif size > self.size():
886 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
888 def readfrom(self, offset, size, num_retries, exact=False):
889 """Read up to `size` bytes from the file starting at `offset`.
892 If False (default), return less data than requested if the read
893 crosses a block boundary and the next block isn't cached. If True,
894 only return less data than requested when hitting EOF.
898 if size == 0 or offset >= self.size():
900 readsegs = locators_and_ranges(self._segments, offset, size)
901 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
906 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
908 blockview = memoryview(block)
909 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
915 if lr.locator not in locs:
916 self.parent._my_block_manager().block_prefetch(lr.locator)
921 def _repack_writes(self, num_retries):
922 """Test if the buffer block has more data than actual segments.
924 This happens when a buffered write over-writes a file range written in
925 a previous buffered write. Re-pack the buffer block for efficiency
926 and to avoid leaking information.
929 segs = self._segments
931 # Sum up the segments to get the total bytes of the file referencing
932 # into the buffer block.
933 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
934 write_total = sum([s.range_size for s in bufferblock_segs])
936 if write_total < self._current_bblock.size():
937 # There is more data in the buffer block than is actually accounted for by segments, so
938 # re-pack into a new buffer by copying over to a new buffer block.
939 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
940 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
941 for t in bufferblock_segs:
942 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
943 t.segment_offset = new_bb.size() - t.range_size
945 self._current_bblock = new_bb
949 def writeto(self, offset, data, num_retries):
950 """Write `data` to the file starting at `offset`.
952 This will update existing bytes and/or extend the size of the file as
959 if offset > self.size():
960 raise ArgumentError("Offset is past the end of the file")
962 if len(data) > config.KEEP_BLOCK_SIZE:
963 # Chunk it up into smaller writes
965 dataview = memoryview(data)
967 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
968 n += config.KEEP_BLOCK_SIZE
971 self._committed = False
973 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
974 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
976 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
977 self._repack_writes(num_retries)
978 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
979 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
980 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
982 self._current_bblock.append(data)
984 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
986 self.parent.notify(WRITE, self.parent, self.name, (self, self))
991 def flush(self, sync=True, num_retries=0):
992 """Flush the current bufferblock to Keep.
995 If True, commit block synchronously, wait until buffer block has been written.
996 If False, commit block asynchronously, return immediately after putting block into
1002 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1003 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1004 self._repack_writes(num_retries)
1005 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1009 for s in self._segments:
1010 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1012 if bb.state() != _BufferBlock.COMMITTED:
1013 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1014 to_delete.add(s.locator)
1015 s.locator = bb.locator()
1017 self.parent._my_block_manager().delete_bufferblock(s)
1019 self.parent.notify(MOD, self.parent, self.name, (self, self))
1023 def add_segment(self, blocks, pos, size):
1024 """Add a segment to the end of the file.
1026 `pos` and `offset` reference a section of the stream described by
1027 `blocks` (a list of Range objects)
1030 self._add_segment(blocks, pos, size)
1032 def _add_segment(self, blocks, pos, size):
1033 """Internal implementation of add_segment."""
1034 self._committed = False
1035 for lr in locators_and_ranges(blocks, pos, size):
1036 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1037 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1038 self._segments.append(r)
1042 """Get the file size."""
1044 n = self._segments[-1]
1045 return n.range_start + n.range_size
1050 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1053 for segment in self.segments:
1054 loc = segment.locator
1055 if loc.startswith("bufferblock"):
1056 loc = self._bufferblocks[loc].calculate_locator()
1057 if portable_locators:
1058 loc = KeepLocator(loc).stripped()
1059 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1060 segment.segment_offset, segment.range_size))
1061 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1067 def _reparent(self, newparent, newname):
1068 self._committed = False
1069 self.flush(sync=True)
1070 self.parent.remove(self.name)
1071 self.parent = newparent
1073 self.lock = self.parent.root_collection().lock
1076 class ArvadosFileReader(ArvadosFileReaderBase):
1077 """Wraps ArvadosFile in a file-like object supporting reading only.
1079 Be aware that this class is NOT thread safe as there is no locking around
1080 updating file pointer.
1084 def __init__(self, arvadosfile, num_retries=None):
1085 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1086 self.arvadosfile = arvadosfile
1089 return self.arvadosfile.size()
1091 def stream_name(self):
1092 return self.arvadosfile.parent.stream_name()
1094 @_FileLikeObjectBase._before_close
1096 def read(self, size=None, num_retries=None):
1097 """Read up to `size` bytes from the file and return the result.
1099 Starts at the current file position. If `size` is None, read the
1100 entire remainder of the file.
1104 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1107 self._filepos += len(rd)
1108 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1109 return ''.join(data)
1111 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1112 self._filepos += len(data)
1115 @_FileLikeObjectBase._before_close
1117 def readfrom(self, offset, size, num_retries=None):
1118 """Read up to `size` bytes from the stream, starting at the specified file offset.
1120 This method does not change the file position.
1122 return self.arvadosfile.readfrom(offset, size, num_retries)
1128 class ArvadosFileWriter(ArvadosFileReader):
1129 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1131 Be aware that this class is NOT thread safe as there is no locking around
1132 updating file pointer.
1136 def __init__(self, arvadosfile, mode, num_retries=None):
1137 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1139 self.arvadosfile.add_writer(self)
1141 @_FileLikeObjectBase._before_close
1143 def write(self, data, num_retries=None):
1144 if self.mode[0] == "a":
1145 self.arvadosfile.writeto(self.size(), data, num_retries)
1147 self.arvadosfile.writeto(self._filepos, data, num_retries)
1148 self._filepos += len(data)
1151 @_FileLikeObjectBase._before_close
1153 def writelines(self, seq, num_retries=None):
1155 self.write(s, num_retries=num_retries)
1157 @_FileLikeObjectBase._before_close
1158 def truncate(self, size=None):
1160 size = self._filepos
1161 self.arvadosfile.truncate(size)
1162 if self._filepos > self.size():
1163 self._filepos = self.size()
1165 @_FileLikeObjectBase._before_close
1167 self.arvadosfile.flush()
1171 self.arvadosfile.remove_writer(self)
1172 super(ArvadosFileWriter, self).close()