16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
20 from .retry import retry_method
25 _logger = logging.getLogger('arvados.arvfile')
28 """split(path) -> streamname, filename
30 Separate the stream name and file name in a /-separated stream path and
31 return a tuple (stream_name, file_name). If no stream name is available,
36 stream_name, file_name = path.rsplit('/', 1)
37 except ValueError: # No / in string
38 stream_name, file_name = '.', path
39 return stream_name, file_name
42 class UnownedBlockError(Exception):
43 """Raised when there's an writable block without an owner on the BlockManager."""
47 class _FileLikeObjectBase(object):
48 def __init__(self, name, mode):
54 def _before_close(orig_func):
55 @functools.wraps(orig_func)
56 def before_close_wrapper(self, *args, **kwargs):
58 raise ValueError("I/O operation on closed stream file")
59 return orig_func(self, *args, **kwargs)
60 return before_close_wrapper
65 def __exit__(self, exc_type, exc_value, traceback):
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77 def __init__(self, name, mode, num_retries=None):
78 super(ArvadosFileReaderBase, self).__init__(name, mode)
80 self.num_retries = num_retries
81 self._readline_cache = (None, None)
85 data = self.readline()
90 def decompressed_name(self):
91 return re.sub('\.(bz2|gz)$', '', self.name)
93 @_FileLikeObjectBase._before_close
94 def seek(self, pos, whence=os.SEEK_SET):
95 if whence == os.SEEK_CUR:
97 elif whence == os.SEEK_END:
100 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116 @_FileLikeObjectBase._before_close
118 def readall(self, size=2**20, num_retries=None):
120 data = self.read(size, num_retries=num_retries)
125 @_FileLikeObjectBase._before_close
127 def readline(self, size=float('inf'), num_retries=None):
128 cache_pos, cache_data = self._readline_cache
129 if self.tell() == cache_pos:
131 self._filepos += len(cache_data)
134 data_size = len(data[-1])
135 while (data_size < size) and ('\n' not in data[-1]):
136 next_read = self.read(2 ** 20, num_retries=num_retries)
139 data.append(next_read)
140 data_size += len(next_read)
143 nextline_index = data.index('\n') + 1
145 nextline_index = len(data)
146 nextline_index = min(nextline_index, size)
147 self._filepos -= len(data) - nextline_index
148 self._readline_cache = (self.tell(), data[nextline_index:])
149 return data[:nextline_index]
151 @_FileLikeObjectBase._before_close
153 def decompress(self, decompress, size, num_retries=None):
154 for segment in self.readall(size, num_retries=num_retries):
155 data = decompress(segment)
159 @_FileLikeObjectBase._before_close
161 def readall_decompressed(self, size=2**20, num_retries=None):
163 if self.name.endswith('.bz2'):
164 dc = bz2.BZ2Decompressor()
165 return self.decompress(dc.decompress, size,
166 num_retries=num_retries)
167 elif self.name.endswith('.gz'):
168 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170 size, num_retries=num_retries)
172 return self.readall(size, num_retries=num_retries)
174 @_FileLikeObjectBase._before_close
176 def readlines(self, sizehint=float('inf'), num_retries=None):
179 for s in self.readall(num_retries=num_retries):
182 if data_size >= sizehint:
184 return ''.join(data).splitlines(True)
187 raise IOError(errno.ENOSYS, "Not implemented")
189 def read(self, size, num_retries=None):
190 raise IOError(errno.ENOSYS, "Not implemented")
192 def readfrom(self, start, size, num_retries=None):
193 raise IOError(errno.ENOSYS, "Not implemented")
196 class StreamFileReader(ArvadosFileReaderBase):
197 class _NameAttribute(str):
198 # The Python file API provides a plain .name attribute.
199 # Older SDK provided a name() method.
200 # This class provides both, for maximum compatibility.
204 def __init__(self, stream, segments, name):
205 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206 self._stream = stream
207 self.segments = segments
209 def stream_name(self):
210 return self._stream.name()
213 n = self.segments[-1]
214 return n.range_start + n.range_size
216 @_FileLikeObjectBase._before_close
218 def read(self, size, num_retries=None):
219 """Read up to 'size' bytes from the stream, starting at the current file position"""
224 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
226 lr = available_chunks[0]
227 data = self._stream.readfrom(lr.locator+lr.segment_offset,
229 num_retries=num_retries)
231 self._filepos += len(data)
234 @_FileLikeObjectBase._before_close
236 def readfrom(self, start, size, num_retries=None):
237 """Read up to 'size' bytes from the stream, starting at 'start'"""
242 for lr in locators_and_ranges(self.segments, start, size):
243 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244 num_retries=num_retries))
247 def as_manifest(self):
249 for r in self.segments:
250 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
254 def synchronized(orig_func):
255 @functools.wraps(orig_func)
256 def synchronized_wrapper(self, *args, **kwargs):
258 return orig_func(self, *args, **kwargs)
259 return synchronized_wrapper
262 class StateChangeError(Exception):
263 def __init__(self, message, state, nextstate):
264 super(StateChangeError, self).__init__(message)
266 self.nextstate = nextstate
268 class _BufferBlock(object):
269 """A stand-in for a Keep block that is in the process of being written.
271 Writers can append to it, get the size, and compute the Keep locator.
272 There are three valid states:
278 Block is in the process of being uploaded to Keep, append is an error.
281 The block has been written to Keep, its internal buffer has been
282 released, fetching the block will fetch it via keep client (since we
283 discarded the internal copy), and identifiers referring to the BufferBlock
284 can be replaced with the block locator.
294 def __init__(self, blockid, starting_capacity, owner):
297 the identifier for this block
300 the initial buffer capacity
303 ArvadosFile that owns this block
306 self.blockid = blockid
307 self.buffer_block = bytearray(starting_capacity)
308 self.buffer_view = memoryview(self.buffer_block)
309 self.write_pointer = 0
310 self._state = _BufferBlock.WRITABLE
313 self.lock = threading.Lock()
314 self.wait_for_commit = threading.Event()
318 def append(self, data):
319 """Append some data to the buffer.
321 Only valid if the block is in WRITABLE state. Implements an expanding
322 buffer, doubling capacity as needed to accomdate all the data.
325 if self._state == _BufferBlock.WRITABLE:
326 while (self.write_pointer+len(data)) > len(self.buffer_block):
327 new_buffer_block = bytearray(len(self.buffer_block) * 2)
328 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
329 self.buffer_block = new_buffer_block
330 self.buffer_view = memoryview(self.buffer_block)
331 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
332 self.write_pointer += len(data)
335 raise AssertionError("Buffer block is not writable")
337 STATE_TRANSITIONS = frozenset([
339 (PENDING, COMMITTED),
344 def set_state(self, nextstate, val=None):
345 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
346 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
347 self._state = nextstate
349 if self._state == _BufferBlock.PENDING:
350 self.wait_for_commit.clear()
352 if self._state == _BufferBlock.COMMITTED:
354 self.buffer_view = None
355 self.buffer_block = None
356 self.wait_for_commit.set()
358 if self._state == _BufferBlock.ERROR:
360 self.wait_for_commit.set()
367 """The amount of data written to the buffer."""
368 return self.write_pointer
372 """The Keep locator for this buffer's contents."""
373 if self._locator is None:
374 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
378 def clone(self, new_blockid, owner):
379 if self._state == _BufferBlock.COMMITTED:
380 raise AssertionError("Cannot duplicate committed buffer block")
381 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
382 bufferblock.append(self.buffer_view[0:self.size()])
387 self._state = _BufferBlock.DELETED
389 self.buffer_block = None
390 self.buffer_view = None
393 def repack_writes(self):
394 """Optimize buffer block by repacking segments in file sequence.
396 When the client makes random writes, they appear in the buffer block in
397 the sequence they were written rather than the sequence they appear in
398 the file. This makes for inefficient, fragmented manifests. Attempt
399 to optimize by repacking writes in file sequence.
402 if self._state != _BufferBlock.WRITABLE:
403 raise AssertionError("Cannot repack non-writable block")
405 segs = self.owner.segments()
407 # Collect the segments that reference the buffer block.
408 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
410 # Collect total data referenced by segments (could be smaller than
411 # bufferblock size if a portion of the file was written and
413 write_total = sum([s.range_size for s in bufferblock_segs])
415 if write_total < self.size() or len(bufferblock_segs) > 1:
416 # If there's more than one segment referencing this block, it is
417 # due to out-of-order writes and will produce a fragmented
418 # manifest, so try to optimize by re-packing into a new buffer.
419 contents = self.buffer_view[0:self.write_pointer].tobytes()
420 new_bb = _BufferBlock(None, write_total, None)
421 for t in bufferblock_segs:
422 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
423 t.segment_offset = new_bb.size() - t.range_size
425 self.buffer_block = new_bb.buffer_block
426 self.buffer_view = new_bb.buffer_view
427 self.write_pointer = new_bb.write_pointer
430 self.owner.set_segments(segs)
433 return "<BufferBlock %s>" % (self.blockid)
436 class NoopLock(object):
440 def __exit__(self, exc_type, exc_value, traceback):
443 def acquire(self, blocking=False):
450 def must_be_writable(orig_func):
451 @functools.wraps(orig_func)
452 def must_be_writable_wrapper(self, *args, **kwargs):
453 if not self.writable():
454 raise IOError(errno.EROFS, "Collection is read-only.")
455 return orig_func(self, *args, **kwargs)
456 return must_be_writable_wrapper
459 class _BlockManager(object):
460 """BlockManager handles buffer blocks.
462 Also handles background block uploads, and background block prefetch for a
463 Collection of ArvadosFiles.
467 DEFAULT_PUT_THREADS = 2
468 DEFAULT_GET_THREADS = 2
470 def __init__(self, keep, copies=None, put_threads=None):
471 """keep: KeepClient object to use"""
473 self._bufferblocks = collections.OrderedDict()
474 self._put_queue = None
475 self._put_threads = None
476 self._prefetch_queue = None
477 self._prefetch_threads = None
478 self.lock = threading.Lock()
479 self.prefetch_enabled = True
481 self.num_put_threads = put_threads
483 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
484 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
486 self._pending_write_size = 0
487 self.threads_lock = threading.Lock()
488 self.padding_block = None
491 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
492 """Allocate a new, empty bufferblock in WRITABLE state and return it.
495 optional block identifier, otherwise one will be automatically assigned
498 optional capacity, otherwise will use default capacity
501 ArvadosFile that owns this block
504 return self._alloc_bufferblock(blockid, starting_capacity, owner)
506 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
508 blockid = str(uuid.uuid4())
509 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
510 self._bufferblocks[bufferblock.blockid] = bufferblock
514 def dup_block(self, block, owner):
515 """Create a new bufferblock initialized with the content of an existing bufferblock.
518 the buffer block to copy.
521 ArvadosFile that owns the new block
524 new_blockid = str(uuid.uuid4())
525 bufferblock = block.clone(new_blockid, owner)
526 self._bufferblocks[bufferblock.blockid] = bufferblock
530 def is_bufferblock(self, locator):
531 return locator in self._bufferblocks
533 def _commit_bufferblock_worker(self):
534 """Background uploader thread."""
538 bufferblock = self._put_queue.get()
539 if bufferblock is None:
542 if self.copies is None:
543 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
545 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
546 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
548 except Exception as e:
549 bufferblock.set_state(_BufferBlock.ERROR, e)
551 if self._put_queue is not None:
552 self._put_queue.task_done()
554 def start_put_threads(self):
555 with self.threads_lock:
556 if self._put_threads is None:
557 # Start uploader threads.
559 # If we don't limit the Queue size, the upload queue can quickly
560 # grow to take up gigabytes of RAM if the writing process is
561 # generating data more quickly than it can be send to the Keep
564 # With two upload threads and a queue size of 2, this means up to 4
565 # blocks pending. If they are full 64 MiB blocks, that means up to
566 # 256 MiB of internal buffering, which is the same size as the
567 # default download block cache in KeepClient.
568 self._put_queue = Queue.Queue(maxsize=2)
570 self._put_threads = []
571 for i in xrange(0, self.num_put_threads):
572 thread = threading.Thread(target=self._commit_bufferblock_worker)
573 self._put_threads.append(thread)
577 def _block_prefetch_worker(self):
578 """The background downloader thread."""
581 b = self._prefetch_queue.get()
586 _logger.exception("Exception doing block prefetch")
589 def start_get_threads(self):
590 if self._prefetch_threads is None:
591 self._prefetch_queue = Queue.Queue()
592 self._prefetch_threads = []
593 for i in xrange(0, self.num_get_threads):
594 thread = threading.Thread(target=self._block_prefetch_worker)
595 self._prefetch_threads.append(thread)
601 def stop_threads(self):
602 """Shut down and wait for background upload and download threads to finish."""
604 if self._put_threads is not None:
605 for t in self._put_threads:
606 self._put_queue.put(None)
607 for t in self._put_threads:
609 self._put_threads = None
610 self._put_queue = None
612 if self._prefetch_threads is not None:
613 for t in self._prefetch_threads:
614 self._prefetch_queue.put(None)
615 for t in self._prefetch_threads:
617 self._prefetch_threads = None
618 self._prefetch_queue = None
623 def __exit__(self, exc_type, exc_value, traceback):
627 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
628 """Packs small blocks together before uploading"""
630 self._pending_write_size += closed_file_size
632 # Check if there are enough small blocks for filling up one in full
633 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
636 # Search blocks ready for getting packed together before being committed to Keep.
637 # A WRITABLE block always has an owner.
638 # A WRITABLE block with its owner.closed() implies that it's
639 # size is <= KEEP_BLOCK_SIZE/2.
641 small_blocks = [b for b in self._bufferblocks.values()
642 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
643 except AttributeError:
644 # Writable blocks without owner shouldn't exist.
645 raise UnownedBlockError()
647 if len(small_blocks) <= 1:
648 # Not enough small blocks for repacking
651 for bb in small_blocks:
654 # Update the pending write size count with its true value, just in case
655 # some small file was opened, written and closed several times.
656 self._pending_write_size = sum([b.size() for b in small_blocks])
658 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
661 new_bb = self._alloc_bufferblock()
663 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
664 bb = small_blocks.pop(0)
665 self._pending_write_size -= bb.size()
666 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
667 files.append((bb, new_bb.write_pointer - bb.size()))
669 self.commit_bufferblock(new_bb, sync=sync)
671 for bb, new_bb_segment_offset in files:
672 newsegs = bb.owner.segments()
674 if s.locator == bb.blockid:
675 s.locator = new_bb.locator()
676 s.segment_offset = new_bb_segment_offset+s.segment_offset
677 bb.owner.set_segments(newsegs)
678 self._delete_bufferblock(bb.blockid)
680 def commit_bufferblock(self, block, sync):
681 """Initiate a background upload of a bufferblock.
684 The block object to upload
687 If `sync` is True, upload the block synchronously.
688 If `sync` is False, upload the block asynchronously. This will
689 return immediately unless the upload queue is at capacity, in
690 which case it will wait on an upload queue slot.
694 # Mark the block as PENDING so to disallow any more appends.
695 block.set_state(_BufferBlock.PENDING)
696 except StateChangeError as e:
697 if e.state == _BufferBlock.PENDING:
699 block.wait_for_commit.wait()
702 if block.state() == _BufferBlock.COMMITTED:
704 elif block.state() == _BufferBlock.ERROR:
711 if self.copies is None:
712 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
714 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
715 block.set_state(_BufferBlock.COMMITTED, loc)
716 except Exception as e:
717 block.set_state(_BufferBlock.ERROR, e)
720 self.start_put_threads()
721 self._put_queue.put(block)
724 def get_bufferblock(self, locator):
725 return self._bufferblocks.get(locator)
728 def get_padding_block(self):
729 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
730 when using truncate() to extend the size of a file.
732 For reference (and possible future optimization), the md5sum of the
733 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
737 if self.padding_block is None:
738 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
739 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
740 self.commit_bufferblock(self.padding_block, False)
741 return self.padding_block
744 def delete_bufferblock(self, locator):
745 self._delete_bufferblock(locator)
747 def _delete_bufferblock(self, locator):
748 bb = self._bufferblocks[locator]
750 del self._bufferblocks[locator]
752 def get_block_contents(self, locator, num_retries, cache_only=False):
755 First checks to see if the locator is a BufferBlock and return that, if
756 not, passes the request through to KeepClient.get().
760 if locator in self._bufferblocks:
761 bufferblock = self._bufferblocks[locator]
762 if bufferblock.state() != _BufferBlock.COMMITTED:
763 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
765 locator = bufferblock._locator
767 return self._keep.get_from_cache(locator)
769 return self._keep.get(locator, num_retries=num_retries)
771 def commit_all(self):
772 """Commit all outstanding buffer blocks.
774 This is a synchronous call, and will not return until all buffer blocks
775 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
778 self.repack_small_blocks(force=True, sync=True)
781 items = self._bufferblocks.items()
784 if v.state() != _BufferBlock.COMMITTED and v.owner:
785 v.owner.flush(sync=False)
788 if self._put_queue is not None:
789 self._put_queue.join()
793 if v.state() == _BufferBlock.ERROR:
794 err.append((v.locator(), v.error))
796 raise KeepWriteError("Error writing some blocks", err, label="block")
799 # flush again with sync=True to remove committed bufferblocks from
802 v.owner.flush(sync=True)
804 def block_prefetch(self, locator):
805 """Initiate a background download of a block.
807 This assumes that the underlying KeepClient implements a block cache,
808 so repeated requests for the same block will not result in repeated
809 downloads (unless the block is evicted from the cache.) This method
814 if not self.prefetch_enabled:
817 if self._keep.get_from_cache(locator) is not None:
821 if locator in self._bufferblocks:
824 self.start_get_threads()
825 self._prefetch_queue.put(locator)
828 class ArvadosFile(object):
829 """Represent a file in a Collection.
831 ArvadosFile manages the underlying representation of a file in Keep as a
832 sequence of segments spanning a set of blocks, and implements random
835 This object may be accessed from multiple threads.
839 def __init__(self, parent, name, stream=[], segments=[]):
841 ArvadosFile constructor.
844 a list of Range objects representing a block stream
847 a list of Range objects representing segments
851 self._writers = set()
852 self._committed = False
854 self.lock = parent.root_collection().lock
856 self._add_segment(stream, s.locator, s.range_size)
857 self._current_bblock = None
860 return self.parent.writable()
863 def permission_expired(self, as_of_dt=None):
864 """Returns True if any of the segment's locators is expired"""
865 for r in self._segments:
866 if KeepLocator(r.locator).permission_expired(as_of_dt):
872 return copy.copy(self._segments)
875 def clone(self, new_parent, new_name):
876 """Make a copy of this file."""
877 cp = ArvadosFile(new_parent, new_name)
878 cp.replace_contents(self)
883 def replace_contents(self, other):
884 """Replace segments of this file with segments from another `ArvadosFile` object."""
888 for other_segment in other.segments():
889 new_loc = other_segment.locator
890 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
891 if other_segment.locator not in map_loc:
892 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
893 if bufferblock.state() != _BufferBlock.WRITABLE:
894 map_loc[other_segment.locator] = bufferblock.locator()
896 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
897 new_loc = map_loc[other_segment.locator]
899 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
901 self.set_committed(False)
903 def __eq__(self, other):
906 if not isinstance(other, ArvadosFile):
909 othersegs = other.segments()
911 if len(self._segments) != len(othersegs):
913 for i in xrange(0, len(othersegs)):
914 seg1 = self._segments[i]
919 if self.parent._my_block_manager().is_bufferblock(loc1):
920 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
922 if other.parent._my_block_manager().is_bufferblock(loc2):
923 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
925 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
926 seg1.range_start != seg2.range_start or
927 seg1.range_size != seg2.range_size or
928 seg1.segment_offset != seg2.segment_offset):
933 def __ne__(self, other):
934 return not self.__eq__(other)
937 def set_segments(self, segs):
938 self._segments = segs
941 def set_committed(self, value=True):
942 """Set committed flag.
944 If value is True, set committed to be True.
946 If value is False, set committed to be False for this and all parents.
948 if value == self._committed:
950 self._committed = value
951 if self._committed is False and self.parent is not None:
952 self.parent.set_committed(False)
956 """Get whether this is committed or not."""
957 return self._committed
960 def add_writer(self, writer):
961 """Add an ArvadosFileWriter reference to the list of writers"""
962 if isinstance(writer, ArvadosFileWriter):
963 self._writers.add(writer)
966 def remove_writer(self, writer, flush):
968 Called from ArvadosFileWriter.close(). Remove a writer reference from the list
969 and do some block maintenance tasks.
971 self._writers.remove(writer)
973 if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
974 # File writer closed, not small enough for repacking
977 # All writers closed and size is adequate for repacking
978 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
982 Get whether this is closed or not. When the writers list is empty, the file
983 is supposed to be closed.
985 return len(self._writers) == 0
989 def truncate(self, size):
990 """Shrink or expand the size of the file.
992 If `size` is less than the size of the file, the file contents after
993 `size` will be discarded. If `size` is greater than the current size
994 of the file, it will be filled with zero bytes.
997 if size < self.size():
999 for r in self._segments:
1000 range_end = r.range_start+r.range_size
1001 if r.range_start >= size:
1002 # segment is past the trucate size, all done
1004 elif size < range_end:
1005 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1006 nr.segment_offset = r.segment_offset
1012 self._segments = new_segs
1013 self.set_committed(False)
1014 elif size > self.size():
1015 padding = self.parent._my_block_manager().get_padding_block()
1016 diff = size - self.size()
1017 while diff > config.KEEP_BLOCK_SIZE:
1018 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1019 diff -= config.KEEP_BLOCK_SIZE
1021 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1022 self.set_committed(False)
1024 # size == self.size()
1027 def readfrom(self, offset, size, num_retries, exact=False):
1028 """Read up to `size` bytes from the file starting at `offset`.
1031 If False (default), return less data than requested if the read
1032 crosses a block boundary and the next block isn't cached. If True,
1033 only return less data than requested when hitting EOF.
1037 if size == 0 or offset >= self.size():
1039 readsegs = locators_and_ranges(self._segments, offset, size)
1040 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1045 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1047 blockview = memoryview(block)
1048 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1049 locs.add(lr.locator)
1054 if lr.locator not in locs:
1055 self.parent._my_block_manager().block_prefetch(lr.locator)
1056 locs.add(lr.locator)
1058 return ''.join(data)
1062 def writeto(self, offset, data, num_retries):
1063 """Write `data` to the file starting at `offset`.
1065 This will update existing bytes and/or extend the size of the file as
1072 if offset > self.size():
1073 self.truncate(offset)
1075 if len(data) > config.KEEP_BLOCK_SIZE:
1076 # Chunk it up into smaller writes
1078 dataview = memoryview(data)
1079 while n < len(data):
1080 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1081 n += config.KEEP_BLOCK_SIZE
1084 self.set_committed(False)
1086 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1087 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1089 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1090 self._current_bblock.repack_writes()
1091 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1092 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1093 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1095 self._current_bblock.append(data)
1097 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1099 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1104 def flush(self, sync=True, num_retries=0):
1105 """Flush the current bufferblock to Keep.
1108 If True, commit block synchronously, wait until buffer block has been written.
1109 If False, commit block asynchronously, return immediately after putting block into
1112 if self.committed():
1115 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1116 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1117 self._current_bblock.repack_writes()
1118 if self._current_bblock.state() != _BufferBlock.DELETED:
1119 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1123 for s in self._segments:
1124 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1126 if bb.state() != _BufferBlock.COMMITTED:
1127 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1128 to_delete.add(s.locator)
1129 s.locator = bb.locator()
1131 self.parent._my_block_manager().delete_bufferblock(s)
1133 self.parent.notify(MOD, self.parent, self.name, (self, self))
1137 def add_segment(self, blocks, pos, size):
1138 """Add a segment to the end of the file.
1140 `pos` and `offset` reference a section of the stream described by
1141 `blocks` (a list of Range objects)
1144 self._add_segment(blocks, pos, size)
1146 def _add_segment(self, blocks, pos, size):
1147 """Internal implementation of add_segment."""
1148 self.set_committed(False)
1149 for lr in locators_and_ranges(blocks, pos, size):
1150 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1151 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1152 self._segments.append(r)
1156 """Get the file size."""
1158 n = self._segments[-1]
1159 return n.range_start + n.range_size
1164 def manifest_text(self, stream_name=".", portable_locators=False,
1165 normalize=False, only_committed=False):
1168 for segment in self._segments:
1169 loc = segment.locator
1170 if self.parent._my_block_manager().is_bufferblock(loc):
1173 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1174 if portable_locators:
1175 loc = KeepLocator(loc).stripped()
1176 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1177 segment.segment_offset, segment.range_size))
1178 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1184 def _reparent(self, newparent, newname):
1185 self.set_committed(False)
1186 self.flush(sync=True)
1187 self.parent.remove(self.name)
1188 self.parent = newparent
1190 self.lock = self.parent.root_collection().lock
1193 class ArvadosFileReader(ArvadosFileReaderBase):
1194 """Wraps ArvadosFile in a file-like object supporting reading only.
1196 Be aware that this class is NOT thread safe as there is no locking around
1197 updating file pointer.
1201 def __init__(self, arvadosfile, num_retries=None):
1202 super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1203 self.arvadosfile = arvadosfile
1206 return self.arvadosfile.size()
1208 def stream_name(self):
1209 return self.arvadosfile.parent.stream_name()
1211 @_FileLikeObjectBase._before_close
1213 def read(self, size=None, num_retries=None):
1214 """Read up to `size` bytes from the file and return the result.
1216 Starts at the current file position. If `size` is None, read the
1217 entire remainder of the file.
1221 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1224 self._filepos += len(rd)
1225 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1226 return ''.join(data)
1228 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1229 self._filepos += len(data)
1232 @_FileLikeObjectBase._before_close
1234 def readfrom(self, offset, size, num_retries=None):
1235 """Read up to `size` bytes from the stream, starting at the specified file offset.
1237 This method does not change the file position.
1239 return self.arvadosfile.readfrom(offset, size, num_retries)
1245 class ArvadosFileWriter(ArvadosFileReader):
1246 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1248 Be aware that this class is NOT thread safe as there is no locking around
1249 updating file pointer.
1253 def __init__(self, arvadosfile, mode, num_retries=None):
1254 super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1256 self.arvadosfile.add_writer(self)
1261 @_FileLikeObjectBase._before_close
1263 def write(self, data, num_retries=None):
1264 if self.mode[0] == "a":
1265 self.arvadosfile.writeto(self.size(), data, num_retries)
1267 self.arvadosfile.writeto(self._filepos, data, num_retries)
1268 self._filepos += len(data)
1271 @_FileLikeObjectBase._before_close
1273 def writelines(self, seq, num_retries=None):
1275 self.write(s, num_retries=num_retries)
1277 @_FileLikeObjectBase._before_close
1278 def truncate(self, size=None):
1280 size = self._filepos
1281 self.arvadosfile.truncate(size)
1283 @_FileLikeObjectBase._before_close
1285 self.arvadosfile.flush()
1287 def close(self, flush=True):
1289 self.arvadosfile.remove_writer(self, flush)
1290 super(ArvadosFileWriter, self).close()