17 from .errors import KeepWriteError, AssertionError, ArgumentError
18 from .keep import KeepLocator
19 from ._normalize_stream import normalize_stream
20 from ._ranges import locators_and_ranges, replace_range, Range
21 from .retry import retry_method
26 _logger = logging.getLogger('arvados.arvfile')
29 """split(path) -> streamname, filename
31 Separate the stream name and file name in a /-separated stream path and
32 return a tuple (stream_name, file_name). If no stream name is available,
37 stream_name, file_name = path.rsplit('/', 1)
38 except ValueError: # No / in string
39 stream_name, file_name = '.', path
40 return stream_name, file_name
42 class _FileLikeObjectBase(object):
43 def __init__(self, name, mode):
49 def _before_close(orig_func):
50 @functools.wraps(orig_func)
51 def before_close_wrapper(self, *args, **kwargs):
53 raise ValueError("I/O operation on closed stream file")
54 return orig_func(self, *args, **kwargs)
55 return before_close_wrapper
60 def __exit__(self, exc_type, exc_value, traceback):
71 class ArvadosFileReaderBase(_FileLikeObjectBase):
72 def __init__(self, name, mode, num_retries=None):
73 super(ArvadosFileReaderBase, self).__init__(name, mode)
75 self.num_retries = num_retries
76 self._readline_cache = (None, None)
80 data = self.readline()
85 def decompressed_name(self):
86 return re.sub('\.(bz2|gz)$', '', self.name)
88 @_FileLikeObjectBase._before_close
89 def seek(self, pos, whence=os.SEEK_SET):
90 if whence == os.SEEK_CUR:
92 elif whence == os.SEEK_END:
94 self._filepos = min(max(pos, 0L), self.size())
99 @_FileLikeObjectBase._before_close
101 def readall(self, size=2**20, num_retries=None):
103 data = self.read(size, num_retries=num_retries)
108 @_FileLikeObjectBase._before_close
110 def readline(self, size=float('inf'), num_retries=None):
111 cache_pos, cache_data = self._readline_cache
112 if self.tell() == cache_pos:
114 self._filepos += len(cache_data)
117 data_size = len(data[-1])
118 while (data_size < size) and ('\n' not in data[-1]):
119 next_read = self.read(2 ** 20, num_retries=num_retries)
122 data.append(next_read)
123 data_size += len(next_read)
126 nextline_index = data.index('\n') + 1
128 nextline_index = len(data)
129 nextline_index = min(nextline_index, size)
130 self._filepos -= len(data) - nextline_index
131 self._readline_cache = (self.tell(), data[nextline_index:])
132 return data[:nextline_index]
134 @_FileLikeObjectBase._before_close
136 def decompress(self, decompress, size, num_retries=None):
137 for segment in self.readall(size, num_retries=num_retries):
138 data = decompress(segment)
142 @_FileLikeObjectBase._before_close
144 def readall_decompressed(self, size=2**20, num_retries=None):
146 if self.name.endswith('.bz2'):
147 dc = bz2.BZ2Decompressor()
148 return self.decompress(dc.decompress, size,
149 num_retries=num_retries)
150 elif self.name.endswith('.gz'):
151 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
152 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
153 size, num_retries=num_retries)
155 return self.readall(size, num_retries=num_retries)
157 @_FileLikeObjectBase._before_close
159 def readlines(self, sizehint=float('inf'), num_retries=None):
162 for s in self.readall(num_retries=num_retries):
165 if data_size >= sizehint:
167 return ''.join(data).splitlines(True)
170 raise NotImplementedError()
172 def read(self, size, num_retries=None):
173 raise NotImplementedError()
175 def readfrom(self, start, size, num_retries=None):
176 raise NotImplementedError()
179 class StreamFileReader(ArvadosFileReaderBase):
180 class _NameAttribute(str):
181 # The Python file API provides a plain .name attribute.
182 # Older SDK provided a name() method.
183 # This class provides both, for maximum compatibility.
187 def __init__(self, stream, segments, name):
188 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
189 self._stream = stream
190 self.segments = segments
192 def stream_name(self):
193 return self._stream.name()
196 n = self.segments[-1]
197 return n.range_start + n.range_size
199 @_FileLikeObjectBase._before_close
201 def read(self, size, num_retries=None):
202 """Read up to 'size' bytes from the stream, starting at the current file position"""
207 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
209 lr = available_chunks[0]
210 data = self._stream.readfrom(lr.locator+lr.segment_offset,
212 num_retries=num_retries)
214 self._filepos += len(data)
217 @_FileLikeObjectBase._before_close
219 def readfrom(self, start, size, num_retries=None):
220 """Read up to 'size' bytes from the stream, starting at 'start'"""
225 for lr in locators_and_ranges(self.segments, start, size):
226 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
227 num_retries=num_retries))
230 def as_manifest(self):
232 for r in self.segments:
233 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
234 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
237 def synchronized(orig_func):
238 @functools.wraps(orig_func)
239 def synchronized_wrapper(self, *args, **kwargs):
241 return orig_func(self, *args, **kwargs)
242 return synchronized_wrapper
245 class StateChangeError(Exception):
246 def __init__(self, message, state, nextstate):
247 super(StateChangeError, self).__init__(message)
249 self.nextstate = nextstate
251 class _BufferBlock(object):
252 """A stand-in for a Keep block that is in the process of being written.
254 Writers can append to it, get the size, and compute the Keep locator.
255 There are three valid states:
261 Block is in the process of being uploaded to Keep, append is an error.
264 The block has been written to Keep, its internal buffer has been
265 released, fetching the block will fetch it via keep client (since we
266 discarded the internal copy), and identifiers referring to the BufferBlock
267 can be replaced with the block locator.
276 def __init__(self, blockid, starting_capacity, owner):
279 the identifier for this block
282 the initial buffer capacity
285 ArvadosFile that owns this block
288 self.blockid = blockid
289 self.buffer_block = bytearray(starting_capacity)
290 self.buffer_view = memoryview(self.buffer_block)
291 self.write_pointer = 0
292 self._state = _BufferBlock.WRITABLE
295 self.lock = threading.Lock()
296 self.wait_for_commit = threading.Event()
300 def append(self, data):
301 """Append some data to the buffer.
303 Only valid if the block is in WRITABLE state. Implements an expanding
304 buffer, doubling capacity as needed to accomdate all the data.
307 if self._state == _BufferBlock.WRITABLE:
308 while (self.write_pointer+len(data)) > len(self.buffer_block):
309 new_buffer_block = bytearray(len(self.buffer_block) * 2)
310 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
311 self.buffer_block = new_buffer_block
312 self.buffer_view = memoryview(self.buffer_block)
313 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
314 self.write_pointer += len(data)
317 raise AssertionError("Buffer block is not writable")
319 STATE_TRANSITIONS = frozenset([
321 (PENDING, COMMITTED),
326 def set_state(self, nextstate, val=None):
327 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
328 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
329 self._state = nextstate
331 if self._state == _BufferBlock.PENDING:
332 self.wait_for_commit.clear()
334 if self._state == _BufferBlock.COMMITTED:
336 self.buffer_view = None
337 self.buffer_block = None
338 self.wait_for_commit.set()
340 if self._state == _BufferBlock.ERROR:
342 self.wait_for_commit.set()
349 """The amount of data written to the buffer."""
350 return self.write_pointer
354 """The Keep locator for this buffer's contents."""
355 if self._locator is None:
356 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
360 def clone(self, new_blockid, owner):
361 if self._state == _BufferBlock.COMMITTED:
362 raise AssertionError("Cannot duplicate committed buffer block")
363 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
364 bufferblock.append(self.buffer_view[0:self.size()])
370 self.buffer_block = None
371 self.buffer_view = None
374 class NoopLock(object):
378 def __exit__(self, exc_type, exc_value, traceback):
381 def acquire(self, blocking=False):
388 def must_be_writable(orig_func):
389 @functools.wraps(orig_func)
390 def must_be_writable_wrapper(self, *args, **kwargs):
391 if not self.writable():
392 raise IOError(errno.EROFS, "Collection is read-only.")
393 return orig_func(self, *args, **kwargs)
394 return must_be_writable_wrapper
397 class _BlockManager(object):
398 """BlockManager handles buffer blocks.
400 Also handles background block uploads, and background block prefetch for a
401 Collection of ArvadosFiles.
405 DEFAULT_PUT_THREADS = 2
406 DEFAULT_GET_THREADS = 2
408 def __init__(self, keep, copies=None):
409 """keep: KeepClient object to use"""
411 self._bufferblocks = collections.OrderedDict()
412 self._put_queue = None
413 self._put_threads = None
414 self._prefetch_queue = None
415 self._prefetch_threads = None
416 self.lock = threading.Lock()
417 self.prefetch_enabled = True
418 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
419 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
421 self._pending_write_size = 0
422 self.threads_lock = threading.Lock()
425 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
426 """Allocate a new, empty bufferblock in WRITABLE state and return it.
429 optional block identifier, otherwise one will be automatically assigned
432 optional capacity, otherwise will use default capacity
435 ArvadosFile that owns this block
438 return self._alloc_bufferblock(blockid, starting_capacity, owner)
440 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
442 blockid = "%s" % uuid.uuid4()
443 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
444 self._bufferblocks[bufferblock.blockid] = bufferblock
448 def dup_block(self, block, owner):
449 """Create a new bufferblock initialized with the content of an existing bufferblock.
452 the buffer block to copy.
455 ArvadosFile that owns the new block
458 new_blockid = "bufferblock%i" % len(self._bufferblocks)
459 bufferblock = block.clone(new_blockid, owner)
460 self._bufferblocks[bufferblock.blockid] = bufferblock
464 def is_bufferblock(self, locator):
465 return locator in self._bufferblocks
467 def _commit_bufferblock_worker(self):
468 """Background uploader thread."""
472 bufferblock = self._put_queue.get()
473 if bufferblock is None:
476 if self.copies is None:
477 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
479 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
480 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
482 except Exception as e:
483 bufferblock.set_state(_BufferBlock.ERROR, e)
485 if self._put_queue is not None:
486 self._put_queue.task_done()
488 def start_put_threads(self):
489 with self.threads_lock:
490 if self._put_threads is None:
491 # Start uploader threads.
493 # If we don't limit the Queue size, the upload queue can quickly
494 # grow to take up gigabytes of RAM if the writing process is
495 # generating data more quickly than it can be send to the Keep
498 # With two upload threads and a queue size of 2, this means up to 4
499 # blocks pending. If they are full 64 MiB blocks, that means up to
500 # 256 MiB of internal buffering, which is the same size as the
501 # default download block cache in KeepClient.
502 self._put_queue = Queue.Queue(maxsize=2)
504 self._put_threads = []
505 for i in xrange(0, self.num_put_threads):
506 thread = threading.Thread(target=self._commit_bufferblock_worker)
507 self._put_threads.append(thread)
511 def _block_prefetch_worker(self):
512 """The background downloader thread."""
515 b = self._prefetch_queue.get()
520 _logger.error(traceback.format_exc())
523 def start_get_threads(self):
524 if self._prefetch_threads is None:
525 self._prefetch_queue = Queue.Queue()
526 self._prefetch_threads = []
527 for i in xrange(0, self.num_get_threads):
528 thread = threading.Thread(target=self._block_prefetch_worker)
529 self._prefetch_threads.append(thread)
535 def stop_threads(self):
536 """Shut down and wait for background upload and download threads to finish."""
538 if self._put_threads is not None:
539 for t in self._put_threads:
540 self._put_queue.put(None)
541 for t in self._put_threads:
543 self._put_threads = None
544 self._put_queue = None
546 if self._prefetch_threads is not None:
547 for t in self._prefetch_threads:
548 self._prefetch_queue.put(None)
549 for t in self._prefetch_threads:
551 self._prefetch_threads = None
552 self._prefetch_queue = None
557 def __exit__(self, exc_type, exc_value, traceback):
561 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
562 """Packs small blocks together before uploading"""
563 self._pending_write_size += closed_file_size
565 # Check if there are enough small blocks for filling up one in full
566 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
568 # Search blocks ready for getting packed together before being committed to Keep.
569 # A WRITABLE block always has an owner.
570 # A WRITABLE block with its owner.closed() implies that it's
571 # size is <= KEEP_BLOCK_SIZE/2.
572 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
574 if len(small_blocks) <= 1:
575 # Not enough small blocks for repacking
578 # Update the pending write size count with its true value, just in case
579 # some small file was opened, written and closed several times.
580 self._pending_write_size = sum([b.size() for b in small_blocks])
581 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
584 new_bb = self._alloc_bufferblock()
585 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
586 bb = small_blocks.pop(0)
588 self._pending_write_size -= bb.size()
589 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
590 arvfile.set_segments([Range(new_bb.blockid,
593 new_bb.write_pointer - bb.size())])
594 self._delete_bufferblock(bb.blockid)
595 self.commit_bufferblock(new_bb, sync=sync)
597 def commit_bufferblock(self, block, sync):
598 """Initiate a background upload of a bufferblock.
601 The block object to upload
604 If `sync` is True, upload the block synchronously.
605 If `sync` is False, upload the block asynchronously. This will
606 return immediately unless the upload queue is at capacity, in
607 which case it will wait on an upload queue slot.
611 # Mark the block as PENDING so to disallow any more appends.
612 block.set_state(_BufferBlock.PENDING)
613 except StateChangeError as e:
614 if e.state == _BufferBlock.PENDING:
616 block.wait_for_commit.wait()
619 if block.state() == _BufferBlock.COMMITTED:
621 elif block.state() == _BufferBlock.ERROR:
628 if self.copies is None:
629 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
631 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
632 block.set_state(_BufferBlock.COMMITTED, loc)
633 except Exception as e:
634 block.set_state(_BufferBlock.ERROR, e)
637 self.start_put_threads()
638 self._put_queue.put(block)
641 def get_bufferblock(self, locator):
642 return self._bufferblocks.get(locator)
645 def delete_bufferblock(self, locator):
646 self._delete_bufferblock(locator)
648 def _delete_bufferblock(self, locator):
649 bb = self._bufferblocks[locator]
651 del self._bufferblocks[locator]
653 def get_block_contents(self, locator, num_retries, cache_only=False):
656 First checks to see if the locator is a BufferBlock and return that, if
657 not, passes the request through to KeepClient.get().
661 if locator in self._bufferblocks:
662 bufferblock = self._bufferblocks[locator]
663 if bufferblock.state() != _BufferBlock.COMMITTED:
664 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
666 locator = bufferblock._locator
668 return self._keep.get_from_cache(locator)
670 return self._keep.get(locator, num_retries=num_retries)
672 def commit_all(self):
673 """Commit all outstanding buffer blocks.
675 This is a synchronous call, and will not return until all buffer blocks
676 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
679 self.repack_small_blocks(force=True, sync=True)
682 items = self._bufferblocks.items()
685 if v.state() != _BufferBlock.COMMITTED and v.owner:
686 v.owner.flush(sync=False)
689 if self._put_queue is not None:
690 self._put_queue.join()
694 if v.state() == _BufferBlock.ERROR:
695 err.append((v.locator(), v.error))
697 raise KeepWriteError("Error writing some blocks", err, label="block")
700 # flush again with sync=True to remove committed bufferblocks from
703 v.owner.flush(sync=True)
705 def block_prefetch(self, locator):
706 """Initiate a background download of a block.
708 This assumes that the underlying KeepClient implements a block cache,
709 so repeated requests for the same block will not result in repeated
710 downloads (unless the block is evicted from the cache.) This method
715 if not self.prefetch_enabled:
718 if self._keep.get_from_cache(locator) is not None:
722 if locator in self._bufferblocks:
725 self.start_get_threads()
726 self._prefetch_queue.put(locator)
729 class ArvadosFile(object):
730 """Represent a file in a Collection.
732 ArvadosFile manages the underlying representation of a file in Keep as a
733 sequence of segments spanning a set of blocks, and implements random
736 This object may be accessed from multiple threads.
740 def __init__(self, parent, name, stream=[], segments=[]):
742 ArvadosFile constructor.
745 a list of Range objects representing a block stream
748 a list of Range objects representing segments
752 self._writers = set()
753 self._committed = False
755 self.lock = parent.root_collection().lock
757 self._add_segment(stream, s.locator, s.range_size)
758 self._current_bblock = None
761 return self.parent.writable()
765 return copy.copy(self._segments)
768 def clone(self, new_parent, new_name):
769 """Make a copy of this file."""
770 cp = ArvadosFile(new_parent, new_name)
771 cp.replace_contents(self)
776 def replace_contents(self, other):
777 """Replace segments of this file with segments from another `ArvadosFile` object."""
781 for other_segment in other.segments():
782 new_loc = other_segment.locator
783 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
784 if other_segment.locator not in map_loc:
785 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
786 if bufferblock.state() != _BufferBlock.WRITABLE:
787 map_loc[other_segment.locator] = bufferblock.locator()
789 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
790 new_loc = map_loc[other_segment.locator]
792 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
794 self._committed = False
796 def __eq__(self, other):
799 if not isinstance(other, ArvadosFile):
802 othersegs = other.segments()
804 if len(self._segments) != len(othersegs):
806 for i in xrange(0, len(othersegs)):
807 seg1 = self._segments[i]
812 if self.parent._my_block_manager().is_bufferblock(loc1):
813 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
815 if other.parent._my_block_manager().is_bufferblock(loc2):
816 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
818 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
819 seg1.range_start != seg2.range_start or
820 seg1.range_size != seg2.range_size or
821 seg1.segment_offset != seg2.segment_offset):
826 def __ne__(self, other):
827 return not self.__eq__(other)
830 def set_segments(self, segs):
831 self._segments = segs
834 def set_committed(self):
835 """Set committed flag to True"""
836 self._committed = True
840 """Get whether this is committed or not."""
841 return self._committed
844 def add_writer(self, writer):
845 """Add an ArvadosFileWriter reference to the list of writers"""
846 if isinstance(writer, ArvadosFileWriter):
847 self._writers.add(writer)
850 def remove_writer(self, writer, flush):
852 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
853 and do some block maintenance tasks.
855 self._writers.remove(writer)
857 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
858 # File writer closed, not small enough for repacking
861 # All writers closed and size is adequate for repacking
862 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
866 Get whether this is closed or not. When the writers list is empty, the file
867 is supposed to be closed.
869 return len(self._writers) == 0
873 def truncate(self, size):
874 """Shrink the size of the file.
876 If `size` is less than the size of the file, the file contents after
877 `size` will be discarded. If `size` is greater than the current size
878 of the file, an IOError will be raised.
881 if size < self.size():
883 for r in self._segments:
884 range_end = r.range_start+r.range_size
885 if r.range_start >= size:
886 # segment is past the trucate size, all done
888 elif size < range_end:
889 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
890 nr.segment_offset = r.segment_offset
896 self._segments = new_segs
897 self._committed = False
898 elif size > self.size():
899 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
901 def readfrom(self, offset, size, num_retries, exact=False):
902 """Read up to `size` bytes from the file starting at `offset`.
905 If False (default), return less data than requested if the read
906 crosses a block boundary and the next block isn't cached. If True,
907 only return less data than requested when hitting EOF.
911 if size == 0 or offset >= self.size():
913 readsegs = locators_and_ranges(self._segments, offset, size)
914 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
919 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
921 blockview = memoryview(block)
922 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
928 if lr.locator not in locs:
929 self.parent._my_block_manager().block_prefetch(lr.locator)
934 def _repack_writes(self, num_retries):
935 """Test if the buffer block has more data than actual segments.
937 This happens when a buffered write over-writes a file range written in
938 a previous buffered write. Re-pack the buffer block for efficiency
939 and to avoid leaking information.
942 segs = self._segments
944 # Sum up the segments to get the total bytes of the file referencing
945 # into the buffer block.
946 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
947 write_total = sum([s.range_size for s in bufferblock_segs])
949 if write_total < self._current_bblock.size():
950 # There is more data in the buffer block than is actually accounted for by segments, so
951 # re-pack into a new buffer by copying over to a new buffer block.
952 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
953 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
954 for t in bufferblock_segs:
955 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
956 t.segment_offset = new_bb.size() - t.range_size
958 self._current_bblock = new_bb
962 def writeto(self, offset, data, num_retries):
963 """Write `data` to the file starting at `offset`.
965 This will update existing bytes and/or extend the size of the file as
972 if offset > self.size():
973 raise ArgumentError("Offset is past the end of the file")
975 if len(data) > config.KEEP_BLOCK_SIZE:
976 # Chunk it up into smaller writes
978 dataview = memoryview(data)
980 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
981 n += config.KEEP_BLOCK_SIZE
984 self._committed = False
986 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
987 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
989 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
990 self._repack_writes(num_retries)
991 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
992 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
993 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
995 self._current_bblock.append(data)
997 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
999 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1004 def flush(self, sync=True, num_retries=0):
1005 """Flush the current bufferblock to Keep.
1008 If True, commit block synchronously, wait until buffer block has been written.
1009 If False, commit block asynchronously, return immediately after putting block into
1012 if self.committed():
1015 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1016 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1017 self._repack_writes(num_retries)
1018 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1022 for s in self._segments:
1023 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1025 if bb.state() != _BufferBlock.COMMITTED:
1026 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1027 to_delete.add(s.locator)
1028 s.locator = bb.locator()
1030 self.parent._my_block_manager().delete_bufferblock(s)
1032 self.parent.notify(MOD, self.parent, self.name, (self, self))
1036 def add_segment(self, blocks, pos, size):
1037 """Add a segment to the end of the file.
1039 `pos` and `offset` reference a section of the stream described by
1040 `blocks` (a list of Range objects)
1043 self._add_segment(blocks, pos, size)
1045 def _add_segment(self, blocks, pos, size):
1046 """Internal implementation of add_segment."""
1047 self._committed = False
1048 for lr in locators_and_ranges(blocks, pos, size):
1049 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1050 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1051 self._segments.append(r)
1055 """Get the file size."""
1057 n = self._segments[-1]
1058 return n.range_start + n.range_size
1063 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1066 for segment in self.segments:
1067 loc = segment.locator
1068 if loc.startswith("bufferblock"):
1069 loc = self._bufferblocks[loc].calculate_locator()
1070 if portable_locators:
1071 loc = KeepLocator(loc).stripped()
1072 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1073 segment.segment_offset, segment.range_size))
1074 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1080 def _reparent(self, newparent, newname):
1081 self._committed = False
1082 self.flush(sync=True)
1083 self.parent.remove(self.name)
1084 self.parent = newparent
1086 self.lock = self.parent.root_collection().lock
1089 class ArvadosFileReader(ArvadosFileReaderBase):
1090 """Wraps ArvadosFile in a file-like object supporting reading only.
1092 Be aware that this class is NOT thread safe as there is no locking around
1093 updating file pointer.
1097 def __init__(self, arvadosfile, num_retries=None):
1098 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1099 self.arvadosfile = arvadosfile
1102 return self.arvadosfile.size()
1104 def stream_name(self):
1105 return self.arvadosfile.parent.stream_name()
1107 @_FileLikeObjectBase._before_close
1109 def read(self, size=None, num_retries=None):
1110 """Read up to `size` bytes from the file and return the result.
1112 Starts at the current file position. If `size` is None, read the
1113 entire remainder of the file.
1117 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1120 self._filepos += len(rd)
1121 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1122 return ''.join(data)
1124 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1125 self._filepos += len(data)
1128 @_FileLikeObjectBase._before_close
1130 def readfrom(self, offset, size, num_retries=None):
1131 """Read up to `size` bytes from the stream, starting at the specified file offset.
1133 This method does not change the file position.
1135 return self.arvadosfile.readfrom(offset, size, num_retries)
1141 class ArvadosFileWriter(ArvadosFileReader):
1142 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1144 Be aware that this class is NOT thread safe as there is no locking around
1145 updating file pointer.
1149 def __init__(self, arvadosfile, mode, num_retries=None):
1150 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1152 self.arvadosfile.add_writer(self)
1154 @_FileLikeObjectBase._before_close
1156 def write(self, data, num_retries=None):
1157 if self.mode[0] == "a":
1158 self.arvadosfile.writeto(self.size(), data, num_retries)
1160 self.arvadosfile.writeto(self._filepos, data, num_retries)
1161 self._filepos += len(data)
1164 @_FileLikeObjectBase._before_close
1166 def writelines(self, seq, num_retries=None):
1168 self.write(s, num_retries=num_retries)
1170 @_FileLikeObjectBase._before_close
1171 def truncate(self, size=None):
1173 size = self._filepos
1174 self.arvadosfile.truncate(size)
1175 if self._filepos > self.size():
1176 self._filepos = self.size()
1178 @_FileLikeObjectBase._before_close
1180 self.arvadosfile.flush()
1182 def close(self, flush=True):
1184 self.arvadosfile.remove_writer(self, flush)
1185 super(ArvadosFileWriter, self).close()