16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
41 class _FileLikeObjectBase(object):
42 def __init__(self, name, mode):
48 def _before_close(orig_func):
49 @functools.wraps(orig_func)
50 def before_close_wrapper(self, *args, **kwargs):
52 raise ValueError("I/O operation on closed stream file")
53 return orig_func(self, *args, **kwargs)
54 return before_close_wrapper
59 def __exit__(self, exc_type, exc_value, traceback):
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71 def __init__(self, name, mode, num_retries=None):
72 super(ArvadosFileReaderBase, self).__init__(name, mode)
74 self.num_retries = num_retries
75 self._readline_cache = (None, None)
79 data = self.readline()
84 def decompressed_name(self):
85 return re.sub('\.(bz2|gz)$', '', self.name)
87 @_FileLikeObjectBase._before_close
88 def seek(self, pos, whence=os.SEEK_SET):
89 if whence == os.SEEK_CUR:
91 elif whence == os.SEEK_END:
93 self._filepos = min(max(pos, 0L), self.size())
98 @_FileLikeObjectBase._before_close
100 def readall(self, size=2**20, num_retries=None):
102 data = self.read(size, num_retries=num_retries)
107 @_FileLikeObjectBase._before_close
109 def readline(self, size=float('inf'), num_retries=None):
110 cache_pos, cache_data = self._readline_cache
111 if self.tell() == cache_pos:
113 self._filepos += len(cache_data)
116 data_size = len(data[-1])
117 while (data_size < size) and ('\n' not in data[-1]):
118 next_read = self.read(2 ** 20, num_retries=num_retries)
121 data.append(next_read)
122 data_size += len(next_read)
125 nextline_index = data.index('\n') + 1
127 nextline_index = len(data)
128 nextline_index = min(nextline_index, size)
129 self._filepos -= len(data) - nextline_index
130 self._readline_cache = (self.tell(), data[nextline_index:])
131 return data[:nextline_index]
133 @_FileLikeObjectBase._before_close
135 def decompress(self, decompress, size, num_retries=None):
136 for segment in self.readall(size, num_retries=num_retries):
137 data = decompress(segment)
141 @_FileLikeObjectBase._before_close
143 def readall_decompressed(self, size=2**20, num_retries=None):
145 if self.name.endswith('.bz2'):
146 dc = bz2.BZ2Decompressor()
147 return self.decompress(dc.decompress, size,
148 num_retries=num_retries)
149 elif self.name.endswith('.gz'):
150 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152 size, num_retries=num_retries)
154 return self.readall(size, num_retries=num_retries)
156 @_FileLikeObjectBase._before_close
158 def readlines(self, sizehint=float('inf'), num_retries=None):
161 for s in self.readall(num_retries=num_retries):
164 if data_size >= sizehint:
166 return ''.join(data).splitlines(True)
169 raise NotImplementedError()
171 def read(self, size, num_retries=None):
172 raise NotImplementedError()
174 def readfrom(self, start, size, num_retries=None):
175 raise NotImplementedError()
178 class StreamFileReader(ArvadosFileReaderBase):
179 class _NameAttribute(str):
180 # The Python file API provides a plain .name attribute.
181 # Older SDK provided a name() method.
182 # This class provides both, for maximum compatibility.
186 def __init__(self, stream, segments, name):
187 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188 self._stream = stream
189 self.segments = segments
191 def stream_name(self):
192 return self._stream.name()
195 n = self.segments[-1]
196 return n.range_start + n.range_size
198 @_FileLikeObjectBase._before_close
200 def read(self, size, num_retries=None):
201 """Read up to 'size' bytes from the stream, starting at the current file position"""
206 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
208 lr = available_chunks[0]
209 data = self._stream.readfrom(lr.locator+lr.segment_offset,
211 num_retries=num_retries)
213 self._filepos += len(data)
216 @_FileLikeObjectBase._before_close
218 def readfrom(self, start, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at 'start'"""
224 for lr in locators_and_ranges(self.segments, start, size):
225 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226 num_retries=num_retries))
229 def as_manifest(self):
231 for r in self.segments:
232 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
236 def synchronized(orig_func):
237 @functools.wraps(orig_func)
238 def synchronized_wrapper(self, *args, **kwargs):
240 return orig_func(self, *args, **kwargs)
241 return synchronized_wrapper
244 class StateChangeError(Exception):
245 def __init__(self, message, state, nextstate):
246 super(StateChangeError, self).__init__(message)
248 self.nextstate = nextstate
250 class _BufferBlock(object):
251 """A stand-in for a Keep block that is in the process of being written.
253 Writers can append to it, get the size, and compute the Keep locator.
254 There are three valid states:
260 Block is in the process of being uploaded to Keep, append is an error.
263 The block has been written to Keep, its internal buffer has been
264 released, fetching the block will fetch it via keep client (since we
265 discarded the internal copy), and identifiers referring to the BufferBlock
266 can be replaced with the block locator.
275 def __init__(self, blockid, starting_capacity, owner):
278 the identifier for this block
281 the initial buffer capacity
284 ArvadosFile that owns this block
287 self.blockid = blockid
288 self.buffer_block = bytearray(starting_capacity)
289 self.buffer_view = memoryview(self.buffer_block)
290 self.write_pointer = 0
291 self._state = _BufferBlock.WRITABLE
294 self.lock = threading.Lock()
295 self.wait_for_commit = threading.Event()
299 def append(self, data):
300 """Append some data to the buffer.
302 Only valid if the block is in WRITABLE state. Implements an expanding
303 buffer, doubling capacity as needed to accomdate all the data.
306 if self._state == _BufferBlock.WRITABLE:
307 while (self.write_pointer+len(data)) > len(self.buffer_block):
308 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310 self.buffer_block = new_buffer_block
311 self.buffer_view = memoryview(self.buffer_block)
312 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313 self.write_pointer += len(data)
316 raise AssertionError("Buffer block is not writable")
318 STATE_TRANSITIONS = frozenset([
320 (PENDING, COMMITTED),
325 def set_state(self, nextstate, val=None):
326 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328 self._state = nextstate
330 if self._state == _BufferBlock.PENDING:
331 self.wait_for_commit.clear()
333 if self._state == _BufferBlock.COMMITTED:
335 self.buffer_view = None
336 self.buffer_block = None
337 self.wait_for_commit.set()
339 if self._state == _BufferBlock.ERROR:
341 self.wait_for_commit.set()
348 """The amount of data written to the buffer."""
349 return self.write_pointer
353 """The Keep locator for this buffer's contents."""
354 if self._locator is None:
355 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
359 def clone(self, new_blockid, owner):
360 if self._state == _BufferBlock.COMMITTED:
361 raise AssertionError("Cannot duplicate committed buffer block")
362 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363 bufferblock.append(self.buffer_view[0:self.size()])
369 self.buffer_block = None
370 self.buffer_view = None
373 class NoopLock(object):
377 def __exit__(self, exc_type, exc_value, traceback):
380 def acquire(self, blocking=False):
387 def must_be_writable(orig_func):
388 @functools.wraps(orig_func)
389 def must_be_writable_wrapper(self, *args, **kwargs):
390 if not self.writable():
391 raise IOError(errno.EROFS, "Collection is read-only.")
392 return orig_func(self, *args, **kwargs)
393 return must_be_writable_wrapper
396 class _BlockManager(object):
397 """BlockManager handles buffer blocks.
399 Also handles background block uploads, and background block prefetch for a
400 Collection of ArvadosFiles.
404 DEFAULT_PUT_THREADS = 2
405 DEFAULT_GET_THREADS = 2
407 def __init__(self, keep, copies=None, put_threads=None):
408 """keep: KeepClient object to use"""
410 self._bufferblocks = collections.OrderedDict()
411 self._put_queue = None
412 self._put_threads = None
413 self._prefetch_queue = None
414 self._prefetch_threads = None
415 self.lock = threading.Lock()
416 self.prefetch_enabled = True
418 self.num_put_threads = put_threads
420 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
421 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
423 self._pending_write_size = 0
424 self.threads_lock = threading.Lock()
427 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
428 """Allocate a new, empty bufferblock in WRITABLE state and return it.
431 optional block identifier, otherwise one will be automatically assigned
434 optional capacity, otherwise will use default capacity
437 ArvadosFile that owns this block
440 return self._alloc_bufferblock(blockid, starting_capacity, owner)
442 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
444 blockid = "%s" % uuid.uuid4()
445 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
446 self._bufferblocks[bufferblock.blockid] = bufferblock
450 def dup_block(self, block, owner):
451 """Create a new bufferblock initialized with the content of an existing bufferblock.
454 the buffer block to copy.
457 ArvadosFile that owns the new block
460 new_blockid = "bufferblock%i" % len(self._bufferblocks)
461 bufferblock = block.clone(new_blockid, owner)
462 self._bufferblocks[bufferblock.blockid] = bufferblock
466 def is_bufferblock(self, locator):
467 return locator in self._bufferblocks
469 def _commit_bufferblock_worker(self):
470 """Background uploader thread."""
474 bufferblock = self._put_queue.get()
475 if bufferblock is None:
478 if self.copies is None:
479 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
481 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
482 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
484 except Exception as e:
485 bufferblock.set_state(_BufferBlock.ERROR, e)
487 if self._put_queue is not None:
488 self._put_queue.task_done()
490 def start_put_threads(self):
491 with self.threads_lock:
492 if self._put_threads is None:
493 # Start uploader threads.
495 # If we don't limit the Queue size, the upload queue can quickly
496 # grow to take up gigabytes of RAM if the writing process is
497 # generating data more quickly than it can be send to the Keep
500 # With two upload threads and a queue size of 2, this means up to 4
501 # blocks pending. If they are full 64 MiB blocks, that means up to
502 # 256 MiB of internal buffering, which is the same size as the
503 # default download block cache in KeepClient.
504 self._put_queue = Queue.Queue(maxsize=2)
506 self._put_threads = []
507 for i in xrange(0, self.num_put_threads):
508 thread = threading.Thread(target=self._commit_bufferblock_worker)
509 self._put_threads.append(thread)
513 def _block_prefetch_worker(self):
514 """The background downloader thread."""
517 b = self._prefetch_queue.get()
522 _logger.exception("Exception doing block prefetch")
525 def start_get_threads(self):
526 if self._prefetch_threads is None:
527 self._prefetch_queue = Queue.Queue()
528 self._prefetch_threads = []
529 for i in xrange(0, self.num_get_threads):
530 thread = threading.Thread(target=self._block_prefetch_worker)
531 self._prefetch_threads.append(thread)
537 def stop_threads(self):
538 """Shut down and wait for background upload and download threads to finish."""
540 if self._put_threads is not None:
541 for t in self._put_threads:
542 self._put_queue.put(None)
543 for t in self._put_threads:
545 self._put_threads = None
546 self._put_queue = None
548 if self._prefetch_threads is not None:
549 for t in self._prefetch_threads:
550 self._prefetch_queue.put(None)
551 for t in self._prefetch_threads:
553 self._prefetch_threads = None
554 self._prefetch_queue = None
559 def __exit__(self, exc_type, exc_value, traceback):
563 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
564 """Packs small blocks together before uploading"""
565 self._pending_write_size += closed_file_size
567 # Check if there are enough small blocks for filling up one in full
568 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
570 # Search blocks ready for getting packed together before being committed to Keep.
571 # A WRITABLE block always has an owner.
572 # A WRITABLE block with its owner.closed() implies that it's
573 # size is <= KEEP_BLOCK_SIZE/2.
574 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
576 if len(small_blocks) <= 1:
577 # Not enough small blocks for repacking
580 # Update the pending write size count with its true value, just in case
581 # some small file was opened, written and closed several times.
582 self._pending_write_size = sum([b.size() for b in small_blocks])
583 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
586 new_bb = self._alloc_bufferblock()
587 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
588 bb = small_blocks.pop(0)
590 self._pending_write_size -= bb.size()
591 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
592 arvfile.set_segments([Range(new_bb.blockid,
595 new_bb.write_pointer - bb.size())])
596 self._delete_bufferblock(bb.blockid)
597 self.commit_bufferblock(new_bb, sync=sync)
599 def commit_bufferblock(self, block, sync):
600 """Initiate a background upload of a bufferblock.
603 The block object to upload
606 If `sync` is True, upload the block synchronously.
607 If `sync` is False, upload the block asynchronously. This will
608 return immediately unless the upload queue is at capacity, in
609 which case it will wait on an upload queue slot.
613 # Mark the block as PENDING so to disallow any more appends.
614 block.set_state(_BufferBlock.PENDING)
615 except StateChangeError as e:
616 if e.state == _BufferBlock.PENDING:
618 block.wait_for_commit.wait()
621 if block.state() == _BufferBlock.COMMITTED:
623 elif block.state() == _BufferBlock.ERROR:
630 if self.copies is None:
631 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
633 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
634 block.set_state(_BufferBlock.COMMITTED, loc)
635 except Exception as e:
636 block.set_state(_BufferBlock.ERROR, e)
639 self.start_put_threads()
640 self._put_queue.put(block)
643 def get_bufferblock(self, locator):
644 return self._bufferblocks.get(locator)
647 def delete_bufferblock(self, locator):
648 self._delete_bufferblock(locator)
650 def _delete_bufferblock(self, locator):
651 bb = self._bufferblocks[locator]
653 del self._bufferblocks[locator]
655 def get_block_contents(self, locator, num_retries, cache_only=False):
658 First checks to see if the locator is a BufferBlock and return that, if
659 not, passes the request through to KeepClient.get().
663 if locator in self._bufferblocks:
664 bufferblock = self._bufferblocks[locator]
665 if bufferblock.state() != _BufferBlock.COMMITTED:
666 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
668 locator = bufferblock._locator
670 return self._keep.get_from_cache(locator)
672 return self._keep.get(locator, num_retries=num_retries)
674 def commit_all(self):
675 """Commit all outstanding buffer blocks.
677 This is a synchronous call, and will not return until all buffer blocks
678 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
681 self.repack_small_blocks(force=True, sync=True)
684 items = self._bufferblocks.items()
687 if v.state() != _BufferBlock.COMMITTED and v.owner:
688 v.owner.flush(sync=False)
691 if self._put_queue is not None:
692 self._put_queue.join()
696 if v.state() == _BufferBlock.ERROR:
697 err.append((v.locator(), v.error))
699 raise KeepWriteError("Error writing some blocks", err, label="block")
702 # flush again with sync=True to remove committed bufferblocks from
705 v.owner.flush(sync=True)
707 def block_prefetch(self, locator):
708 """Initiate a background download of a block.
710 This assumes that the underlying KeepClient implements a block cache,
711 so repeated requests for the same block will not result in repeated
712 downloads (unless the block is evicted from the cache.) This method
717 if not self.prefetch_enabled:
720 if self._keep.get_from_cache(locator) is not None:
724 if locator in self._bufferblocks:
727 self.start_get_threads()
728 self._prefetch_queue.put(locator)
731 class ArvadosFile(object):
732 """Represent a file in a Collection.
734 ArvadosFile manages the underlying representation of a file in Keep as a
735 sequence of segments spanning a set of blocks, and implements random
738 This object may be accessed from multiple threads.
742 def __init__(self, parent, name, stream=[], segments=[]):
744 ArvadosFile constructor.
747 a list of Range objects representing a block stream
750 a list of Range objects representing segments
754 self._writers = set()
755 self._committed = False
757 self.lock = parent.root_collection().lock
759 self._add_segment(stream, s.locator, s.range_size)
760 self._current_bblock = None
763 return self.parent.writable()
766 def permission_expired(self, as_of_dt=None):
767 """Returns True if any of the segment's locators is expired"""
768 for r in self._segments:
769 if KeepLocator(r.locator).permission_expired(as_of_dt):
775 return copy.copy(self._segments)
778 def clone(self, new_parent, new_name):
779 """Make a copy of this file."""
780 cp = ArvadosFile(new_parent, new_name)
781 cp.replace_contents(self)
786 def replace_contents(self, other):
787 """Replace segments of this file with segments from another `ArvadosFile` object."""
791 for other_segment in other.segments():
792 new_loc = other_segment.locator
793 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
794 if other_segment.locator not in map_loc:
795 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
796 if bufferblock.state() != _BufferBlock.WRITABLE:
797 map_loc[other_segment.locator] = bufferblock.locator()
799 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
800 new_loc = map_loc[other_segment.locator]
802 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
804 self.set_committed(False)
806 def __eq__(self, other):
809 if not isinstance(other, ArvadosFile):
812 othersegs = other.segments()
814 if len(self._segments) != len(othersegs):
816 for i in xrange(0, len(othersegs)):
817 seg1 = self._segments[i]
822 if self.parent._my_block_manager().is_bufferblock(loc1):
823 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
825 if other.parent._my_block_manager().is_bufferblock(loc2):
826 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
828 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
829 seg1.range_start != seg2.range_start or
830 seg1.range_size != seg2.range_size or
831 seg1.segment_offset != seg2.segment_offset):
836 def __ne__(self, other):
837 return not self.__eq__(other)
840 def set_segments(self, segs):
841 self._segments = segs
844 def set_committed(self, value=True):
845 """Set committed flag.
847 If value is True, set committed to be True.
849 If value is False, set committed to be False for this and all parents.
851 if value == self._committed:
853 self._committed = value
854 if self._committed is False and self.parent is not None:
855 self.parent.set_committed(False)
859 """Get whether this is committed or not."""
860 return self._committed
863 def add_writer(self, writer):
864 """Add an ArvadosFileWriter reference to the list of writers"""
865 if isinstance(writer, ArvadosFileWriter):
866 self._writers.add(writer)
869 def remove_writer(self, writer, flush):
871 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
872 and do some block maintenance tasks.
874 self._writers.remove(writer)
876 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
877 # File writer closed, not small enough for repacking
880 # All writers closed and size is adequate for repacking
881 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
885 Get whether this is closed or not. When the writers list is empty, the file
886 is supposed to be closed.
888 return len(self._writers) == 0
892 def truncate(self, size):
893 """Shrink the size of the file.
895 If `size` is less than the size of the file, the file contents after
896 `size` will be discarded. If `size` is greater than the current size
897 of the file, an IOError will be raised.
900 if size < self.size():
902 for r in self._segments:
903 range_end = r.range_start+r.range_size
904 if r.range_start >= size:
905 # segment is past the trucate size, all done
907 elif size < range_end:
908 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
909 nr.segment_offset = r.segment_offset
915 self._segments = new_segs
916 self.set_committed(False)
917 elif size > self.size():
918 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
920 def readfrom(self, offset, size, num_retries, exact=False):
921 """Read up to `size` bytes from the file starting at `offset`.
924 If False (default), return less data than requested if the read
925 crosses a block boundary and the next block isn't cached. If True,
926 only return less data than requested when hitting EOF.
930 if size == 0 or offset >= self.size():
932 readsegs = locators_and_ranges(self._segments, offset, size)
933 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
938 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
940 blockview = memoryview(block)
941 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
947 if lr.locator not in locs:
948 self.parent._my_block_manager().block_prefetch(lr.locator)
953 def _repack_writes(self, num_retries):
954 """Test if the buffer block has more data than actual segments.
956 This happens when a buffered write over-writes a file range written in
957 a previous buffered write. Re-pack the buffer block for efficiency
958 and to avoid leaking information.
961 segs = self._segments
963 # Sum up the segments to get the total bytes of the file referencing
964 # into the buffer block.
965 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
966 write_total = sum([s.range_size for s in bufferblock_segs])
968 if write_total < self._current_bblock.size():
969 # There is more data in the buffer block than is actually accounted for by segments, so
970 # re-pack into a new buffer by copying over to a new buffer block.
971 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
972 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
973 for t in bufferblock_segs:
974 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
975 t.segment_offset = new_bb.size() - t.range_size
977 self._current_bblock = new_bb
981 def writeto(self, offset, data, num_retries):
982 """Write `data` to the file starting at `offset`.
984 This will update existing bytes and/or extend the size of the file as
991 if offset > self.size():
992 raise ArgumentError("Offset is past the end of the file")
994 if len(data) > config.KEEP_BLOCK_SIZE:
995 # Chunk it up into smaller writes
997 dataview = memoryview(data)
999 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1000 n += config.KEEP_BLOCK_SIZE
1003 self.set_committed(False)
1005 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1006 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1008 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1009 self._repack_writes(num_retries)
1010 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1011 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1012 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1014 self._current_bblock.append(data)
1016 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1018 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1023 def flush(self, sync=True, num_retries=0):
1024 """Flush the current bufferblock to Keep.
1027 If True, commit block synchronously, wait until buffer block has been written.
1028 If False, commit block asynchronously, return immediately after putting block into
1031 if self.committed():
1034 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1035 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1036 self._repack_writes(num_retries)
1037 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1041 for s in self._segments:
1042 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1044 if bb.state() != _BufferBlock.COMMITTED:
1045 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1046 to_delete.add(s.locator)
1047 s.locator = bb.locator()
1049 self.parent._my_block_manager().delete_bufferblock(s)
1051 self.parent.notify(MOD, self.parent, self.name, (self, self))
1055 def add_segment(self, blocks, pos, size):
1056 """Add a segment to the end of the file.
1058 `pos` and `offset` reference a section of the stream described by
1059 `blocks` (a list of Range objects)
1062 self._add_segment(blocks, pos, size)
1064 def _add_segment(self, blocks, pos, size):
1065 """Internal implementation of add_segment."""
1066 self.set_committed(False)
1067 for lr in locators_and_ranges(blocks, pos, size):
1068 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1069 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1070 self._segments.append(r)
1074 """Get the file size."""
1076 n = self._segments[-1]
1077 return n.range_start + n.range_size
1082 def manifest_text(self, stream_name=".", portable_locators=False,
1083 normalize=False, only_committed=False):
1086 for segment in self.segments:
1087 loc = segment.locator
1088 if self.parent._my_block_manager().is_bufferblock(loc):
1091 loc = self._bufferblocks[loc].calculate_locator()
1092 if portable_locators:
1093 loc = KeepLocator(loc).stripped()
1094 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1095 segment.segment_offset, segment.range_size))
1096 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1102 def _reparent(self, newparent, newname):
1103 self.set_committed(False)
1104 self.flush(sync=True)
1105 self.parent.remove(self.name)
1106 self.parent = newparent
1108 self.lock = self.parent.root_collection().lock
1111 class ArvadosFileReader(ArvadosFileReaderBase):
1112 """Wraps ArvadosFile in a file-like object supporting reading only.
1114 Be aware that this class is NOT thread safe as there is no locking around
1115 updating file pointer.
1119 def __init__(self, arvadosfile, num_retries=None):
1120 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1121 self.arvadosfile = arvadosfile
1124 return self.arvadosfile.size()
1126 def stream_name(self):
1127 return self.arvadosfile.parent.stream_name()
1129 @_FileLikeObjectBase._before_close
1131 def read(self, size=None, num_retries=None):
1132 """Read up to `size` bytes from the file and return the result.
1134 Starts at the current file position. If `size` is None, read the
1135 entire remainder of the file.
1139 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1142 self._filepos += len(rd)
1143 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1144 return ''.join(data)
1146 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1147 self._filepos += len(data)
1150 @_FileLikeObjectBase._before_close
1152 def readfrom(self, offset, size, num_retries=None):
1153 """Read up to `size` bytes from the stream, starting at the specified file offset.
1155 This method does not change the file position.
1157 return self.arvadosfile.readfrom(offset, size, num_retries)
1163 class ArvadosFileWriter(ArvadosFileReader):
1164 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1166 Be aware that this class is NOT thread safe as there is no locking around
1167 updating file pointer.
1171 def __init__(self, arvadosfile, mode, num_retries=None):
1172 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1174 self.arvadosfile.add_writer(self)
1176 @_FileLikeObjectBase._before_close
1178 def write(self, data, num_retries=None):
1179 if self.mode[0] == "a":
1180 self.arvadosfile.writeto(self.size(), data, num_retries)
1182 self.arvadosfile.writeto(self._filepos, data, num_retries)
1183 self._filepos += len(data)
1186 @_FileLikeObjectBase._before_close
1188 def writelines(self, seq, num_retries=None):
1190 self.write(s, num_retries=num_retries)
1192 @_FileLikeObjectBase._before_close
1193 def truncate(self, size=None):
1195 size = self._filepos
1196 self.arvadosfile.truncate(size)
1197 if self._filepos > self.size():
1198 self._filepos = self.size()
1200 @_FileLikeObjectBase._before_close
1202 self.arvadosfile.flush()
1204 def close(self, flush=True):
1206 self.arvadosfile.remove_writer(self, flush)
1207 super(ArvadosFileWriter, self).close()