16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
41 class _FileLikeObjectBase(object):
42 def __init__(self, name, mode):
48 def _before_close(orig_func):
49 @functools.wraps(orig_func)
50 def before_close_wrapper(self, *args, **kwargs):
52 raise ValueError("I/O operation on closed stream file")
53 return orig_func(self, *args, **kwargs)
54 return before_close_wrapper
59 def __exit__(self, exc_type, exc_value, traceback):
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71 def __init__(self, name, mode, num_retries=None):
72 super(ArvadosFileReaderBase, self).__init__(name, mode)
74 self.num_retries = num_retries
75 self._readline_cache = (None, None)
79 data = self.readline()
84 def decompressed_name(self):
85 return re.sub('\.(bz2|gz)$', '', self.name)
87 @_FileLikeObjectBase._before_close
88 def seek(self, pos, whence=os.SEEK_SET):
89 if whence == os.SEEK_CUR:
91 elif whence == os.SEEK_END:
93 self._filepos = min(max(pos, 0L), self.size())
98 @_FileLikeObjectBase._before_close
100 def readall(self, size=2**20, num_retries=None):
102 data = self.read(size, num_retries=num_retries)
107 @_FileLikeObjectBase._before_close
109 def readline(self, size=float('inf'), num_retries=None):
110 cache_pos, cache_data = self._readline_cache
111 if self.tell() == cache_pos:
113 self._filepos += len(cache_data)
116 data_size = len(data[-1])
117 while (data_size < size) and ('\n' not in data[-1]):
118 next_read = self.read(2 ** 20, num_retries=num_retries)
121 data.append(next_read)
122 data_size += len(next_read)
125 nextline_index = data.index('\n') + 1
127 nextline_index = len(data)
128 nextline_index = min(nextline_index, size)
129 self._filepos -= len(data) - nextline_index
130 self._readline_cache = (self.tell(), data[nextline_index:])
131 return data[:nextline_index]
133 @_FileLikeObjectBase._before_close
135 def decompress(self, decompress, size, num_retries=None):
136 for segment in self.readall(size, num_retries=num_retries):
137 data = decompress(segment)
141 @_FileLikeObjectBase._before_close
143 def readall_decompressed(self, size=2**20, num_retries=None):
145 if self.name.endswith('.bz2'):
146 dc = bz2.BZ2Decompressor()
147 return self.decompress(dc.decompress, size,
148 num_retries=num_retries)
149 elif self.name.endswith('.gz'):
150 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152 size, num_retries=num_retries)
154 return self.readall(size, num_retries=num_retries)
156 @_FileLikeObjectBase._before_close
158 def readlines(self, sizehint=float('inf'), num_retries=None):
161 for s in self.readall(num_retries=num_retries):
164 if data_size >= sizehint:
166 return ''.join(data).splitlines(True)
169 raise NotImplementedError()
171 def read(self, size, num_retries=None):
172 raise NotImplementedError()
174 def readfrom(self, start, size, num_retries=None):
175 raise NotImplementedError()
178 class StreamFileReader(ArvadosFileReaderBase):
179 class _NameAttribute(str):
180 # The Python file API provides a plain .name attribute.
181 # Older SDK provided a name() method.
182 # This class provides both, for maximum compatibility.
186 def __init__(self, stream, segments, name):
187 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188 self._stream = stream
189 self.segments = segments
191 def stream_name(self):
192 return self._stream.name()
195 n = self.segments[-1]
196 return n.range_start + n.range_size
198 @_FileLikeObjectBase._before_close
200 def read(self, size, num_retries=None):
201 """Read up to 'size' bytes from the stream, starting at the current file position"""
206 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
208 lr = available_chunks[0]
209 data = self._stream.readfrom(lr.locator+lr.segment_offset,
211 num_retries=num_retries)
213 self._filepos += len(data)
216 @_FileLikeObjectBase._before_close
218 def readfrom(self, start, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at 'start'"""
224 for lr in locators_and_ranges(self.segments, start, size):
225 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226 num_retries=num_retries))
229 def as_manifest(self):
231 for r in self.segments:
232 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
236 def synchronized(orig_func):
237 @functools.wraps(orig_func)
238 def synchronized_wrapper(self, *args, **kwargs):
240 return orig_func(self, *args, **kwargs)
241 return synchronized_wrapper
244 class StateChangeError(Exception):
245 def __init__(self, message, state, nextstate):
246 super(StateChangeError, self).__init__(message)
248 self.nextstate = nextstate
250 class _BufferBlock(object):
251 """A stand-in for a Keep block that is in the process of being written.
253 Writers can append to it, get the size, and compute the Keep locator.
254 There are three valid states:
260 Block is in the process of being uploaded to Keep, append is an error.
263 The block has been written to Keep, its internal buffer has been
264 released, fetching the block will fetch it via keep client (since we
265 discarded the internal copy), and identifiers referring to the BufferBlock
266 can be replaced with the block locator.
275 def __init__(self, blockid, starting_capacity, owner):
278 the identifier for this block
281 the initial buffer capacity
284 ArvadosFile that owns this block
287 self.blockid = blockid
288 self.buffer_block = bytearray(starting_capacity)
289 self.buffer_view = memoryview(self.buffer_block)
290 self.write_pointer = 0
291 self._state = _BufferBlock.WRITABLE
294 self.lock = threading.Lock()
295 self.wait_for_commit = threading.Event()
299 def append(self, data):
300 """Append some data to the buffer.
302 Only valid if the block is in WRITABLE state. Implements an expanding
303 buffer, doubling capacity as needed to accomdate all the data.
306 if self._state == _BufferBlock.WRITABLE:
307 while (self.write_pointer+len(data)) > len(self.buffer_block):
308 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310 self.buffer_block = new_buffer_block
311 self.buffer_view = memoryview(self.buffer_block)
312 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313 self.write_pointer += len(data)
316 raise AssertionError("Buffer block is not writable")
318 STATE_TRANSITIONS = frozenset([
320 (PENDING, COMMITTED),
325 def set_state(self, nextstate, val=None):
326 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328 self._state = nextstate
330 if self._state == _BufferBlock.PENDING:
331 self.wait_for_commit.clear()
333 if self._state == _BufferBlock.COMMITTED:
335 self.buffer_view = None
336 self.buffer_block = None
337 self.wait_for_commit.set()
339 if self._state == _BufferBlock.ERROR:
341 self.wait_for_commit.set()
348 """The amount of data written to the buffer."""
349 return self.write_pointer
353 """The Keep locator for this buffer's contents."""
354 if self._locator is None:
355 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
359 def clone(self, new_blockid, owner):
360 if self._state == _BufferBlock.COMMITTED:
361 raise AssertionError("Cannot duplicate committed buffer block")
362 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363 bufferblock.append(self.buffer_view[0:self.size()])
369 self.buffer_block = None
370 self.buffer_view = None
373 class NoopLock(object):
377 def __exit__(self, exc_type, exc_value, traceback):
380 def acquire(self, blocking=False):
387 def must_be_writable(orig_func):
388 @functools.wraps(orig_func)
389 def must_be_writable_wrapper(self, *args, **kwargs):
390 if not self.writable():
391 raise IOError(errno.EROFS, "Collection is read-only.")
392 return orig_func(self, *args, **kwargs)
393 return must_be_writable_wrapper
396 class _BlockManager(object):
397 """BlockManager handles buffer blocks.
399 Also handles background block uploads, and background block prefetch for a
400 Collection of ArvadosFiles.
404 DEFAULT_PUT_THREADS = 2
405 DEFAULT_GET_THREADS = 2
407 def __init__(self, keep, copies=None):
408 """keep: KeepClient object to use"""
410 self._bufferblocks = collections.OrderedDict()
411 self._put_queue = None
412 self._put_threads = None
413 self._prefetch_queue = None
414 self._prefetch_threads = None
415 self.lock = threading.Lock()
416 self.prefetch_enabled = True
417 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
422 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
423 """Allocate a new, empty bufferblock in WRITABLE state and return it.
426 optional block identifier, otherwise one will be automatically assigned
429 optional capacity, otherwise will use default capacity
432 ArvadosFile that owns this block
436 blockid = "%s" % uuid.uuid4()
437 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
438 self._bufferblocks[bufferblock.blockid] = bufferblock
442 def dup_block(self, block, owner):
443 """Create a new bufferblock initialized with the content of an existing bufferblock.
446 the buffer block to copy.
449 ArvadosFile that owns the new block
452 new_blockid = "bufferblock%i" % len(self._bufferblocks)
453 bufferblock = block.clone(new_blockid, owner)
454 self._bufferblocks[bufferblock.blockid] = bufferblock
458 def is_bufferblock(self, locator):
459 return locator in self._bufferblocks
461 def _commit_bufferblock_worker(self):
462 """Background uploader thread."""
466 bufferblock = self._put_queue.get()
467 if bufferblock is None:
470 if self.copies is None:
471 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
473 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
474 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
476 except Exception as e:
477 bufferblock.set_state(_BufferBlock.ERROR, e)
479 if self._put_queue is not None:
480 self._put_queue.task_done()
483 def start_put_threads(self):
484 if self._put_threads is None:
485 # Start uploader threads.
487 # If we don't limit the Queue size, the upload queue can quickly
488 # grow to take up gigabytes of RAM if the writing process is
489 # generating data more quickly than it can be send to the Keep
492 # With two upload threads and a queue size of 2, this means up to 4
493 # blocks pending. If they are full 64 MiB blocks, that means up to
494 # 256 MiB of internal buffering, which is the same size as the
495 # default download block cache in KeepClient.
496 self._put_queue = Queue.Queue(maxsize=2)
498 self._put_threads = []
499 for i in xrange(0, self.num_put_threads):
500 thread = threading.Thread(target=self._commit_bufferblock_worker)
501 self._put_threads.append(thread)
505 def _block_prefetch_worker(self):
506 """The background downloader thread."""
509 b = self._prefetch_queue.get()
517 def start_get_threads(self):
518 if self._prefetch_threads is None:
519 self._prefetch_queue = Queue.Queue()
520 self._prefetch_threads = []
521 for i in xrange(0, self.num_get_threads):
522 thread = threading.Thread(target=self._block_prefetch_worker)
523 self._prefetch_threads.append(thread)
529 def stop_threads(self):
530 """Shut down and wait for background upload and download threads to finish."""
532 if self._put_threads is not None:
533 for t in self._put_threads:
534 self._put_queue.put(None)
535 for t in self._put_threads:
537 self._put_threads = None
538 self._put_queue = None
540 if self._prefetch_threads is not None:
541 for t in self._prefetch_threads:
542 self._prefetch_queue.put(None)
543 for t in self._prefetch_threads:
545 self._prefetch_threads = None
546 self._prefetch_queue = None
551 def __exit__(self, exc_type, exc_value, traceback):
555 def repack_small_blocks(self, force=False, sync=False):
556 """Packs small blocks together before uploading"""
557 # Search blocks ready for getting packed together before being committed to Keep.
558 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
559 if len(small_blocks) <= 1:
560 # Not enough small blocks for repacking
563 # Check if there are enough small blocks for filling up one in full
564 pending_write_size = sum([b.size() for b in small_blocks])
565 if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
566 new_bb = _BufferBlock("%s" % uuid.uuid4(), 2**14, None)
567 self._bufferblocks[new_bb.blockid] = new_bb
568 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
569 bb = small_blocks.pop(0)
571 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
572 arvfile.set_segments([Range(new_bb.blockid,
575 new_bb.write_pointer - bb.size())])
577 del self._bufferblocks[bb.blockid]
578 self.commit_bufferblock(new_bb, sync=sync)
580 def commit_bufferblock(self, block, sync):
581 """Initiate a background upload of a bufferblock.
584 The block object to upload
587 If `sync` is True, upload the block synchronously.
588 If `sync` is False, upload the block asynchronously. This will
589 return immediately unless the upload queue is at capacity, in
590 which case it will wait on an upload queue slot.
594 # Mark the block as PENDING so to disallow any more appends.
595 block.set_state(_BufferBlock.PENDING)
596 except StateChangeError as e:
597 if e.state == _BufferBlock.PENDING:
599 block.wait_for_commit.wait()
602 if block.state() == _BufferBlock.COMMITTED:
604 elif block.state() == _BufferBlock.ERROR:
611 if self.copies is None:
612 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
614 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
615 block.set_state(_BufferBlock.COMMITTED, loc)
616 except Exception as e:
617 block.set_state(_BufferBlock.ERROR, e)
620 self.start_put_threads()
621 self._put_queue.put(block)
624 def get_bufferblock(self, locator):
625 return self._bufferblocks.get(locator)
628 def delete_bufferblock(self, locator):
629 bb = self._bufferblocks[locator]
631 del self._bufferblocks[locator]
633 def get_block_contents(self, locator, num_retries, cache_only=False):
636 First checks to see if the locator is a BufferBlock and return that, if
637 not, passes the request through to KeepClient.get().
641 if locator in self._bufferblocks:
642 bufferblock = self._bufferblocks[locator]
643 if bufferblock.state() != _BufferBlock.COMMITTED:
644 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
646 locator = bufferblock._locator
648 return self._keep.get_from_cache(locator)
650 return self._keep.get(locator, num_retries=num_retries)
652 def commit_all(self):
653 """Commit all outstanding buffer blocks.
655 This is a synchronous call, and will not return until all buffer blocks
656 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
659 self.repack_small_blocks(force=True, sync=True)
662 items = self._bufferblocks.items()
665 if v.state() != _BufferBlock.COMMITTED and v.owner:
666 v.owner.flush(sync=False)
669 if self._put_queue is not None:
670 self._put_queue.join()
674 if v.state() == _BufferBlock.ERROR:
675 err.append((v.locator(), v.error))
677 raise KeepWriteError("Error writing some blocks", err, label="block")
680 # flush again with sync=True to remove committed bufferblocks from
683 v.owner.flush(sync=True)
685 def block_prefetch(self, locator):
686 """Initiate a background download of a block.
688 This assumes that the underlying KeepClient implements a block cache,
689 so repeated requests for the same block will not result in repeated
690 downloads (unless the block is evicted from the cache.) This method
695 if not self.prefetch_enabled:
698 if self._keep.get_from_cache(locator) is not None:
702 if locator in self._bufferblocks:
705 self.start_get_threads()
706 self._prefetch_queue.put(locator)
709 class ArvadosFile(object):
710 """Represent a file in a Collection.
712 ArvadosFile manages the underlying representation of a file in Keep as a
713 sequence of segments spanning a set of blocks, and implements random
716 This object may be accessed from multiple threads.
720 def __init__(self, parent, name, stream=[], segments=[]):
722 ArvadosFile constructor.
725 a list of Range objects representing a block stream
728 a list of Range objects representing segments
732 self._writers = set()
733 self._committed = False
735 self.lock = parent.root_collection().lock
737 self._add_segment(stream, s.locator, s.range_size)
738 self._current_bblock = None
741 return self.parent.writable()
745 return copy.copy(self._segments)
748 def clone(self, new_parent, new_name):
749 """Make a copy of this file."""
750 cp = ArvadosFile(new_parent, new_name)
751 cp.replace_contents(self)
756 def replace_contents(self, other):
757 """Replace segments of this file with segments from another `ArvadosFile` object."""
761 for other_segment in other.segments():
762 new_loc = other_segment.locator
763 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
764 if other_segment.locator not in map_loc:
765 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
766 if bufferblock.state() != _BufferBlock.WRITABLE:
767 map_loc[other_segment.locator] = bufferblock.locator()
769 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
770 new_loc = map_loc[other_segment.locator]
772 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
774 self._committed = False
776 def __eq__(self, other):
779 if not isinstance(other, ArvadosFile):
782 othersegs = other.segments()
784 if len(self._segments) != len(othersegs):
786 for i in xrange(0, len(othersegs)):
787 seg1 = self._segments[i]
792 if self.parent._my_block_manager().is_bufferblock(loc1):
793 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
795 if other.parent._my_block_manager().is_bufferblock(loc2):
796 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
798 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
799 seg1.range_start != seg2.range_start or
800 seg1.range_size != seg2.range_size or
801 seg1.segment_offset != seg2.segment_offset):
806 def __ne__(self, other):
807 return not self.__eq__(other)
810 def set_segments(self, segs):
811 self._segments = segs
814 def set_committed(self):
815 """Set committed flag to True"""
816 self._committed = True
820 """Get whether this is committed or not."""
821 return self._committed
824 def add_writer(self, writer):
825 """Add an ArvadosFileWriter reference to the list of writers"""
826 if isinstance(writer, ArvadosFileWriter):
827 self._writers.add(writer)
830 def remove_writer(self, writer):
832 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
833 and do some block maintenance tasks.
835 self._writers.remove(writer)
837 if self.size() > config.KEEP_BLOCK_SIZE / 2:
838 # File writer closed, not small enough for repacking
841 # All writers closed and size is adequate for repacking
842 self.parent._my_block_manager().repack_small_blocks()
846 Get whether this is closed or not. When the writers list is empty, the file
847 is supposed to be closed.
849 return len(self._writers) == 0
853 def truncate(self, size):
854 """Shrink the size of the file.
856 If `size` is less than the size of the file, the file contents after
857 `size` will be discarded. If `size` is greater than the current size
858 of the file, an IOError will be raised.
861 if size < self.size():
863 for r in self._segments:
864 range_end = r.range_start+r.range_size
865 if r.range_start >= size:
866 # segment is past the trucate size, all done
868 elif size < range_end:
869 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
870 nr.segment_offset = r.segment_offset
876 self._segments = new_segs
877 self._committed = False
878 elif size > self.size():
879 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
881 def readfrom(self, offset, size, num_retries, exact=False):
882 """Read up to `size` bytes from the file starting at `offset`.
885 If False (default), return less data than requested if the read
886 crosses a block boundary and the next block isn't cached. If True,
887 only return less data than requested when hitting EOF.
891 if size == 0 or offset >= self.size():
893 readsegs = locators_and_ranges(self._segments, offset, size)
894 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
899 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
901 blockview = memoryview(block)
902 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
908 if lr.locator not in locs:
909 self.parent._my_block_manager().block_prefetch(lr.locator)
914 def _repack_writes(self, num_retries):
915 """Test if the buffer block has more data than actual segments.
917 This happens when a buffered write over-writes a file range written in
918 a previous buffered write. Re-pack the buffer block for efficiency
919 and to avoid leaking information.
922 segs = self._segments
924 # Sum up the segments to get the total bytes of the file referencing
925 # into the buffer block.
926 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
927 write_total = sum([s.range_size for s in bufferblock_segs])
929 if write_total < self._current_bblock.size():
930 # There is more data in the buffer block than is actually accounted for by segments, so
931 # re-pack into a new buffer by copying over to a new buffer block.
932 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
933 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
934 for t in bufferblock_segs:
935 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
936 t.segment_offset = new_bb.size() - t.range_size
938 self._current_bblock = new_bb
942 def writeto(self, offset, data, num_retries):
943 """Write `data` to the file starting at `offset`.
945 This will update existing bytes and/or extend the size of the file as
952 if offset > self.size():
953 raise ArgumentError("Offset is past the end of the file")
955 if len(data) > config.KEEP_BLOCK_SIZE:
956 # Chunk it up into smaller writes
958 dataview = memoryview(data)
960 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
961 n += config.KEEP_BLOCK_SIZE
964 self._committed = False
966 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
967 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
969 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
970 self._repack_writes(num_retries)
971 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
972 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
973 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
975 self._current_bblock.append(data)
977 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
979 self.parent.notify(WRITE, self.parent, self.name, (self, self))
984 def flush(self, sync=True, num_retries=0):
985 """Flush the current bufferblock to Keep.
988 If True, commit block synchronously, wait until buffer block has been written.
989 If False, commit block asynchronously, return immediately after putting block into
995 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
996 if self._current_bblock.state() == _BufferBlock.WRITABLE:
997 self._repack_writes(num_retries)
998 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1002 for s in self._segments:
1003 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1005 if bb.state() != _BufferBlock.COMMITTED:
1006 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1007 to_delete.add(s.locator)
1008 s.locator = bb.locator()
1010 self.parent._my_block_manager().delete_bufferblock(s)
1012 self.parent.notify(MOD, self.parent, self.name, (self, self))
1016 def add_segment(self, blocks, pos, size):
1017 """Add a segment to the end of the file.
1019 `pos` and `offset` reference a section of the stream described by
1020 `blocks` (a list of Range objects)
1023 self._add_segment(blocks, pos, size)
1025 def _add_segment(self, blocks, pos, size):
1026 """Internal implementation of add_segment."""
1027 self._committed = False
1028 for lr in locators_and_ranges(blocks, pos, size):
1029 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1030 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1031 self._segments.append(r)
1035 """Get the file size."""
1037 n = self._segments[-1]
1038 return n.range_start + n.range_size
1043 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1046 for segment in self.segments:
1047 loc = segment.locator
1048 if loc.startswith("bufferblock"):
1049 loc = self._bufferblocks[loc].calculate_locator()
1050 if portable_locators:
1051 loc = KeepLocator(loc).stripped()
1052 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1053 segment.segment_offset, segment.range_size))
1054 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1060 def _reparent(self, newparent, newname):
1061 self._committed = False
1062 self.flush(sync=True)
1063 self.parent.remove(self.name)
1064 self.parent = newparent
1066 self.lock = self.parent.root_collection().lock
1069 class ArvadosFileReader(ArvadosFileReaderBase):
1070 """Wraps ArvadosFile in a file-like object supporting reading only.
1072 Be aware that this class is NOT thread safe as there is no locking around
1073 updating file pointer.
1077 def __init__(self, arvadosfile, num_retries=None):
1078 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1079 self.arvadosfile = arvadosfile
1082 return self.arvadosfile.size()
1084 def stream_name(self):
1085 return self.arvadosfile.parent.stream_name()
1087 @_FileLikeObjectBase._before_close
1089 def read(self, size=None, num_retries=None):
1090 """Read up to `size` bytes from the file and return the result.
1092 Starts at the current file position. If `size` is None, read the
1093 entire remainder of the file.
1097 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1100 self._filepos += len(rd)
1101 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1102 return ''.join(data)
1104 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1105 self._filepos += len(data)
1108 @_FileLikeObjectBase._before_close
1110 def readfrom(self, offset, size, num_retries=None):
1111 """Read up to `size` bytes from the stream, starting at the specified file offset.
1113 This method does not change the file position.
1115 return self.arvadosfile.readfrom(offset, size, num_retries)
1121 class ArvadosFileWriter(ArvadosFileReader):
1122 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1124 Be aware that this class is NOT thread safe as there is no locking around
1125 updating file pointer.
1129 def __init__(self, arvadosfile, mode, num_retries=None):
1130 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1132 self.arvadosfile.add_writer(self)
1134 @_FileLikeObjectBase._before_close
1136 def write(self, data, num_retries=None):
1137 if self.mode[0] == "a":
1138 self.arvadosfile.writeto(self.size(), data, num_retries)
1140 self.arvadosfile.writeto(self._filepos, data, num_retries)
1141 self._filepos += len(data)
1144 @_FileLikeObjectBase._before_close
1146 def writelines(self, seq, num_retries=None):
1148 self.write(s, num_retries=num_retries)
1150 @_FileLikeObjectBase._before_close
1151 def truncate(self, size=None):
1153 size = self._filepos
1154 self.arvadosfile.truncate(size)
1155 if self._filepos > self.size():
1156 self._filepos = self.size()
1158 @_FileLikeObjectBase._before_close
1160 self.arvadosfile.flush()
1164 self.arvadosfile.remove_writer(self)
1165 super(ArvadosFileWriter, self).close()