5476: Better implementation of connection timeout scaling. Updated docstring
[arvados.git] / sdk / python / arvados / collection.py
index 41df5558e205f107c0d14ee9e535dfb81066eabb..3d48652dd53afe4eecc3bc35628646e427a8ac73 100644 (file)
@@ -3,15 +3,18 @@ import logging
 import os
 import re
 import errno
 import os
 import re
 import errno
+import hashlib
 import time
 import time
+import threading
 
 from collections import deque
 from stat import *
 
 
 from collections import deque
 from stat import *
 
-from .arvfile import split, FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
-from keep import *
-from .stream import StreamReader, normalize_stream, locator_block_size
-from .ranges import Range, LocatorAndRange
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from keep import KeepLocator, KeepClient
+from .stream import StreamReader
+from ._normalize_stream import normalize_stream
+from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
 import config
 import errors
 from .safeapi import ThreadSafeApiCache
 import config
 import errors
@@ -35,7 +38,8 @@ class CollectionBase(object):
         return self._keep_client
 
     def stripped_manifest(self):
         return self._keep_client
 
     def stripped_manifest(self):
-        """
+        """Get the manifest with locator hints stripped.
+
         Return the manifest for the current collection with all
         non-portable hints (i.e., permission signatures and other
         hints other than size hints) removed from the locators.
         Return the manifest for the current collection with all
         non-portable hints (i.e., permission signatures and other
         hints other than size hints) removed from the locators.
@@ -54,197 +58,7 @@ class CollectionBase(object):
         return ''.join(clean)
 
 
         return ''.join(clean)
 
 
-class CollectionReader(CollectionBase):
-    def __init__(self, manifest_locator_or_text, api_client=None,
-                 keep_client=None, num_retries=0):
-        """Instantiate a CollectionReader.
-
-        This class parses Collection manifests to provide a simple interface
-        to read its underlying files.
-
-        Arguments:
-        * manifest_locator_or_text: One of a Collection UUID, portable data
-          hash, or full manifest text.
-        * api_client: The API client to use to look up Collections.  If not
-          provided, CollectionReader will build one from available Arvados
-          configuration.
-        * keep_client: The KeepClient to use to download Collection data.
-          If not provided, CollectionReader will build one from available
-          Arvados configuration.
-        * num_retries: The default number of times to retry failed
-          service requests.  Default 0.  You may change this value
-          after instantiation, but note those changes may not
-          propagate to related objects like the Keep client.
-        """
-        self._api_client = api_client
-        self._keep_client = keep_client
-        self.num_retries = num_retries
-        if re.match(util.keep_locator_pattern, manifest_locator_or_text):
-            self._manifest_locator = manifest_locator_or_text
-            self._manifest_text = None
-        elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
-            self._manifest_locator = manifest_locator_or_text
-            self._manifest_text = None
-        elif re.match(util.manifest_pattern, manifest_locator_or_text):
-            self._manifest_text = manifest_locator_or_text
-            self._manifest_locator = None
-        else:
-            raise errors.ArgumentError(
-                "Argument to CollectionReader must be a manifest or a collection UUID")
-        self._api_response = None
-        self._streams = None
-
-    def _populate_from_api_server(self):
-        # As in KeepClient itself, we must wait until the last
-        # possible moment to instantiate an API client, in order to
-        # avoid tripping up clients that don't have access to an API
-        # server.  If we do build one, make sure our Keep client uses
-        # it.  If instantiation fails, we'll fall back to the except
-        # clause, just like any other Collection lookup
-        # failure. Return an exception, or None if successful.
-        try:
-            if self._api_client is None:
-                self._api_client = arvados.api('v1')
-                self._keep_client = None  # Make a new one with the new api.
-            self._api_response = self._api_client.collections().get(
-                uuid=self._manifest_locator).execute(
-                num_retries=self.num_retries)
-            self._manifest_text = self._api_response['manifest_text']
-            return None
-        except Exception as e:
-            return e
-
-    def _populate_from_keep(self):
-        # Retrieve a manifest directly from Keep. This has a chance of
-        # working if [a] the locator includes a permission signature
-        # or [b] the Keep services are operating in world-readable
-        # mode. Return an exception, or None if successful.
-        try:
-            self._manifest_text = self._my_keep().get(
-                self._manifest_locator, num_retries=self.num_retries)
-        except Exception as e:
-            return e
-
-    def _populate(self):
-        error_via_api = None
-        error_via_keep = None
-        should_try_keep = ((self._manifest_text is None) and
-                           util.keep_locator_pattern.match(
-                self._manifest_locator))
-        if ((self._manifest_text is None) and
-            util.signed_locator_pattern.match(self._manifest_locator)):
-            error_via_keep = self._populate_from_keep()
-        if self._manifest_text is None:
-            error_via_api = self._populate_from_api_server()
-            if error_via_api is not None and not should_try_keep:
-                raise error_via_api
-        if ((self._manifest_text is None) and
-            not error_via_keep and
-            should_try_keep):
-            # Looks like a keep locator, and we didn't already try keep above
-            error_via_keep = self._populate_from_keep()
-        if self._manifest_text is None:
-            # Nothing worked!
-            raise arvados.errors.NotFoundError(
-                ("Failed to retrieve collection '{}' " +
-                 "from either API server ({}) or Keep ({})."
-                 ).format(
-                    self._manifest_locator,
-                    error_via_api,
-                    error_via_keep))
-        self._streams = [sline.split()
-                         for sline in self._manifest_text.split("\n")
-                         if sline]
-
-    def _populate_first(orig_func):
-        # Decorator for methods that read actual Collection data.
-        @functools.wraps(orig_func)
-        def populate_first_wrapper(self, *args, **kwargs):
-            if self._streams is None:
-                self._populate()
-            return orig_func(self, *args, **kwargs)
-        return populate_first_wrapper
-
-    @_populate_first
-    def api_response(self):
-        """api_response() -> dict or None
-
-        Returns information about this Collection fetched from the API server.
-        If the Collection exists in Keep but not the API server, currently
-        returns None.  Future versions may provide a synthetic response.
-        """
-        return self._api_response
-
-    @_populate_first
-    def normalize(self):
-        # Rearrange streams
-        streams = {}
-        for s in self.all_streams():
-            for f in s.all_files():
-                streamname, filename = split(s.name() + "/" + f.name())
-                if streamname not in streams:
-                    streams[streamname] = {}
-                if filename not in streams[streamname]:
-                    streams[streamname][filename] = []
-                for r in f.segments:
-                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
-
-        self._streams = [normalize_stream(s, streams[s])
-                         for s in sorted(streams)]
-
-        # Regenerate the manifest text based on the normalized streams
-        self._manifest_text = ''.join(
-            [StreamReader(stream, keep=self._my_keep()).manifest_text()
-             for stream in self._streams])
-
-    @_populate_first
-    def open(self, streampath, filename=None):
-        """open(streampath[, filename]) -> file-like object
-
-        Pass in the path of a file to read from the Collection, either as a
-        single string or as two separate stream name and file name arguments.
-        This method returns a file-like object to read that file.
-        """
-        if filename is None:
-            streampath, filename = split(streampath)
-        keep_client = self._my_keep()
-        for stream_s in self._streams:
-            stream = StreamReader(stream_s, keep_client,
-                                  num_retries=self.num_retries)
-            if stream.name() == streampath:
-                break
-        else:
-            raise ValueError("stream '{}' not found in Collection".
-                             format(streampath))
-        try:
-            return stream.files()[filename]
-        except KeyError:
-            raise ValueError("file '{}' not found in Collection stream '{}'".
-                             format(filename, streampath))
-
-    @_populate_first
-    def all_streams(self):
-        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
-                for s in self._streams]
-
-    def all_files(self):
-        for s in self.all_streams():
-            for f in s.all_files():
-                yield f
-
-    @_populate_first
-    def manifest_text(self, strip=False, normalize=False):
-        if normalize:
-            cr = CollectionReader(self.manifest_text())
-            cr.normalize()
-            return cr.manifest_text(strip=strip, normalize=False)
-        elif strip:
-            return self.stripped_manifest()
-        else:
-            return self._manifest_text
-
-
-class _WriterFile(FileLikeObjectBase):
+class _WriterFile(_FileLikeObjectBase):
     def __init__(self, coll_writer, name):
         super(_WriterFile, self).__init__(name, 'wb')
         self.dest = coll_writer
     def __init__(self, coll_writer, name):
         super(_WriterFile, self).__init__(name, 'wb')
         self.dest = coll_writer
@@ -253,16 +67,16 @@ class _WriterFile(FileLikeObjectBase):
         super(_WriterFile, self).close()
         self.dest.finish_current_file()
 
         super(_WriterFile, self).close()
         self.dest.finish_current_file()
 
-    @FileLikeObjectBase._before_close
+    @_FileLikeObjectBase._before_close
     def write(self, data):
         self.dest.write(data)
 
     def write(self, data):
         self.dest.write(data)
 
-    @FileLikeObjectBase._before_close
+    @_FileLikeObjectBase._before_close
     def writelines(self, seq):
         for data in seq:
             self.write(data)
 
     def writelines(self, seq):
         for data in seq:
             self.write(data)
 
-    @FileLikeObjectBase._before_close
+    @_FileLikeObjectBase._before_close
     def flush(self):
         self.dest.flush_data()
 
     def flush(self):
         self.dest.flush_data()
 
@@ -656,13 +470,14 @@ class ResumableCollectionWriter(CollectionWriter):
                 "resumable writer can't accept unsourced data")
         return super(ResumableCollectionWriter, self).write(data)
 
                 "resumable writer can't accept unsourced data")
         return super(ResumableCollectionWriter, self).write(data)
 
+
 ADD = "add"
 DEL = "del"
 MOD = "mod"
 FILE = "file"
 COLLECTION = "collection"
 
 ADD = "add"
 DEL = "del"
 MOD = "mod"
 FILE = "file"
 COLLECTION = "collection"
 
-class SynchronizedCollectionBase(CollectionBase):
+class RichCollectionBase(CollectionBase):
     """Base class for Collections and Subcollections.
 
     Implements the majority of functionality relating to accessing items in the
     """Base class for Collections and Subcollections.
 
     Implements the majority of functionality relating to accessing items in the
@@ -684,10 +499,7 @@ class SynchronizedCollectionBase(CollectionBase):
     def _my_block_manager(self):
         raise NotImplementedError()
 
     def _my_block_manager(self):
         raise NotImplementedError()
 
-    def _populate(self):
-        raise NotImplementedError()
-
-    def sync_mode(self):
+    def writable(self):
         raise NotImplementedError()
 
     def root_collection(self):
         raise NotImplementedError()
 
     def root_collection(self):
@@ -696,6 +508,9 @@ class SynchronizedCollectionBase(CollectionBase):
     def notify(self, event, collection, name, item):
         raise NotImplementedError()
 
     def notify(self, event, collection, name, item):
         raise NotImplementedError()
 
+    def stream_name(self):
+        raise NotImplementedError()
+
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
@@ -707,25 +522,18 @@ class SynchronizedCollectionBase(CollectionBase):
         the path.
 
         :create_type:
         the path.
 
         :create_type:
-          One of `arvado.collection.FILE` or
-          `arvado.collection.COLLECTION`.  If the path is not found, and value
+          One of `arvados.collection.FILE` or
+          `arvados.collection.COLLECTION`.  If the path is not found, and value
           of create_type is FILE then create and return a new ArvadosFile for
           the last path component.  If COLLECTION, then create and return a new
           Collection for the last path component.
 
         """
 
           of create_type is FILE then create and return a new ArvadosFile for
           the last path component.  If COLLECTION, then create and return a new
           Collection for the last path component.
 
         """
 
-        if self.sync_mode() == SYNC_READONLY:
-            raise IOError((errno.EROFS, "Collection is read only"))
-
-        pathcomponents = path.split("/")
-        if pathcomponents[0] == '.':
-            del pathcomponents[0]
-
-        if pathcomponents and pathcomponents[0]:
+        pathcomponents = path.split("/", 1)
+        if pathcomponents[0]:
             item = self._items.get(pathcomponents[0])
             if len(pathcomponents) == 1:
             item = self._items.get(pathcomponents[0])
             if len(pathcomponents) == 1:
-                # item must be a file
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
@@ -743,11 +551,10 @@ class SynchronizedCollectionBase(CollectionBase):
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
-                del pathcomponents[0]
-                if isinstance(item, SynchronizedCollectionBase):
-                    return item.find_or_create("/".join(pathcomponents), create_type)
+                if isinstance(item, RichCollectionBase):
+                    return item.find_or_create(pathcomponents[1], create_type)
                 else:
                 else:
-                    raise errors.ArgumentError("Interior path components must be subcollection")
+                    raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
         else:
             return self
 
         else:
             return self
 
@@ -759,23 +566,21 @@ class SynchronizedCollectionBase(CollectionBase):
         found.
 
         """
         found.
 
         """
-        pathcomponents = path.split("/")
-        if pathcomponents[0] == '.':
-            del pathcomponents[0]
+        if not path:
+            raise errors.ArgumentError("Parameter 'path' must not be empty.")
 
 
-        if pathcomponents and pathcomponents[0]:
-            item = self._items.get(pathcomponents[0])
-            if len(pathcomponents) == 1:
-                # item must be a file
-                return item
-            else:
-                del pathcomponents[0]
-                if isinstance(item, SynchronizedCollectionBase):
-                    return item.find("/".join(pathcomponents))
-                else:
-                    raise errors.ArgumentError("Interior path components must be subcollection")
+        pathcomponents = path.split("/", 1)
+        item = self._items.get(pathcomponents[0])
+        if len(pathcomponents) == 1:
+            return item
         else:
         else:
-            return self
+            if isinstance(item, RichCollectionBase):
+                if pathcomponents[1]:
+                    return item.find(pathcomponents[1])
+                else:
+                    return item
+            else:
+                raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
 
     def mkdirs(path):
         """Recursive subcollection create.
 
     def mkdirs(path):
         """Recursive subcollection create.
@@ -786,7 +591,7 @@ class SynchronizedCollectionBase(CollectionBase):
         """
         return self.find_or_create(path, COLLECTION)
 
         """
         return self.find_or_create(path, COLLECTION)
 
-    def open(self, path, mode):
+    def open(self, path, mode="r"):
         """Open a file-like object for access.
 
         :path:
         """Open a file-like object for access.
 
         :path:
@@ -806,10 +611,10 @@ class SynchronizedCollectionBase(CollectionBase):
         """
         mode = mode.replace("b", "")
         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
         """
         mode = mode.replace("b", "")
         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
-            raise ArgumentError("Bad mode '%s'" % mode)
+            raise errors.ArgumentError("Bad mode '%s'" % mode)
         create = (mode != "r")
 
         create = (mode != "r")
 
-        if create and self.sync_mode() == SYNC_READONLY:
+        if create and not self.writable():
             raise IOError((errno.EROFS, "Collection is read only"))
 
         if create:
             raise IOError((errno.EROFS, "Collection is read only"))
 
         if create:
@@ -825,15 +630,16 @@ class SynchronizedCollectionBase(CollectionBase):
         if mode[0] == "w":
             arvfile.truncate(0)
 
         if mode[0] == "w":
             arvfile.truncate(0)
 
+        name = os.path.basename(path)
+
         if mode == "r":
         if mode == "r":
-            return ArvadosFileReader(arvfile, path, mode, num_retries=self.num_retries)
+            return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
         else:
         else:
-            return ArvadosFileWriter(arvfile, path, mode, num_retries=self.num_retries)
+            return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
 
     @synchronized
     def modified(self):
 
     @synchronized
     def modified(self):
-        """Test if the collection (or any subcollection or file) has been modified
-        since it was created."""
+        """Test if the collection (or any subcollection or file) has been modified."""
         if self._modified:
             return True
         for k,v in self._items.items():
         if self._modified:
             return True
         for k,v in self._items.items():
@@ -853,22 +659,18 @@ class SynchronizedCollectionBase(CollectionBase):
         """Iterate over names of files and collections contained in this collection."""
         return iter(self._items.keys())
 
         """Iterate over names of files and collections contained in this collection."""
         return iter(self._items.keys())
 
-    @synchronized
-    def iterkeys(self):
-        """Iterate over names of files and collections directly contained in this collection."""
-        return self._items.keys()
-
     @synchronized
     def __getitem__(self, k):
     @synchronized
     def __getitem__(self, k):
-        """Get a file or collection that is directly contained by this collection.  If
-        you want to search a path, use `find()` instead.
+        """Get a file or collection that is directly contained by this collection.
+
+        If you want to search a path, use `find()` instead.
+
         """
         return self._items[k]
 
     @synchronized
     def __contains__(self, k):
         """
         return self._items[k]
 
     @synchronized
     def __contains__(self, k):
-        """If there is a file or collection a directly contained by this collection
-        with name `k`."""
+        """Test if there is a file or collection a directly contained by this collection."""
         return k in self._items
 
     @synchronized
         return k in self._items
 
     @synchronized
@@ -901,7 +703,7 @@ class SynchronizedCollectionBase(CollectionBase):
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
-        return self.find(path) != None
+        return self.find(path) is not None
 
     @must_be_writable
     @synchronized
 
     @must_be_writable
     @synchronized
@@ -911,42 +713,72 @@ class SynchronizedCollectionBase(CollectionBase):
         :recursive:
           Specify whether to remove non-empty subcollections (True), or raise an error (False).
         """
         :recursive:
           Specify whether to remove non-empty subcollections (True), or raise an error (False).
         """
-        pathcomponents = path.split("/")
-        if pathcomponents[0] == '.':
-            # Remove '.' from the front of the path
-            del pathcomponents[0]
 
 
-        if len(pathcomponents) > 0:
-            item = self._items.get(pathcomponents[0])
-            if item is None:
-                raise IOError((errno.ENOENT, "File not found"))
-            if len(pathcomponents) == 1:
-                if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                    raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
-                deleteditem = self._items[pathcomponents[0]]
-                del self._items[pathcomponents[0]]
-                self._modified = True
-                self.notify(DEL, self, pathcomponents[0], deleteditem)
-            else:
-                del pathcomponents[0]
-                item.remove("/".join(pathcomponents))
-        else:
+        if not path:
+            raise errors.ArgumentError("Parameter 'path' must not be empty.")
+
+        pathcomponents = path.split("/", 1)
+        item = self._items.get(pathcomponents[0])
+        if item is None:
             raise IOError((errno.ENOENT, "File not found"))
             raise IOError((errno.ENOENT, "File not found"))
+        if len(pathcomponents) == 1:
+            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
+                raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
+            deleteditem = self._items[pathcomponents[0]]
+            del self._items[pathcomponents[0]]
+            self._modified = True
+            self.notify(DEL, self, pathcomponents[0], deleteditem)
+        else:
+            item.remove(pathcomponents[1])
 
 
-    def _cloneinto(self, target):
-        for k,v in self._items.items():
-            target._items[k] = v.clone(target)
+    def _clonefrom(self, source):
+        for k,v in source.items():
+            self._items[k] = v.clone(self)
 
     def clone(self):
         raise NotImplementedError()
 
 
     def clone(self):
         raise NotImplementedError()
 
+    @must_be_writable
+    @synchronized
+    def add(self, source_obj, target_name, overwrite=False):
+        """Copy a file or subcollection to this collection.
+
+        :source_obj:
+          An ArvadosFile, or Subcollection object
+
+        :target_name:
+          Destination item name.  If the target name already exists and is a
+          file, this will raise an error unless you specify `overwrite=True`.
+
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+
+        """
+
+        if target_name in self and not overwrite:
+            raise IOError((errno.EEXIST, "File already exists"))
+
+        modified_from = None
+        if target_name in self:
+            modified_from = self[target_name]
+
+        # Actually make the copy.
+        dup = source_obj.clone(self)
+        self._items[target_name] = dup
+        self._modified = True
+
+        if modified_from:
+            self.notify(MOD, self, target_name, (modified_from, dup))
+        else:
+            self.notify(ADD, self, target_name, dup)
+
     @must_be_writable
     @synchronized
     def copy(self, source, target_path, source_collection=None, overwrite=False):
         """Copy a file or subcollection to a new path in this collection.
 
         :source:
     @must_be_writable
     @synchronized
     def copy(self, source, target_path, source_collection=None, overwrite=False):
         """Copy a file or subcollection to a new path in this collection.
 
         :source:
-          An ArvadosFile, Subcollection, or string with a path to source file or subcollection
+          A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
 
         :target_path:
           Destination file or path.  If the target path already exists and is a
 
         :target_path:
           Destination file or path.  If the target path already exists and is a
@@ -984,45 +816,53 @@ class SynchronizedCollectionBase(CollectionBase):
 
         target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 
 
         target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 
-        with target_dir.lock:
-            if target_name in target_dir:
-                if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
-                    target_dir = target_dir[target_name]
-                    target_name = sourcecomponents[-1]
-                elif not overwrite:
-                    raise IOError((errno.EEXIST, "File already exists"))
-
-            modified_from = None
-            if target_name in target_dir:
-                modified_from = target_dir[target_name]
+        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+            target_dir = target_dir[target_name]
+            target_name = sourcecomponents[-1]
 
 
-            # Actually make the copy.
-            dup = source_obj.clone(target_dir)
-            target_dir._items[target_name] = dup
-            target_dir._modified = True
-
-        if modified_from:
-            self.notify(MOD, target_dir, target_name, (modified_from, dup))
-        else:
-            self.notify(ADD, target_dir, target_name, dup)
+        target_dir.add(source_obj, target_name, overwrite)
 
     @synchronized
 
     @synchronized
-    def manifest_text(self, strip=False, normalize=False):
+    def manifest_text(self, stream_name=".", strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
         """Get the manifest text for this collection, sub collections and files.
 
+        :stream_name:
+          Name of the stream (directory)
+
         :strip:
           If True, remove signing tokens from block locators if present.
         :strip:
           If True, remove signing tokens from block locators if present.
-          If False, block locators are left unchanged.
+          If False (default), block locators are left unchanged.
 
         :normalize:
           If True, always export the manifest text in normalized form
 
         :normalize:
           If True, always export the manifest text in normalized form
-          even if the Collection is not modified.  If False and the collection
+          even if the Collection is not modified.  If False (default) and the collection
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
         """
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
         """
+
         if self.modified() or self._manifest_text is None or normalize:
         if self.modified() or self._manifest_text is None or normalize:
-            return export_manifest(self, stream_name=".", portable_locators=strip)
+            stream = {}
+            buf = []
+            sorted_keys = sorted(self.keys())
+            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
+                # Create a stream per file `k`
+                arvfile = self[filename]
+                filestream = []
+                for segment in arvfile.segments():
+                    loc = segment.locator
+                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
+                    if strip:
+                        loc = KeepLocator(loc).stripped()
+                    filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
+                                         segment.segment_offset, segment.range_size))
+                stream[filename] = filestream
+            if stream:
+                buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
+            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
+                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
+            return "".join(buf)
         else:
             if strip:
                 return self.stripped_manifest()
         else:
             if strip:
                 return self.stripped_manifest()
@@ -1031,13 +871,14 @@ class SynchronizedCollectionBase(CollectionBase):
 
     @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
 
     @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
-        """
-        Generate list of add/modify/delete actions which, when given to `apply`, will
-        change `self` to match `end_collection`
+        """Generate list of add/modify/delete actions.
+
+        When given to `apply`, will change `self` to match `end_collection`
+
         """
         changes = []
         if holding_collection is None:
         """
         changes = []
         if holding_collection is None:
-            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_EXPLICIT)
+            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
         for k in self:
             if k not in end_collection:
                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
@@ -1107,7 +948,7 @@ class SynchronizedCollectionBase(CollectionBase):
     def __eq__(self, other):
         if other is self:
             return True
     def __eq__(self, other):
         if other is self:
             return True
-        if not isinstance(other, SynchronizedCollectionBase):
+        if not isinstance(other, RichCollectionBase):
             return False
         if len(self._items) != len(other):
             return False
             return False
         if len(self._items) != len(other):
             return False
@@ -1121,11 +962,16 @@ class SynchronizedCollectionBase(CollectionBase):
     def __ne__(self, other):
         return not self.__eq__(other)
 
     def __ne__(self, other):
         return not self.__eq__(other)
 
-class Collection(SynchronizedCollectionBase):
-    """Represents the root of an Arvados Collection, which may be associated with
-    an API server Collection record.
 
 
-    Brief summary of useful methods:
+class Collection(RichCollectionBase):
+    """Represents the root of an Arvados Collection.
+
+    This class is threadsafe.  The root collection object, all subcollections
+    and files are protected by a single lock (i.e. each access locks the entire
+    collection).
+
+    Brief summary of
+    useful methods:
 
     :To read an existing file:
       `c.open("myfile", "r")`
 
     :To read an existing file:
       `c.open("myfile", "r")`
@@ -1151,20 +997,18 @@ class Collection(SynchronizedCollectionBase):
     :To merge remote changes into this object:
       `c.update()`
 
     :To merge remote changes into this object:
       `c.update()`
 
-    This class is threadsafe.  The root collection object, all subcollections
-    and files are protected by a single lock (i.e. each access locks the entire
-    collection).
+    Must be associated with an API server Collection record (during
+    initialization, or using `save_new`) to use `save` or `update`
 
     """
 
     def __init__(self, manifest_locator_or_text=None,
 
     """
 
     def __init__(self, manifest_locator_or_text=None,
-                 parent=None,
-                 apiconfig=None,
                  api_client=None,
                  keep_client=None,
                  num_retries=None,
                  api_client=None,
                  keep_client=None,
                  num_retries=None,
-                 block_manager=None,
-                 sync=None):
+                 parent=None,
+                 apiconfig=None,
+                 block_manager=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1184,16 +1028,6 @@ class Collection(SynchronizedCollectionBase):
           the number of retries for API and Keep requests.
         :block_manager:
           the block manager to use.  If not specified, create one.
           the number of retries for API and Keep requests.
         :block_manager:
           the block manager to use.  If not specified, create one.
-        :sync:
-          Set synchronization policy with API server collection record.
-          :SYNC_READONLY:
-            Collection is read only.  No synchronization.  This mode will
-            also forego locking, which gives better performance.
-          :SYNC_EXPLICIT:
-            Collection is writable.  Synchronize on explicit request via `update()` or `save()`
-          :SYNC_LIVE:
-            Collection is writable.  Synchronize with server in response to
-            background websocket events, on block write, or on file close.
 
         """
         super(Collection, self).__init__(parent)
 
         """
         super(Collection, self).__init__(parent)
@@ -1206,15 +1040,11 @@ class Collection(SynchronizedCollectionBase):
         else:
             self._config = config.settings()
 
         else:
             self._config = config.settings()
 
-        self.num_retries = num_retries if num_retries is not None else 2
+        self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
 
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
 
-        if sync is None:
-            raise errors.ArgumentError("Must specify sync mode")
-
-        self._sync = sync
         self.lock = threading.RLock()
         self.callbacks = []
         self.events = None
         self.lock = threading.RLock()
         self.callbacks = []
         self.events = None
@@ -1230,41 +1060,31 @@ class Collection(SynchronizedCollectionBase):
                 raise errors.ArgumentError(
                     "Argument to CollectionReader must be a manifest or a collection UUID")
 
                 raise errors.ArgumentError(
                     "Argument to CollectionReader must be a manifest or a collection UUID")
 
-            self._populate()
-            self._subscribe_events()
-
+            try:
+                self._populate()
+            except (IOError, errors.SyntaxError) as e:
+                raise errors.ArgumentError("Error processing manifest text: %s", e)
 
     def root_collection(self):
         return self
 
 
     def root_collection(self):
         return self
 
-    def sync_mode(self):
-        return self._sync
-
-    def _subscribe_events(self):
-        if self._sync == SYNC_LIVE and self.events is None:
-            if not self._has_collection_uuid():
-                raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
-            self.events = events.subscribe(arvados.api(apiconfig=self._config),
-                                           [["object_uuid", "=", self._manifest_locator]],
-                                           self.on_message)
+    def stream_name(self):
+        return "."
 
 
-    def on_message(self, event):
-        if event.get("object_uuid") == self._manifest_locator:
-            self.update()
+    def writable(self):
+        return True
 
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
 
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
-        """Fetch the latest collection record on the API server and merge it with the
-        current collection contents.
+        """Merge the latest collection on the API server with the current collection."""
 
 
-        """
         if other is None:
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
         if other is None:
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
-            other = import_manifest(response["manifest_text"])
-        baseline = import_manifest(self._manifest_text)
+            other = CollectionReader(response["manifest_text"])
+        baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
 
     @synchronized
         self.apply(baseline.diff(other))
 
     @synchronized
@@ -1280,13 +1100,13 @@ class Collection(SynchronizedCollectionBase):
             if self._api_client is None:
                 self._my_api()
             else:
             if self._api_client is None:
                 self._my_api()
             else:
-                self._keep_client = KeepClient(api=self._api_client)
+                self._keep_client = KeepClient(api_client=self._api_client)
         return self._keep_client
 
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
         return self._keep_client
 
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = BlockManager(self._my_keep())
+            self._block_manager = _BlockManager(self._my_keep())
         return self._block_manager
 
     def _populate_from_api_server(self):
         return self._block_manager
 
     def _populate_from_api_server(self):
@@ -1339,7 +1159,7 @@ class Collection(SynchronizedCollectionBase):
             error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
             # Nothing worked!
             error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
             # Nothing worked!
-            raise arvados.errors.NotFoundError(
+            raise errors.NotFoundError(
                 ("Failed to retrieve collection '{}' " +
                  "from either API server ({}) or Keep ({})."
                  ).format(
                 ("Failed to retrieve collection '{}' " +
                  "from either API server ({}) or Keep ({})."
                  ).format(
@@ -1348,12 +1168,8 @@ class Collection(SynchronizedCollectionBase):
                     error_via_keep))
         # populate
         self._baseline_manifest = self._manifest_text
                     error_via_keep))
         # populate
         self._baseline_manifest = self._manifest_text
-        import_manifest(self._manifest_text, self)
+        self._import_manifest(self._manifest_text)
 
 
-        if self._sync == SYNC_READONLY:
-            # Now that we're populated, knowing that this will be readonly,
-            # forego any further locking.
-            self.lock = NoopLock()
 
     def _has_collection_uuid(self):
         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
 
     def _has_collection_uuid(self):
         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
@@ -1363,20 +1179,36 @@ class Collection(SynchronizedCollectionBase):
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
-        if self._sync != SYNC_READONLY and self._has_collection_uuid():
-            self.save()
+        if exc_type is not None:
+            if self.writable() and self._has_collection_uuid():
+                self.save()
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
     @synchronized
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
     @synchronized
-    def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
+    def manifest_locator(self):
+        """Get the manifest locator, if any.
+
+        The manifest locator will be set when the collection is loaded from an
+        API server record or the portable data hash of a manifest.
+
+        The manifest locator will be None if the collection is newly created or
+        was created directly from manifest text.  The method `save_new()` will
+        assign a manifest locator.
+
+        """
+        return self._manifest_locator
+
+    @synchronized
+    def clone(self, new_parent=None, readonly=False, new_config=None):
         if new_config is None:
             new_config = self._config
         if new_config is None:
             new_config = self._config
-        newcollection = Collection(parent=new_parent, apiconfig=new_config, sync=SYNC_EXPLICIT)
-        if new_sync == SYNC_READONLY:
-            newcollection.lock = NoopLock()
-        self._cloneinto(newcollection)
-        newcollection._sync = new_sync
+        if readonly:
+            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
+        else:
+            newcollection = Collection(parent=new_parent, apiconfig=new_config)
+
+        newcollection._clonefrom(self)
         return newcollection
 
     @synchronized
         return newcollection
 
     @synchronized
@@ -1389,22 +1221,48 @@ class Collection(SynchronizedCollectionBase):
         """
         return self._api_response
 
         """
         return self._api_response
 
+    def find_or_create(self, path, create_type):
+        """See `RichCollectionBase.find_or_create`"""
+        if path == ".":
+            return self
+        else:
+            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
+
+    def find(self, path):
+        """See `RichCollectionBase.find`"""
+        if path == ".":
+            return self
+        else:
+            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
+
+    def remove(self, path, recursive=False):
+        """See `RichCollectionBase.remove`"""
+        if path == ".":
+            raise errors.ArgumentError("Cannot remove '.'")
+        else:
+            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
+
     @must_be_writable
     @synchronized
     @retry_method
     def save(self, merge=True, num_retries=None):
     @must_be_writable
     @synchronized
     @retry_method
     def save(self, merge=True, num_retries=None):
-        """Commit pending buffer blocks to Keep, merge with remote record (if
-        update=True), write the manifest to Keep, and update the collection
-        record.
+        """Save collection to an existing collection record.
+
+        Commit pending buffer blocks to Keep, merge with remote record (if
+        merge=True, the default), write the manifest to Keep, and update the
+        collection record.
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
         `save_new()`.
 
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
         `save_new()`.
 
-        :update:
+        :merge:
           Update and merge remote changes before saving.  Otherwise, any
           remote changes will be ignored and overwritten.
 
           Update and merge remote changes before saving.  Otherwise, any
           remote changes will be ignored and overwritten.
 
+        :num_retries:
+          Retry count on API calls (if None,  use the collection default)
+
         """
         if self.modified():
             if not self._has_collection_uuid():
         """
         if self.modified():
             if not self._has_collection_uuid():
@@ -1423,21 +1281,23 @@ class Collection(SynchronizedCollectionBase):
             self._manifest_text = self._api_response["manifest_text"]
             self.set_unmodified()
 
             self._manifest_text = self._api_response["manifest_text"]
             self.set_unmodified()
 
+
     @must_be_writable
     @synchronized
     @retry_method
     def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
     @must_be_writable
     @synchronized
     @retry_method
     def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
-        """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
-        a new collection record (if create_collection_record True).
+        """Save collection to a new collection record.
 
 
+        Commit pending buffer blocks to Keep, write the manifest to Keep, and
+        create a new collection record (if create_collection_record True).
         After creating a new collection record, this Collection object will be
         After creating a new collection record, this Collection object will be
-        associated with the new record for `save()` and SYNC_LIVE updates.
+        associated with the new record used by `save()`.
 
         :name:
           The collection name.
 
 
         :name:
           The collection name.
 
-        :keep_only:
-          Only save the manifest to keep, do not create a collection record.
+        :create_collection_record:
+          If True, create a collection record.  If False, only save the manifest to keep.
 
         :owner_uuid:
           the user, or project uuid that will own this collection.
 
         :owner_uuid:
           the user, or project uuid that will own this collection.
@@ -1448,6 +1308,9 @@ class Collection(SynchronizedCollectionBase):
           if it conflicts with a collection with the same name and owner.  If
           False, a name conflict will result in an error.
 
           if it conflicts with a collection with the same name and owner.  If
           False, a name conflict will result in an error.
 
+        :num_retries:
+          Retry count on API calls (if None,  use the collection default)
+
         """
         self._my_block_manager().commit_all()
         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
         """
         self._my_block_manager().commit_all()
         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
@@ -1465,14 +1328,8 @@ class Collection(SynchronizedCollectionBase):
             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
             text = self._api_response["manifest_text"]
 
             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
             text = self._api_response["manifest_text"]
 
-            if self.events:
-                self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
-
             self._manifest_locator = self._api_response["uuid"]
 
             self._manifest_locator = self._api_response["uuid"]
 
-            if self.events:
-                self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
-
         self._manifest_text = text
         self.set_unmodified()
 
         self._manifest_text = text
         self.set_unmodified()
 
@@ -1489,78 +1346,70 @@ class Collection(SynchronizedCollectionBase):
         for c in self.callbacks:
             c(event, collection, name, item)
 
         for c in self.callbacks:
             c(event, collection, name, item)
 
-def ReadOnlyCollection(*args, **kwargs):
-    """Create a read-only collection object from an api collection record locator,
-    a portable data hash of a manifest, or raw manifest text.
-
-    See `Collection` constructor for detailed options.
-
-    """
-    kwargs["sync"] = SYNC_READONLY
-    return Collection(*args, **kwargs)
-
-def WritableCollection(*args, **kwargs):
-    """Create a writable collection object from an api collection record locator,
-    a portable data hash of a manifest, or raw manifest text.
-
-    See `Collection` constructor for detailed options.
-
-    """
-
-    kwargs["sync"] = SYNC_EXPLICIT
-    return Collection(*args, **kwargs)
-
-def LiveCollection(*args, **kwargs):
-    """Create a writable, live updating collection object representing an existing
-    collection record on the API server.
-
-    See `Collection` constructor for detailed options.
-
-    """
-    kwargs["sync"] = SYNC_LIVE
-    return Collection(*args, **kwargs)
-
-def createWritableCollection(name, owner_uuid=None, apiconfig=None):
-    """Create an empty, writable collection object and create an associated api
-    collection record.
-
-    :name:
-      The collection name
-
-    :owner_uuid:
-      The parent project.
-
-    :apiconfig:
-      Optional alternate api configuration to use (to specify alternate API
-      host or token than the default.)
+    @synchronized
+    def _import_manifest(self, manifest_text):
+        """Import a manifest into a `Collection`.
 
 
-    """
-    newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
-    newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
-    return newcollection
+        :manifest_text:
+          The manifest text to import from.
 
 
-def createLiveCollection(name, owner_uuid=None, apiconfig=None):
-    """Create an empty, writable, live updating Collection object and create an
-    associated collection record on the API server.
+        """
+        if len(self) > 0:
+            raise ArgumentError("Can only import manifest into an empty collection")
 
 
-    :name:
-      The collection name
+        STREAM_NAME = 0
+        BLOCKS = 1
+        SEGMENTS = 2
+
+        stream_name = None
+        state = STREAM_NAME
+
+        for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+            tok = token_and_separator.group(1)
+            sep = token_and_separator.group(2)
+
+            if state == STREAM_NAME:
+                # starting a new stream
+                stream_name = tok.replace('\\040', ' ')
+                blocks = []
+                segments = []
+                streamoffset = 0L
+                state = BLOCKS
+                continue
+
+            if state == BLOCKS:
+                block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+                if block_locator:
+                    blocksize = long(block_locator.group(1))
+                    blocks.append(Range(tok, streamoffset, blocksize))
+                    streamoffset += blocksize
+                else:
+                    state = SEGMENTS
+
+            if state == SEGMENTS:
+                file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+                if file_segment:
+                    pos = long(file_segment.group(1))
+                    size = long(file_segment.group(2))
+                    name = file_segment.group(3).replace('\\040', ' ')
+                    filepath = os.path.join(stream_name, name)
+                    afile = self.find_or_create(filepath, FILE)
+                    if isinstance(afile, ArvadosFile):
+                        afile.add_segment(blocks, pos, size)
+                    else:
+                        raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+                else:
+                    # error!
+                    raise errors.SyntaxError("Invalid manifest format")
 
 
-    :owner_uuid:
-      The parent project.
+            if sep == "\n":
+                stream_name = None
+                state = STREAM_NAME
 
 
-    :apiconfig:
-      Optional alternate api configuration to use (to specify alternate API
-      host or token than the default.)
+        self.set_unmodified()
 
 
-    """
-    newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
-    newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
-    newcollection._sync = SYNC_LIVE
-    newcollection._subscribe_events()
-    return newcollection
 
 
-class Subcollection(SynchronizedCollectionBase):
+class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
@@ -1571,12 +1420,13 @@ class Subcollection(SynchronizedCollectionBase):
     def __init__(self, parent):
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
     def __init__(self, parent):
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
+        self._manifest_text = None
 
     def root_collection(self):
         return self.parent.root_collection()
 
 
     def root_collection(self):
         return self.parent.root_collection()
 
-    def sync_mode(self):
-        return self.root_collection().sync_mode()
+    def writable(self):
+        return self.root_collection().writable()
 
     def _my_api(self):
         return self.root_collection()._my_api()
 
     def _my_api(self):
         return self.root_collection()._my_api()
@@ -1587,151 +1437,89 @@ class Subcollection(SynchronizedCollectionBase):
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
-    def _populate(self):
-        self.root_collection()._populate()
-
     def notify(self, event, collection, name, item):
         return self.root_collection().notify(event, collection, name, item)
 
     def notify(self, event, collection, name, item):
         return self.root_collection().notify(event, collection, name, item)
 
+    def stream_name(self):
+        for k, v in self.parent.items():
+            if v is self:
+                return os.path.join(self.parent.stream_name(), k)
+        return '.'
+
     @synchronized
     def clone(self, new_parent):
         c = Subcollection(new_parent)
     @synchronized
     def clone(self, new_parent):
         c = Subcollection(new_parent)
-        self._cloneinto(c)
+        c._clonefrom(self)
         return c
 
         return c
 
-def import_manifest(manifest_text,
-                    into_collection=None,
-                    api_client=None,
-                    keep=None,
-                    num_retries=None,
-                    sync=SYNC_READONLY):
-    """Import a manifest into a `Collection`.
 
 
-    :manifest_text:
-      The manifest text to import from.
+class CollectionReader(Collection):
+    """A read-only collection object.
 
 
-    :into_collection:
-      The `Collection` that will be initialized (must be empty).
-      If None, create a new `Collection` object.
+    Initialize from an api collection record locator, a portable data hash of a
+    manifest, or raw manifest text.  See `Collection` constructor for detailed
+    options.
 
 
-    :api_client:
-      The API client object that will be used when creating a new `Collection` object.
+    """
+    def __init__(self, manifest_locator_or_text, *args, **kwargs):
+        self._in_init = True
+        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
+        self._in_init = False
 
 
-    :keep:
-      The keep client object that will be used when creating a new `Collection` object.
+        # Forego any locking since it should never change once initialized.
+        self.lock = NoopLock()
 
 
-    :num_retries:
-      the default number of api client and keep retries on error.
+        # Backwards compatability with old CollectionReader
+        # all_streams() and all_files()
+        self._streams = None
 
 
-    :sync:
-      Collection sync mode (only if into_collection is None)
-    """
-    if into_collection is not None:
-        if len(into_collection) > 0:
-            raise ArgumentError("Can only import manifest into an empty collection")
-    else:
-        into_collection = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
-
-    save_sync = into_collection.sync_mode()
-    into_collection._sync = None
-
-    STREAM_NAME = 0
-    BLOCKS = 1
-    SEGMENTS = 2
-
-    stream_name = None
-    state = STREAM_NAME
-
-    for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
-        tok = n.group(1)
-        sep = n.group(2)
-
-        if state == STREAM_NAME:
-            # starting a new stream
-            stream_name = tok.replace('\\040', ' ')
-            blocks = []
-            segments = []
-            streamoffset = 0L
-            state = BLOCKS
-            continue
-
-        if state == BLOCKS:
-            s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
-            if s:
-                blocksize = long(s.group(1))
-                blocks.append(Range(tok, streamoffset, blocksize))
-                streamoffset += blocksize
-            else:
-                state = SEGMENTS
-
-        if state == SEGMENTS:
-            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
-            if s:
-                pos = long(s.group(1))
-                size = long(s.group(2))
-                name = s.group(3).replace('\\040', ' ')
-                f = into_collection.find_or_create("%s/%s" % (stream_name, name), FILE)
-                f.add_segment(blocks, pos, size)
-            else:
-                # error!
-                raise errors.SyntaxError("Invalid manifest format")
+    def writable(self):
+        return self._in_init
 
 
-        if sep == "\n":
-            stream_name = None
-            state = STREAM_NAME
+    def _populate_streams(orig_func):
+        @functools.wraps(orig_func)
+        def populate_streams_wrapper(self, *args, **kwargs):
+            # Defer populating self._streams until needed since it creates a copy of the manifest.
+            if self._streams is None:
+                if self._manifest_text:
+                    self._streams = [sline.split()
+                                     for sline in self._manifest_text.split("\n")
+                                     if sline]
+                else:
+                    self._streams = []
+            return orig_func(self, *args, **kwargs)
+        return populate_streams_wrapper
 
 
-    into_collection.set_unmodified()
-    into_collection._sync = save_sync
-    return into_collection
+    @_populate_streams
+    def normalize(self):
+        """Normalize the streams returned by `all_streams`.
 
 
-def export_manifest(item, stream_name=".", portable_locators=False):
-    """Export a manifest from the contents of a SynchronizedCollectionBase.
+        This method is kept for backwards compatability and only affects the
+        behavior of `all_streams()` and `all_files()`
 
 
-    :item:
-      Create a manifest for `item` (must be a `SynchronizedCollectionBase` or `ArvadosFile`).  If
-      `item` is a is a `Collection`, this will also export subcollections.
+        """
 
 
-    :stream_name:
-      the name of the stream when exporting `item`.
+        # Rearrange streams
+        streams = {}
+        for s in self.all_streams():
+            for f in s.all_files():
+                streamname, filename = split(s.name() + "/" + f.name())
+                if streamname not in streams:
+                    streams[streamname] = {}
+                if filename not in streams[streamname]:
+                    streams[streamname][filename] = []
+                for r in f.segments:
+                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
 
 
-    :portable_locators:
-      If True, strip any permission hints on block locators.
-      If False, use block locators as-is.
+        self._streams = [normalize_stream(s, streams[s])
+                         for s in sorted(streams)]
+    @_populate_streams
+    def all_streams(self):
+        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
+                for s in self._streams]
 
 
-    """
-    buf = ""
-    if isinstance(item, SynchronizedCollectionBase):
-        stream = {}
-        sorted_keys = sorted(item.keys())
-        for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
-            # Create a stream per file `k`
-            arvfile = item[filename]
-            filestream = []
-            for segment in arvfile.segments():
-                loc = segment.locator
-                if loc.startswith("bufferblock"):
-                    loc = arvfile.parent._my_block_manager()._bufferblocks[loc].locator()
-                if portable_locators:
-                    loc = KeepLocator(loc).stripped()
-                filestream.append(LocatorAndRange(loc, locator_block_size(loc),
-                                     segment.segment_offset, segment.range_size))
-            stream[filename] = filestream
-        if stream:
-            buf += ' '.join(normalize_stream(stream_name, stream))
-            buf += "\n"
-        for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
-            buf += export_manifest(item[dirname], stream_name=os.path.join(stream_name, dirname), portable_locators=portable_locators)
-    elif isinstance(item, ArvadosFile):
-        filestream = []
-        for segment in item.segments:
-            loc = segment.locator
-            if loc.startswith("bufferblock"):
-                loc = item._bufferblocks[loc].calculate_locator()
-            if portable_locators:
-                loc = KeepLocator(loc).stripped()
-            filestream.append(LocatorAndRange(loc, locator_block_size(loc),
-                                 segment.segment_offset, segment.range_size))
-        stream[stream_name] = filestream
-        buf += ' '.join(normalize_stream(stream_name, stream))
-        buf += "\n"
-    return buf
+    @_populate_streams
+    def all_files(self):
+        for s in self.all_streams():
+            for f in s.all_files():
+                yield f