19821: Document the arvados.collection module
[arvados.git] / sdk / python / arvados / collection.py
index bfb43be5eb85401e332915419f2a52ea71eb2e19..a96474549a12951ef9d1c09de41a4023c489d942 100644 (file)
@@ -1,6 +1,16 @@
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
+"""Tools to work with Arvados collections
+
+This module provides high-level interfaces to create, read, and update
+Arvados collections. Most users will want to instantiate `Collection`
+objects, and use methods like `Collection.open` and `Collection.mkdirs` to
+read and write data in the collection. Refer to the Arvados Python SDK
+cookbook for [an introduction to using the Collection class][cookbook].
+
+[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
+"""
 
 from __future__ import absolute_import
 from future.utils import listitems, listvalues, viewkeys
@@ -35,15 +45,65 @@ import arvados.util
 import arvados.events as events
 from arvados.retry import retry_method
 
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    IO,
+    Iterator,
+    List,
+    Mapping,
+    Optional,
+    Tuple,
+    Union,
+)
+
+if sys.version_info < (3, 8):
+    from typing_extensions import Literal
+else:
+    from typing import Literal
+
 _logger = logging.getLogger('arvados.collection')
 
+ADD = "add"
+"""Argument value for `Collection` methods to represent an added item"""
+DEL = "del"
+"""Argument value for `Collection` methods to represent a removed item"""
+MOD = "mod"
+"""Argument value for `Collection` methods to represent a modified item"""
+TOK = "tok"
+"""Argument value for `Collection` methods to represent an item with token differences"""
+FILE = "file"
+"""`create_type` value for `Collection.find_or_create`"""
+COLLECTION = "collection"
+"""`create_type` value for `Collection.find_or_create`"""
+
+ChangeList = List[Union[
+    Tuple[Literal[ADD, DEL], str, 'Collection'],
+    Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
+]]
+ChangeType = Literal[ADD, DEL, MOD, TOK]
+CollectionItem = Union[ArvadosFile, 'Collection']
+ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
+CreateType = Literal[COLLECTION, FILE]
+Properties = Mapping[str, Any]
+StorageClasses = List[str]
+
 class CollectionBase(object):
-    """Abstract base class for Collection classes."""
+    """Abstract base class for Collection classes
+
+    .. ATTENTION:: Internal
+       This class is meant to be used by other parts of the SDK. User code
+       should instantiate or subclass `Collection` or one of its subclasses
+       directly.
+    """
 
     def __enter__(self):
+        """Enter a context block with this collection instance"""
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
+        """Exit a context block with this collection instance"""
         pass
 
     def _my_keep(self):
@@ -52,12 +112,13 @@ class CollectionBase(object):
                                            num_retries=self.num_retries)
         return self._keep_client
 
-    def stripped_manifest(self):
-        """Get the manifest with locator hints stripped.
+    def stripped_manifest(self) -> str:
+        """Create a copy of the collection manifest with only size hints
 
-        Return the manifest for the current collection with all
-        non-portable hints (i.e., permission signatures and other
-        hints other than size hints) removed from the locators.
+        This method returns a string with the current collection's manifest
+        text with all non-portable locator hints like permission hints and
+        remote cluster hints removed. The only hints in the returned manifest
+        will be size hints.
         """
         raw = self.manifest_text()
         clean = []
@@ -96,709 +157,379 @@ class _WriterFile(_FileLikeObjectBase):
         self.dest.flush_data()
 
 
-class CollectionWriter(CollectionBase):
-    """Deprecated, use Collection instead."""
+class RichCollectionBase(CollectionBase):
+    """Base class for Collection classes
 
-    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
-    def __init__(self, api_client=None, num_retries=0, replication=None):
-        """Instantiate a CollectionWriter.
+    .. ATTENTION:: Internal
+       This class is meant to be used by other parts of the SDK. User code
+       should instantiate or subclass `Collection` or one of its subclasses
+       directly.
+    """
 
-        CollectionWriter lets you build a new Arvados Collection from scratch.
-        Write files to it.  The CollectionWriter will upload data to Keep as
-        appropriate, and provide you with the Collection manifest text when
-        you're finished.
+    def __init__(self, parent=None):
+        self.parent = parent
+        self._committed = False
+        self._has_remote_blocks = False
+        self._callback = None
+        self._items = {}
 
-        Arguments:
-        * api_client: The API client to use to look up Collections.  If not
-          provided, CollectionReader will build one from available Arvados
-          configuration.
-        * num_retries: The default number of times to retry failed
-          service requests.  Default 0.  You may change this value
-          after instantiation, but note those changes may not
-          propagate to related objects like the Keep client.
-        * replication: The number of copies of each block to store.
-          If this argument is None or not supplied, replication is
-          the server-provided default if available, otherwise 2.
-        """
-        self._api_client = api_client
-        self.num_retries = num_retries
-        self.replication = (2 if replication is None else replication)
-        self._keep_client = None
-        self._data_buffer = []
-        self._data_buffer_len = 0
-        self._current_stream_files = []
-        self._current_stream_length = 0
-        self._current_stream_locators = []
-        self._current_stream_name = '.'
-        self._current_file_name = None
-        self._current_file_pos = 0
-        self._finished_streams = []
-        self._close_file = None
-        self._queued_file = None
-        self._queued_dirents = deque()
-        self._queued_trees = deque()
-        self._last_open = None
+    def _my_api(self):
+        raise NotImplementedError()
 
-    def __exit__(self, exc_type, exc_value, traceback):
-        if exc_type is None:
-            self.finish()
+    def _my_keep(self):
+        raise NotImplementedError()
 
-    def do_queued_work(self):
-        # The work queue consists of three pieces:
-        # * _queued_file: The file object we're currently writing to the
-        #   Collection.
-        # * _queued_dirents: Entries under the current directory
-        #   (_queued_trees[0]) that we want to write or recurse through.
-        #   This may contain files from subdirectories if
-        #   max_manifest_depth == 0 for this directory.
-        # * _queued_trees: Directories that should be written as separate
-        #   streams to the Collection.
-        # This function handles the smallest piece of work currently queued
-        # (current file, then current directory, then next directory) until
-        # no work remains.  The _work_THING methods each do a unit of work on
-        # THING.  _queue_THING methods add a THING to the work queue.
-        while True:
-            if self._queued_file:
-                self._work_file()
-            elif self._queued_dirents:
-                self._work_dirents()
-            elif self._queued_trees:
-                self._work_trees()
-            else:
-                break
+    def _my_block_manager(self):
+        raise NotImplementedError()
 
-    def _work_file(self):
-        while True:
-            buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
-            if not buf:
-                break
-            self.write(buf)
-        self.finish_current_file()
-        if self._close_file:
-            self._queued_file.close()
-        self._close_file = None
-        self._queued_file = None
+    def writable(self) -> bool:
+        """Indicate whether this collection object can be modified
 
-    def _work_dirents(self):
-        path, stream_name, max_manifest_depth = self._queued_trees[0]
-        if stream_name != self.current_stream_name():
-            self.start_new_stream(stream_name)
-        while self._queued_dirents:
-            dirent = self._queued_dirents.popleft()
-            target = os.path.join(path, dirent)
-            if os.path.isdir(target):
-                self._queue_tree(target,
-                                 os.path.join(stream_name, dirent),
-                                 max_manifest_depth - 1)
-            else:
-                self._queue_file(target, dirent)
-                break
-        if not self._queued_dirents:
-            self._queued_trees.popleft()
+        This method returns `False` if this object is a `CollectionReader`,
+        else `True`.
+        """
+        raise NotImplementedError()
 
-    def _work_trees(self):
-        path, stream_name, max_manifest_depth = self._queued_trees[0]
-        d = arvados.util.listdir_recursive(
-            path, max_depth = (None if max_manifest_depth == 0 else 0))
-        if d:
-            self._queue_dirents(stream_name, d)
-        else:
-            self._queued_trees.popleft()
+    def root_collection(self) -> 'Collection':
+        """Get this collection's root collection object
 
-    def _queue_file(self, source, filename=None):
-        assert (self._queued_file is None), "tried to queue more than one file"
-        if not hasattr(source, 'read'):
-            source = open(source, 'rb')
-            self._close_file = True
-        else:
-            self._close_file = False
-        if filename is None:
-            filename = os.path.basename(source.name)
-        self.start_new_file(filename)
-        self._queued_file = source
+        If you open a subcollection with `Collection.find`, calling this method
+        on that subcollection returns the source Collection object.
+        """
+        raise NotImplementedError()
 
-    def _queue_dirents(self, stream_name, dirents):
-        assert (not self._queued_dirents), "tried to queue more than one tree"
-        self._queued_dirents = deque(sorted(dirents))
+    def stream_name(self) -> str:
+        """Get the name of the manifest stream represented by this collection
 
-    def _queue_tree(self, path, stream_name, max_manifest_depth):
-        self._queued_trees.append((path, stream_name, max_manifest_depth))
+        If you open a subcollection with `Collection.find`, calling this method
+        on that subcollection returns the name of the stream you opened.
+        """
+        raise NotImplementedError()
 
-    def write_file(self, source, filename=None):
-        self._queue_file(source, filename)
-        self.do_queued_work()
+    @synchronized
+    def has_remote_blocks(self) -> bool:
+        """Indiciate whether the collection refers to remote data
 
-    def write_directory_tree(self,
-                             path, stream_name='.', max_manifest_depth=-1):
-        self._queue_tree(path, stream_name, max_manifest_depth)
-        self.do_queued_work()
+        Returns `True` if the collection manifest includes any Keep locators
+        with a remote hint (`+R`), else `False`.
+        """
+        if self._has_remote_blocks:
+            return True
+        for item in self:
+            if self[item].has_remote_blocks():
+                return True
+        return False
 
-    def write(self, newdata):
-        if isinstance(newdata, bytes):
-            pass
-        elif isinstance(newdata, str):
-            newdata = newdata.encode()
-        elif hasattr(newdata, '__iter__'):
-            for s in newdata:
-                self.write(s)
-            return
-        self._data_buffer.append(newdata)
-        self._data_buffer_len += len(newdata)
-        self._current_stream_length += len(newdata)
-        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
-            self.flush_data()
+    @synchronized
+    def set_has_remote_blocks(self, val: bool) -> None:
+        """Cache whether this collection refers to remote blocks
 
-    def open(self, streampath, filename=None):
-        """open(streampath[, filename]) -> file-like object
+        .. ATTENTION:: Internal
+           This method is only meant to be used by other Collection methods.
 
-        Pass in the path of a file to write to the Collection, either as a
-        single string or as two separate stream name and file name arguments.
-        This method returns a file-like object you can write to add it to the
-        Collection.
+        Set this collection's cached "has remote blocks" flag to the given
+        value.
+        """
+        self._has_remote_blocks = val
+        if self.parent:
+            self.parent.set_has_remote_blocks(val)
 
-        You may only have one file object from the Collection open at a time,
-        so be sure to close the object when you're done.  Using the object in
-        a with statement makes that easy::
+    @must_be_writable
+    @synchronized
+    def find_or_create(
+            self,
+            path: str,
+            create_type: CreateType,
+    ) -> CollectionItem:
+        """Get the item at the given path, creating it if necessary
+
+        If `path` refers to a stream in this collection, returns a
+        corresponding `Subcollection` object. If `path` refers to a file in
+        this collection, returns a corresponding
+        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
+        this collection, then this method creates a new object and returns
+        it, creating parent streams as needed. The type of object created is
+        determined by the value of `create_type`.
 
-          with cwriter.open('./doc/page1.txt') as outfile:
-              outfile.write(page1_data)
-          with cwriter.open('./doc/page2.txt') as outfile:
-              outfile.write(page2_data)
+        Arguments:
+
+        * path: str --- The path to find or create within this collection.
+
+        * create_type: Literal[COLLECTION, FILE] --- The type of object to
+          create at `path` if one does not exist. Passing `COLLECTION`
+          creates a stream and returns the corresponding
+          `Subcollection`. Passing `FILE` creates a new file and returns the
+          corresponding `arvados.arvfile.ArvadosFile`.
         """
-        if filename is None:
-            streampath, filename = split(streampath)
-        if self._last_open and not self._last_open.closed:
-            raise errors.AssertionError(
-                u"can't open '{}' when '{}' is still open".format(
-                    filename, self._last_open.name))
-        if streampath != self.current_stream_name():
-            self.start_new_stream(streampath)
-        self.set_current_file_name(filename)
-        self._last_open = _WriterFile(self, filename)
-        return self._last_open
+        pathcomponents = path.split("/", 1)
+        if pathcomponents[0]:
+            item = self._items.get(pathcomponents[0])
+            if len(pathcomponents) == 1:
+                if item is None:
+                    # create new file
+                    if create_type == COLLECTION:
+                        item = Subcollection(self, pathcomponents[0])
+                    else:
+                        item = ArvadosFile(self, pathcomponents[0])
+                    self._items[pathcomponents[0]] = item
+                    self.set_committed(False)
+                    self.notify(ADD, self, pathcomponents[0], item)
+                return item
+            else:
+                if item is None:
+                    # create new collection
+                    item = Subcollection(self, pathcomponents[0])
+                    self._items[pathcomponents[0]] = item
+                    self.set_committed(False)
+                    self.notify(ADD, self, pathcomponents[0], item)
+                if isinstance(item, RichCollectionBase):
+                    return item.find_or_create(pathcomponents[1], create_type)
+                else:
+                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+        else:
+            return self
 
-    def flush_data(self):
-        data_buffer = b''.join(self._data_buffer)
-        if data_buffer:
-            self._current_stream_locators.append(
-                self._my_keep().put(
-                    data_buffer[0:config.KEEP_BLOCK_SIZE],
-                    copies=self.replication))
-            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
-            self._data_buffer_len = len(self._data_buffer[0])
+    @synchronized
+    def find(self, path: str) -> CollectionItem:
+        """Get the item at the given path
 
-    def start_new_file(self, newfilename=None):
-        self.finish_current_file()
-        self.set_current_file_name(newfilename)
+        If `path` refers to a stream in this collection, returns a
+        corresponding `Subcollection` object. If `path` refers to a file in
+        this collection, returns a corresponding
+        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
+        this collection, then this method raises `NotADirectoryError`.
 
-    def set_current_file_name(self, newfilename):
-        if re.search(r'[\t\n]', newfilename):
-            raise errors.AssertionError(
-                "Manifest filenames cannot contain whitespace: %s" %
-                newfilename)
-        elif re.search(r'\x00', newfilename):
-            raise errors.AssertionError(
-                "Manifest filenames cannot contain NUL characters: %s" %
-                newfilename)
-        self._current_file_name = newfilename
+        Arguments:
 
-    def current_file_name(self):
-        return self._current_file_name
+        * path: str --- The path to find or create within this collection.
+        """
+        if not path:
+            raise errors.ArgumentError("Parameter 'path' is empty.")
 
-    def finish_current_file(self):
-        if self._current_file_name is None:
-            if self._current_file_pos == self._current_stream_length:
-                return
-            raise errors.AssertionError(
-                "Cannot finish an unnamed file " +
-                "(%d bytes at offset %d in '%s' stream)" %
-                (self._current_stream_length - self._current_file_pos,
-                 self._current_file_pos,
-                 self._current_stream_name))
-        self._current_stream_files.append([
-                self._current_file_pos,
-                self._current_stream_length - self._current_file_pos,
-                self._current_file_name])
-        self._current_file_pos = self._current_stream_length
-        self._current_file_name = None
+        pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 
-    def start_new_stream(self, newstreamname='.'):
-        self.finish_current_stream()
-        self.set_current_stream_name(newstreamname)
+        item = self._items.get(pathcomponents[0])
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
+            return item
+        else:
+            if isinstance(item, RichCollectionBase):
+                if pathcomponents[1]:
+                    return item.find(pathcomponents[1])
+                else:
+                    return item
+            else:
+                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 
-    def set_current_stream_name(self, newstreamname):
-        if re.search(r'[\t\n]', newstreamname):
-            raise errors.AssertionError(
-                "Manifest stream names cannot contain whitespace: '%s'" %
-                (newstreamname))
-        self._current_stream_name = '.' if newstreamname=='' else newstreamname
+    @synchronized
+    def mkdirs(self, path: str) -> 'Subcollection':
+        """Create and return a subcollection at `path`
 
-    def current_stream_name(self):
-        return self._current_stream_name
+        If `path` exists within this collection, raises `FileExistsError`.
+        Otherwise, creates a stream at that path and returns the
+        corresponding `Subcollection`.
+        """
+        if self.find(path) != None:
+            raise IOError(errno.EEXIST, "Directory or file exists", path)
 
-    def finish_current_stream(self):
-        self.finish_current_file()
-        self.flush_data()
-        if not self._current_stream_files:
-            pass
-        elif self._current_stream_name is None:
-            raise errors.AssertionError(
-                "Cannot finish an unnamed stream (%d bytes in %d files)" %
-                (self._current_stream_length, len(self._current_stream_files)))
-        else:
-            if not self._current_stream_locators:
-                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
-            self._finished_streams.append([self._current_stream_name,
-                                           self._current_stream_locators,
-                                           self._current_stream_files])
-        self._current_stream_files = []
-        self._current_stream_length = 0
-        self._current_stream_locators = []
-        self._current_stream_name = None
-        self._current_file_pos = 0
-        self._current_file_name = None
+        return self.find_or_create(path, COLLECTION)
 
-    def finish(self):
-        """Store the manifest in Keep and return its locator.
+    def open(
+            self,
+            path: str,
+            mode: str="r",
+            encoding: Optional[str]=None,
+    ) -> IO:
+        """Open a file-like object within the collection
 
-        This is useful for storing manifest fragments (task outputs)
-        temporarily in Keep during a Crunch job.
+        This method returns a file-like object that can read and/or write the
+        file located at `path` within the collection. If you attempt to write
+        a `path` that does not exist, the file is created with `find_or_create`.
+        If the file cannot be opened for any other reason, this method raises
+        `OSError` with an appropriate errno.
 
-        In other cases you should make a collection instead, by
-        sending manifest_text() to the API server's "create
-        collection" endpoint.
-        """
-        return self._my_keep().put(self.manifest_text().encode(),
-                                   copies=self.replication)
+        Arguments:
 
-    def portable_data_hash(self):
-        stripped = self.stripped_manifest().encode()
-        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
+        * path: str --- The path of the file to open within this collection
 
-    def manifest_text(self):
-        self.finish_current_stream()
-        manifest = ''
+        * mode: str --- The mode to open this file. Supports all the same
+          values as `builtins.open`.
 
-        for stream in self._finished_streams:
-            if not re.search(r'^\.(/.*)?$', stream[0]):
-                manifest += './'
-            manifest += stream[0].replace(' ', '\\040')
-            manifest += ' ' + ' '.join(stream[1])
-            manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
-            manifest += "\n"
+        * encoding: str | None --- The text encoding of the file. Only used
+          when the file is opened in text mode. The default is
+          platform-dependent.
+        """
+        if not re.search(r'^[rwa][bt]?\+?$', mode):
+            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 
-        return manifest
+        if mode[0] == 'r' and '+' not in mode:
+            fclass = ArvadosFileReader
+            arvfile = self.find(path)
+        elif not self.writable():
+            raise IOError(errno.EROFS, "Collection is read only")
+        else:
+            fclass = ArvadosFileWriter
+            arvfile = self.find_or_create(path, FILE)
 
-    def data_locators(self):
-        ret = []
-        for name, locators, files in self._finished_streams:
-            ret += locators
-        return ret
+        if arvfile is None:
+            raise IOError(errno.ENOENT, "File not found", path)
+        if not isinstance(arvfile, ArvadosFile):
+            raise IOError(errno.EISDIR, "Is a directory", path)
 
-    def save_new(self, name=None):
-        return self._api_client.collections().create(
-            ensure_unique_name=True,
-            body={
-                'name': name,
-                'manifest_text': self.manifest_text(),
-            }).execute(num_retries=self.num_retries)
+        if mode[0] == 'w':
+            arvfile.truncate(0)
 
+        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+        if 'b' not in mode:
+            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+        return f
 
-class ResumableCollectionWriter(CollectionWriter):
-    """Deprecated, use Collection instead."""
+    def modified(self) -> bool:
+        """Indicate whether this collection has an API server record
 
-    STATE_PROPS = ['_current_stream_files', '_current_stream_length',
-                   '_current_stream_locators', '_current_stream_name',
-                   '_current_file_name', '_current_file_pos', '_close_file',
-                   '_data_buffer', '_dependencies', '_finished_streams',
-                   '_queued_dirents', '_queued_trees']
+        Returns `False` if this collection corresponds to a record loaded from
+        the API server, `True` otherwise.
+        """
+        return not self.committed()
 
-    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
-    def __init__(self, api_client=None, **kwargs):
-        self._dependencies = {}
-        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
+    @synchronized
+    def committed(self):
+        """Indicate whether this collection has an API server record
 
-    @classmethod
-    def from_state(cls, state, *init_args, **init_kwargs):
-        # Try to build a new writer from scratch with the given state.
-        # If the state is not suitable to resume (because files have changed,
-        # been deleted, aren't predictable, etc.), raise a
-        # StaleWriterStateError.  Otherwise, return the initialized writer.
-        # The caller is responsible for calling writer.do_queued_work()
-        # appropriately after it's returned.
-        writer = cls(*init_args, **init_kwargs)
-        for attr_name in cls.STATE_PROPS:
-            attr_value = state[attr_name]
-            attr_class = getattr(writer, attr_name).__class__
-            # Coerce the value into the same type as the initial value, if
-            # needed.
-            if attr_class not in (type(None), attr_value.__class__):
-                attr_value = attr_class(attr_value)
-            setattr(writer, attr_name, attr_value)
-        # Check dependencies before we try to resume anything.
-        if any(KeepLocator(ls).permission_expired()
-               for ls in writer._current_stream_locators):
-            raise errors.StaleWriterStateError(
-                "locators include expired permission hint")
-        writer.check_dependencies()
-        if state['_current_file'] is not None:
-            path, pos = state['_current_file']
-            try:
-                writer._queued_file = open(path, 'rb')
-                writer._queued_file.seek(pos)
-            except IOError as error:
-                raise errors.StaleWriterStateError(
-                    u"failed to reopen active file {}: {}".format(path, error))
-        return writer
+        Returns `True` if this collection corresponds to a record loaded from
+        the API server, `False` otherwise.
+        """
+        return self._committed
 
-    def check_dependencies(self):
-        for path, orig_stat in listitems(self._dependencies):
-            if not S_ISREG(orig_stat[ST_MODE]):
-                raise errors.StaleWriterStateError(u"{} not file".format(path))
-            try:
-                now_stat = tuple(os.stat(path))
-            except OSError as error:
-                raise errors.StaleWriterStateError(
-                    u"failed to stat {}: {}".format(path, error))
-            if ((not S_ISREG(now_stat[ST_MODE])) or
-                (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
-                (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
-                raise errors.StaleWriterStateError(u"{} changed".format(path))
+    @synchronized
+    def set_committed(self, value: bool=True):
+        """Cache whether this collection has an API server record
 
-    def dump_state(self, copy_func=lambda x: x):
-        state = {attr: copy_func(getattr(self, attr))
-                 for attr in self.STATE_PROPS}
-        if self._queued_file is None:
-            state['_current_file'] = None
-        else:
-            state['_current_file'] = (os.path.realpath(self._queued_file.name),
-                                      self._queued_file.tell())
-        return state
+        .. ATTENTION:: Internal
+           This method is only meant to be used by other Collection methods.
 
-    def _queue_file(self, source, filename=None):
-        try:
-            src_path = os.path.realpath(source)
-        except Exception:
-            raise errors.AssertionError(u"{} not a file path".format(source))
-        try:
-            path_stat = os.stat(src_path)
-        except OSError as stat_error:
-            path_stat = None
-        super(ResumableCollectionWriter, self)._queue_file(source, filename)
-        fd_stat = os.fstat(self._queued_file.fileno())
-        if not S_ISREG(fd_stat.st_mode):
-            # We won't be able to resume from this cache anyway, so don't
-            # worry about further checks.
-            self._dependencies[source] = tuple(fd_stat)
-        elif path_stat is None:
-            raise errors.AssertionError(
-                u"could not stat {}: {}".format(source, stat_error))
-        elif path_stat.st_ino != fd_stat.st_ino:
-            raise errors.AssertionError(
-                u"{} changed between open and stat calls".format(source))
+        Set this collection's cached "committed" flag to the given
+        value and propagates it as needed.
+        """
+        if value == self._committed:
+            return
+        if value:
+            for k,v in listitems(self._items):
+                v.set_committed(True)
+            self._committed = True
         else:
-            self._dependencies[src_path] = tuple(fd_stat)
+            self._committed = False
+            if self.parent is not None:
+                self.parent.set_committed(False)
 
-    def write(self, data):
-        if self._queued_file is None:
-            raise errors.AssertionError(
-                "resumable writer can't accept unsourced data")
-        return super(ResumableCollectionWriter, self).write(data)
+    @synchronized
+    def __iter__(self) -> Iterator[str]:
+        """Iterate names of streams and files in this collection
 
-
-ADD = "add"
-DEL = "del"
-MOD = "mod"
-TOK = "tok"
-FILE = "file"
-COLLECTION = "collection"
-
-class RichCollectionBase(CollectionBase):
-    """Base class for Collections and Subcollections.
-
-    Implements the majority of functionality relating to accessing items in the
-    Collection.
-
-    """
-
-    def __init__(self, parent=None):
-        self.parent = parent
-        self._committed = False
-        self._has_remote_blocks = False
-        self._callback = None
-        self._items = {}
-
-    def _my_api(self):
-        raise NotImplementedError()
-
-    def _my_keep(self):
-        raise NotImplementedError()
-
-    def _my_block_manager(self):
-        raise NotImplementedError()
-
-    def writable(self):
-        raise NotImplementedError()
-
-    def root_collection(self):
-        raise NotImplementedError()
-
-    def notify(self, event, collection, name, item):
-        raise NotImplementedError()
-
-    def stream_name(self):
-        raise NotImplementedError()
-
-
-    @synchronized
-    def has_remote_blocks(self):
-        """Recursively check for a +R segment locator signature."""
-
-        if self._has_remote_blocks:
-            return True
-        for item in self:
-            if self[item].has_remote_blocks():
-                return True
-        return False
+        This method does not recurse. It only iterates the contents of this
+        collection's corresponding stream.
+        """
+        return iter(viewkeys(self._items))
 
     @synchronized
-    def set_has_remote_blocks(self, val):
-        self._has_remote_blocks = val
-        if self.parent:
-            self.parent.set_has_remote_blocks(val)
-
-    @must_be_writable
-    @synchronized
-    def find_or_create(self, path, create_type):
-        """Recursively search the specified file path.
-
-        May return either a `Collection` or `ArvadosFile`.  If not found, will
-        create a new item at the specified path based on `create_type`.  Will
-        create intermediate subcollections needed to contain the final item in
-        the path.
-
-        :create_type:
-          One of `arvados.collection.FILE` or
-          `arvados.collection.COLLECTION`.  If the path is not found, and value
-          of create_type is FILE then create and return a new ArvadosFile for
-          the last path component.  If COLLECTION, then create and return a new
-          Collection for the last path component.
+    def __getitem__(self, k: str) -> CollectionItem:
+        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 
+        This method does not recurse. If you want to search a path, use
+        `RichCollectionBase.find` instead.
         """
-
-        pathcomponents = path.split("/", 1)
-        if pathcomponents[0]:
-            item = self._items.get(pathcomponents[0])
-            if len(pathcomponents) == 1:
-                if item is None:
-                    # create new file
-                    if create_type == COLLECTION:
-                        item = Subcollection(self, pathcomponents[0])
-                    else:
-                        item = ArvadosFile(self, pathcomponents[0])
-                    self._items[pathcomponents[0]] = item
-                    self.set_committed(False)
-                    self.notify(ADD, self, pathcomponents[0], item)
-                return item
-            else:
-                if item is None:
-                    # create new collection
-                    item = Subcollection(self, pathcomponents[0])
-                    self._items[pathcomponents[0]] = item
-                    self.set_committed(False)
-                    self.notify(ADD, self, pathcomponents[0], item)
-                if isinstance(item, RichCollectionBase):
-                    return item.find_or_create(pathcomponents[1], create_type)
-                else:
-                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
-        else:
-            return self
+        return self._items[k]
 
     @synchronized
-    def find(self, path):
-        """Recursively search the specified file path.
-
-        May return either a Collection or ArvadosFile. Return None if not
-        found.
-        If path is invalid (ex: starts with '/'), an IOError exception will be
-        raised.
+    def __contains__(self, k: str) -> bool:
+        """Indicate whether this collection has an item with this name
 
+        This method does not recurse. It you want to check a path, use
+        `RichCollectionBase.exists` instead.
         """
-        if not path:
-            raise errors.ArgumentError("Parameter 'path' is empty.")
-
-        pathcomponents = path.split("/", 1)
-        if pathcomponents[0] == '':
-            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
-
-        item = self._items.get(pathcomponents[0])
-        if item is None:
-            return None
-        elif len(pathcomponents) == 1:
-            return item
-        else:
-            if isinstance(item, RichCollectionBase):
-                if pathcomponents[1]:
-                    return item.find(pathcomponents[1])
-                else:
-                    return item
-            else:
-                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+        return k in self._items
 
     @synchronized
-    def mkdirs(self, path):
-        """Recursive subcollection create.
-
-        Like `os.makedirs()`.  Will create intermediate subcollections needed
-        to contain the leaf subcollection path.
+    def __len__(self):
+        """Get the number of items directly contained in this collection
 
+        This method does not recurse. It only counts the streams and files
+        in this collection's corresponding stream.
         """
+        return len(self._items)
 
-        if self.find(path) != None:
-            raise IOError(errno.EEXIST, "Directory or file exists", path)
-
-        return self.find_or_create(path, COLLECTION)
-
-    def open(self, path, mode="r", encoding=None):
-        """Open a file-like object for access.
-
-        :path:
-          path to a file in the collection
-        :mode:
-          a string consisting of "r", "w", or "a", optionally followed
-          by "b" or "t", optionally followed by "+".
-          :"b":
-            binary mode: write() accepts bytes, read() returns bytes.
-          :"t":
-            text mode (default): write() accepts strings, read() returns strings.
-          :"r":
-            opens for reading
-          :"r+":
-            opens for reading and writing.  Reads/writes share a file pointer.
-          :"w", "w+":
-            truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
-          :"a", "a+":
-            opens for reading and writing.  All writes are appended to
-            the end of the file.  Writing does not affect the file pointer for
-            reading.
+    @must_be_writable
+    @synchronized
+    def __delitem__(self, p: str) -> None:
+        """Delete an item from this collection's stream
 
+        This method does not recurse. If you want to remove an item by a
+        path, use `RichCollectionBase.remove` instead.
         """
-
-        if not re.search(r'^[rwa][bt]?\+?$', mode):
-            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
-
-        if mode[0] == 'r' and '+' not in mode:
-            fclass = ArvadosFileReader
-            arvfile = self.find(path)
-        elif not self.writable():
-            raise IOError(errno.EROFS, "Collection is read only")
-        else:
-            fclass = ArvadosFileWriter
-            arvfile = self.find_or_create(path, FILE)
-
-        if arvfile is None:
-            raise IOError(errno.ENOENT, "File not found", path)
-        if not isinstance(arvfile, ArvadosFile):
-            raise IOError(errno.EISDIR, "Is a directory", path)
-
-        if mode[0] == 'w':
-            arvfile.truncate(0)
-
-        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
-        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
-        if 'b' not in mode:
-            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
-            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
-        return f
-
-    def modified(self):
-        """Determine if the collection has been modified since last commited."""
-        return not self.committed()
-
-    @synchronized
-    def committed(self):
-        """Determine if the collection has been committed to the API server."""
-        return self._committed
+        del self._items[p]
+        self.set_committed(False)
+        self.notify(DEL, self, p, None)
 
     @synchronized
-    def set_committed(self, value=True):
-        """Recursively set committed flag.
-
-        If value is True, set committed to be True for this and all children.
+    def keys(self) -> Iterator[str]:
+        """Iterate names of streams and files in this collection
 
-        If value is False, set committed to be False for this and all parents.
+        This method does not recurse. It only iterates the contents of this
+        collection's corresponding stream.
         """
-        if value == self._committed:
-            return
-        if value:
-            for k,v in listitems(self._items):
-                v.set_committed(True)
-            self._committed = True
-        else:
-            self._committed = False
-            if self.parent is not None:
-                self.parent.set_committed(False)
-
-    @synchronized
-    def __iter__(self):
-        """Iterate over names of files and collections contained in this collection."""
-        return iter(viewkeys(self._items))
+        return self._items.keys()
 
     @synchronized
-    def __getitem__(self, k):
-        """Get a file or collection that is directly contained by this collection.
-
-        If you want to search a path, use `find()` instead.
+    def values(self) -> List[CollectionItem]:
+        """Get a list of objects in this collection's stream
 
+        The return value includes a `Subcollection` for every stream, and an
+        `arvados.arvfile.ArvadosFile` for every file, directly within this
+        collection's stream.  This method does not recurse.
         """
-        return self._items[k]
-
-    @synchronized
-    def __contains__(self, k):
-        """Test if there is a file or collection a directly contained by this collection."""
-        return k in self._items
+        return listvalues(self._items)
 
     @synchronized
-    def __len__(self):
-        """Get the number of items directly contained in this collection."""
-        return len(self._items)
+    def items(self) -> List[Tuple[str, CollectionItem]]:
+        """Get a list of `(name, object)` tuples from this collection's stream
 
-    @must_be_writable
-    @synchronized
-    def __delitem__(self, p):
-        """Delete an item by name which is directly contained by this collection."""
-        del self._items[p]
-        self.set_committed(False)
-        self.notify(DEL, self, p, None)
+        The return value includes a `Subcollection` for every stream, and an
+        `arvados.arvfile.ArvadosFile` for every file, directly within this
+        collection's stream.  This method does not recurse.
+        """
+        return listitems(self._items)
 
-    @synchronized
-    def keys(self):
-        """Get a list of names of files and collections directly contained in this collection."""
-        return self._items.keys()
+    def exists(self, path: str) -> bool:
+        """Indicate whether this collection includes an item at `path`
 
-    @synchronized
-    def values(self):
-        """Get a list of files and collection objects directly contained in this collection."""
-        return listvalues(self._items)
+        This method returns `True` if `path` refers to a stream or file within
+        this collection, else `False`.
 
-    @synchronized
-    def items(self):
-        """Get a list of (name, object) tuples directly contained in this collection."""
-        return listitems(self._items)
+        Arguments:
 
-    def exists(self, path):
-        """Test if there is a file or collection at `path`."""
+        * path: str --- The path to check for existence within this collection
+        """
         return self.find(path) is not None
 
     @must_be_writable
     @synchronized
-    def remove(self, path, recursive=False):
-        """Remove the file or subcollection (directory) at `path`.
+    def remove(self, path: str, recursive: bool=False) -> None:
+        """Remove the file or stream at `path`
 
-        :recursive:
-          Specify whether to remove non-empty subcollections (True), or raise an error (False).
-        """
+        Arguments:
+
+        * path: str --- The path of the item to remove from the collection
 
+        * recursive: bool --- Controls the method's behavior if `path` refers
+          to a nonempty stream. If `False` (the default), this method raises
+          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
+          items under the stream.
+        """
         if not path:
             raise errors.ArgumentError("Parameter 'path' is empty.")
 
@@ -825,26 +556,33 @@ class RichCollectionBase(CollectionBase):
 
     @must_be_writable
     @synchronized
-    def add(self, source_obj, target_name, overwrite=False, reparent=False):
-        """Copy or move a file or subcollection to this collection.
+    def add(
+            self,
+            source_obj: CollectionItem,
+            target_name: str,
+            overwrite: bool=False,
+            reparent: bool=False,
+    ) -> None:
+        """Copy or move a file or subcollection object to this collection
 
-        :source_obj:
-          An ArvadosFile, or Subcollection object
+        Arguments:
 
-        :target_name:
-          Destination item name.  If the target name already exists and is a
-          file, this will raise an error unless you specify `overwrite=True`.
+        * source_obj: ArvadosFile | Subcollection --- The file or subcollection
+          to add to this collection
 
-        :overwrite:
-          Whether to overwrite target file if it already exists.
+        * target_name: str --- The path inside this collection where
+          `source_obj` should be added.
 
-        :reparent:
-          If True, source_obj will be moved from its parent collection to this collection.
-          If False, source_obj will be copied and the parent collection will be
-          unmodified.
+        * overwrite: bool --- Controls the behavior of this method when the
+          collection already contains an object at `target_name`. If `False`
+          (the default), this method will raise `FileExistsError`. If `True`,
+          the object at `target_name` will be replaced with `source_obj`.
 
+        * reparent: bool --- Controls whether this method copies or moves
+          `source_obj`. If `False` (the default), `source_obj` is copied into
+          this collection. If `True`, `source_obj` is moved into this
+          collection.
         """
-
         if target_name in self and not overwrite:
             raise IOError(errno.EEXIST, "File already exists", target_name)
 
@@ -911,92 +649,115 @@ class RichCollectionBase(CollectionBase):
 
     @must_be_writable
     @synchronized
-    def copy(self, source, target_path, source_collection=None, overwrite=False):
-        """Copy a file or subcollection to a new path in this collection.
+    def copy(
+            self,
+            source: Union[str, CollectionItem],
+            target_path: str,
+            source_collection: Optional['RichCollectionBase']=None,
+            overwrite: bool=False,
+    ) -> None:
+        """Copy a file or subcollection object to this collection
+
+        Arguments:
 
-        :source:
-          A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
+        * source: str | ArvadosFile | Subcollection --- The file or
+          subcollection to add to this collection. If `source` is a str, the
+          object will be found by looking up this path from `source_collection`
+          (see below).
 
-        :target_path:
-          Destination file or path.  If the target path already exists and is a
-          subcollection, the item will be placed inside the subcollection.  If
-          the target path already exists and is a file, this will raise an error
-          unless you specify `overwrite=True`.
+        * target_path: str --- The path inside this collection where the
+          source object should be added.
 
-        :source_collection:
-          Collection to copy `source_path` from (default `self`)
+        * source_collection: Collection | None --- The collection to find the
+          source object from when `source` is a path. Defaults to the current
+          collection (`self`).
 
-        :overwrite:
-          Whether to overwrite target file if it already exists.
+        * overwrite: bool --- Controls the behavior of this method when the
+          collection already contains an object at `target_path`. If `False`
+          (the default), this method will raise `FileExistsError`. If `True`,
+          the object at `target_path` will be replaced with `source_obj`.
         """
-
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
         target_dir.add(source_obj, target_name, overwrite, False)
 
     @must_be_writable
     @synchronized
-    def rename(self, source, target_path, source_collection=None, overwrite=False):
-        """Move a file or subcollection from `source_collection` to a new path in this collection.
+    def rename(
+            self,
+            source: Union[str, CollectionItem],
+            target_path: str,
+            source_collection: Optional['RichCollectionBase']=None,
+            overwrite: bool=False,
+    ) -> None:
+        """Move a file or subcollection object to this collection
 
-        :source:
-          A string with a path to source file or subcollection.
+        Arguments:
 
-        :target_path:
-          Destination file or path.  If the target path already exists and is a
-          subcollection, the item will be placed inside the subcollection.  If
-          the target path already exists and is a file, this will raise an error
-          unless you specify `overwrite=True`.
+        * source: str | ArvadosFile | Subcollection --- The file or
+          subcollection to add to this collection. If `source` is a str, the
+          object will be found by looking up this path from `source_collection`
+          (see below).
 
-        :source_collection:
-          Collection to copy `source_path` from (default `self`)
+        * target_path: str --- The path inside this collection where the
+          source object should be added.
 
-        :overwrite:
-          Whether to overwrite target file if it already exists.
-        """
+        * source_collection: Collection | None --- The collection to find the
+          source object from when `source` is a path. Defaults to the current
+          collection (`self`).
 
+        * overwrite: bool --- Controls the behavior of this method when the
+          collection already contains an object at `target_path`. If `False`
+          (the default), this method will raise `FileExistsError`. If `True`,
+          the object at `target_path` will be replaced with `source_obj`.
+        """
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
         if not source_obj.writable():
             raise IOError(errno.EROFS, "Source collection is read only", source)
         target_dir.add(source_obj, target_name, overwrite, True)
 
-    def portable_manifest_text(self, stream_name="."):
-        """Get the manifest text for this collection, sub collections and files.
+    def portable_manifest_text(self, stream_name: str=".") -> str:
+        """Get the portable manifest text for this collection
 
-        This method does not flush outstanding blocks to Keep.  It will return
-        a normalized manifest with access tokens stripped.
+        The portable manifest text is normalized, and does not include access
+        tokens. This method does not flush outstanding blocks to Keep.
 
-        :stream_name:
-          Name to use for this stream (directory)
+        Arguments:
 
+        * stream_name: str --- The name to use for this collection's stream in
+          the generated manifest. Default `'.'`.
         """
         return self._get_manifest_text(stream_name, True, True)
 
     @synchronized
-    def manifest_text(self, stream_name=".", strip=False, normalize=False,
-                      only_committed=False):
-        """Get the manifest text for this collection, sub collections and files.
+    def manifest_text(
+            self,
+            stream_name: str=".",
+            strip: bool=False,
+            normalize: bool=False,
+            only_committed: bool=False,
+    ) -> str:
+        """Get the manifest text for this collection
 
-        This method will flush outstanding blocks to Keep.  By default, it will
-        not normalize an unmodified manifest or strip access tokens.
-
-        :stream_name:
-          Name to use for this stream (directory)
+        Arguments:
 
-        :strip:
-          If True, remove signing tokens from block locators if present.
-          If False (default), block locators are left unchanged.
+        * stream_name: str --- The name to use for this collection's stream in
+          the generated manifest. Default `'.'`.
 
-        :normalize:
-          If True, always export the manifest text in normalized form
-          even if the Collection is not modified.  If False (default) and the collection
-          is not modified, return the original manifest text even if it is not
-          in normalized form.
+        * strip: bool --- Controls whether or not the returned manifest text
+          includes access tokens. If `False` (the default), the manifest text
+          will include access tokens. If `True`, the manifest text will not
+          include access tokens.
 
-        :only_committed:
-          If True, don't commit pending blocks.
+        * normalize: bool --- Controls whether or not the returned manifest
+          text is normalized. Default `False`.
 
+        * only_committed: bool --- Controls whether or not this method uploads
+          pending data to Keep before building and returning the manifest text.
+          If `False` (the default), this method will finish uploading all data
+          to Keep, then return the final manifest. If `True`, this method will
+          build and return a manifest that only refers to the data that has
+          finished uploading at the time this method was called.
         """
-
         if not only_committed:
             self._my_block_manager().commit_all()
         return self._get_manifest_text(stream_name, strip, normalize,
@@ -1075,11 +836,26 @@ class RichCollectionBase(CollectionBase):
         return remote_blocks
 
     @synchronized
-    def diff(self, end_collection, prefix=".", holding_collection=None):
-        """Generate list of add/modify/delete actions.
+    def diff(
+            self,
+            end_collection: 'RichCollectionBase',
+            prefix: str=".",
+            holding_collection: Optional['Collection']=None,
+    ) -> ChangeList:
+        """Build a list of differences between this collection and another
+
+        Arguments:
 
-        When given to `apply`, will change `self` to match `end_collection`
+        * end_collection: RichCollectionBase --- A collection object with the
+          desired end state. The returned diff list will describe how to go
+          from the current collection object `self` to `end_collection`.
 
+        * prefix: str --- The name to use for this collection's stream in
+          the diff list. Default `'.'`.
+
+        * holding_collection: Collection | None --- A collection object used to
+          hold objects for the returned diff list. By default, a new empty
+          collection is created.
         """
         changes = []
         if holding_collection is None:
@@ -1101,12 +877,20 @@ class RichCollectionBase(CollectionBase):
 
     @must_be_writable
     @synchronized
-    def apply(self, changes):
-        """Apply changes from `diff`.
+    def apply(self, changes: ChangeList) -> None:
+        """Apply a list of changes from to this collection
+
+        This method takes a list of changes generated by
+        `RichCollectionBase.diff` and applies it to this
+        collection. Afterward, the state of this collection object will
+        match the state of `end_collection` passed to `diff`. If a change
+        conflicts with a local change, it will be saved to an alternate path
+        indicating the conflict.
 
-        If a change conflicts with a local change, it will be saved to an
-        alternate path indicating the conflict.
+        Arguments:
 
+        * changes: ChangeList --- The list of differences generated by
+          `RichCollectionBase.diff`.
         """
         if changes:
             self.set_committed(False)
@@ -1148,8 +932,8 @@ class RichCollectionBase(CollectionBase):
                 # else, the file is modified or already removed, in either
                 # case we don't want to try to remove it.
 
-    def portable_data_hash(self):
-        """Get the portable data hash for this collection's manifest."""
+    def portable_data_hash(self) -> str:
+        """Get the portable data hash for this collection's manifest"""
         if self._manifest_locator and self.committed():
             # If the collection is already saved on the API server, and it's committed
             # then return API server's PDH response.
@@ -1159,25 +943,62 @@ class RichCollectionBase(CollectionBase):
             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 
     @synchronized
-    def subscribe(self, callback):
+    def subscribe(self, callback: ChangeCallback) -> None:
+        """Set a notify callback for changes to this collection
+
+        Arguments:
+
+        * callback: ChangeCallback --- The callable to call each time the
+          collection is changed.
+        """
         if self._callback is None:
             self._callback = callback
         else:
             raise errors.ArgumentError("A callback is already set on this collection.")
 
     @synchronized
-    def unsubscribe(self):
+    def unsubscribe(self) -> None:
+        """Remove any notify callback set for changes to this collection"""
         if self._callback is not None:
             self._callback = None
 
     @synchronized
-    def notify(self, event, collection, name, item):
+    def notify(
+            self,
+            event: ChangeType,
+            collection: 'RichCollectionBase',
+            name: str,
+            item: CollectionItem,
+    ) -> None:
+        """Notify any subscribed callback about a change to this collection
+
+        .. ATTENTION:: Internal
+           This method is only meant to be used by other Collection methods.
+
+        If a callback has been registered with `RichCollectionBase.subscribe`,
+        it will be called with information about a change to this collection.
+        Then this notification will be propagated to this collection's root.
+
+        Arguments:
+
+        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
+          the collection.
+
+        * collection: RichCollectionBase --- The collection that was modified.
+
+        * name: str --- The name of the file or stream within `collection` that
+          was modified.
+
+        * item: ArvadosFile | Subcollection --- The new contents at `name`
+          within `collection`.
+        """
         if self._callback:
             self._callback(event, collection, name, item)
         self.root_collection().notify(event, collection, name, item)
 
     @synchronized
-    def __eq__(self, other):
+    def __eq__(self, other: Any) -> bool:
+        """Indicate whether this collection object is equal to another"""
         if other is self:
             return True
         if not isinstance(other, RichCollectionBase):
@@ -1191,101 +1012,97 @@ class RichCollectionBase(CollectionBase):
                 return False
         return True
 
-    def __ne__(self, other):
+    def __ne__(self, other: Any) -> bool:
+        """Indicate whether this collection object is not equal to another"""
         return not self.__eq__(other)
 
     @synchronized
-    def flush(self):
-        """Flush bufferblocks to Keep."""
+    def flush(self) -> None:
+        """Upload any pending data to Keep"""
         for e in listvalues(self):
             e.flush()
 
 
 class Collection(RichCollectionBase):
-    """Represents the root of an Arvados Collection.
-
-    This class is threadsafe.  The root collection object, all subcollections
-    and files are protected by a single lock (i.e. each access locks the entire
-    collection).
-
-    Brief summary of
-    useful methods:
-
-    :To read an existing file:
-      `c.open("myfile", "r")`
-
-    :To write a new file:
-      `c.open("myfile", "w")`
-
-    :To determine if a file exists:
-      `c.find("myfile") is not None`
-
-    :To copy a file:
-      `c.copy("source", "dest")`
-
-    :To delete a file:
-      `c.remove("myfile")`
-
-    :To save to an existing collection record:
-      `c.save()`
+    """Read and manipulate an Arvados collection
 
-    :To save a new collection record:
-    `c.save_new()`
-
-    :To merge remote changes into this object:
-      `c.update()`
-
-    Must be associated with an API server Collection record (during
-    initialization, or using `save_new`) to use `save` or `update`
+    This class provides a high-level interface to create, read, and update
+    Arvados collections and their contents. Refer to the Arvados Python SDK
+    cookbook for [an introduction to using the Collection class][cookbook].
 
+    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
     """
 
-    def __init__(self, manifest_locator_or_text=None,
-                 api_client=None,
-                 keep_client=None,
-                 num_retries=10,
-                 parent=None,
-                 apiconfig=None,
-                 block_manager=None,
-                 replication_desired=None,
-                 storage_classes_desired=None,
-                 put_threads=None):
-        """Collection constructor.
-
-        :manifest_locator_or_text:
-          An Arvados collection UUID, portable data hash, raw manifest
-          text, or (if creating an empty collection) None.
-
-        :parent:
-          the parent Collection, may be None.
-
-        :apiconfig:
-          A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
-          Prefer this over supplying your own api_client and keep_client (except in testing).
-          Will use default config settings if not specified.
-
-        :api_client:
-          The API client object to use for requests.  If not specified, create one using `apiconfig`.
-
-        :keep_client:
-          the Keep client to use for requests.  If not specified, create one using `apiconfig`.
-
-        :num_retries:
-          the number of retries for API and Keep requests.
+    def __init__(self, manifest_locator_or_text: Optional[str]=None,
+                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
+                 keep_client: Optional['arvados.keep.KeepClient']=None,
+                 num_retries: int=10,
+                 parent: Optional['Collection']=None,
+                 apiconfig: Optional[Mapping[str, str]]=None,
+                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
+                 replication_desired: Optional[int]=None,
+                 storage_classes_desired: Optional[List[str]]=None,
+                 put_threads: Optional[int]=None):
+        """Initialize a Collection object
 
-        :block_manager:
-          the block manager to use.  If not specified, create one.
-
-        :replication_desired:
-          How many copies should Arvados maintain. If None, API server default
-          configuration applies. If not None, this value will also be used
-          for determining the number of block copies being written.
-
-        :storage_classes_desired:
-          A list of storage class names where to upload the data. If None,
-          the keep client is expected to store the data into the cluster's
-          default storage class(es).
+        Arguments:
 
+        * manifest_locator_or_text: str | None --- This string can contain a
+          collection manifest text, portable data hash, or UUID. When given a
+          portable data hash or UUID, this instance will load a collection
+          record from the API server. Otherwise, this instance will represent a
+          new collection without an API server record. The default value `None`
+          instantiates a new collection with an empty manifest.
+
+        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
+          Arvados API client object this instance uses to make requests. If
+          none is given, this instance creates its own client using the
+          settings from `apiconfig` (see below). If your client instantiates
+          many Collection objects, you can help limit memory utilization by
+          calling `arvados.api.api` to construct an
+          `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
+          for every Collection.
+
+        * keep_client: arvados.keep.KeepClient | None --- The Keep client
+          object this instance uses to make requests. If none is given, this
+          instance creates its own client using its `api_client`.
+
+        * num_retries: int --- The number of times that client requests are
+          retried. Default 10.
+
+        * parent: arvados.collection.Collection | None --- The parent Collection
+          object of this instance, if any. This argument is primarily used by
+          other Collection methods; user client code shouldn't need to use it.
+
+        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
+          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
+          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
+          Collection object constructs one from these settings. If no
+          mapping is provided, calls `arvados.config.settings` to get these
+          parameters from user configuration.
+
+        * block_manager: arvados.arvfile._BlockManager | None --- The
+          _BlockManager object used by this instance to coordinate reading
+          and writing Keep data blocks. If none is given, this instance
+          constructs its own. This argument is primarily used by other
+          Collection methods; user client code shouldn't need to use it.
+
+        * replication_desired: int | None --- This controls both the value of
+          the `replication_desired` field on API collection records saved by
+          this class, as well as the number of Keep services that the object
+          writes new data blocks to. If none is given, uses the default value
+          configured for the cluster.
+
+        * storage_classes_desired: list[str] | None --- This controls both
+          the value of the `storage_classes_desired` field on API collection
+          records saved by this class, as well as selecting which specific
+          Keep services the object writes new data blocks to. If none is
+          given, defaults to an empty list.
+
+        * put_threads: int | None --- The number of threads to run
+          simultaneously to upload data blocks to Keep. This value is used when
+          building a new `block_manager`. It is unused when a `block_manager`
+          is provided.
         """
 
         if storage_classes_desired and type(storage_classes_desired) is not list:
@@ -1339,19 +1156,33 @@ class Collection(RichCollectionBase):
             except errors.SyntaxError as e:
                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
 
-    def storage_classes_desired(self):
+    def storage_classes_desired(self) -> list[str]:
+        """Get this collection's `storage_classes_desired` value"""
         return self._storage_classes_desired or []
 
-    def root_collection(self):
+    def root_collection(self) -> 'Collection':
         return self
 
-    def get_properties(self):
+    def get_properties(self) -> dict[str, Any]:
+        """Get this collection's properties
+
+        This method always returns a dict. If this collection object does not
+        have an associated API record, or that record does not have any
+        properties set, this method returns an empty dict.
+        """
         if self._api_response and self._api_response["properties"]:
             return self._api_response["properties"]
         else:
             return {}
 
-    def get_trash_at(self):
+    def get_trash_at(self) -> Optional[datetime.datetime]:
+        """Get this collection's `trash_at` field
+
+        This method parses the `trash_at` field of the collection's API
+        record and returns a datetime from it. If that field is not set, or
+        this collection object does not have an associated API record,
+        returns None.
+        """
         if self._api_response and self._api_response["trash_at"]:
             try:
                 return ciso8601.parse_datetime(self._api_response["trash_at"])
@@ -1360,20 +1191,56 @@ class Collection(RichCollectionBase):
         else:
             return None
 
-    def stream_name(self):
+    def stream_name(self) -> str:
         return "."
 
-    def writable(self):
+    def writable(self) -> bool:
         return True
 
     @synchronized
-    def known_past_version(self, modified_at_and_portable_data_hash):
+    def known_past_version(
+            self,
+            modified_at_and_portable_data_hash: tuple[Optional[str], Optional[str]]
+    ) -> bool:
+        """Indicate whether an API record for this collection has been seen before
+
+        As this collection object loads records from the API server, it records
+        their `modified_at` and `portable_data_hash` fields. This method accepts
+        a 2-tuple with values for those fields, and returns `True` if the
+        combination was previously loaded.
+        """
         return modified_at_and_portable_data_hash in self._past_versions
 
     @synchronized
     @retry_method
-    def update(self, other=None, num_retries=None):
-        """Merge the latest collection on the API server with the current collection."""
+    def update(
+            self,
+            other: Optional['Collection']=None,
+            num_retries: Optional[int]=None,
+    ) -> None:
+        """Merge another collection's contents into this one
+
+        This method compares the manifest of this collection instance with
+        another, then updates this instance's manifest with changes from the
+        other, renaming files to flag conflicts where necessary.
+
+        When called without any arguments, this method reloads the collection's
+        API record, and updates this instance with any changes that have
+        appeared server-side. If this instance does not have a corresponding
+        API record, this method raises `arvados.errors.ArgumentError`.
+
+        Arguments:
+
+        * other: Collection | None --- The collection whose contents should be
+          merged into this instance. When not provided, this method reloads this
+          collection's API record and constructs a Collection object from it.
+          If this instance does not have a corresponding API record, this method
+          raises `arvados.errors.ArgumentError`.
+
+        * num_retries: int | None --- The number of times to retry reloading
+          the collection's API record from the API server. If not specified,
+          uses the `num_retries` provided when this instance was constructed.
+        """
 
         if other is None:
             if self._manifest_locator is None:
@@ -1467,32 +1334,64 @@ class Collection(RichCollectionBase):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
-        """Support scoped auto-commit in a with: block."""
+        """Exit a context with this collection instance
+
+        If no exception was raised inside the context block, and this
+        collection is writable and has a corresponding API record, that
+        record will be updated to match the state of this instance at the end
+        of the block.
+        """
         if exc_type is None:
             if self.writable() and self._has_collection_uuid():
                 self.save()
         self.stop_threads()
 
-    def stop_threads(self):
+    def stop_threads(self) -> None:
+        """Stop background Keep upload/download threads"""
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
     @synchronized
-    def manifest_locator(self):
-        """Get the manifest locator, if any.
-
-        The manifest locator will be set when the collection is loaded from an
-        API server record or the portable data hash of a manifest.
-
-        The manifest locator will be None if the collection is newly created or
-        was created directly from manifest text.  The method `save_new()` will
-        assign a manifest locator.
-
+    def manifest_locator(self) -> Optional[str]:
+        """Get this collection's manifest locator, if any
+
+        * If this collection instance is associated with an API record with a
+          UUID, return that.
+        * Otherwise, if this collection instance was loaded from an API record
+          by portable data hash, return that.
+        * Otherwise, return `None`.
         """
         return self._manifest_locator
 
     @synchronized
-    def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
+    def clone(
+            self,
+            new_parent: Optional['Collection']=None,
+            new_name: Optional[str]=None,
+            readonly: bool=False,
+            new_config: Optional[Mapping[str, str]]=None,
+    ) -> 'Collection':
+        """Create a Collection object with the same contents as this instance
+
+        This method creates a new Collection object with contents that match
+        this instance's. The new collection will not be associated with any API
+        record.
+
+        Arguments:
+
+        * new_parent: Collection | None --- This value is passed to the new
+          Collection's constructor as the `parent` argument.
+
+        * new_name: str | None --- This value is unused.
+
+        * readonly: bool --- If this value is true, this method constructs and
+          returns a `CollectionReader`. Otherwise, it returns a mutable
+          `Collection`. Default `False`.
+
+        * new_config: Mapping[str, str] | None --- This value is passed to the
+          new Collection's constructor as `apiconfig`. If no value is provided,
+          defaults to the configuration passed to this instance's constructor.
+        """
         if new_config is None:
             new_config = self._config
         if readonly:
@@ -1504,31 +1403,31 @@ class Collection(RichCollectionBase):
         return newcollection
 
     @synchronized
-    def api_response(self):
-        """Returns information about this Collection fetched from the API server.
-
-        If the Collection exists in Keep but not the API server, currently
-        returns None.  Future versions may provide a synthetic response.
+    def api_response(self) -> Optional[Dict[str, Any]]:
+        """Get this instance's associated API record
 
+        If this Collection instance has an associated API record, return it.
+        Otherwise, return `None`.
         """
         return self._api_response
 
-    def find_or_create(self, path, create_type):
-        """See `RichCollectionBase.find_or_create`"""
+    def find_or_create(
+            self,
+            path: str,
+            create_type: CreateType,
+    ) -> CollectionItem:
         if path == ".":
             return self
         else:
             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
 
-    def find(self, path):
-        """See `RichCollectionBase.find`"""
+    def find(self, path: str) -> CollectionItem:
         if path == ".":
             return self
         else:
             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
 
-    def remove(self, path, recursive=False):
-        """See `RichCollectionBase.remove`"""
+    def remove(self, path: str, recursive: bool=False) -> None:
         if path == ".":
             raise errors.ArgumentError("Cannot remove '.'")
         else:
@@ -1537,49 +1436,52 @@ class Collection(RichCollectionBase):
     @must_be_writable
     @synchronized
     @retry_method
-    def save(self,
-             properties=None,
-             storage_classes=None,
-             trash_at=None,
-             merge=True,
-             num_retries=None,
-             preserve_version=False):
-        """Save collection to an existing collection record.
-
-        Commit pending buffer blocks to Keep, merge with remote record (if
-        merge=True, the default), and update the collection record. Returns
-        the current manifest text.
-
-        Will raise AssertionError if not associated with a collection record on
-        the API server.  If you want to save a manifest to Keep only, see
-        `save_new()`.
-
-        :properties:
-          Additional properties of collection. This value will replace any existing
-          properties of collection.
-
-        :storage_classes:
-          Specify desirable storage classes to be used when writing data to Keep.
-
-        :trash_at:
-          A collection is *expiring* when it has a *trash_at* time in the future.
-          An expiring collection can be accessed as normal,
-          but is scheduled to be trashed automatically at the *trash_at* time.
-
-        :merge:
-          Update and merge remote changes before saving.  Otherwise, any
-          remote changes will be ignored and overwritten.
-
-        :num_retries:
-          Retry count on API calls (if None,  use the collection default)
-
-        :preserve_version:
-          If True, indicate that the collection content being saved right now
-          should be preserved in a version snapshot if the collection record is
-          updated in the future. Requires that the API server has
-          Collections.CollectionVersioning enabled, if not, setting this will
-          raise an exception.
+    def save(
+            self,
+            properties: Optional[Properties]=None,
+            storage_classes: Optional[StorageClasses]=None,
+            trash_at: Optional[datetime.datetime]=None,
+            merge: bool=True,
+            num_retries: Optional[int]=None,
+            preserve_version: bool=False,
+    ) -> str:
+        """Save collection to an existing API record
+
+        This method updates the instance's corresponding API record to match
+        the instance's state. If this instance does not have a corresponding API
+        record yet, raises `AssertionError`. (To create a new API record, use
+        `Collection.save_new`.) This method returns the saved collection
+        manifest.
 
+        Arguments:
+
+        * properties: dict[str, Any] | None --- If provided, the API record will
+          be updated with these properties. Note this will completely replace
+          any existing properties.
+
+        * storage_classes: list[str] | None --- If provided, the API record will
+          be updated with this value in the `storage_classes_desired` field.
+          This value will also be saved on the instance and used for any
+          changes that follow.
+
+        * trash_at: datetime.datetime | None --- If provided, the API record
+          will be updated with this value in the `trash_at` field.
+
+        * merge: bool --- If `True` (the default), this method will first
+          reload this collection's API record, and merge any new contents into
+          this instance before saving changes. See `Collection.update` for
+          details.
+
+        * num_retries: int | None --- The number of times to retry reloading
+          the collection's API record from the API server. If not specified,
+          uses the `num_retries` provided when this instance was constructed.
+
+        * preserve_version: bool --- This value will be passed to directly
+          to the underlying API call. If `True`, the Arvados API will
+          preserve the versions of this collection both immediately before
+          and after the update. If `True` when the API server is not
+          configured with collection versioning, this method raises
+          `arvados.errors.ArgumentError`.
         """
         if properties and type(properties) is not dict:
             raise errors.ArgumentError("properties must be dictionary type.")
@@ -1643,60 +1545,66 @@ class Collection(RichCollectionBase):
     @must_be_writable
     @synchronized
     @retry_method
-    def save_new(self, name=None,
-                 create_collection_record=True,
-                 owner_uuid=None,
-                 properties=None,
-                 storage_classes=None,
-                 trash_at=None,
-                 ensure_unique_name=False,
-                 num_retries=None,
-                 preserve_version=False):
-        """Save collection to a new collection record.
-
-        Commit pending buffer blocks to Keep and, when create_collection_record
-        is True (default), create a new collection record.  After creating a
-        new collection record, this Collection object will be associated with
-        the new record used by `save()`. Returns the current manifest text.
-
-        :name:
-          The collection name.
-
-        :create_collection_record:
-           If True, create a collection record on the API server.
-           If False, only commit blocks to Keep and return the manifest text.
-
-        :owner_uuid:
-          the user, or project uuid that will own this collection.
-          If None, defaults to the current user.
-
-        :properties:
-          Additional properties of collection. This value will replace any existing
-          properties of collection.
-
-        :storage_classes:
-          Specify desirable storage classes to be used when writing data to Keep.
-
-        :trash_at:
-          A collection is *expiring* when it has a *trash_at* time in the future.
-          An expiring collection can be accessed as normal,
-          but is scheduled to be trashed automatically at the *trash_at* time.
-
-        :ensure_unique_name:
-          If True, ask the API server to rename the collection
-          if it conflicts with a collection with the same name and owner.  If
-          False, a name conflict will result in an error.
-
-        :num_retries:
-          Retry count on API calls (if None,  use the collection default)
-
-        :preserve_version:
-          If True, indicate that the collection content being saved right now
-          should be preserved in a version snapshot if the collection record is
-          updated in the future. Requires that the API server has
-          Collections.CollectionVersioning enabled, if not, setting this will
-          raise an exception.
+    def save_new(
+            self,
+            name: Optional[str]=None,
+            create_collection_record: bool=True,
+            owner_uuid: Optional[str]=None,
+            properties: Optional[Properties]=None,
+            storage_classes: Optional[StorageClasses]=None,
+            trash_at: Optional[datetime.datetime]=None,
+            ensure_unique_name: bool=False,
+            num_retries: Optional[int]=None,
+            preserve_version: bool=False,
+    ):
+        """Save collection to a new API record
+
+        This method finishes uploading new data blocks and (optionally)
+        creates a new API collection record with the provided data. If a new
+        record is created, this instance becomes associated with that record
+        for future updates like `save()`. This method returns the saved
+        collection manifest.
+
+        Arguments:
+
+        * name: str | None --- The `name` field to use on the new collection
+          record. If not specified, a generic default name is generated.
+
+        * create_collection_record: bool --- If `True` (the default), creates a
+          collection record on the API server. If `False`, the method finishes
+          all data uploads and only returns the resulting collection manifest
+          without sending it to the API server.
+
+        * owner_uuid: str | None --- The `owner_uuid` field to use on the
+          new collection record.
+
+        * properties: dict[str, Any] | None --- The `properties` field to use on
+          the new collection record.
+
+        * storage_classes: list[str] | None --- The
+          `storage_classes_desired` field to use on the new collection record.
+
+        * trash_at: datetime.datetime | None --- The `trash_at` field to use
+          on the new collection record.
+
+        * ensure_unique_name: bool --- This value is passed directly to the
+          Arvados API when creating the collection record. If `True`, the API
+          server may modify the submitted `name` to ensure the collection's
+          `name`+`owner_uuid` combination is unique. If `False` (the default),
+          if a collection already exists with this same `name`+`owner_uuid`
+          combination, creating a collection record will raise a validation
+          error.
 
+        * num_retries: int | None --- The number of times to retry reloading
+          the collection's API record from the API server. If not specified,
+          uses the `num_retries` provided when this instance was constructed.
+
+        * preserve_version: bool --- This value will be passed to directly
+          to the underlying API call. If `True`, the Arvados API will
+          preserve the versions of this collection both immediately before
+          and after the update. If `True` when the API server is not
+          configured with collection versioning, this method raises
+          `arvados.errors.ArgumentError`.
         """
         if properties and type(properties) is not dict:
             raise errors.ArgumentError("properties must be dictionary type.")
@@ -1834,17 +1742,24 @@ class Collection(RichCollectionBase):
         self.set_committed(True)
 
     @synchronized
-    def notify(self, event, collection, name, item):
+    def notify(
+            self,
+            event: ChangeType,
+            collection: 'RichCollectionBase',
+            name: str,
+            item: CollectionItem,
+    ) -> None:
         if self._callback:
             self._callback(event, collection, name, item)
 
 
 class Subcollection(RichCollectionBase):
-    """This is a subdirectory within a collection that doesn't have its own API
-    server record.
-
-    Subcollection locking falls under the umbrella lock of its root collection.
+    """Read and manipulate a stream/directory within an Arvados collection
 
+    This class represents a single stream (like a directory) within an Arvados
+    `Collection`. It is returned by `Collection.find` and provides the same API.
+    Operations that work on the API collection record propagate to the parent
+    `Collection` object.
     """
 
     def __init__(self, parent, name):
@@ -1854,10 +1769,10 @@ class Subcollection(RichCollectionBase):
         self.name = name
         self.num_retries = parent.num_retries
 
-    def root_collection(self):
+    def root_collection(self) -> 'Collection':
         return self.parent.root_collection()
 
-    def writable(self):
+    def writable(self) -> bool:
         return self.root_collection().writable()
 
     def _my_api(self):
@@ -1869,11 +1784,15 @@ class Subcollection(RichCollectionBase):
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
-    def stream_name(self):
+    def stream_name(self) -> str:
         return os.path.join(self.parent.stream_name(), self.name)
 
     @synchronized
-    def clone(self, new_parent, new_name):
+    def clone(
+            self,
+            new_parent: Optional['Collection']=None,
+            new_name: Optional[str]=None,
+    ) -> 'Subcollection':
         c = Subcollection(new_parent, new_name)
         c._clonefrom(self)
         return c
@@ -1900,11 +1819,11 @@ class Subcollection(RichCollectionBase):
 
 
 class CollectionReader(Collection):
-    """A read-only collection object.
-
-    Initialize from a collection UUID or portable data hash, or raw
-    manifest text.  See `Collection` constructor for detailed options.
+    """Read-only `Collection` subclass
 
+    This class will never create or update any API collection records. You can
+    use this class for additional code safety when you only need to read
+    existing collections.
     """
     def __init__(self, manifest_locator_or_text, *args, **kwargs):
         self._in_init = True
@@ -1918,7 +1837,7 @@ class CollectionReader(Collection):
         # all_streams() and all_files()
         self._streams = None
 
-    def writable(self):
+    def writable(self) -> bool:
         return self._in_init
 
     def _populate_streams(orig_func):
@@ -1935,16 +1854,10 @@ class CollectionReader(Collection):
             return orig_func(self, *args, **kwargs)
         return populate_streams_wrapper
 
+    @arvados.util._deprecated('3.0', 'Collection iteration')
     @_populate_streams
     def normalize(self):
-        """Normalize the streams returned by `all_streams`.
-
-        This method is kept for backwards compatability and only affects the
-        behavior of `all_streams()` and `all_files()`
-
-        """
-
-        # Rearrange streams
+        """Normalize the streams returned by `all_streams`"""
         streams = {}
         for s in self.all_streams():
             for f in s.all_files():
@@ -1971,3 +1884,423 @@ class CollectionReader(Collection):
         for s in self.all_streams():
             for f in s.all_files():
                 yield f
+
+
+class CollectionWriter(CollectionBase):
+    """Create a new collection from scratch
+
+    .. WARNING:: Deprecated
+       This class is deprecated. Prefer `arvados.collection.Collection`
+       instead.
+    """
+
+    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
+    def __init__(self, api_client=None, num_retries=0, replication=None):
+        """Instantiate a CollectionWriter.
+
+        CollectionWriter lets you build a new Arvados Collection from scratch.
+        Write files to it.  The CollectionWriter will upload data to Keep as
+        appropriate, and provide you with the Collection manifest text when
+        you're finished.
+
+        Arguments:
+        * api_client: The API client to use to look up Collections.  If not
+          provided, CollectionReader will build one from available Arvados
+          configuration.
+        * num_retries: The default number of times to retry failed
+          service requests.  Default 0.  You may change this value
+          after instantiation, but note those changes may not
+          propagate to related objects like the Keep client.
+        * replication: The number of copies of each block to store.
+          If this argument is None or not supplied, replication is
+          the server-provided default if available, otherwise 2.
+        """
+        self._api_client = api_client
+        self.num_retries = num_retries
+        self.replication = (2 if replication is None else replication)
+        self._keep_client = None
+        self._data_buffer = []
+        self._data_buffer_len = 0
+        self._current_stream_files = []
+        self._current_stream_length = 0
+        self._current_stream_locators = []
+        self._current_stream_name = '.'
+        self._current_file_name = None
+        self._current_file_pos = 0
+        self._finished_streams = []
+        self._close_file = None
+        self._queued_file = None
+        self._queued_dirents = deque()
+        self._queued_trees = deque()
+        self._last_open = None
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        if exc_type is None:
+            self.finish()
+
+    def do_queued_work(self):
+        # The work queue consists of three pieces:
+        # * _queued_file: The file object we're currently writing to the
+        #   Collection.
+        # * _queued_dirents: Entries under the current directory
+        #   (_queued_trees[0]) that we want to write or recurse through.
+        #   This may contain files from subdirectories if
+        #   max_manifest_depth == 0 for this directory.
+        # * _queued_trees: Directories that should be written as separate
+        #   streams to the Collection.
+        # This function handles the smallest piece of work currently queued
+        # (current file, then current directory, then next directory) until
+        # no work remains.  The _work_THING methods each do a unit of work on
+        # THING.  _queue_THING methods add a THING to the work queue.
+        while True:
+            if self._queued_file:
+                self._work_file()
+            elif self._queued_dirents:
+                self._work_dirents()
+            elif self._queued_trees:
+                self._work_trees()
+            else:
+                break
+
+    def _work_file(self):
+        while True:
+            buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
+            if not buf:
+                break
+            self.write(buf)
+        self.finish_current_file()
+        if self._close_file:
+            self._queued_file.close()
+        self._close_file = None
+        self._queued_file = None
+
+    def _work_dirents(self):
+        path, stream_name, max_manifest_depth = self._queued_trees[0]
+        if stream_name != self.current_stream_name():
+            self.start_new_stream(stream_name)
+        while self._queued_dirents:
+            dirent = self._queued_dirents.popleft()
+            target = os.path.join(path, dirent)
+            if os.path.isdir(target):
+                self._queue_tree(target,
+                                 os.path.join(stream_name, dirent),
+                                 max_manifest_depth - 1)
+            else:
+                self._queue_file(target, dirent)
+                break
+        if not self._queued_dirents:
+            self._queued_trees.popleft()
+
+    def _work_trees(self):
+        path, stream_name, max_manifest_depth = self._queued_trees[0]
+        d = arvados.util.listdir_recursive(
+            path, max_depth = (None if max_manifest_depth == 0 else 0))
+        if d:
+            self._queue_dirents(stream_name, d)
+        else:
+            self._queued_trees.popleft()
+
+    def _queue_file(self, source, filename=None):
+        assert (self._queued_file is None), "tried to queue more than one file"
+        if not hasattr(source, 'read'):
+            source = open(source, 'rb')
+            self._close_file = True
+        else:
+            self._close_file = False
+        if filename is None:
+            filename = os.path.basename(source.name)
+        self.start_new_file(filename)
+        self._queued_file = source
+
+    def _queue_dirents(self, stream_name, dirents):
+        assert (not self._queued_dirents), "tried to queue more than one tree"
+        self._queued_dirents = deque(sorted(dirents))
+
+    def _queue_tree(self, path, stream_name, max_manifest_depth):
+        self._queued_trees.append((path, stream_name, max_manifest_depth))
+
+    def write_file(self, source, filename=None):
+        self._queue_file(source, filename)
+        self.do_queued_work()
+
+    def write_directory_tree(self,
+                             path, stream_name='.', max_manifest_depth=-1):
+        self._queue_tree(path, stream_name, max_manifest_depth)
+        self.do_queued_work()
+
+    def write(self, newdata):
+        if isinstance(newdata, bytes):
+            pass
+        elif isinstance(newdata, str):
+            newdata = newdata.encode()
+        elif hasattr(newdata, '__iter__'):
+            for s in newdata:
+                self.write(s)
+            return
+        self._data_buffer.append(newdata)
+        self._data_buffer_len += len(newdata)
+        self._current_stream_length += len(newdata)
+        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
+            self.flush_data()
+
+    def open(self, streampath, filename=None):
+        """open(streampath[, filename]) -> file-like object
+
+        Pass in the path of a file to write to the Collection, either as a
+        single string or as two separate stream name and file name arguments.
+        This method returns a file-like object you can write to add it to the
+        Collection.
+
+        You may only have one file object from the Collection open at a time,
+        so be sure to close the object when you're done.  Using the object in
+        a with statement makes that easy::
+
+          with cwriter.open('./doc/page1.txt') as outfile:
+              outfile.write(page1_data)
+          with cwriter.open('./doc/page2.txt') as outfile:
+              outfile.write(page2_data)
+        """
+        if filename is None:
+            streampath, filename = split(streampath)
+        if self._last_open and not self._last_open.closed:
+            raise errors.AssertionError(
+                u"can't open '{}' when '{}' is still open".format(
+                    filename, self._last_open.name))
+        if streampath != self.current_stream_name():
+            self.start_new_stream(streampath)
+        self.set_current_file_name(filename)
+        self._last_open = _WriterFile(self, filename)
+        return self._last_open
+
+    def flush_data(self):
+        data_buffer = b''.join(self._data_buffer)
+        if data_buffer:
+            self._current_stream_locators.append(
+                self._my_keep().put(
+                    data_buffer[0:config.KEEP_BLOCK_SIZE],
+                    copies=self.replication))
+            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
+            self._data_buffer_len = len(self._data_buffer[0])
+
+    def start_new_file(self, newfilename=None):
+        self.finish_current_file()
+        self.set_current_file_name(newfilename)
+
+    def set_current_file_name(self, newfilename):
+        if re.search(r'[\t\n]', newfilename):
+            raise errors.AssertionError(
+                "Manifest filenames cannot contain whitespace: %s" %
+                newfilename)
+        elif re.search(r'\x00', newfilename):
+            raise errors.AssertionError(
+                "Manifest filenames cannot contain NUL characters: %s" %
+                newfilename)
+        self._current_file_name = newfilename
+
+    def current_file_name(self):
+        return self._current_file_name
+
+    def finish_current_file(self):
+        if self._current_file_name is None:
+            if self._current_file_pos == self._current_stream_length:
+                return
+            raise errors.AssertionError(
+                "Cannot finish an unnamed file " +
+                "(%d bytes at offset %d in '%s' stream)" %
+                (self._current_stream_length - self._current_file_pos,
+                 self._current_file_pos,
+                 self._current_stream_name))
+        self._current_stream_files.append([
+                self._current_file_pos,
+                self._current_stream_length - self._current_file_pos,
+                self._current_file_name])
+        self._current_file_pos = self._current_stream_length
+        self._current_file_name = None
+
+    def start_new_stream(self, newstreamname='.'):
+        self.finish_current_stream()
+        self.set_current_stream_name(newstreamname)
+
+    def set_current_stream_name(self, newstreamname):
+        if re.search(r'[\t\n]', newstreamname):
+            raise errors.AssertionError(
+                "Manifest stream names cannot contain whitespace: '%s'" %
+                (newstreamname))
+        self._current_stream_name = '.' if newstreamname=='' else newstreamname
+
+    def current_stream_name(self):
+        return self._current_stream_name
+
+    def finish_current_stream(self):
+        self.finish_current_file()
+        self.flush_data()
+        if not self._current_stream_files:
+            pass
+        elif self._current_stream_name is None:
+            raise errors.AssertionError(
+                "Cannot finish an unnamed stream (%d bytes in %d files)" %
+                (self._current_stream_length, len(self._current_stream_files)))
+        else:
+            if not self._current_stream_locators:
+                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
+            self._finished_streams.append([self._current_stream_name,
+                                           self._current_stream_locators,
+                                           self._current_stream_files])
+        self._current_stream_files = []
+        self._current_stream_length = 0
+        self._current_stream_locators = []
+        self._current_stream_name = None
+        self._current_file_pos = 0
+        self._current_file_name = None
+
+    def finish(self):
+        """Store the manifest in Keep and return its locator.
+
+        This is useful for storing manifest fragments (task outputs)
+        temporarily in Keep during a Crunch job.
+
+        In other cases you should make a collection instead, by
+        sending manifest_text() to the API server's "create
+        collection" endpoint.
+        """
+        return self._my_keep().put(self.manifest_text().encode(),
+                                   copies=self.replication)
+
+    def portable_data_hash(self):
+        stripped = self.stripped_manifest().encode()
+        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
+
+    def manifest_text(self):
+        self.finish_current_stream()
+        manifest = ''
+
+        for stream in self._finished_streams:
+            if not re.search(r'^\.(/.*)?$', stream[0]):
+                manifest += './'
+            manifest += stream[0].replace(' ', '\\040')
+            manifest += ' ' + ' '.join(stream[1])
+            manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
+            manifest += "\n"
+
+        return manifest
+
+    def data_locators(self):
+        ret = []
+        for name, locators, files in self._finished_streams:
+            ret += locators
+        return ret
+
+    def save_new(self, name=None):
+        return self._api_client.collections().create(
+            ensure_unique_name=True,
+            body={
+                'name': name,
+                'manifest_text': self.manifest_text(),
+            }).execute(num_retries=self.num_retries)
+
+
+class ResumableCollectionWriter(CollectionWriter):
+    """CollectionWriter that can serialize internal state to disk
+
+    .. WARNING:: Deprecated
+       This class is deprecated. Prefer `arvados.collection.Collection`
+       instead.
+    """
+
+    STATE_PROPS = ['_current_stream_files', '_current_stream_length',
+                   '_current_stream_locators', '_current_stream_name',
+                   '_current_file_name', '_current_file_pos', '_close_file',
+                   '_data_buffer', '_dependencies', '_finished_streams',
+                   '_queued_dirents', '_queued_trees']
+
+    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
+    def __init__(self, api_client=None, **kwargs):
+        self._dependencies = {}
+        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
+
+    @classmethod
+    def from_state(cls, state, *init_args, **init_kwargs):
+        # Try to build a new writer from scratch with the given state.
+        # If the state is not suitable to resume (because files have changed,
+        # been deleted, aren't predictable, etc.), raise a
+        # StaleWriterStateError.  Otherwise, return the initialized writer.
+        # The caller is responsible for calling writer.do_queued_work()
+        # appropriately after it's returned.
+        writer = cls(*init_args, **init_kwargs)
+        for attr_name in cls.STATE_PROPS:
+            attr_value = state[attr_name]
+            attr_class = getattr(writer, attr_name).__class__
+            # Coerce the value into the same type as the initial value, if
+            # needed.
+            if attr_class not in (type(None), attr_value.__class__):
+                attr_value = attr_class(attr_value)
+            setattr(writer, attr_name, attr_value)
+        # Check dependencies before we try to resume anything.
+        if any(KeepLocator(ls).permission_expired()
+               for ls in writer._current_stream_locators):
+            raise errors.StaleWriterStateError(
+                "locators include expired permission hint")
+        writer.check_dependencies()
+        if state['_current_file'] is not None:
+            path, pos = state['_current_file']
+            try:
+                writer._queued_file = open(path, 'rb')
+                writer._queued_file.seek(pos)
+            except IOError as error:
+                raise errors.StaleWriterStateError(
+                    u"failed to reopen active file {}: {}".format(path, error))
+        return writer
+
+    def check_dependencies(self):
+        for path, orig_stat in listitems(self._dependencies):
+            if not S_ISREG(orig_stat[ST_MODE]):
+                raise errors.StaleWriterStateError(u"{} not file".format(path))
+            try:
+                now_stat = tuple(os.stat(path))
+            except OSError as error:
+                raise errors.StaleWriterStateError(
+                    u"failed to stat {}: {}".format(path, error))
+            if ((not S_ISREG(now_stat[ST_MODE])) or
+                (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
+                (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
+                raise errors.StaleWriterStateError(u"{} changed".format(path))
+
+    def dump_state(self, copy_func=lambda x: x):
+        state = {attr: copy_func(getattr(self, attr))
+                 for attr in self.STATE_PROPS}
+        if self._queued_file is None:
+            state['_current_file'] = None
+        else:
+            state['_current_file'] = (os.path.realpath(self._queued_file.name),
+                                      self._queued_file.tell())
+        return state
+
+    def _queue_file(self, source, filename=None):
+        try:
+            src_path = os.path.realpath(source)
+        except Exception:
+            raise errors.AssertionError(u"{} not a file path".format(source))
+        try:
+            path_stat = os.stat(src_path)
+        except OSError as stat_error:
+            path_stat = None
+        super(ResumableCollectionWriter, self)._queue_file(source, filename)
+        fd_stat = os.fstat(self._queued_file.fileno())
+        if not S_ISREG(fd_stat.st_mode):
+            # We won't be able to resume from this cache anyway, so don't
+            # worry about further checks.
+            self._dependencies[source] = tuple(fd_stat)
+        elif path_stat is None:
+            raise errors.AssertionError(
+                u"could not stat {}: {}".format(source, stat_error))
+        elif path_stat.st_ino != fd_stat.st_ino:
+            raise errors.AssertionError(
+                u"{} changed between open and stat calls".format(source))
+        else:
+            self._dependencies[src_path] = tuple(fd_stat)
+
+    def write(self, data):
+        if self._queued_file is None:
+            raise errors.AssertionError(
+                "resumable writer can't accept unsourced data")
+        return super(ResumableCollectionWriter, self).write(data)