16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
41 class _FileLikeObjectBase(object):
42 def __init__(self, name, mode):
48 def _before_close(orig_func):
49 @functools.wraps(orig_func)
50 def before_close_wrapper(self, *args, **kwargs):
52 raise ValueError("I/O operation on closed stream file")
53 return orig_func(self, *args, **kwargs)
54 return before_close_wrapper
59 def __exit__(self, exc_type, exc_value, traceback):
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71 def __init__(self, name, mode, num_retries=None):
72 super(ArvadosFileReaderBase, self).__init__(name, mode)
74 self.num_retries = num_retries
75 self._readline_cache = (None, None)
79 data = self.readline()
84 def decompressed_name(self):
85 return re.sub('\.(bz2|gz)$', '', self.name)
87 @_FileLikeObjectBase._before_close
88 def seek(self, pos, whence=os.SEEK_SET):
89 if whence == os.SEEK_CUR:
91 elif whence == os.SEEK_END:
93 self._filepos = min(max(pos, 0L), self.size())
98 @_FileLikeObjectBase._before_close
100 def readall(self, size=2**20, num_retries=None):
102 data = self.read(size, num_retries=num_retries)
107 @_FileLikeObjectBase._before_close
109 def readline(self, size=float('inf'), num_retries=None):
110 cache_pos, cache_data = self._readline_cache
111 if self.tell() == cache_pos:
113 self._filepos += len(cache_data)
116 data_size = len(data[-1])
117 while (data_size < size) and ('\n' not in data[-1]):
118 next_read = self.read(2 ** 20, num_retries=num_retries)
121 data.append(next_read)
122 data_size += len(next_read)
125 nextline_index = data.index('\n') + 1
127 nextline_index = len(data)
128 nextline_index = min(nextline_index, size)
129 self._filepos -= len(data) - nextline_index
130 self._readline_cache = (self.tell(), data[nextline_index:])
131 return data[:nextline_index]
133 @_FileLikeObjectBase._before_close
135 def decompress(self, decompress, size, num_retries=None):
136 for segment in self.readall(size, num_retries=num_retries):
137 data = decompress(segment)
141 @_FileLikeObjectBase._before_close
143 def readall_decompressed(self, size=2**20, num_retries=None):
145 if self.name.endswith('.bz2'):
146 dc = bz2.BZ2Decompressor()
147 return self.decompress(dc.decompress, size,
148 num_retries=num_retries)
149 elif self.name.endswith('.gz'):
150 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152 size, num_retries=num_retries)
154 return self.readall(size, num_retries=num_retries)
156 @_FileLikeObjectBase._before_close
158 def readlines(self, sizehint=float('inf'), num_retries=None):
161 for s in self.readall(num_retries=num_retries):
164 if data_size >= sizehint:
166 return ''.join(data).splitlines(True)
169 raise NotImplementedError()
171 def read(self, size, num_retries=None):
172 raise NotImplementedError()
174 def readfrom(self, start, size, num_retries=None):
175 raise NotImplementedError()
178 class StreamFileReader(ArvadosFileReaderBase):
179 class _NameAttribute(str):
180 # The Python file API provides a plain .name attribute.
181 # Older SDK provided a name() method.
182 # This class provides both, for maximum compatibility.
186 def __init__(self, stream, segments, name):
187 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188 self._stream = stream
189 self.segments = segments
191 def stream_name(self):
192 return self._stream.name()
195 n = self.segments[-1]
196 return n.range_start + n.range_size
198 @_FileLikeObjectBase._before_close
200 def read(self, size, num_retries=None):
201 """Read up to 'size' bytes from the stream, starting at the current file position"""
206 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
208 lr = available_chunks[0]
209 data = self._stream.readfrom(lr.locator+lr.segment_offset,
211 num_retries=num_retries)
213 self._filepos += len(data)
216 @_FileLikeObjectBase._before_close
218 def readfrom(self, start, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at 'start'"""
224 for lr in locators_and_ranges(self.segments, start, size):
225 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226 num_retries=num_retries))
229 def as_manifest(self):
231 for r in self.segments:
232 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
236 def synchronized(orig_func):
237 @functools.wraps(orig_func)
238 def synchronized_wrapper(self, *args, **kwargs):
240 return orig_func(self, *args, **kwargs)
241 return synchronized_wrapper
244 class StateChangeError(Exception):
245 def __init__(self, message, state, nextstate):
246 super(StateChangeError, self).__init__(message)
248 self.nextstate = nextstate
250 class _BufferBlock(object):
251 """A stand-in for a Keep block that is in the process of being written.
253 Writers can append to it, get the size, and compute the Keep locator.
254 There are three valid states:
260 Block is in the process of being uploaded to Keep, append is an error.
263 The block has been written to Keep, its internal buffer has been
264 released, fetching the block will fetch it via keep client (since we
265 discarded the internal copy), and identifiers referring to the BufferBlock
266 can be replaced with the block locator.
275 def __init__(self, blockid, starting_capacity, owner):
278 the identifier for this block
281 the initial buffer capacity
284 ArvadosFile that owns this block
287 self.blockid = blockid
288 self.buffer_block = bytearray(starting_capacity)
289 self.buffer_view = memoryview(self.buffer_block)
290 self.write_pointer = 0
291 self._state = _BufferBlock.WRITABLE
294 self.lock = threading.Lock()
295 self.wait_for_commit = threading.Event()
299 def append(self, data):
300 """Append some data to the buffer.
302 Only valid if the block is in WRITABLE state. Implements an expanding
303 buffer, doubling capacity as needed to accomdate all the data.
306 if self._state == _BufferBlock.WRITABLE:
307 while (self.write_pointer+len(data)) > len(self.buffer_block):
308 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310 self.buffer_block = new_buffer_block
311 self.buffer_view = memoryview(self.buffer_block)
312 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313 self.write_pointer += len(data)
316 raise AssertionError("Buffer block is not writable")
318 STATE_TRANSITIONS = frozenset([
320 (PENDING, COMMITTED),
325 def set_state(self, nextstate, val=None):
326 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328 self._state = nextstate
330 if self._state == _BufferBlock.PENDING:
331 self.wait_for_commit.clear()
333 if self._state == _BufferBlock.COMMITTED:
335 self.buffer_view = None
336 self.buffer_block = None
337 self.wait_for_commit.set()
339 if self._state == _BufferBlock.ERROR:
341 self.wait_for_commit.set()
348 """The amount of data written to the buffer."""
349 return self.write_pointer
353 """The Keep locator for this buffer's contents."""
354 if self._locator is None:
355 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
359 def clone(self, new_blockid, owner):
360 if self._state == _BufferBlock.COMMITTED:
361 raise AssertionError("Cannot duplicate committed buffer block")
362 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363 bufferblock.append(self.buffer_view[0:self.size()])
369 self.buffer_block = None
370 self.buffer_view = None
373 class NoopLock(object):
377 def __exit__(self, exc_type, exc_value, traceback):
380 def acquire(self, blocking=False):
387 def must_be_writable(orig_func):
388 @functools.wraps(orig_func)
389 def must_be_writable_wrapper(self, *args, **kwargs):
390 if not self.writable():
391 raise IOError(errno.EROFS, "Collection is read-only.")
392 return orig_func(self, *args, **kwargs)
393 return must_be_writable_wrapper
396 class _BlockManager(object):
397 """BlockManager handles buffer blocks.
399 Also handles background block uploads, and background block prefetch for a
400 Collection of ArvadosFiles.
404 DEFAULT_PUT_THREADS = 2
405 DEFAULT_GET_THREADS = 2
407 def __init__(self, keep, copies=None):
408 """keep: KeepClient object to use"""
410 self._bufferblocks = collections.OrderedDict()
411 self._put_queue = None
412 self._put_threads = None
413 self._prefetch_queue = None
414 self._prefetch_threads = None
415 self.lock = threading.Lock()
416 self.prefetch_enabled = True
417 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
420 self._pending_write_size = 0
421 self.threads_lock = threading.Lock()
424 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
425 """Allocate a new, empty bufferblock in WRITABLE state and return it.
428 optional block identifier, otherwise one will be automatically assigned
431 optional capacity, otherwise will use default capacity
434 ArvadosFile that owns this block
437 return self._alloc_bufferblock(blockid, starting_capacity, owner)
439 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
441 blockid = "%s" % uuid.uuid4()
442 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
443 self._bufferblocks[bufferblock.blockid] = bufferblock
447 def dup_block(self, block, owner):
448 """Create a new bufferblock initialized with the content of an existing bufferblock.
451 the buffer block to copy.
454 ArvadosFile that owns the new block
457 new_blockid = "bufferblock%i" % len(self._bufferblocks)
458 bufferblock = block.clone(new_blockid, owner)
459 self._bufferblocks[bufferblock.blockid] = bufferblock
463 def is_bufferblock(self, locator):
464 return locator in self._bufferblocks
466 def _commit_bufferblock_worker(self):
467 """Background uploader thread."""
471 bufferblock = self._put_queue.get()
472 if bufferblock is None:
475 if self.copies is None:
476 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
478 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
479 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
481 except Exception as e:
482 bufferblock.set_state(_BufferBlock.ERROR, e)
484 if self._put_queue is not None:
485 self._put_queue.task_done()
487 def start_put_threads(self):
488 with self.threads_lock:
489 if self._put_threads is None:
490 # Start uploader threads.
492 # If we don't limit the Queue size, the upload queue can quickly
493 # grow to take up gigabytes of RAM if the writing process is
494 # generating data more quickly than it can be send to the Keep
497 # With two upload threads and a queue size of 2, this means up to 4
498 # blocks pending. If they are full 64 MiB blocks, that means up to
499 # 256 MiB of internal buffering, which is the same size as the
500 # default download block cache in KeepClient.
501 self._put_queue = Queue.Queue(maxsize=2)
503 self._put_threads = []
504 for i in xrange(0, self.num_put_threads):
505 thread = threading.Thread(target=self._commit_bufferblock_worker)
506 self._put_threads.append(thread)
510 def _block_prefetch_worker(self):
511 """The background downloader thread."""
514 b = self._prefetch_queue.get()
519 _logger.exception("Exception doing block prefetch")
522 def start_get_threads(self):
523 if self._prefetch_threads is None:
524 self._prefetch_queue = Queue.Queue()
525 self._prefetch_threads = []
526 for i in xrange(0, self.num_get_threads):
527 thread = threading.Thread(target=self._block_prefetch_worker)
528 self._prefetch_threads.append(thread)
534 def stop_threads(self):
535 """Shut down and wait for background upload and download threads to finish."""
537 if self._put_threads is not None:
538 for t in self._put_threads:
539 self._put_queue.put(None)
540 for t in self._put_threads:
542 self._put_threads = None
543 self._put_queue = None
545 if self._prefetch_threads is not None:
546 for t in self._prefetch_threads:
547 self._prefetch_queue.put(None)
548 for t in self._prefetch_threads:
550 self._prefetch_threads = None
551 self._prefetch_queue = None
556 def __exit__(self, exc_type, exc_value, traceback):
560 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
561 """Packs small blocks together before uploading"""
562 self._pending_write_size += closed_file_size
564 # Check if there are enough small blocks for filling up one in full
565 if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
567 # Search blocks ready for getting packed together before being committed to Keep.
568 # A WRITABLE block always has an owner.
569 # A WRITABLE block with its owner.closed() implies that it's
570 # size is <= KEEP_BLOCK_SIZE/2.
571 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
573 if len(small_blocks) <= 1:
574 # Not enough small blocks for repacking
577 # Update the pending write size count with its true value, just in case
578 # some small file was opened, written and closed several times.
579 self._pending_write_size = sum([b.size() for b in small_blocks])
580 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
583 new_bb = self._alloc_bufferblock()
584 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
585 bb = small_blocks.pop(0)
587 self._pending_write_size -= bb.size()
588 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
589 arvfile.set_segments([Range(new_bb.blockid,
592 new_bb.write_pointer - bb.size())])
593 self._delete_bufferblock(bb.blockid)
594 self.commit_bufferblock(new_bb, sync=sync)
596 def commit_bufferblock(self, block, sync):
597 """Initiate a background upload of a bufferblock.
600 The block object to upload
603 If `sync` is True, upload the block synchronously.
604 If `sync` is False, upload the block asynchronously. This will
605 return immediately unless the upload queue is at capacity, in
606 which case it will wait on an upload queue slot.
610 # Mark the block as PENDING so to disallow any more appends.
611 block.set_state(_BufferBlock.PENDING)
612 except StateChangeError as e:
613 if e.state == _BufferBlock.PENDING:
615 block.wait_for_commit.wait()
618 if block.state() == _BufferBlock.COMMITTED:
620 elif block.state() == _BufferBlock.ERROR:
627 if self.copies is None:
628 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
630 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
631 block.set_state(_BufferBlock.COMMITTED, loc)
632 except Exception as e:
633 block.set_state(_BufferBlock.ERROR, e)
636 self.start_put_threads()
637 self._put_queue.put(block)
640 def get_bufferblock(self, locator):
641 return self._bufferblocks.get(locator)
644 def delete_bufferblock(self, locator):
645 self._delete_bufferblock(locator)
647 def _delete_bufferblock(self, locator):
648 bb = self._bufferblocks[locator]
650 del self._bufferblocks[locator]
652 def get_block_contents(self, locator, num_retries, cache_only=False):
655 First checks to see if the locator is a BufferBlock and return that, if
656 not, passes the request through to KeepClient.get().
660 if locator in self._bufferblocks:
661 bufferblock = self._bufferblocks[locator]
662 if bufferblock.state() != _BufferBlock.COMMITTED:
663 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
665 locator = bufferblock._locator
667 return self._keep.get_from_cache(locator)
669 return self._keep.get(locator, num_retries=num_retries)
671 def commit_all(self):
672 """Commit all outstanding buffer blocks.
674 This is a synchronous call, and will not return until all buffer blocks
675 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
678 self.repack_small_blocks(force=True, sync=True)
681 items = self._bufferblocks.items()
684 if v.state() != _BufferBlock.COMMITTED and v.owner:
685 v.owner.flush(sync=False)
688 if self._put_queue is not None:
689 self._put_queue.join()
693 if v.state() == _BufferBlock.ERROR:
694 err.append((v.locator(), v.error))
696 raise KeepWriteError("Error writing some blocks", err, label="block")
699 # flush again with sync=True to remove committed bufferblocks from
702 v.owner.flush(sync=True)
704 def block_prefetch(self, locator):
705 """Initiate a background download of a block.
707 This assumes that the underlying KeepClient implements a block cache,
708 so repeated requests for the same block will not result in repeated
709 downloads (unless the block is evicted from the cache.) This method
714 if not self.prefetch_enabled:
717 if self._keep.get_from_cache(locator) is not None:
721 if locator in self._bufferblocks:
724 self.start_get_threads()
725 self._prefetch_queue.put(locator)
728 class ArvadosFile(object):
729 """Represent a file in a Collection.
731 ArvadosFile manages the underlying representation of a file in Keep as a
732 sequence of segments spanning a set of blocks, and implements random
735 This object may be accessed from multiple threads.
739 def __init__(self, parent, name, stream=[], segments=[]):
741 ArvadosFile constructor.
744 a list of Range objects representing a block stream
747 a list of Range objects representing segments
751 self._writers = set()
752 self._committed = False
754 self.lock = parent.root_collection().lock
756 self._add_segment(stream, s.locator, s.range_size)
757 self._current_bblock = None
760 return self.parent.writable()
763 def permission_expired(self, as_of_dt=None):
764 """Returns True if any of the segment's locators is expired"""
765 for r in self._segments:
766 if KeepLocator(r.locator).permission_expired(as_of_dt):
772 return copy.copy(self._segments)
775 def clone(self, new_parent, new_name):
776 """Make a copy of this file."""
777 cp = ArvadosFile(new_parent, new_name)
778 cp.replace_contents(self)
783 def replace_contents(self, other):
784 """Replace segments of this file with segments from another `ArvadosFile` object."""
788 for other_segment in other.segments():
789 new_loc = other_segment.locator
790 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
791 if other_segment.locator not in map_loc:
792 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
793 if bufferblock.state() != _BufferBlock.WRITABLE:
794 map_loc[other_segment.locator] = bufferblock.locator()
796 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
797 new_loc = map_loc[other_segment.locator]
799 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
801 self._committed = False
803 def __eq__(self, other):
806 if not isinstance(other, ArvadosFile):
809 othersegs = other.segments()
811 if len(self._segments) != len(othersegs):
813 for i in xrange(0, len(othersegs)):
814 seg1 = self._segments[i]
819 if self.parent._my_block_manager().is_bufferblock(loc1):
820 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
822 if other.parent._my_block_manager().is_bufferblock(loc2):
823 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
825 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
826 seg1.range_start != seg2.range_start or
827 seg1.range_size != seg2.range_size or
828 seg1.segment_offset != seg2.segment_offset):
833 def __ne__(self, other):
834 return not self.__eq__(other)
837 def set_segments(self, segs):
838 self._segments = segs
841 def set_committed(self):
842 """Set committed flag to True"""
843 self._committed = True
847 """Get whether this is committed or not."""
848 return self._committed
851 def add_writer(self, writer):
852 """Add an ArvadosFileWriter reference to the list of writers"""
853 if isinstance(writer, ArvadosFileWriter):
854 self._writers.add(writer)
857 def remove_writer(self, writer, flush):
859 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
860 and do some block maintenance tasks.
862 self._writers.remove(writer)
864 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
865 # File writer closed, not small enough for repacking
868 # All writers closed and size is adequate for repacking
869 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
873 Get whether this is closed or not. When the writers list is empty, the file
874 is supposed to be closed.
876 return len(self._writers) == 0
880 def truncate(self, size):
881 """Shrink the size of the file.
883 If `size` is less than the size of the file, the file contents after
884 `size` will be discarded. If `size` is greater than the current size
885 of the file, an IOError will be raised.
888 if size < self.size():
890 for r in self._segments:
891 range_end = r.range_start+r.range_size
892 if r.range_start >= size:
893 # segment is past the trucate size, all done
895 elif size < range_end:
896 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
897 nr.segment_offset = r.segment_offset
903 self._segments = new_segs
904 self._committed = False
905 elif size > self.size():
906 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
908 def readfrom(self, offset, size, num_retries, exact=False):
909 """Read up to `size` bytes from the file starting at `offset`.
912 If False (default), return less data than requested if the read
913 crosses a block boundary and the next block isn't cached. If True,
914 only return less data than requested when hitting EOF.
918 if size == 0 or offset >= self.size():
920 readsegs = locators_and_ranges(self._segments, offset, size)
921 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
926 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
928 blockview = memoryview(block)
929 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
935 if lr.locator not in locs:
936 self.parent._my_block_manager().block_prefetch(lr.locator)
941 def _repack_writes(self, num_retries):
942 """Test if the buffer block has more data than actual segments.
944 This happens when a buffered write over-writes a file range written in
945 a previous buffered write. Re-pack the buffer block for efficiency
946 and to avoid leaking information.
949 segs = self._segments
951 # Sum up the segments to get the total bytes of the file referencing
952 # into the buffer block.
953 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
954 write_total = sum([s.range_size for s in bufferblock_segs])
956 if write_total < self._current_bblock.size():
957 # There is more data in the buffer block than is actually accounted for by segments, so
958 # re-pack into a new buffer by copying over to a new buffer block.
959 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
960 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
961 for t in bufferblock_segs:
962 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
963 t.segment_offset = new_bb.size() - t.range_size
965 self._current_bblock = new_bb
969 def writeto(self, offset, data, num_retries):
970 """Write `data` to the file starting at `offset`.
972 This will update existing bytes and/or extend the size of the file as
979 if offset > self.size():
980 raise ArgumentError("Offset is past the end of the file")
982 if len(data) > config.KEEP_BLOCK_SIZE:
983 # Chunk it up into smaller writes
985 dataview = memoryview(data)
987 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
988 n += config.KEEP_BLOCK_SIZE
991 self._committed = False
993 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
994 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
996 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
997 self._repack_writes(num_retries)
998 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
999 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1000 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1002 self._current_bblock.append(data)
1004 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1006 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1011 def flush(self, sync=True, num_retries=0):
1012 """Flush the current bufferblock to Keep.
1015 If True, commit block synchronously, wait until buffer block has been written.
1016 If False, commit block asynchronously, return immediately after putting block into
1019 if self.committed():
1022 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1023 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1024 self._repack_writes(num_retries)
1025 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1029 for s in self._segments:
1030 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1032 if bb.state() != _BufferBlock.COMMITTED:
1033 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1034 to_delete.add(s.locator)
1035 s.locator = bb.locator()
1037 self.parent._my_block_manager().delete_bufferblock(s)
1039 self.parent.notify(MOD, self.parent, self.name, (self, self))
1043 def add_segment(self, blocks, pos, size):
1044 """Add a segment to the end of the file.
1046 `pos` and `offset` reference a section of the stream described by
1047 `blocks` (a list of Range objects)
1050 self._add_segment(blocks, pos, size)
1052 def _add_segment(self, blocks, pos, size):
1053 """Internal implementation of add_segment."""
1054 self._committed = False
1055 for lr in locators_and_ranges(blocks, pos, size):
1056 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1057 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1058 self._segments.append(r)
1062 """Get the file size."""
1064 n = self._segments[-1]
1065 return n.range_start + n.range_size
1070 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1073 for segment in self.segments:
1074 loc = segment.locator
1075 if loc.startswith("bufferblock"):
1076 loc = self._bufferblocks[loc].calculate_locator()
1077 if portable_locators:
1078 loc = KeepLocator(loc).stripped()
1079 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1080 segment.segment_offset, segment.range_size))
1081 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1087 def _reparent(self, newparent, newname):
1088 self._committed = False
1089 self.flush(sync=True)
1090 self.parent.remove(self.name)
1091 self.parent = newparent
1093 self.lock = self.parent.root_collection().lock
1096 class ArvadosFileReader(ArvadosFileReaderBase):
1097 """Wraps ArvadosFile in a file-like object supporting reading only.
1099 Be aware that this class is NOT thread safe as there is no locking around
1100 updating file pointer.
1104 def __init__(self, arvadosfile, num_retries=None):
1105 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1106 self.arvadosfile = arvadosfile
1109 return self.arvadosfile.size()
1111 def stream_name(self):
1112 return self.arvadosfile.parent.stream_name()
1114 @_FileLikeObjectBase._before_close
1116 def read(self, size=None, num_retries=None):
1117 """Read up to `size` bytes from the file and return the result.
1119 Starts at the current file position. If `size` is None, read the
1120 entire remainder of the file.
1124 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1127 self._filepos += len(rd)
1128 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1129 return ''.join(data)
1131 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1132 self._filepos += len(data)
1135 @_FileLikeObjectBase._before_close
1137 def readfrom(self, offset, size, num_retries=None):
1138 """Read up to `size` bytes from the stream, starting at the specified file offset.
1140 This method does not change the file position.
1142 return self.arvadosfile.readfrom(offset, size, num_retries)
1148 class ArvadosFileWriter(ArvadosFileReader):
1149 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1151 Be aware that this class is NOT thread safe as there is no locking around
1152 updating file pointer.
1156 def __init__(self, arvadosfile, mode, num_retries=None):
1157 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1159 self.arvadosfile.add_writer(self)
1161 @_FileLikeObjectBase._before_close
1163 def write(self, data, num_retries=None):
1164 if self.mode[0] == "a":
1165 self.arvadosfile.writeto(self.size(), data, num_retries)
1167 self.arvadosfile.writeto(self._filepos, data, num_retries)
1168 self._filepos += len(data)
1171 @_FileLikeObjectBase._before_close
1173 def writelines(self, seq, num_retries=None):
1175 self.write(s, num_retries=num_retries)
1177 @_FileLikeObjectBase._before_close
1178 def truncate(self, size=None):
1180 size = self._filepos
1181 self.arvadosfile.truncate(size)
1182 if self._filepos > self.size():
1183 self._filepos = self.size()
1185 @_FileLikeObjectBase._before_close
1187 self.arvadosfile.flush()
1189 def close(self, flush=True):
1191 self.arvadosfile.remove_writer(self, flush)
1192 super(ArvadosFileWriter, self).close()