refs #13667 "Merge branch '13667-add-version-file'".
[arvados.git] / sdk / python / arvados / collection.py
index 30828732d8d4908ca0922bc780c11d2a6943578e..e38a6bd475c7b8a4aee7787a537a40656fd93b36 100644 (file)
@@ -1,9 +1,20 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from future.utils import listitems, listvalues, viewkeys
+from builtins import str
+from past.builtins import basestring
+from builtins import object
 import functools
 import logging
 import os
 import re
 import errno
 import hashlib
 import functools
 import logging
 import os
 import re
 import errno
 import hashlib
+import datetime
+import ciso8601
 import time
 import threading
 
 import time
 import threading
 
@@ -11,20 +22,22 @@ from collections import deque
 from stat import *
 
 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
 from stat import *
 
 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
-from keep import KeepLocator, KeepClient
+from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
 from ._normalize_stream import normalize_stream
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
 from .stream import StreamReader
 from ._normalize_stream import normalize_stream
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
-import config
-import errors
-import util
-import events
+import arvados.config as config
+import arvados.errors as errors
+import arvados.util
+import arvados.events as events
 from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
 class CollectionBase(object):
 from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
 class CollectionBase(object):
+    """Abstract base class for Collection classes."""
+
     def __enter__(self):
         return self
 
     def __enter__(self):
         return self
 
@@ -51,7 +64,7 @@ class CollectionBase(object):
             if fields:
                 clean_fields = fields[:1] + [
                     (re.sub(r'\+[^\d][^\+]*', '', x)
             if fields:
                 clean_fields = fields[:1] + [
                     (re.sub(r'\+[^\d][^\+]*', '', x)
-                     if re.match(util.keep_locator_pattern, x)
+                     if re.match(arvados.util.keep_locator_pattern, x)
                      else x)
                     for x in fields[1:]]
                 clean += [' '.join(clean_fields), "\n"]
                      else x)
                     for x in fields[1:]]
                 clean += [' '.join(clean_fields), "\n"]
@@ -82,6 +95,8 @@ class _WriterFile(_FileLikeObjectBase):
 
 
 class CollectionWriter(CollectionBase):
 
 
 class CollectionWriter(CollectionBase):
+    """Deprecated, use Collection instead."""
+
     def __init__(self, api_client=None, num_retries=0, replication=None):
         """Instantiate a CollectionWriter.
 
     def __init__(self, api_client=None, num_retries=0, replication=None):
         """Instantiate a CollectionWriter.
 
@@ -180,7 +195,7 @@ class CollectionWriter(CollectionBase):
 
     def _work_trees(self):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
 
     def _work_trees(self):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
-        d = util.listdir_recursive(
+        d = arvados.util.listdir_recursive(
             path, max_depth = (None if max_manifest_depth == 0 else 0))
         if d:
             self._queue_dirents(stream_name, d)
             path, max_depth = (None if max_manifest_depth == 0 else 0))
         if d:
             self._queue_dirents(stream_name, d)
@@ -216,7 +231,11 @@ class CollectionWriter(CollectionBase):
         self.do_queued_work()
 
     def write(self, newdata):
         self.do_queued_work()
 
     def write(self, newdata):
-        if hasattr(newdata, '__iter__'):
+        if isinstance(newdata, bytes):
+            pass
+        elif isinstance(newdata, str):
+            newdata = newdata.encode()
+        elif hasattr(newdata, '__iter__'):
             for s in newdata:
                 self.write(s)
             return
             for s in newdata:
                 self.write(s)
             return
@@ -256,7 +275,7 @@ class CollectionWriter(CollectionBase):
         return self._last_open
 
     def flush_data(self):
         return self._last_open
 
     def flush_data(self):
-        data_buffer = ''.join(self._data_buffer)
+        data_buffer = b''.join(self._data_buffer)
         if data_buffer:
             self._current_stream_locators.append(
                 self._my_keep().put(
         if data_buffer:
             self._current_stream_locators.append(
                 self._my_keep().put(
@@ -307,7 +326,8 @@ class CollectionWriter(CollectionBase):
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
-                "Manifest stream names cannot contain whitespace")
+                "Manifest stream names cannot contain whitespace: '%s'" %
+                (newstreamname))
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
@@ -345,11 +365,12 @@ class CollectionWriter(CollectionBase):
         sending manifest_text() to the API server's "create
         collection" endpoint.
         """
         sending manifest_text() to the API server's "create
         collection" endpoint.
         """
-        return self._my_keep().put(self.manifest_text(), copies=self.replication)
+        return self._my_keep().put(self.manifest_text().encode(),
+                                   copies=self.replication)
 
     def portable_data_hash(self):
 
     def portable_data_hash(self):
-        stripped = self.stripped_manifest()
-        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+        stripped = self.stripped_manifest().encode()
+        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 
     def manifest_text(self):
         self.finish_current_stream()
 
     def manifest_text(self):
         self.finish_current_stream()
@@ -371,8 +392,18 @@ class CollectionWriter(CollectionBase):
             ret += locators
         return ret
 
             ret += locators
         return ret
 
+    def save_new(self, name=None):
+        return self._api_client.collections().create(
+            ensure_unique_name=True,
+            body={
+                'name': name,
+                'manifest_text': self.manifest_text(),
+            }).execute(num_retries=self.num_retries)
+
 
 class ResumableCollectionWriter(CollectionWriter):
 
 class ResumableCollectionWriter(CollectionWriter):
+    """Deprecated, use Collection instead."""
+
     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
                    '_current_stream_locators', '_current_stream_name',
                    '_current_file_name', '_current_file_pos', '_close_file',
     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
                    '_current_stream_locators', '_current_stream_name',
                    '_current_file_name', '_current_file_pos', '_close_file',
@@ -417,7 +448,7 @@ class ResumableCollectionWriter(CollectionWriter):
         return writer
 
     def check_dependencies(self):
         return writer
 
     def check_dependencies(self):
-        for path, orig_stat in self._dependencies.items():
+        for path, orig_stat in listitems(self._dependencies):
             if not S_ISREG(orig_stat[ST_MODE]):
                 raise errors.StaleWriterStateError("{} not file".format(path))
             try:
             if not S_ISREG(orig_stat[ST_MODE]):
                 raise errors.StaleWriterStateError("{} not file".format(path))
             try:
@@ -474,6 +505,7 @@ class ResumableCollectionWriter(CollectionWriter):
 ADD = "add"
 DEL = "del"
 MOD = "mod"
 ADD = "add"
 DEL = "del"
 MOD = "mod"
+TOK = "tok"
 FILE = "file"
 COLLECTION = "collection"
 
 FILE = "file"
 COLLECTION = "collection"
 
@@ -487,7 +519,8 @@ class RichCollectionBase(CollectionBase):
 
     def __init__(self, parent=None):
         self.parent = parent
 
     def __init__(self, parent=None):
         self.parent = parent
-        self._modified = True
+        self._committed = False
+        self._callback = None
         self._items = {}
 
     def _my_api(self):
         self._items = {}
 
     def _my_api(self):
@@ -537,24 +570,24 @@ class RichCollectionBase(CollectionBase):
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
-                        item = Subcollection(self)
+                        item = Subcollection(self, pathcomponents[0])
                     else:
                     else:
-                        item = ArvadosFile(self)
+                        item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._items[pathcomponents[0]] = item
-                    self._modified = True
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
                 if item is None:
                     # create new collection
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
                 if item is None:
                     # create new collection
-                    item = Subcollection(self)
+                    item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._items[pathcomponents[0]] = item
-                    self._modified = True
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
-                    raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
         else:
             return self
 
         else:
             return self
 
@@ -562,16 +595,23 @@ class RichCollectionBase(CollectionBase):
     def find(self, path):
         """Recursively search the specified file path.
 
     def find(self, path):
         """Recursively search the specified file path.
 
-        May return either a Collection or ArvadosFile.  Return None if not
+        May return either a Collection or ArvadosFile. Return None if not
         found.
         found.
+        If path is invalid (ex: starts with '/'), an IOError exception will be
+        raised.
 
         """
         if not path:
 
         """
         if not path:
-            raise errors.ArgumentError("Parameter 'path' must not be empty.")
+            raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
 
         pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
         item = self._items.get(pathcomponents[0])
         item = self._items.get(pathcomponents[0])
-        if len(pathcomponents) == 1:
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
             return item
         else:
             if isinstance(item, RichCollectionBase):
             return item
         else:
             if isinstance(item, RichCollectionBase):
@@ -580,15 +620,20 @@ class RichCollectionBase(CollectionBase):
                 else:
                     return item
             else:
                 else:
                     return item
             else:
-                raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 
 
-    def mkdirs(path):
+    @synchronized
+    def mkdirs(self, path):
         """Recursive subcollection create.
 
         """Recursive subcollection create.
 
-        Like `os.mkdirs()`.  Will create intermediate subcollections needed to
-        contain the leaf subcollection path.
+        Like `os.makedirs()`.  Will create intermediate subcollections needed
+        to contain the leaf subcollection path.
 
         """
 
         """
+
+        if self.find(path) != None:
+            raise IOError(errno.EEXIST, "Directory or file exists", path)
+
         return self.find_or_create(path, COLLECTION)
 
     def open(self, path, mode="r"):
         return self.find_or_create(path, COLLECTION)
 
     def open(self, path, mode="r"):
@@ -597,7 +642,12 @@ class RichCollectionBase(CollectionBase):
         :path:
           path to a file in the collection
         :mode:
         :path:
           path to a file in the collection
         :mode:
-          one of "r", "r+", "w", "w+", "a", "a+"
+          a string consisting of "r", "w", or "a", optionally followed
+          by "b" or "t", optionally followed by "+".
+          :"b":
+            binary mode: write() accepts bytes, read() returns bytes.
+          :"t":
+            text mode (default): write() accepts strings, read() returns strings.
           :"r":
             opens for reading
           :"r+":
           :"r":
             opens for reading
           :"r+":
@@ -609,55 +659,61 @@ class RichCollectionBase(CollectionBase):
             the end of the file.  Writing does not affect the file pointer for
             reading.
         """
             the end of the file.  Writing does not affect the file pointer for
             reading.
         """
-        mode = mode.replace("b", "")
-        if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
-            raise errors.ArgumentError("Bad mode '%s'" % mode)
-        create = (mode != "r")
 
 
-        if create and not self.writable():
-            raise IOError((errno.EROFS, "Collection is read only"))
+        if not re.search(r'^[rwa][bt]?\+?$', mode):
+            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 
 
-        if create:
-            arvfile = self.find_or_create(path, FILE)
-        else:
+        if mode[0] == 'r' and '+' not in mode:
+            fclass = ArvadosFileReader
             arvfile = self.find(path)
             arvfile = self.find(path)
+        elif not self.writable():
+            raise IOError(errno.EROFS, "Collection is read only")
+        else:
+            fclass = ArvadosFileWriter
+            arvfile = self.find_or_create(path, FILE)
 
         if arvfile is None:
 
         if arvfile is None:
-            raise IOError((errno.ENOENT, "File not found"))
+            raise IOError(errno.ENOENT, "File not found", path)
         if not isinstance(arvfile, ArvadosFile):
         if not isinstance(arvfile, ArvadosFile):
-            raise IOError((errno.EISDIR, "Path must refer to a file."))
+            raise IOError(errno.EISDIR, "Is a directory", path)
 
 
-        if mode[0] == "w":
+        if mode[0] == 'w':
             arvfile.truncate(0)
 
             arvfile.truncate(0)
 
-        name = os.path.basename(path)
+        return fclass(arvfile, mode=mode, num_retries=self.num_retries)
 
 
-        if mode == "r":
-            return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
-        else:
-            return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
+    def modified(self):
+        """Determine if the collection has been modified since last commited."""
+        return not self.committed()
 
     @synchronized
 
     @synchronized
-    def modified(self):
-        """Test if the collection (or any subcollection or file) has been modified."""
-        if self._modified:
-            return True
-        for k,v in self._items.items():
-            if v.modified():
-                return True
-        return False
+    def committed(self):
+        """Determine if the collection has been committed to the API server."""
+        return self._committed
 
     @synchronized
 
     @synchronized
-    def set_unmodified(self):
-        """Recursively clear modified flag."""
-        self._modified = False
-        for k,v in self._items.items():
-            v.set_unmodified()
+    def set_committed(self, value=True):
+        """Recursively set committed flag.
+
+        If value is True, set committed to be True for this and all children.
+
+        If value is False, set committed to be False for this and all parents.
+        """
+        if value == self._committed:
+            return
+        if value:
+            for k,v in listitems(self._items):
+                v.set_committed(True)
+            self._committed = True
+        else:
+            self._committed = False
+            if self.parent is not None:
+                self.parent.set_committed(False)
 
     @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
 
     @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
-        return iter(self._items.keys())
+        return iter(viewkeys(self._items))
 
     @synchronized
     def __getitem__(self, k):
 
     @synchronized
     def __getitem__(self, k):
@@ -683,7 +739,7 @@ class RichCollectionBase(CollectionBase):
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
-        self._modified = True
+        self.set_committed(False)
         self.notify(DEL, self, p, None)
 
     @synchronized
         self.notify(DEL, self, p, None)
 
     @synchronized
@@ -694,12 +750,12 @@ class RichCollectionBase(CollectionBase):
     @synchronized
     def values(self):
         """Get a list of files and collection objects directly contained in this collection."""
     @synchronized
     def values(self):
         """Get a list of files and collection objects directly contained in this collection."""
-        return self._items.values()
+        return listvalues(self._items)
 
     @synchronized
     def items(self):
         """Get a list of (name, object) tuples directly contained in this collection."""
 
     @synchronized
     def items(self):
         """Get a list of (name, object) tuples directly contained in this collection."""
-        return self._items.items()
+        return listitems(self._items)
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
@@ -715,33 +771,33 @@ class RichCollectionBase(CollectionBase):
         """
 
         if not path:
         """
 
         if not path:
-            raise errors.ArgumentError("Parameter 'path' must not be empty.")
+            raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
 
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
-            raise IOError((errno.ENOENT, "File not found"))
+            raise IOError(errno.ENOENT, "File not found", path)
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
+                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
-            self._modified = True
+            self.set_committed(False)
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
 
     def _clonefrom(self, source):
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
 
     def _clonefrom(self, source):
-        for k,v in source.items():
-            self._items[k] = v.clone(self)
+        for k,v in listitems(source):
+            self._items[k] = v.clone(self, k)
 
     def clone(self):
         raise NotImplementedError()
 
     @must_be_writable
     @synchronized
 
     def clone(self):
         raise NotImplementedError()
 
     @must_be_writable
     @synchronized
-    def add(self, source_obj, target_name, overwrite=False):
-        """Copy a file or subcollection to this collection.
+    def add(self, source_obj, target_name, overwrite=False, reparent=False):
+        """Copy or move a file or subcollection to this collection.
 
         :source_obj:
           An ArvadosFile, or Subcollection object
 
         :source_obj:
           An ArvadosFile, or Subcollection object
@@ -753,24 +809,74 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
 
         :overwrite:
           Whether to overwrite target file if it already exists.
 
+        :reparent:
+          If True, source_obj will be moved from its parent collection to this collection.
+          If False, source_obj will be copied and the parent collection will be
+          unmodified.
+
         """
 
         if target_name in self and not overwrite:
         """
 
         if target_name in self and not overwrite:
-            raise IOError((errno.EEXIST, "File already exists"))
+            raise IOError(errno.EEXIST, "File already exists", target_name)
 
         modified_from = None
         if target_name in self:
             modified_from = self[target_name]
 
 
         modified_from = None
         if target_name in self:
             modified_from = self[target_name]
 
-        # Actually make the copy.
-        dup = source_obj.clone(self)
-        self._items[target_name] = dup
-        self._modified = True
+        # Actually make the move or copy.
+        if reparent:
+            source_obj._reparent(self, target_name)
+            item = source_obj
+        else:
+            item = source_obj.clone(self, target_name)
+
+        self._items[target_name] = item
+        self.set_committed(False)
 
         if modified_from:
 
         if modified_from:
-            self.notify(MOD, self, target_name, (modified_from, dup))
+            self.notify(MOD, self, target_name, (modified_from, item))
+        else:
+            self.notify(ADD, self, target_name, item)
+
+    def _get_src_target(self, source, target_path, source_collection, create_dest):
+        if source_collection is None:
+            source_collection = self
+
+        # Find the object
+        if isinstance(source, basestring):
+            source_obj = source_collection.find(source)
+            if source_obj is None:
+                raise IOError(errno.ENOENT, "File not found", source)
+            sourcecomponents = source.split("/")
         else:
         else:
-            self.notify(ADD, self, target_name, dup)
+            source_obj = source
+            sourcecomponents = None
+
+        # Find parent collection the target path
+        targetcomponents = target_path.split("/")
+
+        # Determine the name to use.
+        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
+
+        if not target_name:
+            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+
+        if create_dest:
+            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        else:
+            if len(targetcomponents) > 1:
+                target_dir = self.find("/".join(targetcomponents[0:-1]))
+            else:
+                target_dir = self
+
+        if target_dir is None:
+            raise IOError(errno.ENOENT, "Target directory not found", target_name)
+
+        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
+            target_dir = target_dir[target_name]
+            target_name = sourcecomponents[-1]
+
+        return (source_obj, target_dir, target_name)
 
     @must_be_writable
     @synchronized
 
     @must_be_writable
     @synchronized
@@ -792,35 +898,35 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
         """
         :overwrite:
           Whether to overwrite target file if it already exists.
         """
-        if source_collection is None:
-            source_collection = self
 
 
-        # Find the object to copy
-        if isinstance(source, basestring):
-            source_obj = source_collection.find(source)
-            if source_obj is None:
-                raise IOError((errno.ENOENT, "File not found"))
-            sourcecomponents = source.split("/")
-        else:
-            source_obj = source
-            sourcecomponents = None
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+        target_dir.add(source_obj, target_name, overwrite, False)
 
 
-        # Find parent collection the target path
-        targetcomponents = target_path.split("/")
+    @must_be_writable
+    @synchronized
+    def rename(self, source, target_path, source_collection=None, overwrite=False):
+        """Move a file or subcollection from `source_collection` to a new path in this collection.
 
 
-        # Determine the name to use.
-        target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
+        :source:
+          A string with a path to source file or subcollection.
 
 
-        if not target_name:
-            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
 
 
-        target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
 
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
-            target_dir = target_dir[target_name]
-            target_name = sourcecomponents[-1]
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+        """
 
 
-        target_dir.add(source_obj, target_name, overwrite)
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+        if not source_obj.writable():
+            raise IOError(errno.EROFS, "Source collection is read only", source)
+        target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
         """Get the manifest text for this collection, sub collections and files.
 
     def portable_manifest_text(self, stream_name="."):
         """Get the manifest text for this collection, sub collections and files.
@@ -834,7 +940,9 @@ class RichCollectionBase(CollectionBase):
         """
         return self._get_manifest_text(stream_name, True, True)
 
         """
         return self._get_manifest_text(stream_name, True, True)
 
-    def manifest_text(self, stream_name=".", strip=False, normalize=False):
+    @synchronized
+    def manifest_text(self, stream_name=".", strip=False, normalize=False,
+                      only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         This method will flush outstanding blocks to Keep.  By default, it will
         """Get the manifest text for this collection, sub collections and files.
 
         This method will flush outstanding blocks to Keep.  By default, it will
@@ -853,13 +961,18 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, don't commit pending blocks.
+
         """
 
         """
 
-        self._my_block_manager().commit_all()
-        return self._get_manifest_text(stream_name, strip, normalize)
+        if not only_committed:
+            self._my_block_manager().commit_all()
+        return self._get_manifest_text(stream_name, strip, normalize,
+                                       only_committed=only_committed)
 
     @synchronized
 
     @synchronized
-    def _get_manifest_text(self, stream_name, strip, normalize):
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
@@ -875,9 +988,12 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
         """
 
         """
 
-        if self.modified() or self._manifest_text is None or normalize:
+        if not self.committed() or self._manifest_text is None or normalize:
             stream = {}
             buf = []
             sorted_keys = sorted(self.keys())
             stream = {}
             buf = []
             sorted_keys = sorted(self.keys())
@@ -888,6 +1004,8 @@ class RichCollectionBase(CollectionBase):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
@@ -897,7 +1015,7 @@ class RichCollectionBase(CollectionBase):
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
-                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
             return "".join(buf)
         else:
             if strip:
             return "".join(buf)
         else:
             if strip:
@@ -917,15 +1035,17 @@ class RichCollectionBase(CollectionBase):
             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
-               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
+               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
         for k in end_collection:
             if k in self:
                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
         for k in end_collection:
             if k in self:
                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
-                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
+                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+                else:
+                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
             else:
             else:
-                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
+                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
 
     @must_be_writable
         return changes
 
     @must_be_writable
@@ -937,12 +1057,14 @@ class RichCollectionBase(CollectionBase):
         alternate path indicating the conflict.
 
         """
         alternate path indicating the conflict.
 
         """
+        if changes:
+            self.set_committed(False)
         for change in changes:
             event_type = change[0]
             path = change[1]
             initial = change[2]
             local = self.find(path)
         for change in changes:
             event_type = change[0]
             path = change[1]
             initial = change[2]
             local = self.find(path)
-            conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
+            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
                                                                     time.gmtime()))
             if event_type == ADD:
                 if local is None:
                                                                     time.gmtime()))
             if event_type == ADD:
                 if local is None:
@@ -952,7 +1074,7 @@ class RichCollectionBase(CollectionBase):
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
-            elif event_type == MOD:
+            elif event_type == MOD or event_type == TOK:
                 final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
                 final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
@@ -977,8 +1099,31 @@ class RichCollectionBase(CollectionBase):
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
-        stripped = self.portable_manifest_text()
-        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+        if self._manifest_locator and self.committed():
+            # If the collection is already saved on the API server, and it's committed
+            # then return API server's PDH response.
+            return self._portable_data_hash
+        else:
+            stripped = self.portable_manifest_text().encode()
+            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
+
+    @synchronized
+    def subscribe(self, callback):
+        if self._callback is None:
+            self._callback = callback
+        else:
+            raise errors.ArgumentError("A callback is already set on this collection.")
+
+    @synchronized
+    def unsubscribe(self):
+        if self._callback is not None:
+            self._callback = None
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+        self.root_collection().notify(event, collection, name, item)
 
     @synchronized
     def __eq__(self, other):
 
     @synchronized
     def __eq__(self, other):
@@ -998,6 +1143,12 @@ class RichCollectionBase(CollectionBase):
     def __ne__(self, other):
         return not self.__eq__(other)
 
     def __ne__(self, other):
         return not self.__eq__(other)
 
+    @synchronized
+    def flush(self):
+        """Flush bufferblocks to Keep."""
+        for e in listvalues(self):
+            e.flush()
+
 
 class Collection(RichCollectionBase):
     """Represents the root of an Arvados Collection.
 
 class Collection(RichCollectionBase):
     """Represents the root of an Arvados Collection.
@@ -1044,32 +1195,47 @@ class Collection(RichCollectionBase):
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
-                 block_manager=None):
+                 block_manager=None,
+                 replication_desired=None,
+                 put_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
-          One of Arvados collection UUID, block locator of
-          a manifest, raw manifest text, or None (to create an empty collection).
+          An Arvados collection UUID, portable data hash, raw manifest
+          text, or (if creating an empty collection) None.
+
         :parent:
           the parent Collection, may be None.
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
+        self.put_threads = put_threads
 
         if apiconfig:
             self._config = apiconfig
 
         if apiconfig:
             self._config = apiconfig
@@ -1079,22 +1245,23 @@ class Collection(RichCollectionBase):
         self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
         self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
+        self._portable_data_hash = None
         self._api_response = None
         self._api_response = None
+        self._past_versions = set()
 
         self.lock = threading.RLock()
 
         self.lock = threading.RLock()
-        self.callbacks = []
         self.events = None
 
         if manifest_locator_or_text:
         self.events = None
 
         if manifest_locator_or_text:
-            if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
                 self._manifest_locator = manifest_locator_or_text
-            elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
                 self._manifest_locator = manifest_locator_or_text
-            elif re.match(util.manifest_pattern, manifest_locator_or_text):
+            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
-                    "Argument to CollectionReader must be a manifest or a collection UUID")
+                    "Argument to CollectionReader is not a manifest or a collection UUID")
 
             try:
                 self._populate()
 
             try:
                 self._populate()
@@ -1104,12 +1271,28 @@ class Collection(RichCollectionBase):
     def root_collection(self):
         return self
 
     def root_collection(self):
         return self
 
+    def get_properties(self):
+        if self._api_response and self._api_response["properties"]:
+            return self._api_response["properties"]
+        else:
+            return {}
+
+    def get_trash_at(self):
+        if self._api_response and self._api_response["trash_at"]:
+            return ciso8601.parse_datetime(self._api_response["trash_at"])
+        else:
+            return None
+
     def stream_name(self):
         return "."
 
     def writable(self):
         return True
 
     def stream_name(self):
         return "."
 
     def writable(self):
         return True
 
+    @synchronized
+    def known_past_version(self, modified_at_and_portable_data_hash):
+        return modified_at_and_portable_data_hash in self._past_versions
+
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
@@ -1119,15 +1302,26 @@ class Collection(RichCollectionBase):
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
+                response.get("portable_data_hash") != self.portable_data_hash()):
+                # The record on the server is different from our current one, but we've seen it before,
+                # so ignore it because it's already been merged.
+                # However, if it's the same as our current record, proceed with the update, because we want to update
+                # our tokens.
+                return
+            else:
+                self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
+        self._manifest_text = self.manifest_text()
 
     @synchronized
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
 
     @synchronized
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
         return self._api_client
 
     @synchronized
@@ -1142,9 +1336,16 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
         return self._block_manager
 
         return self._block_manager
 
+    def _remember_api_response(self, response):
+        self._api_response = response
+        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -1153,71 +1354,39 @@ class Collection(RichCollectionBase):
         # it.  If instantiation fails, we'll fall back to the except
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         # it.  If instantiation fails, we'll fall back to the except
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
-        try:
-            self._api_response = self._my_api().collections().get(
-                uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries)
-            self._manifest_text = self._api_response['manifest_text']
-            return None
-        except Exception as e:
-            return e
-
-    def _populate_from_keep(self):
-        # Retrieve a manifest directly from Keep. This has a chance of
-        # working if [a] the locator includes a permission signature
-        # or [b] the Keep services are operating in world-readable
-        # mode. Return an exception, or None if successful.
-        try:
-            self._manifest_text = self._my_keep().get(
-                self._manifest_locator, num_retries=self.num_retries)
-        except Exception as e:
-            return e
+        self._remember_api_response(self._my_api().collections().get(
+            uuid=self._manifest_locator).execute(
+                num_retries=self.num_retries))
+        self._manifest_text = self._api_response['manifest_text']
+        self._portable_data_hash = self._api_response['portable_data_hash']
+        # If not overriden via kwargs, we should try to load the
+        # replication_desired from the API server
+        if self.replication_desired is None:
+            self.replication_desired = self._api_response.get('replication_desired', None)
 
     def _populate(self):
 
     def _populate(self):
-        if self._manifest_locator is None and self._manifest_text is None:
-            return
-        error_via_api = None
-        error_via_keep = None
-        should_try_keep = ((self._manifest_text is None) and
-                           util.keep_locator_pattern.match(
-                               self._manifest_locator))
-        if ((self._manifest_text is None) and
-            util.signed_locator_pattern.match(self._manifest_locator)):
-            error_via_keep = self._populate_from_keep()
-        if self._manifest_text is None:
-            error_via_api = self._populate_from_api_server()
-            if error_via_api is not None and not should_try_keep:
-                raise error_via_api
-        if ((self._manifest_text is None) and
-            not error_via_keep and
-            should_try_keep):
-            # Looks like a keep locator, and we didn't already try keep above
-            error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
         if self._manifest_text is None:
-            # Nothing worked!
-            raise errors.NotFoundError(
-                ("Failed to retrieve collection '{}' " +
-                 "from either API server ({}) or Keep ({})."
-                 ).format(
-                    self._manifest_locator,
-                    error_via_api,
-                    error_via_keep))
-        # populate
+            if self._manifest_locator is None:
+                return
+            else:
+                self._populate_from_api_server()
         self._baseline_manifest = self._manifest_text
         self._import_manifest(self._manifest_text)
 
         self._baseline_manifest = self._manifest_text
         self._import_manifest(self._manifest_text)
 
-
     def _has_collection_uuid(self):
     def _has_collection_uuid(self):
-        return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
 
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
 
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
-        if exc_type is not None:
+        if exc_type is None:
             if self.writable() and self._has_collection_uuid():
                 self.save()
             if self.writable() and self._has_collection_uuid():
                 self.save()
+        self.stop_threads()
+
+    def stop_threads(self):
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
@@ -1236,7 +1405,7 @@ class Collection(RichCollectionBase):
         return self._manifest_locator
 
     @synchronized
         return self._manifest_locator
 
     @synchronized
-    def clone(self, new_parent=None, readonly=False, new_config=None):
+    def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
         if new_config is None:
             new_config = self._config
         if readonly:
         if new_config is None:
             new_config = self._config
         if readonly:
@@ -1281,17 +1450,34 @@ class Collection(RichCollectionBase):
     @must_be_writable
     @synchronized
     @retry_method
     @must_be_writable
     @synchronized
     @retry_method
-    def save(self, merge=True, num_retries=None):
+    def save(self,
+             properties=None,
+             storage_classes=None,
+             trash_at=None,
+             merge=True,
+             num_retries=None):
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
-        merge=True, the default), and update the collection record.  Returns
+        merge=True, the default), and update the collection record. Returns
         the current manifest text.
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
         `save_new()`.
 
         the current manifest text.
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
         `save_new()`.
 
+        :properties:
+          Additional properties of collection. This value will replace any existing
+          properties of collection.
+
+        :storage_classes:
+          Specify desirable storage classes to be used when writing data to Keep.
+
+        :trash_at:
+          A collection is *expiring* when it has a *trash_at* time in the future.
+          An expiring collection can be accessed as normal,
+          but is scheduled to be trashed automatically at the *trash_at* time.
+
         :merge:
           Update and merge remote changes before saving.  Otherwise, any
           remote changes will be ignored and overwritten.
         :merge:
           Update and merge remote changes before saving.  Otherwise, any
           remote changes will be ignored and overwritten.
@@ -1300,9 +1486,27 @@ class Collection(RichCollectionBase):
           Retry count on API calls (if None,  use the collection default)
 
         """
           Retry count on API calls (if None,  use the collection default)
 
         """
-        if self.modified():
+        if properties and type(properties) is not dict:
+            raise errors.ArgumentError("properties must be dictionary type.")
+
+        if storage_classes and type(storage_classes) is not list:
+            raise errors.ArgumentError("storage_classes must be list type.")
+
+        if trash_at and type(trash_at) is not datetime.datetime:
+            raise errors.ArgumentError("trash_at must be datetime type.")
+
+        body={}
+        if properties:
+            body["properties"] = properties
+        if storage_classes:
+            body["storage_classes_desired"] = storage_classes
+        if trash_at:
+            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+            body["trash_at"] = t
+
+        if not self.committed():
             if not self._has_collection_uuid():
             if not self._has_collection_uuid():
-                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
+                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
 
             self._my_block_manager().commit_all()
 
 
             self._my_block_manager().commit_all()
 
@@ -1310,13 +1514,20 @@ class Collection(RichCollectionBase):
                 self.update()
 
             text = self.manifest_text(strip=False)
                 self.update()
 
             text = self.manifest_text(strip=False)
-            self._api_response = self._my_api().collections().update(
+            body['manifest_text'] = text
+
+            self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
                 uuid=self._manifest_locator,
-                body={'manifest_text': text}
-                ).execute(
-                    num_retries=num_retries)
+                body=body
+                ).execute(num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
             self._manifest_text = self._api_response["manifest_text"]
-            self.set_unmodified()
+            self._portable_data_hash = self._api_response["portable_data_hash"]
+            self.set_committed(True)
+        elif body:
+            self._remember_api_response(self._my_api().collections().update(
+                uuid=self._manifest_locator,
+                body=body
+                ).execute(num_retries=num_retries))
 
         return self._manifest_text
 
 
         return self._manifest_text
 
@@ -1327,6 +1538,9 @@ class Collection(RichCollectionBase):
     def save_new(self, name=None,
                  create_collection_record=True,
                  owner_uuid=None,
     def save_new(self, name=None,
                  create_collection_record=True,
                  owner_uuid=None,
+                 properties=None,
+                 storage_classes=None,
+                 trash_at=None,
                  ensure_unique_name=False,
                  num_retries=None):
         """Save collection to a new collection record.
                  ensure_unique_name=False,
                  num_retries=None):
         """Save collection to a new collection record.
@@ -1334,7 +1548,7 @@ class Collection(RichCollectionBase):
         Commit pending buffer blocks to Keep and, when create_collection_record
         is True (default), create a new collection record.  After creating a
         new collection record, this Collection object will be associated with
         Commit pending buffer blocks to Keep and, when create_collection_record
         is True (default), create a new collection record.  After creating a
         new collection record, this Collection object will be associated with
-        the new record used by `save()`.  Returns the current manifest text.
+        the new record used by `save()`. Returns the current manifest text.
 
         :name:
           The collection name.
 
         :name:
           The collection name.
@@ -1347,6 +1561,18 @@ class Collection(RichCollectionBase):
           the user, or project uuid that will own this collection.
           If None, defaults to the current user.
 
           the user, or project uuid that will own this collection.
           If None, defaults to the current user.
 
+        :properties:
+          Additional properties of collection. This value will replace any existing
+          properties of collection.
+
+        :storage_classes:
+          Specify desirable storage classes to be used when writing data to Keep.
+
+        :trash_at:
+          A collection is *expiring* when it has a *trash_at* time in the future.
+          An expiring collection can be accessed as normal,
+          but is scheduled to be trashed automatically at the *trash_at* time.
+
         :ensure_unique_name:
           If True, ask the API server to rename the collection
           if it conflicts with a collection with the same name and owner.  If
         :ensure_unique_name:
           If True, ask the API server to rename the collection
           if it conflicts with a collection with the same name and owner.  If
@@ -1356,40 +1582,50 @@ class Collection(RichCollectionBase):
           Retry count on API calls (if None,  use the collection default)
 
         """
           Retry count on API calls (if None,  use the collection default)
 
         """
+        if properties and type(properties) is not dict:
+            raise errors.ArgumentError("properties must be dictionary type.")
+
+        if storage_classes and type(storage_classes) is not list:
+            raise errors.ArgumentError("storage_classes must be list type.")
+
+        if trash_at and type(trash_at) is not datetime.datetime:
+            raise errors.ArgumentError("trash_at must be datetime type.")
+
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
         if create_collection_record:
             if name is None:
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
         if create_collection_record:
             if name is None:
-                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                name = "New collection"
+                ensure_unique_name = True
 
             body = {"manifest_text": text,
 
             body = {"manifest_text": text,
-                    "name": name}
+                    "name": name,
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
-
-            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+            if properties:
+                body["properties"] = properties
+            if storage_classes:
+                body["storage_classes_desired"] = storage_classes
+            if trash_at:
+                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+                body["trash_at"] = t
+
+            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
+            self._portable_data_hash = self._api_response["portable_data_hash"]
 
             self._manifest_text = text
 
             self._manifest_text = text
-            self.set_unmodified()
+            self.set_committed(True)
 
         return text
 
 
         return text
 
-    @synchronized
-    def subscribe(self, callback):
-        self.callbacks.append(callback)
-
-    @synchronized
-    def unsubscribe(self, callback):
-        self.callbacks.remove(callback)
-
-    @synchronized
-    def notify(self, event, collection, name, item):
-        for c in self.callbacks:
-            c(event, collection, name, item)
+    _token_re = re.compile(r'(\S+)(\s+|$)')
+    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
 
     @synchronized
     def _import_manifest(self, manifest_text):
 
     @synchronized
     def _import_manifest(self, manifest_text):
@@ -1409,7 +1645,7 @@ class Collection(RichCollectionBase):
         stream_name = None
         state = STREAM_NAME
 
         stream_name = None
         state = STREAM_NAME
 
-        for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+        for token_and_separator in self._token_re.finditer(manifest_text):
             tok = token_and_separator.group(1)
             sep = token_and_separator.group(2)
 
             tok = token_and_separator.group(1)
             sep = token_and_separator.group(2)
 
@@ -1418,24 +1654,25 @@ class Collection(RichCollectionBase):
                 stream_name = tok.replace('\\040', ' ')
                 blocks = []
                 segments = []
                 stream_name = tok.replace('\\040', ' ')
                 blocks = []
                 segments = []
-                streamoffset = 0L
+                streamoffset = 0
                 state = BLOCKS
                 state = BLOCKS
+                self.find_or_create(stream_name, COLLECTION)
                 continue
 
             if state == BLOCKS:
                 continue
 
             if state == BLOCKS:
-                block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+                block_locator = self._block_re.match(tok)
                 if block_locator:
                 if block_locator:
-                    blocksize = long(block_locator.group(1))
+                    blocksize = int(block_locator.group(1))
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
 
             if state == SEGMENTS:
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
 
             if state == SEGMENTS:
-                file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+                file_segment = self._segment_re.match(tok)
                 if file_segment:
                 if file_segment:
-                    pos = long(file_segment.group(1))
-                    size = long(file_segment.group(2))
+                    pos = int(file_segment.group(1))
+                    size = int(file_segment.group(2))
                     name = file_segment.group(3).replace('\\040', ' ')
                     filepath = os.path.join(stream_name, name)
                     afile = self.find_or_create(filepath, FILE)
                     name = file_segment.group(3).replace('\\040', ' ')
                     filepath = os.path.join(stream_name, name)
                     afile = self.find_or_create(filepath, FILE)
@@ -1445,27 +1682,34 @@ class Collection(RichCollectionBase):
                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
-                    raise errors.SyntaxError("Invalid manifest format")
+                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
 
             if sep == "\n":
                 stream_name = None
                 state = STREAM_NAME
 
 
             if sep == "\n":
                 stream_name = None
                 state = STREAM_NAME
 
-        self.set_unmodified()
+        self.set_committed(True)
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
 
 
 class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
 
 
 class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
-    It falls under the umbrella of the root collection.
+    Subcollection locking falls under the umbrella lock of its root collection.
 
     """
 
 
     """
 
-    def __init__(self, parent):
+    def __init__(self, parent, name):
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
         self._manifest_text = None
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
         self._manifest_text = None
+        self.name = name
+        self.num_retries = parent.num_retries
 
     def root_collection(self):
         return self.parent.root_collection()
 
     def root_collection(self):
         return self.parent.root_collection()
@@ -1482,28 +1726,31 @@ class Subcollection(RichCollectionBase):
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
-    def notify(self, event, collection, name, item):
-        return self.root_collection().notify(event, collection, name, item)
-
     def stream_name(self):
     def stream_name(self):
-        for k, v in self.parent.items():
-            if v is self:
-                return os.path.join(self.parent.stream_name(), k)
-        return '.'
+        return os.path.join(self.parent.stream_name(), self.name)
 
     @synchronized
 
     @synchronized
-    def clone(self, new_parent):
-        c = Subcollection(new_parent)
+    def clone(self, new_parent, new_name):
+        c = Subcollection(new_parent, new_name)
         c._clonefrom(self)
         return c
 
         c._clonefrom(self)
         return c
 
+    @must_be_writable
+    @synchronized
+    def _reparent(self, newparent, newname):
+        self.set_committed(False)
+        self.flush()
+        self.parent.remove(self.name, recursive=True)
+        self.parent = newparent
+        self.name = newname
+        self.lock = self.parent.root_collection().lock
+
 
 class CollectionReader(Collection):
     """A read-only collection object.
 
 
 class CollectionReader(Collection):
     """A read-only collection object.
 
-    Initialize from an api collection record locator, a portable data hash of a
-    manifest, or raw manifest text.  See `Collection` constructor for detailed
-    options.
+    Initialize from a collection UUID or portable data hash, or raw
+    manifest text.  See `Collection` constructor for detailed options.
 
     """
     def __init__(self, manifest_locator_or_text, *args, **kwargs):
 
     """
     def __init__(self, manifest_locator_or_text, *args, **kwargs):