1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import range
6 from builtins import object
22 from .errors import KeepWriteError, AssertionError, ArgumentError
23 from .keep import KeepLocator
24 from ._normalize_stream import normalize_stream
25 from ._ranges import locators_and_ranges, replace_range, Range
26 from .retry import retry_method
31 _logger = logging.getLogger('arvados.arvfile')
34 """split(path) -> streamname, filename
36 Separate the stream name and file name in a /-separated stream path and
37 return a tuple (stream_name, file_name). If no stream name is available,
42 stream_name, file_name = path.rsplit('/', 1)
43 except ValueError: # No / in string
44 stream_name, file_name = '.', path
45 return stream_name, file_name
48 class UnownedBlockError(Exception):
49 """Raised when there's an writable block without an owner on the BlockManager."""
53 class _FileLikeObjectBase(object):
54 def __init__(self, name, mode):
60 def _before_close(orig_func):
61 @functools.wraps(orig_func)
62 def before_close_wrapper(self, *args, **kwargs):
64 raise ValueError("I/O operation on closed stream file")
65 return orig_func(self, *args, **kwargs)
66 return before_close_wrapper
71 def __exit__(self, exc_type, exc_value, traceback):
82 class ArvadosFileReaderBase(_FileLikeObjectBase):
83 def __init__(self, name, mode, num_retries=None):
84 super(ArvadosFileReaderBase, self).__init__(name, mode)
86 self.num_retries = num_retries
87 self._readline_cache = (None, None)
91 data = self.readline()
96 def decompressed_name(self):
97 return re.sub('\.(bz2|gz)$', '', self.name)
99 @_FileLikeObjectBase._before_close
100 def seek(self, pos, whence=os.SEEK_SET):
101 if whence == os.SEEK_CUR:
103 elif whence == os.SEEK_END:
105 self._filepos = min(max(pos, 0), self.size())
110 @_FileLikeObjectBase._before_close
112 def readall(self, size=2**20, num_retries=None):
114 data = self.read(size, num_retries=num_retries)
119 @_FileLikeObjectBase._before_close
121 def readline(self, size=float('inf'), num_retries=None):
122 cache_pos, cache_data = self._readline_cache
123 if self.tell() == cache_pos:
125 self._filepos += len(cache_data)
128 data_size = len(data[-1])
129 while (data_size < size) and ('\n' not in data[-1]):
130 next_read = self.read(2 ** 20, num_retries=num_retries)
133 data.append(next_read)
134 data_size += len(next_read)
137 nextline_index = data.index('\n') + 1
139 nextline_index = len(data)
140 nextline_index = min(nextline_index, size)
141 self._filepos -= len(data) - nextline_index
142 self._readline_cache = (self.tell(), data[nextline_index:])
143 return data[:nextline_index]
145 @_FileLikeObjectBase._before_close
147 def decompress(self, decompress, size, num_retries=None):
148 for segment in self.readall(size, num_retries=num_retries):
149 data = decompress(segment)
153 @_FileLikeObjectBase._before_close
155 def readall_decompressed(self, size=2**20, num_retries=None):
157 if self.name.endswith('.bz2'):
158 dc = bz2.BZ2Decompressor()
159 return self.decompress(dc.decompress, size,
160 num_retries=num_retries)
161 elif self.name.endswith('.gz'):
162 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
163 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
164 size, num_retries=num_retries)
166 return self.readall(size, num_retries=num_retries)
168 @_FileLikeObjectBase._before_close
170 def readlines(self, sizehint=float('inf'), num_retries=None):
173 for s in self.readall(num_retries=num_retries):
176 if data_size >= sizehint:
178 return ''.join(data).splitlines(True)
181 raise NotImplementedError()
183 def read(self, size, num_retries=None):
184 raise NotImplementedError()
186 def readfrom(self, start, size, num_retries=None):
187 raise NotImplementedError()
190 class StreamFileReader(ArvadosFileReaderBase):
191 class _NameAttribute(str):
192 # The Python file API provides a plain .name attribute.
193 # Older SDK provided a name() method.
194 # This class provides both, for maximum compatibility.
198 def __init__(self, stream, segments, name):
199 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
200 self._stream = stream
201 self.segments = segments
203 def stream_name(self):
204 return self._stream.name()
207 n = self.segments[-1]
208 return n.range_start + n.range_size
210 @_FileLikeObjectBase._before_close
212 def read(self, size, num_retries=None):
213 """Read up to 'size' bytes from the stream, starting at the current file position"""
218 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
220 lr = available_chunks[0]
221 data = self._stream.readfrom(lr.locator+lr.segment_offset,
223 num_retries=num_retries)
225 self._filepos += len(data)
228 @_FileLikeObjectBase._before_close
230 def readfrom(self, start, size, num_retries=None):
231 """Read up to 'size' bytes from the stream, starting at 'start'"""
236 for lr in locators_and_ranges(self.segments, start, size):
237 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
238 num_retries=num_retries))
241 def as_manifest(self):
243 for r in self.segments:
244 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
245 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
248 def synchronized(orig_func):
249 @functools.wraps(orig_func)
250 def synchronized_wrapper(self, *args, **kwargs):
252 return orig_func(self, *args, **kwargs)
253 return synchronized_wrapper
256 class StateChangeError(Exception):
257 def __init__(self, message, state, nextstate):
258 super(StateChangeError, self).__init__(message)
260 self.nextstate = nextstate
262 class _BufferBlock(object):
263 """A stand-in for a Keep block that is in the process of being written.
265 Writers can append to it, get the size, and compute the Keep locator.
266 There are three valid states:
272 Block is in the process of being uploaded to Keep, append is an error.
275 The block has been written to Keep, its internal buffer has been
276 released, fetching the block will fetch it via keep client (since we
277 discarded the internal copy), and identifiers referring to the BufferBlock
278 can be replaced with the block locator.
287 def __init__(self, blockid, starting_capacity, owner):
290 the identifier for this block
293 the initial buffer capacity
296 ArvadosFile that owns this block
299 self.blockid = blockid
300 self.buffer_block = bytearray(starting_capacity)
301 self.buffer_view = memoryview(self.buffer_block)
302 self.write_pointer = 0
303 self._state = _BufferBlock.WRITABLE
306 self.lock = threading.Lock()
307 self.wait_for_commit = threading.Event()
311 def append(self, data):
312 """Append some data to the buffer.
314 Only valid if the block is in WRITABLE state. Implements an expanding
315 buffer, doubling capacity as needed to accomdate all the data.
318 if self._state == _BufferBlock.WRITABLE:
319 while (self.write_pointer+len(data)) > len(self.buffer_block):
320 new_buffer_block = bytearray(len(self.buffer_block) * 2)
321 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
322 self.buffer_block = new_buffer_block
323 self.buffer_view = memoryview(self.buffer_block)
324 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
325 self.write_pointer += len(data)
328 raise AssertionError("Buffer block is not writable")
330 STATE_TRANSITIONS = frozenset([
332 (PENDING, COMMITTED),
337 def set_state(self, nextstate, val=None):
338 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
339 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
340 self._state = nextstate
342 if self._state == _BufferBlock.PENDING:
343 self.wait_for_commit.clear()
345 if self._state == _BufferBlock.COMMITTED:
347 self.buffer_view = None
348 self.buffer_block = None
349 self.wait_for_commit.set()
351 if self._state == _BufferBlock.ERROR:
353 self.wait_for_commit.set()
360 """The amount of data written to the buffer."""
361 return self.write_pointer
365 """The Keep locator for this buffer's contents."""
366 if self._locator is None:
367 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
371 def clone(self, new_blockid, owner):
372 if self._state == _BufferBlock.COMMITTED:
373 raise AssertionError("Cannot duplicate committed buffer block")
374 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
375 bufferblock.append(self.buffer_view[0:self.size()])
381 self.buffer_block = None
382 self.buffer_view = None
385 class NoopLock(object):
389 def __exit__(self, exc_type, exc_value, traceback):
392 def acquire(self, blocking=False):
399 def must_be_writable(orig_func):
400 @functools.wraps(orig_func)
401 def must_be_writable_wrapper(self, *args, **kwargs):
402 if not self.writable():
403 raise IOError(errno.EROFS, "Collection is read-only.")
404 return orig_func(self, *args, **kwargs)
405 return must_be_writable_wrapper
408 class _BlockManager(object):
409 """BlockManager handles buffer blocks.
411 Also handles background block uploads, and background block prefetch for a
412 Collection of ArvadosFiles.
416 DEFAULT_PUT_THREADS = 2
417 DEFAULT_GET_THREADS = 2
419 def __init__(self, keep, copies=None, put_threads=None):
420 """keep: KeepClient object to use"""
422 self._bufferblocks = collections.OrderedDict()
423 self._put_queue = None
424 self._put_threads = None
425 self._prefetch_queue = None
426 self._prefetch_threads = None
427 self.lock = threading.Lock()
428 self.prefetch_enabled = True
430 self.num_put_threads = put_threads
432 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
433 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
435 self._pending_write_size = 0
436 self.threads_lock = threading.Lock()
439 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
440 """Allocate a new, empty bufferblock in WRITABLE state and return it.
443 optional block identifier, otherwise one will be automatically assigned
446 optional capacity, otherwise will use default capacity
449 ArvadosFile that owns this block
452 return self._alloc_bufferblock(blockid, starting_capacity, owner)
454 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
456 blockid = "%s" % uuid.uuid4()
457 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
458 self._bufferblocks[bufferblock.blockid] = bufferblock
462 def dup_block(self, block, owner):
463 """Create a new bufferblock initialized with the content of an existing bufferblock.
466 the buffer block to copy.
469 ArvadosFile that owns the new block
472 new_blockid = "bufferblock%i" % len(self._bufferblocks)
473 bufferblock = block.clone(new_blockid, owner)
474 self._bufferblocks[bufferblock.blockid] = bufferblock
478 def is_bufferblock(self, locator):
479 return locator in self._bufferblocks
481 def _commit_bufferblock_worker(self):
482 """Background uploader thread."""
486 bufferblock = self._put_queue.get()
487 if bufferblock is None:
490 if self.copies is None:
491 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
493 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
494 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
496 except Exception as e:
497 bufferblock.set_state(_BufferBlock.ERROR, e)
499 if self._put_queue is not None:
500 self._put_queue.task_done()
502 def start_put_threads(self):
503 with self.threads_lock:
504 if self._put_threads is None:
505 # Start uploader threads.
507 # If we don't limit the Queue size, the upload queue can quickly
508 # grow to take up gigabytes of RAM if the writing process is
509 # generating data more quickly than it can be send to the Keep
512 # With two upload threads and a queue size of 2, this means up to 4
513 # blocks pending. If they are full 64 MiB blocks, that means up to
514 # 256 MiB of internal buffering, which is the same size as the
515 # default download block cache in KeepClient.
516 self._put_queue = queue.Queue(maxsize=2)
518 self._put_threads = []
519 for i in range(0, self.num_put_threads):
520 thread = threading.Thread(target=self._commit_bufferblock_worker)
521 self._put_threads.append(thread)
525 def _block_prefetch_worker(self):
526 """The background downloader thread."""
529 b = self._prefetch_queue.get()
534 _logger.exception("Exception doing block prefetch")
537 def start_get_threads(self):
538 if self._prefetch_threads is None:
539 self._prefetch_queue = queue.Queue()
540 self._prefetch_threads = []
541 for i in range(0, self.num_get_threads):
542 thread = threading.Thread(target=self._block_prefetch_worker)
543 self._prefetch_threads.append(thread)
549 def stop_threads(self):
550 """Shut down and wait for background upload and download threads to finish."""
552 if self._put_threads is not None:
553 for t in self._put_threads:
554 self._put_queue.put(None)
555 for t in self._put_threads:
557 self._put_threads = None
558 self._put_queue = None
560 if self._prefetch_threads is not None:
561 for t in self._prefetch_threads:
562 self._prefetch_queue.put(None)
563 for t in self._prefetch_threads:
565 self._prefetch_threads = None
566 self._prefetch_queue = None
571 def __exit__(self, exc_type, exc_value, traceback):
575 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
576 """Packs small blocks together before uploading"""
577 self._pending_write_size += closed_file_size
579 # Check if there are enough small blocks for filling up one in full
580 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
582 # Search blocks ready for getting packed together before being committed to Keep.
583 # A WRITABLE block always has an owner.
584 # A WRITABLE block with its owner.closed() implies that it's
585 # size is <= KEEP_BLOCK_SIZE/2.
587 small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
588 except AttributeError:
589 # Writable blocks without owner shouldn't exist.
590 raise UnownedBlockError()
592 if len(small_blocks) <= 1:
593 # Not enough small blocks for repacking
596 # Update the pending write size count with its true value, just in case
597 # some small file was opened, written and closed several times.
598 self._pending_write_size = sum([b.size() for b in small_blocks])
599 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
602 new_bb = self._alloc_bufferblock()
603 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
604 bb = small_blocks.pop(0)
606 self._pending_write_size -= bb.size()
607 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
608 arvfile.set_segments([Range(new_bb.blockid,
611 new_bb.write_pointer - bb.size())])
612 self._delete_bufferblock(bb.blockid)
613 self.commit_bufferblock(new_bb, sync=sync)
615 def commit_bufferblock(self, block, sync):
616 """Initiate a background upload of a bufferblock.
619 The block object to upload
622 If `sync` is True, upload the block synchronously.
623 If `sync` is False, upload the block asynchronously. This will
624 return immediately unless the upload queue is at capacity, in
625 which case it will wait on an upload queue slot.
629 # Mark the block as PENDING so to disallow any more appends.
630 block.set_state(_BufferBlock.PENDING)
631 except StateChangeError as e:
632 if e.state == _BufferBlock.PENDING:
634 block.wait_for_commit.wait()
637 if block.state() == _BufferBlock.COMMITTED:
639 elif block.state() == _BufferBlock.ERROR:
646 if self.copies is None:
647 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
649 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
650 block.set_state(_BufferBlock.COMMITTED, loc)
651 except Exception as e:
652 block.set_state(_BufferBlock.ERROR, e)
655 self.start_put_threads()
656 self._put_queue.put(block)
659 def get_bufferblock(self, locator):
660 return self._bufferblocks.get(locator)
663 def delete_bufferblock(self, locator):
664 self._delete_bufferblock(locator)
666 def _delete_bufferblock(self, locator):
667 bb = self._bufferblocks[locator]
669 del self._bufferblocks[locator]
671 def get_block_contents(self, locator, num_retries, cache_only=False):
674 First checks to see if the locator is a BufferBlock and return that, if
675 not, passes the request through to KeepClient.get().
679 if locator in self._bufferblocks:
680 bufferblock = self._bufferblocks[locator]
681 if bufferblock.state() != _BufferBlock.COMMITTED:
682 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
684 locator = bufferblock._locator
686 return self._keep.get_from_cache(locator)
688 return self._keep.get(locator, num_retries=num_retries)
690 def commit_all(self):
691 """Commit all outstanding buffer blocks.
693 This is a synchronous call, and will not return until all buffer blocks
694 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
697 self.repack_small_blocks(force=True, sync=True)
700 items = list(self._bufferblocks.items())
703 if v.state() != _BufferBlock.COMMITTED and v.owner:
704 v.owner.flush(sync=False)
707 if self._put_queue is not None:
708 self._put_queue.join()
712 if v.state() == _BufferBlock.ERROR:
713 err.append((v.locator(), v.error))
715 raise KeepWriteError("Error writing some blocks", err, label="block")
718 # flush again with sync=True to remove committed bufferblocks from
721 v.owner.flush(sync=True)
723 def block_prefetch(self, locator):
724 """Initiate a background download of a block.
726 This assumes that the underlying KeepClient implements a block cache,
727 so repeated requests for the same block will not result in repeated
728 downloads (unless the block is evicted from the cache.) This method
733 if not self.prefetch_enabled:
736 if self._keep.get_from_cache(locator) is not None:
740 if locator in self._bufferblocks:
743 self.start_get_threads()
744 self._prefetch_queue.put(locator)
747 class ArvadosFile(object):
748 """Represent a file in a Collection.
750 ArvadosFile manages the underlying representation of a file in Keep as a
751 sequence of segments spanning a set of blocks, and implements random
754 This object may be accessed from multiple threads.
758 def __init__(self, parent, name, stream=[], segments=[]):
760 ArvadosFile constructor.
763 a list of Range objects representing a block stream
766 a list of Range objects representing segments
770 self._writers = set()
771 self._committed = False
773 self.lock = parent.root_collection().lock
775 self._add_segment(stream, s.locator, s.range_size)
776 self._current_bblock = None
779 return self.parent.writable()
782 def permission_expired(self, as_of_dt=None):
783 """Returns True if any of the segment's locators is expired"""
784 for r in self._segments:
785 if KeepLocator(r.locator).permission_expired(as_of_dt):
791 return copy.copy(self._segments)
794 def clone(self, new_parent, new_name):
795 """Make a copy of this file."""
796 cp = ArvadosFile(new_parent, new_name)
797 cp.replace_contents(self)
802 def replace_contents(self, other):
803 """Replace segments of this file with segments from another `ArvadosFile` object."""
807 for other_segment in other.segments():
808 new_loc = other_segment.locator
809 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
810 if other_segment.locator not in map_loc:
811 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
812 if bufferblock.state() != _BufferBlock.WRITABLE:
813 map_loc[other_segment.locator] = bufferblock.locator()
815 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
816 new_loc = map_loc[other_segment.locator]
818 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
820 self.set_committed(False)
822 def __eq__(self, other):
825 if not isinstance(other, ArvadosFile):
828 othersegs = other.segments()
830 if len(self._segments) != len(othersegs):
832 for i in range(0, len(othersegs)):
833 seg1 = self._segments[i]
838 if self.parent._my_block_manager().is_bufferblock(loc1):
839 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
841 if other.parent._my_block_manager().is_bufferblock(loc2):
842 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
844 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
845 seg1.range_start != seg2.range_start or
846 seg1.range_size != seg2.range_size or
847 seg1.segment_offset != seg2.segment_offset):
852 def __ne__(self, other):
853 return not self.__eq__(other)
856 def set_segments(self, segs):
857 self._segments = segs
860 def set_committed(self, value=True):
861 """Set committed flag.
863 If value is True, set committed to be True.
865 If value is False, set committed to be False for this and all parents.
867 if value == self._committed:
869 self._committed = value
870 if self._committed is False and self.parent is not None:
871 self.parent.set_committed(False)
875 """Get whether this is committed or not."""
876 return self._committed
879 def add_writer(self, writer):
880 """Add an ArvadosFileWriter reference to the list of writers"""
881 if isinstance(writer, ArvadosFileWriter):
882 self._writers.add(writer)
885 def remove_writer(self, writer, flush):
887 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
888 and do some block maintenance tasks.
890 self._writers.remove(writer)
892 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
893 # File writer closed, not small enough for repacking
896 # All writers closed and size is adequate for repacking
897 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
901 Get whether this is closed or not. When the writers list is empty, the file
902 is supposed to be closed.
904 return len(self._writers) == 0
908 def truncate(self, size):
909 """Shrink the size of the file.
911 If `size` is less than the size of the file, the file contents after
912 `size` will be discarded. If `size` is greater than the current size
913 of the file, an IOError will be raised.
916 if size < self.size():
918 for r in self._segments:
919 range_end = r.range_start+r.range_size
920 if r.range_start >= size:
921 # segment is past the trucate size, all done
923 elif size < range_end:
924 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
925 nr.segment_offset = r.segment_offset
931 self._segments = new_segs
932 self.set_committed(False)
933 elif size > self.size():
934 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
936 def readfrom(self, offset, size, num_retries, exact=False):
937 """Read up to `size` bytes from the file starting at `offset`.
940 If False (default), return less data than requested if the read
941 crosses a block boundary and the next block isn't cached. If True,
942 only return less data than requested when hitting EOF.
946 if size == 0 or offset >= self.size():
948 readsegs = locators_and_ranges(self._segments, offset, size)
949 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
954 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
956 blockview = memoryview(block)
957 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
963 if lr.locator not in locs:
964 self.parent._my_block_manager().block_prefetch(lr.locator)
969 def _repack_writes(self, num_retries):
970 """Test if the buffer block has more data than actual segments.
972 This happens when a buffered write over-writes a file range written in
973 a previous buffered write. Re-pack the buffer block for efficiency
974 and to avoid leaking information.
977 segs = self._segments
979 # Sum up the segments to get the total bytes of the file referencing
980 # into the buffer block.
981 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
982 write_total = sum([s.range_size for s in bufferblock_segs])
984 if write_total < self._current_bblock.size():
985 # There is more data in the buffer block than is actually accounted for by segments, so
986 # re-pack into a new buffer by copying over to a new buffer block.
987 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
988 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
989 for t in bufferblock_segs:
990 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
991 t.segment_offset = new_bb.size() - t.range_size
993 self._current_bblock = new_bb
997 def writeto(self, offset, data, num_retries):
998 """Write `data` to the file starting at `offset`.
1000 This will update existing bytes and/or extend the size of the file as
1007 if offset > self.size():
1008 raise ArgumentError("Offset is past the end of the file")
1010 if len(data) > config.KEEP_BLOCK_SIZE:
1011 # Chunk it up into smaller writes
1013 dataview = memoryview(data)
1014 while n < len(data):
1015 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1016 n += config.KEEP_BLOCK_SIZE
1019 self.set_committed(False)
1021 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1022 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1024 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1025 self._repack_writes(num_retries)
1026 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1027 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1028 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1030 self._current_bblock.append(data)
1032 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1034 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1039 def flush(self, sync=True, num_retries=0):
1040 """Flush the current bufferblock to Keep.
1043 If True, commit block synchronously, wait until buffer block has been written.
1044 If False, commit block asynchronously, return immediately after putting block into
1047 if self.committed():
1050 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1051 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1052 self._repack_writes(num_retries)
1053 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1057 for s in self._segments:
1058 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1060 if bb.state() != _BufferBlock.COMMITTED:
1061 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1062 to_delete.add(s.locator)
1063 s.locator = bb.locator()
1065 self.parent._my_block_manager().delete_bufferblock(s)
1067 self.parent.notify(MOD, self.parent, self.name, (self, self))
1071 def add_segment(self, blocks, pos, size):
1072 """Add a segment to the end of the file.
1074 `pos` and `offset` reference a section of the stream described by
1075 `blocks` (a list of Range objects)
1078 self._add_segment(blocks, pos, size)
1080 def _add_segment(self, blocks, pos, size):
1081 """Internal implementation of add_segment."""
1082 self.set_committed(False)
1083 for lr in locators_and_ranges(blocks, pos, size):
1084 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1085 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1086 self._segments.append(r)
1090 """Get the file size."""
1092 n = self._segments[-1]
1093 return n.range_start + n.range_size
1098 def manifest_text(self, stream_name=".", portable_locators=False,
1099 normalize=False, only_committed=False):
1102 for segment in self.segments:
1103 loc = segment.locator
1104 if self.parent._my_block_manager().is_bufferblock(loc):
1107 loc = self._bufferblocks[loc].calculate_locator()
1108 if portable_locators:
1109 loc = KeepLocator(loc).stripped()
1110 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1111 segment.segment_offset, segment.range_size))
1112 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1118 def _reparent(self, newparent, newname):
1119 self.set_committed(False)
1120 self.flush(sync=True)
1121 self.parent.remove(self.name)
1122 self.parent = newparent
1124 self.lock = self.parent.root_collection().lock
1127 class ArvadosFileReader(ArvadosFileReaderBase):
1128 """Wraps ArvadosFile in a file-like object supporting reading only.
1130 Be aware that this class is NOT thread safe as there is no locking around
1131 updating file pointer.
1135 def __init__(self, arvadosfile, num_retries=None):
1136 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1137 self.arvadosfile = arvadosfile
1140 return self.arvadosfile.size()
1142 def stream_name(self):
1143 return self.arvadosfile.parent.stream_name()
1145 @_FileLikeObjectBase._before_close
1147 def read(self, size=None, num_retries=None):
1148 """Read up to `size` bytes from the file and return the result.
1150 Starts at the current file position. If `size` is None, read the
1151 entire remainder of the file.
1155 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1158 self._filepos += len(rd)
1159 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1160 return ''.join(data)
1162 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1163 self._filepos += len(data)
1166 @_FileLikeObjectBase._before_close
1168 def readfrom(self, offset, size, num_retries=None):
1169 """Read up to `size` bytes from the stream, starting at the specified file offset.
1171 This method does not change the file position.
1173 return self.arvadosfile.readfrom(offset, size, num_retries)
1179 class ArvadosFileWriter(ArvadosFileReader):
1180 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1182 Be aware that this class is NOT thread safe as there is no locking around
1183 updating file pointer.
1187 def __init__(self, arvadosfile, mode, num_retries=None):
1188 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1190 self.arvadosfile.add_writer(self)
1192 @_FileLikeObjectBase._before_close
1194 def write(self, data, num_retries=None):
1195 if self.mode[0] == "a":
1196 self.arvadosfile.writeto(self.size(), data, num_retries)
1198 self.arvadosfile.writeto(self._filepos, data, num_retries)
1199 self._filepos += len(data)
1202 @_FileLikeObjectBase._before_close
1204 def writelines(self, seq, num_retries=None):
1206 self.write(s, num_retries=num_retries)
1208 @_FileLikeObjectBase._before_close
1209 def truncate(self, size=None):
1211 size = self._filepos
1212 self.arvadosfile.truncate(size)
1213 if self._filepos > self.size():
1214 self._filepos = self.size()
1216 @_FileLikeObjectBase._before_close
1218 self.arvadosfile.flush()
1220 def close(self, flush=True):
1222 self.arvadosfile.remove_writer(self, flush)
1223 super(ArvadosFileWriter, self).close()