16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
41 class _FileLikeObjectBase(object):
42 def __init__(self, name, mode):
48 def _before_close(orig_func):
49 @functools.wraps(orig_func)
50 def before_close_wrapper(self, *args, **kwargs):
52 raise ValueError("I/O operation on closed stream file")
53 return orig_func(self, *args, **kwargs)
54 return before_close_wrapper
59 def __exit__(self, exc_type, exc_value, traceback):
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71 def __init__(self, name, mode, num_retries=None):
72 super(ArvadosFileReaderBase, self).__init__(name, mode)
74 self.num_retries = num_retries
75 self._readline_cache = (None, None)
79 data = self.readline()
84 def decompressed_name(self):
85 return re.sub('\.(bz2|gz)$', '', self.name)
87 @_FileLikeObjectBase._before_close
88 def seek(self, pos, whence=os.SEEK_SET):
89 if whence == os.SEEK_CUR:
91 elif whence == os.SEEK_END:
93 self._filepos = min(max(pos, 0L), self.size())
98 @_FileLikeObjectBase._before_close
100 def readall(self, size=2**20, num_retries=None):
102 data = self.read(size, num_retries=num_retries)
107 @_FileLikeObjectBase._before_close
109 def readline(self, size=float('inf'), num_retries=None):
110 cache_pos, cache_data = self._readline_cache
111 if self.tell() == cache_pos:
113 self._filepos += len(cache_data)
116 data_size = len(data[-1])
117 while (data_size < size) and ('\n' not in data[-1]):
118 next_read = self.read(2 ** 20, num_retries=num_retries)
121 data.append(next_read)
122 data_size += len(next_read)
125 nextline_index = data.index('\n') + 1
127 nextline_index = len(data)
128 nextline_index = min(nextline_index, size)
129 self._filepos -= len(data) - nextline_index
130 self._readline_cache = (self.tell(), data[nextline_index:])
131 return data[:nextline_index]
133 @_FileLikeObjectBase._before_close
135 def decompress(self, decompress, size, num_retries=None):
136 for segment in self.readall(size, num_retries=num_retries):
137 data = decompress(segment)
141 @_FileLikeObjectBase._before_close
143 def readall_decompressed(self, size=2**20, num_retries=None):
145 if self.name.endswith('.bz2'):
146 dc = bz2.BZ2Decompressor()
147 return self.decompress(dc.decompress, size,
148 num_retries=num_retries)
149 elif self.name.endswith('.gz'):
150 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152 size, num_retries=num_retries)
154 return self.readall(size, num_retries=num_retries)
156 @_FileLikeObjectBase._before_close
158 def readlines(self, sizehint=float('inf'), num_retries=None):
161 for s in self.readall(num_retries=num_retries):
164 if data_size >= sizehint:
166 return ''.join(data).splitlines(True)
169 raise NotImplementedError()
171 def read(self, size, num_retries=None):
172 raise NotImplementedError()
174 def readfrom(self, start, size, num_retries=None):
175 raise NotImplementedError()
178 class StreamFileReader(ArvadosFileReaderBase):
179 class _NameAttribute(str):
180 # The Python file API provides a plain .name attribute.
181 # Older SDK provided a name() method.
182 # This class provides both, for maximum compatibility.
186 def __init__(self, stream, segments, name):
187 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188 self._stream = stream
189 self.segments = segments
191 def stream_name(self):
192 return self._stream.name()
195 n = self.segments[-1]
196 return n.range_start + n.range_size
198 @_FileLikeObjectBase._before_close
200 def read(self, size, num_retries=None):
201 """Read up to 'size' bytes from the stream, starting at the current file position"""
206 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
208 lr = available_chunks[0]
209 data = self._stream.readfrom(lr.locator+lr.segment_offset,
211 num_retries=num_retries)
213 self._filepos += len(data)
216 @_FileLikeObjectBase._before_close
218 def readfrom(self, start, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at 'start'"""
224 for lr in locators_and_ranges(self.segments, start, size):
225 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226 num_retries=num_retries))
229 def as_manifest(self):
231 for r in self.segments:
232 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
236 def synchronized(orig_func):
237 @functools.wraps(orig_func)
238 def synchronized_wrapper(self, *args, **kwargs):
240 return orig_func(self, *args, **kwargs)
241 return synchronized_wrapper
244 class StateChangeError(Exception):
245 def __init__(self, message, state, nextstate):
246 super(StateChangeError, self).__init__(message)
248 self.nextstate = nextstate
250 class _BufferBlock(object):
251 """A stand-in for a Keep block that is in the process of being written.
253 Writers can append to it, get the size, and compute the Keep locator.
254 There are three valid states:
260 Block is in the process of being uploaded to Keep, append is an error.
263 The block has been written to Keep, its internal buffer has been
264 released, fetching the block will fetch it via keep client (since we
265 discarded the internal copy), and identifiers referring to the BufferBlock
266 can be replaced with the block locator.
275 def __init__(self, blockid, starting_capacity, owner):
278 the identifier for this block
281 the initial buffer capacity
284 ArvadosFile that owns this block
287 self.blockid = blockid
288 self.buffer_block = bytearray(starting_capacity)
289 self.buffer_view = memoryview(self.buffer_block)
290 self.write_pointer = 0
291 self._state = _BufferBlock.WRITABLE
294 self.lock = threading.Lock()
295 self.wait_for_commit = threading.Event()
299 def append(self, data):
300 """Append some data to the buffer.
302 Only valid if the block is in WRITABLE state. Implements an expanding
303 buffer, doubling capacity as needed to accomdate all the data.
306 if self._state == _BufferBlock.WRITABLE:
307 while (self.write_pointer+len(data)) > len(self.buffer_block):
308 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310 self.buffer_block = new_buffer_block
311 self.buffer_view = memoryview(self.buffer_block)
312 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313 self.write_pointer += len(data)
316 raise AssertionError("Buffer block is not writable")
318 STATE_TRANSITIONS = frozenset([
320 (PENDING, COMMITTED),
325 def set_state(self, nextstate, val=None):
326 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328 self._state = nextstate
330 if self._state == _BufferBlock.PENDING:
331 self.wait_for_commit.clear()
333 if self._state == _BufferBlock.COMMITTED:
335 self.buffer_view = None
336 self.buffer_block = None
337 self.wait_for_commit.set()
339 if self._state == _BufferBlock.ERROR:
341 self.wait_for_commit.set()
348 """The amount of data written to the buffer."""
349 return self.write_pointer
353 """The Keep locator for this buffer's contents."""
354 if self._locator is None:
355 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
359 def clone(self, new_blockid, owner):
360 if self._state == _BufferBlock.COMMITTED:
361 raise AssertionError("Cannot duplicate committed buffer block")
362 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363 bufferblock.append(self.buffer_view[0:self.size()])
369 self.buffer_block = None
370 self.buffer_view = None
373 class NoopLock(object):
377 def __exit__(self, exc_type, exc_value, traceback):
380 def acquire(self, blocking=False):
387 def must_be_writable(orig_func):
388 @functools.wraps(orig_func)
389 def must_be_writable_wrapper(self, *args, **kwargs):
390 if not self.writable():
391 raise IOError(errno.EROFS, "Collection is read-only.")
392 return orig_func(self, *args, **kwargs)
393 return must_be_writable_wrapper
396 class _BlockManager(object):
397 """BlockManager handles buffer blocks.
399 Also handles background block uploads, and background block prefetch for a
400 Collection of ArvadosFiles.
404 DEFAULT_PUT_THREADS = 2
405 DEFAULT_GET_THREADS = 2
407 def __init__(self, keep, copies=None):
408 """keep: KeepClient object to use"""
410 self._bufferblocks = collections.OrderedDict()
411 self._put_queue = None
412 self._put_threads = None
413 self._prefetch_queue = None
414 self._prefetch_threads = None
415 self.lock = threading.Lock()
416 self.prefetch_enabled = True
417 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
420 self._pending_write_size = 0
423 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
424 """Allocate a new, empty bufferblock in WRITABLE state and return it.
427 optional block identifier, otherwise one will be automatically assigned
430 optional capacity, otherwise will use default capacity
433 ArvadosFile that owns this block
436 return self._alloc_bufferblock(blockid, starting_capacity, owner)
438 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
440 blockid = "%s" % uuid.uuid4()
441 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
442 self._bufferblocks[bufferblock.blockid] = bufferblock
446 def dup_block(self, block, owner):
447 """Create a new bufferblock initialized with the content of an existing bufferblock.
450 the buffer block to copy.
453 ArvadosFile that owns the new block
456 new_blockid = "bufferblock%i" % len(self._bufferblocks)
457 bufferblock = block.clone(new_blockid, owner)
458 self._bufferblocks[bufferblock.blockid] = bufferblock
462 def is_bufferblock(self, locator):
463 return locator in self._bufferblocks
465 def _commit_bufferblock_worker(self):
466 """Background uploader thread."""
470 bufferblock = self._put_queue.get()
471 if bufferblock is None:
474 if self.copies is None:
475 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
477 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
478 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
480 except Exception as e:
481 bufferblock.set_state(_BufferBlock.ERROR, e)
483 if self._put_queue is not None:
484 self._put_queue.task_done()
487 def start_put_threads(self):
488 if self._put_threads is None:
489 # Start uploader threads.
491 # If we don't limit the Queue size, the upload queue can quickly
492 # grow to take up gigabytes of RAM if the writing process is
493 # generating data more quickly than it can be send to the Keep
496 # With two upload threads and a queue size of 2, this means up to 4
497 # blocks pending. If they are full 64 MiB blocks, that means up to
498 # 256 MiB of internal buffering, which is the same size as the
499 # default download block cache in KeepClient.
500 self._put_queue = Queue.Queue(maxsize=2)
502 self._put_threads = []
503 for i in xrange(0, self.num_put_threads):
504 thread = threading.Thread(target=self._commit_bufferblock_worker)
505 self._put_threads.append(thread)
509 def _block_prefetch_worker(self):
510 """The background downloader thread."""
513 b = self._prefetch_queue.get()
521 def start_get_threads(self):
522 if self._prefetch_threads is None:
523 self._prefetch_queue = Queue.Queue()
524 self._prefetch_threads = []
525 for i in xrange(0, self.num_get_threads):
526 thread = threading.Thread(target=self._block_prefetch_worker)
527 self._prefetch_threads.append(thread)
533 def stop_threads(self):
534 """Shut down and wait for background upload and download threads to finish."""
536 if self._put_threads is not None:
537 for t in self._put_threads:
538 self._put_queue.put(None)
539 for t in self._put_threads:
541 self._put_threads = None
542 self._put_queue = None
544 if self._prefetch_threads is not None:
545 for t in self._prefetch_threads:
546 self._prefetch_queue.put(None)
547 for t in self._prefetch_threads:
549 self._prefetch_threads = None
550 self._prefetch_queue = None
555 def __exit__(self, exc_type, exc_value, traceback):
559 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
560 """Packs small blocks together before uploading"""
561 self._pending_write_size += closed_file_size
563 # Check if there are enough small blocks for filling up one in full
564 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
566 # Search blocks ready for getting packed together before being committed to Keep.
567 # A WRITABLE block always has an owner.
568 # A WRITABLE block with its owner.closed() implies that it's
569 # size is <= KEEP_BLOCK_SIZE/2.
570 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
572 if len(small_blocks) <= 1:
573 # Not enough small blocks for repacking
576 new_bb = self._alloc_bufferblock()
577 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
578 bb = small_blocks.pop(0)
580 self._pending_write_size -= bb.size()
581 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
582 arvfile.set_segments([Range(new_bb.blockid,
585 new_bb.write_pointer - bb.size())])
586 self._delete_bufferblock(bb.blockid)
587 self.commit_bufferblock(new_bb, sync=sync)
589 def commit_bufferblock(self, block, sync):
590 """Initiate a background upload of a bufferblock.
593 The block object to upload
596 If `sync` is True, upload the block synchronously.
597 If `sync` is False, upload the block asynchronously. This will
598 return immediately unless the upload queue is at capacity, in
599 which case it will wait on an upload queue slot.
603 # Mark the block as PENDING so to disallow any more appends.
604 block.set_state(_BufferBlock.PENDING)
605 except StateChangeError as e:
606 if e.state == _BufferBlock.PENDING:
608 block.wait_for_commit.wait()
611 if block.state() == _BufferBlock.COMMITTED:
613 elif block.state() == _BufferBlock.ERROR:
620 if self.copies is None:
621 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
623 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
624 block.set_state(_BufferBlock.COMMITTED, loc)
625 except Exception as e:
626 block.set_state(_BufferBlock.ERROR, e)
629 self.start_put_threads()
630 self._put_queue.put(block)
633 def get_bufferblock(self, locator):
634 return self._bufferblocks.get(locator)
637 def delete_bufferblock(self, locator):
638 self._delete_bufferblock(locator)
640 def _delete_bufferblock(self, locator):
641 bb = self._bufferblocks[locator]
643 del self._bufferblocks[locator]
645 def get_block_contents(self, locator, num_retries, cache_only=False):
648 First checks to see if the locator is a BufferBlock and return that, if
649 not, passes the request through to KeepClient.get().
653 if locator in self._bufferblocks:
654 bufferblock = self._bufferblocks[locator]
655 if bufferblock.state() != _BufferBlock.COMMITTED:
656 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
658 locator = bufferblock._locator
660 return self._keep.get_from_cache(locator)
662 return self._keep.get(locator, num_retries=num_retries)
664 def commit_all(self):
665 """Commit all outstanding buffer blocks.
667 This is a synchronous call, and will not return until all buffer blocks
668 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
671 self.repack_small_blocks(force=True, sync=True)
674 items = self._bufferblocks.items()
677 if v.state() != _BufferBlock.COMMITTED and v.owner:
678 v.owner.flush(sync=False)
681 if self._put_queue is not None:
682 self._put_queue.join()
686 if v.state() == _BufferBlock.ERROR:
687 err.append((v.locator(), v.error))
689 raise KeepWriteError("Error writing some blocks", err, label="block")
692 # flush again with sync=True to remove committed bufferblocks from
695 v.owner.flush(sync=True)
697 def block_prefetch(self, locator):
698 """Initiate a background download of a block.
700 This assumes that the underlying KeepClient implements a block cache,
701 so repeated requests for the same block will not result in repeated
702 downloads (unless the block is evicted from the cache.) This method
707 if not self.prefetch_enabled:
710 if self._keep.get_from_cache(locator) is not None:
714 if locator in self._bufferblocks:
717 self.start_get_threads()
718 self._prefetch_queue.put(locator)
721 class ArvadosFile(object):
722 """Represent a file in a Collection.
724 ArvadosFile manages the underlying representation of a file in Keep as a
725 sequence of segments spanning a set of blocks, and implements random
728 This object may be accessed from multiple threads.
732 def __init__(self, parent, name, stream=[], segments=[]):
734 ArvadosFile constructor.
737 a list of Range objects representing a block stream
740 a list of Range objects representing segments
744 self._writers = set()
745 self._committed = False
747 self.lock = parent.root_collection().lock
749 self._add_segment(stream, s.locator, s.range_size)
750 self._current_bblock = None
753 return self.parent.writable()
757 return copy.copy(self._segments)
760 def clone(self, new_parent, new_name):
761 """Make a copy of this file."""
762 cp = ArvadosFile(new_parent, new_name)
763 cp.replace_contents(self)
768 def replace_contents(self, other):
769 """Replace segments of this file with segments from another `ArvadosFile` object."""
773 for other_segment in other.segments():
774 new_loc = other_segment.locator
775 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
776 if other_segment.locator not in map_loc:
777 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
778 if bufferblock.state() != _BufferBlock.WRITABLE:
779 map_loc[other_segment.locator] = bufferblock.locator()
781 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
782 new_loc = map_loc[other_segment.locator]
784 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
786 self._committed = False
788 def __eq__(self, other):
791 if not isinstance(other, ArvadosFile):
794 othersegs = other.segments()
796 if len(self._segments) != len(othersegs):
798 for i in xrange(0, len(othersegs)):
799 seg1 = self._segments[i]
804 if self.parent._my_block_manager().is_bufferblock(loc1):
805 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
807 if other.parent._my_block_manager().is_bufferblock(loc2):
808 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
810 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
811 seg1.range_start != seg2.range_start or
812 seg1.range_size != seg2.range_size or
813 seg1.segment_offset != seg2.segment_offset):
818 def __ne__(self, other):
819 return not self.__eq__(other)
822 def set_segments(self, segs):
823 self._segments = segs
826 def set_committed(self):
827 """Set committed flag to True"""
828 self._committed = True
832 """Get whether this is committed or not."""
833 return self._committed
836 def add_writer(self, writer):
837 """Add an ArvadosFileWriter reference to the list of writers"""
838 if isinstance(writer, ArvadosFileWriter):
839 self._writers.add(writer)
842 def remove_writer(self, writer, flush):
844 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
845 and do some block maintenance tasks.
847 self._writers.remove(writer)
849 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
850 # File writer closed, not small enough for repacking
853 # All writers closed and size is adequate for repacking
854 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
858 Get whether this is closed or not. When the writers list is empty, the file
859 is supposed to be closed.
861 return len(self._writers) == 0
865 def truncate(self, size):
866 """Shrink the size of the file.
868 If `size` is less than the size of the file, the file contents after
869 `size` will be discarded. If `size` is greater than the current size
870 of the file, an IOError will be raised.
873 if size < self.size():
875 for r in self._segments:
876 range_end = r.range_start+r.range_size
877 if r.range_start >= size:
878 # segment is past the trucate size, all done
880 elif size < range_end:
881 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
882 nr.segment_offset = r.segment_offset
888 self._segments = new_segs
889 self._committed = False
890 elif size > self.size():
891 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
893 def readfrom(self, offset, size, num_retries, exact=False):
894 """Read up to `size` bytes from the file starting at `offset`.
897 If False (default), return less data than requested if the read
898 crosses a block boundary and the next block isn't cached. If True,
899 only return less data than requested when hitting EOF.
903 if size == 0 or offset >= self.size():
905 readsegs = locators_and_ranges(self._segments, offset, size)
906 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
911 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
913 blockview = memoryview(block)
914 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
920 if lr.locator not in locs:
921 self.parent._my_block_manager().block_prefetch(lr.locator)
926 def _repack_writes(self, num_retries):
927 """Test if the buffer block has more data than actual segments.
929 This happens when a buffered write over-writes a file range written in
930 a previous buffered write. Re-pack the buffer block for efficiency
931 and to avoid leaking information.
934 segs = self._segments
936 # Sum up the segments to get the total bytes of the file referencing
937 # into the buffer block.
938 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
939 write_total = sum([s.range_size for s in bufferblock_segs])
941 if write_total < self._current_bblock.size():
942 # There is more data in the buffer block than is actually accounted for by segments, so
943 # re-pack into a new buffer by copying over to a new buffer block.
944 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
945 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
946 for t in bufferblock_segs:
947 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
948 t.segment_offset = new_bb.size() - t.range_size
950 self._current_bblock = new_bb
954 def writeto(self, offset, data, num_retries):
955 """Write `data` to the file starting at `offset`.
957 This will update existing bytes and/or extend the size of the file as
964 if offset > self.size():
965 raise ArgumentError("Offset is past the end of the file")
967 if len(data) > config.KEEP_BLOCK_SIZE:
968 # Chunk it up into smaller writes
970 dataview = memoryview(data)
972 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
973 n += config.KEEP_BLOCK_SIZE
976 self._committed = False
978 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
979 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
981 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
982 self._repack_writes(num_retries)
983 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
984 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
985 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
987 self._current_bblock.append(data)
989 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
991 self.parent.notify(WRITE, self.parent, self.name, (self, self))
996 def flush(self, sync=True, num_retries=0):
997 """Flush the current bufferblock to Keep.
1000 If True, commit block synchronously, wait until buffer block has been written.
1001 If False, commit block asynchronously, return immediately after putting block into
1004 if self.committed():
1007 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1008 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1009 self._repack_writes(num_retries)
1010 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1014 for s in self._segments:
1015 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1017 if bb.state() != _BufferBlock.COMMITTED:
1018 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1019 to_delete.add(s.locator)
1020 s.locator = bb.locator()
1022 self.parent._my_block_manager().delete_bufferblock(s)
1024 self.parent.notify(MOD, self.parent, self.name, (self, self))
1028 def add_segment(self, blocks, pos, size):
1029 """Add a segment to the end of the file.
1031 `pos` and `offset` reference a section of the stream described by
1032 `blocks` (a list of Range objects)
1035 self._add_segment(blocks, pos, size)
1037 def _add_segment(self, blocks, pos, size):
1038 """Internal implementation of add_segment."""
1039 self._committed = False
1040 for lr in locators_and_ranges(blocks, pos, size):
1041 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1042 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1043 self._segments.append(r)
1047 """Get the file size."""
1049 n = self._segments[-1]
1050 return n.range_start + n.range_size
1055 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1058 for segment in self.segments:
1059 loc = segment.locator
1060 if loc.startswith("bufferblock"):
1061 loc = self._bufferblocks[loc].calculate_locator()
1062 if portable_locators:
1063 loc = KeepLocator(loc).stripped()
1064 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1065 segment.segment_offset, segment.range_size))
1066 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1072 def _reparent(self, newparent, newname):
1073 self._committed = False
1074 self.flush(sync=True)
1075 self.parent.remove(self.name)
1076 self.parent = newparent
1078 self.lock = self.parent.root_collection().lock
1081 class ArvadosFileReader(ArvadosFileReaderBase):
1082 """Wraps ArvadosFile in a file-like object supporting reading only.
1084 Be aware that this class is NOT thread safe as there is no locking around
1085 updating file pointer.
1089 def __init__(self, arvadosfile, num_retries=None):
1090 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1091 self.arvadosfile = arvadosfile
1094 return self.arvadosfile.size()
1096 def stream_name(self):
1097 return self.arvadosfile.parent.stream_name()
1099 @_FileLikeObjectBase._before_close
1101 def read(self, size=None, num_retries=None):
1102 """Read up to `size` bytes from the file and return the result.
1104 Starts at the current file position. If `size` is None, read the
1105 entire remainder of the file.
1109 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1112 self._filepos += len(rd)
1113 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1114 return ''.join(data)
1116 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1117 self._filepos += len(data)
1120 @_FileLikeObjectBase._before_close
1122 def readfrom(self, offset, size, num_retries=None):
1123 """Read up to `size` bytes from the stream, starting at the specified file offset.
1125 This method does not change the file position.
1127 return self.arvadosfile.readfrom(offset, size, num_retries)
1133 class ArvadosFileWriter(ArvadosFileReader):
1134 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1136 Be aware that this class is NOT thread safe as there is no locking around
1137 updating file pointer.
1141 def __init__(self, arvadosfile, mode, num_retries=None):
1142 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1144 self.arvadosfile.add_writer(self)
1146 @_FileLikeObjectBase._before_close
1148 def write(self, data, num_retries=None):
1149 if self.mode[0] == "a":
1150 self.arvadosfile.writeto(self.size(), data, num_retries)
1152 self.arvadosfile.writeto(self._filepos, data, num_retries)
1153 self._filepos += len(data)
1156 @_FileLikeObjectBase._before_close
1158 def writelines(self, seq, num_retries=None):
1160 self.write(s, num_retries=num_retries)
1162 @_FileLikeObjectBase._before_close
1163 def truncate(self, size=None):
1165 size = self._filepos
1166 self.arvadosfile.truncate(size)
1167 if self._filepos > self.size():
1168 self._filepos = self.size()
1170 @_FileLikeObjectBase._before_close
1172 self.arvadosfile.flush()
1174 def close(self, flush=True):
1176 self.arvadosfile.remove_writer(self, flush)
1177 super(ArvadosFileWriter, self).close()