+ def writable(self):
+ return self.parent.writable()
+
+ @synchronized
+ def segments(self):
+ return copy.copy(self._segments)
+
+ @synchronized
+ def clone(self, new_parent, new_name):
+ """Make a copy of this file."""
+ cp = ArvadosFile(new_parent, new_name)
+ cp.replace_contents(self)
+ return cp
+
+ @must_be_writable
+ @synchronized
+ def replace_contents(self, other):
+ """Replace segments of this file with segments from another `ArvadosFile` object."""
+
+ map_loc = {}
+ self._segments = []
+ for other_segment in other.segments():
+ new_loc = other_segment.locator
+ if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
+ if other_segment.locator not in map_loc:
+ bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
+ if bufferblock.state() != _BufferBlock.WRITABLE:
+ map_loc[other_segment.locator] = bufferblock.locator()
+ else:
+ map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
+ new_loc = map_loc[other_segment.locator]
+
+ self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
+
+ self._committed = False
+
+ def __eq__(self, other):
+ if other is self:
+ return True
+ if not isinstance(other, ArvadosFile):
+ return False
+
+ othersegs = other.segments()
+ with self.lock:
+ if len(self._segments) != len(othersegs):
+ return False
+ for i in xrange(0, len(othersegs)):
+ seg1 = self._segments[i]
+ seg2 = othersegs[i]
+ loc1 = seg1.locator
+ loc2 = seg2.locator
+
+ if self.parent._my_block_manager().is_bufferblock(loc1):
+ loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
+
+ if other.parent._my_block_manager().is_bufferblock(loc2):
+ loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
+
+ if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
+ seg1.range_start != seg2.range_start or
+ seg1.range_size != seg2.range_size or
+ seg1.segment_offset != seg2.segment_offset):
+ return False
+
+ return True
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ @synchronized
+ def set_committed(self):
+ """Set committed flag to False"""
+ self._committed = True
+
+ @synchronized
+ def committed(self):
+ """Get whether this is committed or not."""
+ return self._committed
+
+ @must_be_writable
+ @synchronized
+ def truncate(self, size):
+ """Shrink the size of the file.
+
+ If `size` is less than the size of the file, the file contents after
+ `size` will be discarded. If `size` is greater than the current size
+ of the file, an IOError will be raised.
+
+ """
+ if size < self.size():
+ new_segs = []
+ for r in self._segments:
+ range_end = r.range_start+r.range_size
+ if r.range_start >= size:
+ # segment is past the trucate size, all done
+ break
+ elif size < range_end:
+ nr = Range(r.locator, r.range_start, size - r.range_start, 0)
+ nr.segment_offset = r.segment_offset
+ new_segs.append(nr)
+ break
+ else:
+ new_segs.append(r)
+
+ self._segments = new_segs
+ self._committed = False
+ elif size > self.size():
+ raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
+
+ def readfrom(self, offset, size, num_retries, exact=False):
+ """Read up to `size` bytes from the file starting at `offset`.
+
+ :exact:
+ If False (default), return less data than requested if the read
+ crosses a block boundary and the next block isn't cached. If True,
+ only return less data than requested when hitting EOF.
+ """
+
+ with self.lock:
+ if size == 0 or offset >= self.size():
+ return ''
+ readsegs = locators_and_ranges(self._segments, offset, size)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+
+ locs = set()
+ data = []
+ for lr in readsegs:
+ block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
+ if block:
+ blockview = memoryview(block)
+ data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+ locs.add(lr.locator)