16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
42 class UnownedBlockError(Exception):
43 """Raised when there's an writable block without an owner on the BlockManager."""
47 class _FileLikeObjectBase(object):
48 def __init__(self, name, mode):
54 def _before_close(orig_func):
55 @functools.wraps(orig_func)
56 def before_close_wrapper(self, *args, **kwargs):
58 raise ValueError("I/O operation on closed stream file")
59 return orig_func(self, *args, **kwargs)
60 return before_close_wrapper
65 def __exit__(self, exc_type, exc_value, traceback):
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77 def __init__(self, name, mode, num_retries=None):
78 super(ArvadosFileReaderBase, self).__init__(name, mode)
80 self.num_retries = num_retries
81 self._readline_cache = (None, None)
85 data = self.readline()
90 def decompressed_name(self):
91 return re.sub('\.(bz2|gz)$', '', self.name)
93 @_FileLikeObjectBase._before_close
94 def seek(self, pos, whence=os.SEEK_SET):
95 if whence == os.SEEK_CUR:
97 elif whence == os.SEEK_END:
99 self._filepos = min(max(pos, 0L), self.size())
104 @_FileLikeObjectBase._before_close
106 def readall(self, size=2**20, num_retries=None):
108 data = self.read(size, num_retries=num_retries)
113 @_FileLikeObjectBase._before_close
115 def readline(self, size=float('inf'), num_retries=None):
116 cache_pos, cache_data = self._readline_cache
117 if self.tell() == cache_pos:
119 self._filepos += len(cache_data)
122 data_size = len(data[-1])
123 while (data_size < size) and ('\n' not in data[-1]):
124 next_read = self.read(2 ** 20, num_retries=num_retries)
127 data.append(next_read)
128 data_size += len(next_read)
131 nextline_index = data.index('\n') + 1
133 nextline_index = len(data)
134 nextline_index = min(nextline_index, size)
135 self._filepos -= len(data) - nextline_index
136 self._readline_cache = (self.tell(), data[nextline_index:])
137 return data[:nextline_index]
139 @_FileLikeObjectBase._before_close
141 def decompress(self, decompress, size, num_retries=None):
142 for segment in self.readall(size, num_retries=num_retries):
143 data = decompress(segment)
147 @_FileLikeObjectBase._before_close
149 def readall_decompressed(self, size=2**20, num_retries=None):
151 if self.name.endswith('.bz2'):
152 dc = bz2.BZ2Decompressor()
153 return self.decompress(dc.decompress, size,
154 num_retries=num_retries)
155 elif self.name.endswith('.gz'):
156 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
157 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
158 size, num_retries=num_retries)
160 return self.readall(size, num_retries=num_retries)
162 @_FileLikeObjectBase._before_close
164 def readlines(self, sizehint=float('inf'), num_retries=None):
167 for s in self.readall(num_retries=num_retries):
170 if data_size >= sizehint:
172 return ''.join(data).splitlines(True)
175 raise NotImplementedError()
177 def read(self, size, num_retries=None):
178 raise NotImplementedError()
180 def readfrom(self, start, size, num_retries=None):
181 raise NotImplementedError()
184 class StreamFileReader(ArvadosFileReaderBase):
185 class _NameAttribute(str):
186 # The Python file API provides a plain .name attribute.
187 # Older SDK provided a name() method.
188 # This class provides both, for maximum compatibility.
192 def __init__(self, stream, segments, name):
193 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
194 self._stream = stream
195 self.segments = segments
197 def stream_name(self):
198 return self._stream.name()
201 n = self.segments[-1]
202 return n.range_start + n.range_size
204 @_FileLikeObjectBase._before_close
206 def read(self, size, num_retries=None):
207 """Read up to 'size' bytes from the stream, starting at the current file position"""
212 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
214 lr = available_chunks[0]
215 data = self._stream.readfrom(lr.locator+lr.segment_offset,
217 num_retries=num_retries)
219 self._filepos += len(data)
222 @_FileLikeObjectBase._before_close
224 def readfrom(self, start, size, num_retries=None):
225 """Read up to 'size' bytes from the stream, starting at 'start'"""
230 for lr in locators_and_ranges(self.segments, start, size):
231 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
232 num_retries=num_retries))
235 def as_manifest(self):
237 for r in self.segments:
238 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
239 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
242 def synchronized(orig_func):
243 @functools.wraps(orig_func)
244 def synchronized_wrapper(self, *args, **kwargs):
246 return orig_func(self, *args, **kwargs)
247 return synchronized_wrapper
250 class StateChangeError(Exception):
251 def __init__(self, message, state, nextstate):
252 super(StateChangeError, self).__init__(message)
254 self.nextstate = nextstate
256 class _BufferBlock(object):
257 """A stand-in for a Keep block that is in the process of being written.
259 Writers can append to it, get the size, and compute the Keep locator.
260 There are three valid states:
266 Block is in the process of being uploaded to Keep, append is an error.
269 The block has been written to Keep, its internal buffer has been
270 released, fetching the block will fetch it via keep client (since we
271 discarded the internal copy), and identifiers referring to the BufferBlock
272 can be replaced with the block locator.
281 def __init__(self, blockid, starting_capacity, owner):
284 the identifier for this block
287 the initial buffer capacity
290 ArvadosFile that owns this block
293 self.blockid = blockid
294 self.buffer_block = bytearray(starting_capacity)
295 self.buffer_view = memoryview(self.buffer_block)
296 self.write_pointer = 0
297 self._state = _BufferBlock.WRITABLE
300 self.lock = threading.Lock()
301 self.wait_for_commit = threading.Event()
305 def append(self, data):
306 """Append some data to the buffer.
308 Only valid if the block is in WRITABLE state. Implements an expanding
309 buffer, doubling capacity as needed to accomdate all the data.
312 if self._state == _BufferBlock.WRITABLE:
313 while (self.write_pointer+len(data)) > len(self.buffer_block):
314 new_buffer_block = bytearray(len(self.buffer_block) * 2)
315 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
316 self.buffer_block = new_buffer_block
317 self.buffer_view = memoryview(self.buffer_block)
318 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
319 self.write_pointer += len(data)
322 raise AssertionError("Buffer block is not writable")
324 STATE_TRANSITIONS = frozenset([
326 (PENDING, COMMITTED),
331 def set_state(self, nextstate, val=None):
332 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
333 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
334 self._state = nextstate
336 if self._state == _BufferBlock.PENDING:
337 self.wait_for_commit.clear()
339 if self._state == _BufferBlock.COMMITTED:
341 self.buffer_view = None
342 self.buffer_block = None
343 self.wait_for_commit.set()
345 if self._state == _BufferBlock.ERROR:
347 self.wait_for_commit.set()
354 """The amount of data written to the buffer."""
355 return self.write_pointer
359 """The Keep locator for this buffer's contents."""
360 if self._locator is None:
361 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
365 def clone(self, new_blockid, owner):
366 if self._state == _BufferBlock.COMMITTED:
367 raise AssertionError("Cannot duplicate committed buffer block")
368 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
369 bufferblock.append(self.buffer_view[0:self.size()])
375 self.buffer_block = None
376 self.buffer_view = None
379 class NoopLock(object):
383 def __exit__(self, exc_type, exc_value, traceback):
386 def acquire(self, blocking=False):
393 def must_be_writable(orig_func):
394 @functools.wraps(orig_func)
395 def must_be_writable_wrapper(self, *args, **kwargs):
396 if not self.writable():
397 raise IOError(errno.EROFS, "Collection is read-only.")
398 return orig_func(self, *args, **kwargs)
399 return must_be_writable_wrapper
402 class _BlockManager(object):
403 """BlockManager handles buffer blocks.
405 Also handles background block uploads, and background block prefetch for a
406 Collection of ArvadosFiles.
410 DEFAULT_PUT_THREADS = 2
411 DEFAULT_GET_THREADS = 2
413 def __init__(self, keep, copies=None, put_threads=None):
414 """keep: KeepClient object to use"""
416 self._bufferblocks = collections.OrderedDict()
417 self._put_queue = None
418 self._put_threads = None
419 self._prefetch_queue = None
420 self._prefetch_threads = None
421 self.lock = threading.Lock()
422 self.prefetch_enabled = True
424 self.num_put_threads = put_threads
426 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
427 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
429 self._pending_write_size = 0
430 self.threads_lock = threading.Lock()
433 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
434 """Allocate a new, empty bufferblock in WRITABLE state and return it.
437 optional block identifier, otherwise one will be automatically assigned
440 optional capacity, otherwise will use default capacity
443 ArvadosFile that owns this block
446 return self._alloc_bufferblock(blockid, starting_capacity, owner)
448 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
450 blockid = "%s" % uuid.uuid4()
451 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
452 self._bufferblocks[bufferblock.blockid] = bufferblock
456 def dup_block(self, block, owner):
457 """Create a new bufferblock initialized with the content of an existing bufferblock.
460 the buffer block to copy.
463 ArvadosFile that owns the new block
466 new_blockid = "bufferblock%i" % len(self._bufferblocks)
467 bufferblock = block.clone(new_blockid, owner)
468 self._bufferblocks[bufferblock.blockid] = bufferblock
472 def is_bufferblock(self, locator):
473 return locator in self._bufferblocks
475 def _commit_bufferblock_worker(self):
476 """Background uploader thread."""
480 bufferblock = self._put_queue.get()
481 if bufferblock is None:
484 if self.copies is None:
485 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
487 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
488 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
490 except Exception as e:
491 bufferblock.set_state(_BufferBlock.ERROR, e)
493 if self._put_queue is not None:
494 self._put_queue.task_done()
496 def start_put_threads(self):
497 with self.threads_lock:
498 if self._put_threads is None:
499 # Start uploader threads.
501 # If we don't limit the Queue size, the upload queue can quickly
502 # grow to take up gigabytes of RAM if the writing process is
503 # generating data more quickly than it can be send to the Keep
506 # With two upload threads and a queue size of 2, this means up to 4
507 # blocks pending. If they are full 64 MiB blocks, that means up to
508 # 256 MiB of internal buffering, which is the same size as the
509 # default download block cache in KeepClient.
510 self._put_queue = Queue.Queue(maxsize=2)
512 self._put_threads = []
513 for i in xrange(0, self.num_put_threads):
514 thread = threading.Thread(target=self._commit_bufferblock_worker)
515 self._put_threads.append(thread)
519 def _block_prefetch_worker(self):
520 """The background downloader thread."""
523 b = self._prefetch_queue.get()
528 _logger.exception("Exception doing block prefetch")
531 def start_get_threads(self):
532 if self._prefetch_threads is None:
533 self._prefetch_queue = Queue.Queue()
534 self._prefetch_threads = []
535 for i in xrange(0, self.num_get_threads):
536 thread = threading.Thread(target=self._block_prefetch_worker)
537 self._prefetch_threads.append(thread)
543 def stop_threads(self):
544 """Shut down and wait for background upload and download threads to finish."""
546 if self._put_threads is not None:
547 for t in self._put_threads:
548 self._put_queue.put(None)
549 for t in self._put_threads:
551 self._put_threads = None
552 self._put_queue = None
554 if self._prefetch_threads is not None:
555 for t in self._prefetch_threads:
556 self._prefetch_queue.put(None)
557 for t in self._prefetch_threads:
559 self._prefetch_threads = None
560 self._prefetch_queue = None
565 def __exit__(self, exc_type, exc_value, traceback):
569 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
570 """Packs small blocks together before uploading"""
571 self._pending_write_size += closed_file_size
573 # Check if there are enough small blocks for filling up one in full
574 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
576 # Search blocks ready for getting packed together before being committed to Keep.
577 # A WRITABLE block always has an owner.
578 # A WRITABLE block with its owner.closed() implies that it's
579 # size is <= KEEP_BLOCK_SIZE/2.
581 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
582 except AttributeError:
583 # Writable blocks without owner shouldn't exist.
584 raise UnownedBlockError()
586 if len(small_blocks) <= 1:
587 # Not enough small blocks for repacking
590 # Update the pending write size count with its true value, just in case
591 # some small file was opened, written and closed several times.
592 self._pending_write_size = sum([b.size() for b in small_blocks])
593 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
596 new_bb = self._alloc_bufferblock()
597 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
598 bb = small_blocks.pop(0)
600 self._pending_write_size -= bb.size()
601 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
602 arvfile.set_segments([Range(new_bb.blockid,
605 new_bb.write_pointer - bb.size())])
606 self._delete_bufferblock(bb.blockid)
607 self.commit_bufferblock(new_bb, sync=sync)
609 def commit_bufferblock(self, block, sync):
610 """Initiate a background upload of a bufferblock.
613 The block object to upload
616 If `sync` is True, upload the block synchronously.
617 If `sync` is False, upload the block asynchronously. This will
618 return immediately unless the upload queue is at capacity, in
619 which case it will wait on an upload queue slot.
623 # Mark the block as PENDING so to disallow any more appends.
624 block.set_state(_BufferBlock.PENDING)
625 except StateChangeError as e:
626 if e.state == _BufferBlock.PENDING:
628 block.wait_for_commit.wait()
631 if block.state() == _BufferBlock.COMMITTED:
633 elif block.state() == _BufferBlock.ERROR:
640 if self.copies is None:
641 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
643 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
644 block.set_state(_BufferBlock.COMMITTED, loc)
645 except Exception as e:
646 block.set_state(_BufferBlock.ERROR, e)
649 self.start_put_threads()
650 self._put_queue.put(block)
653 def get_bufferblock(self, locator):
654 return self._bufferblocks.get(locator)
657 def delete_bufferblock(self, locator):
658 self._delete_bufferblock(locator)
660 def _delete_bufferblock(self, locator):
661 bb = self._bufferblocks[locator]
663 del self._bufferblocks[locator]
665 def get_block_contents(self, locator, num_retries, cache_only=False):
668 First checks to see if the locator is a BufferBlock and return that, if
669 not, passes the request through to KeepClient.get().
673 if locator in self._bufferblocks:
674 bufferblock = self._bufferblocks[locator]
675 if bufferblock.state() != _BufferBlock.COMMITTED:
676 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
678 locator = bufferblock._locator
680 return self._keep.get_from_cache(locator)
682 return self._keep.get(locator, num_retries=num_retries)
684 def commit_all(self):
685 """Commit all outstanding buffer blocks.
687 This is a synchronous call, and will not return until all buffer blocks
688 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
691 self.repack_small_blocks(force=True, sync=True)
694 items = self._bufferblocks.items()
697 if v.state() != _BufferBlock.COMMITTED and v.owner:
698 v.owner.flush(sync=False)
701 if self._put_queue is not None:
702 self._put_queue.join()
706 if v.state() == _BufferBlock.ERROR:
707 err.append((v.locator(), v.error))
709 raise KeepWriteError("Error writing some blocks", err, label="block")
712 # flush again with sync=True to remove committed bufferblocks from
715 v.owner.flush(sync=True)
717 def block_prefetch(self, locator):
718 """Initiate a background download of a block.
720 This assumes that the underlying KeepClient implements a block cache,
721 so repeated requests for the same block will not result in repeated
722 downloads (unless the block is evicted from the cache.) This method
727 if not self.prefetch_enabled:
730 if self._keep.get_from_cache(locator) is not None:
734 if locator in self._bufferblocks:
737 self.start_get_threads()
738 self._prefetch_queue.put(locator)
741 class ArvadosFile(object):
742 """Represent a file in a Collection.
744 ArvadosFile manages the underlying representation of a file in Keep as a
745 sequence of segments spanning a set of blocks, and implements random
748 This object may be accessed from multiple threads.
752 def __init__(self, parent, name, stream=[], segments=[]):
754 ArvadosFile constructor.
757 a list of Range objects representing a block stream
760 a list of Range objects representing segments
764 self._writers = set()
765 self._committed = False
767 self.lock = parent.root_collection().lock
769 self._add_segment(stream, s.locator, s.range_size)
770 self._current_bblock = None
773 return self.parent.writable()
776 def permission_expired(self, as_of_dt=None):
777 """Returns True if any of the segment's locators is expired"""
778 for r in self._segments:
779 if KeepLocator(r.locator).permission_expired(as_of_dt):
785 return copy.copy(self._segments)
788 def clone(self, new_parent, new_name):
789 """Make a copy of this file."""
790 cp = ArvadosFile(new_parent, new_name)
791 cp.replace_contents(self)
796 def replace_contents(self, other):
797 """Replace segments of this file with segments from another `ArvadosFile` object."""
801 for other_segment in other.segments():
802 new_loc = other_segment.locator
803 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
804 if other_segment.locator not in map_loc:
805 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
806 if bufferblock.state() != _BufferBlock.WRITABLE:
807 map_loc[other_segment.locator] = bufferblock.locator()
809 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
810 new_loc = map_loc[other_segment.locator]
812 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
814 self.set_committed(False)
816 def __eq__(self, other):
819 if not isinstance(other, ArvadosFile):
822 othersegs = other.segments()
824 if len(self._segments) != len(othersegs):
826 for i in xrange(0, len(othersegs)):
827 seg1 = self._segments[i]
832 if self.parent._my_block_manager().is_bufferblock(loc1):
833 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
835 if other.parent._my_block_manager().is_bufferblock(loc2):
836 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
838 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
839 seg1.range_start != seg2.range_start or
840 seg1.range_size != seg2.range_size or
841 seg1.segment_offset != seg2.segment_offset):
846 def __ne__(self, other):
847 return not self.__eq__(other)
850 def set_segments(self, segs):
851 self._segments = segs
854 def set_committed(self, value=True):
855 """Set committed flag.
857 If value is True, set committed to be True.
859 If value is False, set committed to be False for this and all parents.
861 if value == self._committed:
863 self._committed = value
864 if self._committed is False and self.parent is not None:
865 self.parent.set_committed(False)
869 """Get whether this is committed or not."""
870 return self._committed
873 def add_writer(self, writer):
874 """Add an ArvadosFileWriter reference to the list of writers"""
875 if isinstance(writer, ArvadosFileWriter):
876 self._writers.add(writer)
879 def remove_writer(self, writer, flush):
881 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
882 and do some block maintenance tasks.
884 self._writers.remove(writer)
886 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
887 # File writer closed, not small enough for repacking
890 # All writers closed and size is adequate for repacking
891 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
895 Get whether this is closed or not. When the writers list is empty, the file
896 is supposed to be closed.
898 return len(self._writers) == 0
902 def truncate(self, size):
903 """Shrink the size of the file.
905 If `size` is less than the size of the file, the file contents after
906 `size` will be discarded. If `size` is greater than the current size
907 of the file, an IOError will be raised.
910 if size < self.size():
912 for r in self._segments:
913 range_end = r.range_start+r.range_size
914 if r.range_start >= size:
915 # segment is past the trucate size, all done
917 elif size < range_end:
918 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
919 nr.segment_offset = r.segment_offset
925 self._segments = new_segs
926 self.set_committed(False)
927 elif size > self.size():
928 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
930 def readfrom(self, offset, size, num_retries, exact=False):
931 """Read up to `size` bytes from the file starting at `offset`.
934 If False (default), return less data than requested if the read
935 crosses a block boundary and the next block isn't cached. If True,
936 only return less data than requested when hitting EOF.
940 if size == 0 or offset >= self.size():
942 readsegs = locators_and_ranges(self._segments, offset, size)
943 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
948 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
950 blockview = memoryview(block)
951 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
957 if lr.locator not in locs:
958 self.parent._my_block_manager().block_prefetch(lr.locator)
963 def _repack_writes(self, num_retries):
964 """Test if the buffer block has more data than actual segments.
966 This happens when a buffered write over-writes a file range written in
967 a previous buffered write. Re-pack the buffer block for efficiency
968 and to avoid leaking information.
971 segs = self._segments
973 # Sum up the segments to get the total bytes of the file referencing
974 # into the buffer block.
975 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
976 write_total = sum([s.range_size for s in bufferblock_segs])
978 if write_total < self._current_bblock.size():
979 # There is more data in the buffer block than is actually accounted for by segments, so
980 # re-pack into a new buffer by copying over to a new buffer block.
981 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
982 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
983 for t in bufferblock_segs:
984 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
985 t.segment_offset = new_bb.size() - t.range_size
987 self._current_bblock = new_bb
991 def writeto(self, offset, data, num_retries):
992 """Write `data` to the file starting at `offset`.
994 This will update existing bytes and/or extend the size of the file as
1001 if offset > self.size():
1002 raise ArgumentError("Offset is past the end of the file")
1004 if len(data) > config.KEEP_BLOCK_SIZE:
1005 # Chunk it up into smaller writes
1007 dataview = memoryview(data)
1008 while n < len(data):
1009 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1010 n += config.KEEP_BLOCK_SIZE
1013 self.set_committed(False)
1015 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1016 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1018 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1019 self._repack_writes(num_retries)
1020 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1021 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1022 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1024 self._current_bblock.append(data)
1026 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1028 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1033 def flush(self, sync=True, num_retries=0):
1034 """Flush the current bufferblock to Keep.
1037 If True, commit block synchronously, wait until buffer block has been written.
1038 If False, commit block asynchronously, return immediately after putting block into
1041 if self.committed():
1044 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1045 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1046 self._repack_writes(num_retries)
1047 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1051 for s in self._segments:
1052 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1054 if bb.state() != _BufferBlock.COMMITTED:
1055 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1056 to_delete.add(s.locator)
1057 s.locator = bb.locator()
1059 self.parent._my_block_manager().delete_bufferblock(s)
1061 self.parent.notify(MOD, self.parent, self.name, (self, self))
1065 def add_segment(self, blocks, pos, size):
1066 """Add a segment to the end of the file.
1068 `pos` and `offset` reference a section of the stream described by
1069 `blocks` (a list of Range objects)
1072 self._add_segment(blocks, pos, size)
1074 def _add_segment(self, blocks, pos, size):
1075 """Internal implementation of add_segment."""
1076 self.set_committed(False)
1077 for lr in locators_and_ranges(blocks, pos, size):
1078 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1079 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1080 self._segments.append(r)
1084 """Get the file size."""
1086 n = self._segments[-1]
1087 return n.range_start + n.range_size
1092 def manifest_text(self, stream_name=".", portable_locators=False,
1093 normalize=False, only_committed=False):
1096 for segment in self.segments:
1097 loc = segment.locator
1098 if self.parent._my_block_manager().is_bufferblock(loc):
1101 loc = self._bufferblocks[loc].calculate_locator()
1102 if portable_locators:
1103 loc = KeepLocator(loc).stripped()
1104 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1105 segment.segment_offset, segment.range_size))
1106 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1112 def _reparent(self, newparent, newname):
1113 self.set_committed(False)
1114 self.flush(sync=True)
1115 self.parent.remove(self.name)
1116 self.parent = newparent
1118 self.lock = self.parent.root_collection().lock
1121 class ArvadosFileReader(ArvadosFileReaderBase):
1122 """Wraps ArvadosFile in a file-like object supporting reading only.
1124 Be aware that this class is NOT thread safe as there is no locking around
1125 updating file pointer.
1129 def __init__(self, arvadosfile, num_retries=None):
1130 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1131 self.arvadosfile = arvadosfile
1134 return self.arvadosfile.size()
1136 def stream_name(self):
1137 return self.arvadosfile.parent.stream_name()
1139 @_FileLikeObjectBase._before_close
1141 def read(self, size=None, num_retries=None):
1142 """Read up to `size` bytes from the file and return the result.
1144 Starts at the current file position. If `size` is None, read the
1145 entire remainder of the file.
1149 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1152 self._filepos += len(rd)
1153 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1154 return ''.join(data)
1156 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1157 self._filepos += len(data)
1160 @_FileLikeObjectBase._before_close
1162 def readfrom(self, offset, size, num_retries=None):
1163 """Read up to `size` bytes from the stream, starting at the specified file offset.
1165 This method does not change the file position.
1167 return self.arvadosfile.readfrom(offset, size, num_retries)
1173 class ArvadosFileWriter(ArvadosFileReader):
1174 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1176 Be aware that this class is NOT thread safe as there is no locking around
1177 updating file pointer.
1181 def __init__(self, arvadosfile, mode, num_retries=None):
1182 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1184 self.arvadosfile.add_writer(self)
1186 @_FileLikeObjectBase._before_close
1188 def write(self, data, num_retries=None):
1189 if self.mode[0] == "a":
1190 self.arvadosfile.writeto(self.size(), data, num_retries)
1192 self.arvadosfile.writeto(self._filepos, data, num_retries)
1193 self._filepos += len(data)
1196 @_FileLikeObjectBase._before_close
1198 def writelines(self, seq, num_retries=None):
1200 self.write(s, num_retries=num_retries)
1202 @_FileLikeObjectBase._before_close
1203 def truncate(self, size=None):
1205 size = self._filepos
1206 self.arvadosfile.truncate(size)
1207 if self._filepos > self.size():
1208 self._filepos = self.size()
1210 @_FileLikeObjectBase._before_close
1212 self.arvadosfile.flush()
1214 def close(self, flush=True):
1216 self.arvadosfile.remove_writer(self, flush)
1217 super(ArvadosFileWriter, self).close()