Merge branch '3198-inode-cache' refs #3198
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 11 May 2015 15:57:52 +0000 (11:57 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 11 May 2015 15:57:52 +0000 (11:57 -0400)
1  2 
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py

index f43caf364b4f39e0da923c0b8497527f358e89ba,ce342b5a41a887ec90cf6a4a979af06cd669e5e7..2e0eaa8ce0f645ca305826851c5eb7ddccb434e2
@@@ -305,7 -305,7 +305,7 @@@ class _BufferBlock(object)
                  self.buffer_view = None
                  self.buffer_block = None
          else:
 -            raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
 +            raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
  
      @synchronized
      def state(self):
@@@ -484,10 -484,9 +484,10 @@@ class _BlockManager(object)
                      thread.daemon = True
                      thread.start()
  
 -        # Mark the block as PENDING so to disallow any more appends.
 -        block.set_state(_BufferBlock.PENDING)
 -        self._put_queue.put(block)
 +        if block.state() == _BufferBlock.WRITABLE:
 +            # Mark the block as PENDING so to disallow any more appends.
 +            block.set_state(_BufferBlock.PENDING)
 +            self._put_queue.put(block)
  
      @synchronized
      def get_bufferblock(self, locator):
              items = self._bufferblocks.items()
  
          for k,v in items:
 -            if v.state() == _BufferBlock.WRITABLE:
 -                self.commit_bufferblock(v)
 +            v.owner.flush()
  
          with self.lock:
              if self._put_queue is not None:
@@@ -703,7 -703,7 +703,7 @@@ class ArvadosFile(object)
                      # segment is past the trucate size, all done
                      break
                  elif size < range_end:
-                     nr = Range(r.locator, r.range_start, size - r.range_start)
+                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
                      nr.segment_offset = r.segment_offset
                      new_segs.append(nr)
                      break
          elif size > self.size():
              raise IOError("truncate() does not support extending the file size")
  
 -    def readfrom(self, offset, size, num_retries):
 -        """Read upto `size` bytes from the file starting at `offset`."""
 +
 +    def readfrom(self, offset, size, num_retries, exact=False):
 +        """Read up to `size` bytes from the file starting at `offset`.
 +
 +        :exact:
 +         If False (default), return less data than requested if the read
 +         crosses a block boundary and the next block isn't cached.  If True,
 +         only return less data than requested when hitting EOF.
 +        """
  
          with self.lock:
              if size == 0 or offset >= self.size():
  
          data = []
          for lr in readsegs:
 -            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
 +            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
              if block:
                  data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
              else:
                  break
          return ''.join(data)
  
 -    def _repack_writes(self):
 +    def _repack_writes(self, num_retries):
          """Test if the buffer block has more data than actual segments.
  
          This happens when a buffered write over-writes a file range written in
          if write_total < self._current_bblock.size():
              # There is more data in the buffer block than is actually accounted for by segments, so
              # re-pack into a new buffer by copying over to a new buffer block.
 +            contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
              new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
              for t in bufferblock_segs:
 -                new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
 +                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
                  t.segment_offset = new_bb.size() - t.range_size
  
              self._current_bblock = new_bb
              self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
  
          if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
 -            self._repack_writes()
 +            self._repack_writes(num_retries)
              if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
                  self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
                  self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
          replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
  
      @synchronized
 -    def flush(self):
 +    def flush(self, num_retries=0):
          if self._current_bblock:
 -            self._repack_writes()
 +            self._repack_writes(num_retries)
              self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
  
      @must_be_writable
          """Internal implementation of add_segment."""
          self._modified = True
          for lr in locators_and_ranges(blocks, pos, size):
-             last = self._segments[-1] if self._segments else Range(0, 0, 0)
+             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
              r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
              self._segments.append(r)
  
@@@ -875,7 -867,7 +875,7 @@@ class ArvadosFileReader(ArvadosFileRead
      @retry_method
      def read(self, size, num_retries=None):
          """Read up to `size` bytes from the stream, starting at the current file position."""
 -        data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
 +        data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
          self._filepos += len(data)
          return data
  
index 11eb666478525e853ac2b347ca424c01c9948ea7,f03deedb18aece57b374a2e94c0e29d26b42319c..30828732d8d4908ca0922bc780c11d2a6943578e
@@@ -822,48 -822,12 +822,48 @@@ class RichCollectionBase(CollectionBase
  
          target_dir.add(source_obj, target_name, overwrite)
  
 -    @synchronized
 +    def portable_manifest_text(self, stream_name="."):
 +        """Get the manifest text for this collection, sub collections and files.
 +
 +        This method does not flush outstanding blocks to Keep.  It will return
 +        a normalized manifest with access tokens stripped.
 +
 +        :stream_name:
 +          Name to use for this stream (directory)
 +
 +        """
 +        return self._get_manifest_text(stream_name, True, True)
 +
      def manifest_text(self, stream_name=".", strip=False, normalize=False):
          """Get the manifest text for this collection, sub collections and files.
  
 +        This method will flush outstanding blocks to Keep.  By default, it will
 +        not normalize an unmodified manifest or strip access tokens.
 +
          :stream_name:
 -          Name of the stream (directory)
 +          Name to use for this stream (directory)
 +
 +        :strip:
 +          If True, remove signing tokens from block locators if present.
 +          If False (default), block locators are left unchanged.
 +
 +        :normalize:
 +          If True, always export the manifest text in normalized form
 +          even if the Collection is not modified.  If False (default) and the collection
 +          is not modified, return the original manifest text even if it is not
 +          in normalized form.
 +
 +        """
 +
 +        self._my_block_manager().commit_all()
 +        return self._get_manifest_text(stream_name, strip, normalize)
 +
 +    @synchronized
 +    def _get_manifest_text(self, stream_name, strip, normalize):
 +        """Get the manifest text for this collection, sub collections and files.
 +
 +        :stream_name:
 +          Name to use for this stream (directory)
  
          :strip:
            If True, remove signing tokens from block locators if present.
              if stream:
                  buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
              for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 -                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
 +                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
              return "".join(buf)
          else:
              if strip:
  
      def portable_data_hash(self):
          """Get the portable data hash for this collection's manifest."""
 -        stripped = self.manifest_text(strip=True)
 +        stripped = self.portable_manifest_text()
          return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
  
      @synchronized
@@@ -1285,8 -1249,8 +1285,8 @@@ class Collection(RichCollectionBase)
          """Save collection to an existing collection record.
  
          Commit pending buffer blocks to Keep, merge with remote record (if
 -        merge=True, the default), write the manifest to Keep, and update the
 -        collection record.
 +        merge=True, the default), and update the collection record.  Returns
 +        the current manifest text.
  
          Will raise AssertionError if not associated with a collection record on
          the API server.  If you want to save a manifest to Keep only, see
          if self.modified():
              if not self._has_collection_uuid():
                  raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
 +
              self._my_block_manager().commit_all()
 +
              if merge:
                  self.update()
 -            self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
  
              text = self.manifest_text(strip=False)
              self._api_response = self._my_api().collections().update(
              self._manifest_text = self._api_response["manifest_text"]
              self.set_unmodified()
  
 +        return self._manifest_text
 +
  
      @must_be_writable
      @synchronized
      @retry_method
 -    def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
 +    def save_new(self, name=None,
 +                 create_collection_record=True,
 +                 owner_uuid=None,
 +                 ensure_unique_name=False,
 +                 num_retries=None):
          """Save collection to a new collection record.
  
 -        Commit pending buffer blocks to Keep, write the manifest to Keep, and
 -        create a new collection record (if create_collection_record True).
 -        After creating a new collection record, this Collection object will be
 -        associated with the new record used by `save()`.
 +        Commit pending buffer blocks to Keep and, when create_collection_record
 +        is True (default), create a new collection record.  After creating a
 +        new collection record, this Collection object will be associated with
 +        the new record used by `save()`.  Returns the current manifest text.
  
          :name:
            The collection name.
  
          :create_collection_record:
 -          If True, create a collection record.  If False, only save the manifest to keep.
 +           If True, create a collection record on the API server.
 +           If False, only commit blocks to Keep and return the manifest text.
  
          :owner_uuid:
            the user, or project uuid that will own this collection.
  
          """
          self._my_block_manager().commit_all()
 -        self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
          text = self.manifest_text(strip=False)
  
          if create_collection_record:
  
              self._manifest_locator = self._api_response["uuid"]
  
 -        self._manifest_text = text
 -        self.set_unmodified()
 +            self._manifest_text = text
 +            self.set_unmodified()
 +
 +        return text
  
      @synchronized
      def subscribe(self, callback):
                  block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                  if block_locator:
                      blocksize = long(block_locator.group(1))
-                     blocks.append(Range(tok, streamoffset, blocksize))
+                     blocks.append(Range(tok, streamoffset, blocksize, 0))
                      streamoffset += blocksize
                  else:
                      state = SEGMENTS