Merge branch '8784-dir-listings'
[arvados.git] / sdk / python / arvados / collection.py
index 8450bd1ca086687928c43e09b2d497ae598c5128..77312e4d4917a276f00b46e90e13ab13ba0d5ac4 100644 (file)
@@ -1,3 +1,8 @@
+from __future__ import absolute_import
+from future.utils import listitems, listvalues, viewkeys
+from builtins import str
+from past.builtins import basestring
+from builtins import object
 import functools
 import logging
 import os
 import functools
 import logging
 import os
@@ -11,15 +16,15 @@ from collections import deque
 from stat import *
 
 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
 from stat import *
 
 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
-from keep import KeepLocator, KeepClient
+from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
 from ._normalize_stream import normalize_stream
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
 from .stream import StreamReader
 from ._normalize_stream import normalize_stream
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
-import config
-import errors
-import util
-import events
+import arvados.config as config
+import arvados.errors as errors
+import arvados.util
+import arvados.events as events
 from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
@@ -51,7 +56,7 @@ class CollectionBase(object):
             if fields:
                 clean_fields = fields[:1] + [
                     (re.sub(r'\+[^\d][^\+]*', '', x)
             if fields:
                 clean_fields = fields[:1] + [
                     (re.sub(r'\+[^\d][^\+]*', '', x)
-                     if re.match(util.keep_locator_pattern, x)
+                     if re.match(arvados.util.keep_locator_pattern, x)
                      else x)
                     for x in fields[1:]]
                 clean += [' '.join(clean_fields), "\n"]
                      else x)
                     for x in fields[1:]]
                 clean += [' '.join(clean_fields), "\n"]
@@ -180,7 +185,7 @@ class CollectionWriter(CollectionBase):
 
     def _work_trees(self):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
 
     def _work_trees(self):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
-        d = util.listdir_recursive(
+        d = arvados.util.listdir_recursive(
             path, max_depth = (None if max_manifest_depth == 0 else 0))
         if d:
             self._queue_dirents(stream_name, d)
             path, max_depth = (None if max_manifest_depth == 0 else 0))
         if d:
             self._queue_dirents(stream_name, d)
@@ -216,7 +221,11 @@ class CollectionWriter(CollectionBase):
         self.do_queued_work()
 
     def write(self, newdata):
         self.do_queued_work()
 
     def write(self, newdata):
-        if hasattr(newdata, '__iter__'):
+        if isinstance(newdata, bytes):
+            pass
+        elif isinstance(newdata, str):
+            newdata = newdata.encode()
+        elif hasattr(newdata, '__iter__'):
             for s in newdata:
                 self.write(s)
             return
             for s in newdata:
                 self.write(s)
             return
@@ -256,7 +265,7 @@ class CollectionWriter(CollectionBase):
         return self._last_open
 
     def flush_data(self):
         return self._last_open
 
     def flush_data(self):
-        data_buffer = ''.join(self._data_buffer)
+        data_buffer = b''.join(self._data_buffer)
         if data_buffer:
             self._current_stream_locators.append(
                 self._my_keep().put(
         if data_buffer:
             self._current_stream_locators.append(
                 self._my_keep().put(
@@ -307,7 +316,8 @@ class CollectionWriter(CollectionBase):
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
-                "Manifest stream names cannot contain whitespace")
+                "Manifest stream names cannot contain whitespace: '%s'" %
+                (newstreamname))
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
@@ -345,11 +355,12 @@ class CollectionWriter(CollectionBase):
         sending manifest_text() to the API server's "create
         collection" endpoint.
         """
         sending manifest_text() to the API server's "create
         collection" endpoint.
         """
-        return self._my_keep().put(self.manifest_text(), copies=self.replication)
+        return self._my_keep().put(self.manifest_text().encode(),
+                                   copies=self.replication)
 
     def portable_data_hash(self):
 
     def portable_data_hash(self):
-        stripped = self.stripped_manifest()
-        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+        stripped = self.stripped_manifest().encode()
+        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 
     def manifest_text(self):
         self.finish_current_stream()
 
     def manifest_text(self):
         self.finish_current_stream()
@@ -417,7 +428,7 @@ class ResumableCollectionWriter(CollectionWriter):
         return writer
 
     def check_dependencies(self):
         return writer
 
     def check_dependencies(self):
-        for path, orig_stat in self._dependencies.items():
+        for path, orig_stat in listitems(self._dependencies):
             if not S_ISREG(orig_stat[ST_MODE]):
                 raise errors.StaleWriterStateError("{} not file".format(path))
             try:
             if not S_ISREG(orig_stat[ST_MODE]):
                 raise errors.StaleWriterStateError("{} not file".format(path))
             try:
@@ -543,7 +554,7 @@ class RichCollectionBase(CollectionBase):
                     else:
                         item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     else:
                         item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
-                    self._committed = False
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
@@ -551,12 +562,12 @@ class RichCollectionBase(CollectionBase):
                     # create new collection
                     item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     # create new collection
                     item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
-                    self._committed = False
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
-                    raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
+                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
         else:
             return self
 
         else:
             return self
 
@@ -564,16 +575,23 @@ class RichCollectionBase(CollectionBase):
     def find(self, path):
         """Recursively search the specified file path.
 
     def find(self, path):
         """Recursively search the specified file path.
 
-        May return either a Collection or ArvadosFile.  Return None if not
+        May return either a Collection or ArvadosFile. Return None if not
         found.
         found.
+        If path is invalid (ex: starts with '/'), an IOError exception will be
+        raised.
 
         """
         if not path:
             raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
 
         """
         if not path:
             raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
         item = self._items.get(pathcomponents[0])
         item = self._items.get(pathcomponents[0])
-        if len(pathcomponents) == 1:
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
             return item
         else:
             if isinstance(item, RichCollectionBase):
             return item
         else:
             if isinstance(item, RichCollectionBase):
@@ -582,7 +600,7 @@ class RichCollectionBase(CollectionBase):
                 else:
                     return item
             else:
                 else:
                     return item
             else:
-                raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
+                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 
     @synchronized
     def mkdirs(self, path):
 
     @synchronized
     def mkdirs(self, path):
@@ -594,7 +612,7 @@ class RichCollectionBase(CollectionBase):
         """
 
         if self.find(path) != None:
         """
 
         if self.find(path) != None:
-            raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
+            raise IOError(errno.EEXIST, "Directory or file exists", path)
 
         return self.find_or_create(path, COLLECTION)
 
 
         return self.find_or_create(path, COLLECTION)
 
@@ -604,7 +622,12 @@ class RichCollectionBase(CollectionBase):
         :path:
           path to a file in the collection
         :mode:
         :path:
           path to a file in the collection
         :mode:
-          one of "r", "r+", "w", "w+", "a", "a+"
+          a string consisting of "r", "w", or "a", optionally followed
+          by "b" or "t", optionally followed by "+".
+          :"b":
+            binary mode: write() accepts bytes, read() returns bytes.
+          :"t":
+            text mode (default): write() accepts strings, read() returns strings.
           :"r":
             opens for reading
           :"r+":
           :"r":
             opens for reading
           :"r+":
@@ -616,33 +639,28 @@ class RichCollectionBase(CollectionBase):
             the end of the file.  Writing does not affect the file pointer for
             reading.
         """
             the end of the file.  Writing does not affect the file pointer for
             reading.
         """
-        mode = mode.replace("b", "")
-        if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
-            raise errors.ArgumentError("Bad mode '%s'" % mode)
-        create = (mode != "r")
 
 
-        if create and not self.writable():
-            raise IOError(errno.EROFS, "Collection is read only")
+        if not re.search(r'^[rwa][bt]?\+?$', mode):
+            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 
 
-        if create:
-            arvfile = self.find_or_create(path, FILE)
-        else:
+        if mode[0] == 'r' and '+' not in mode:
+            fclass = ArvadosFileReader
             arvfile = self.find(path)
             arvfile = self.find(path)
+        elif not self.writable():
+            raise IOError(errno.EROFS, "Collection is read only")
+        else:
+            fclass = ArvadosFileWriter
+            arvfile = self.find_or_create(path, FILE)
 
         if arvfile is None:
 
         if arvfile is None:
-            raise IOError(errno.ENOENT, "File not found")
+            raise IOError(errno.ENOENT, "File not found", path)
         if not isinstance(arvfile, ArvadosFile):
         if not isinstance(arvfile, ArvadosFile):
-            raise IOError(errno.EISDIR, "Is a directory: %s" % path)
+            raise IOError(errno.EISDIR, "Is a directory", path)
 
 
-        if mode[0] == "w":
+        if mode[0] == 'w':
             arvfile.truncate(0)
 
             arvfile.truncate(0)
 
-        name = os.path.basename(path)
-
-        if mode == "r":
-            return ArvadosFileReader(arvfile, num_retries=self.num_retries)
-        else:
-            return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
+        return fclass(arvfile, mode=mode, num_retries=self.num_retries)
 
     def modified(self):
         """Determine if the collection has been modified since last commited."""
 
     def modified(self):
         """Determine if the collection has been modified since last commited."""
@@ -651,25 +669,31 @@ class RichCollectionBase(CollectionBase):
     @synchronized
     def committed(self):
         """Determine if the collection has been committed to the API server."""
     @synchronized
     def committed(self):
         """Determine if the collection has been committed to the API server."""
-
-        if self._committed is False:
-            return False
-        for v in self._items.values():
-            if v.committed() is False:
-                return False
-        return True
+        return self._committed
 
     @synchronized
 
     @synchronized
-    def set_committed(self):
-        """Recursively set committed flag to True."""
-        self._committed = True
-        for k,v in self._items.items():
-            v.set_committed()
+    def set_committed(self, value=True):
+        """Recursively set committed flag.
+
+        If value is True, set committed to be True for this and all children.
+
+        If value is False, set committed to be False for this and all parents.
+        """
+        if value == self._committed:
+            return
+        if value:
+            for k,v in listitems(self._items):
+                v.set_committed(True)
+            self._committed = True
+        else:
+            self._committed = False
+            if self.parent is not None:
+                self.parent.set_committed(False)
 
     @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
 
     @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
-        return iter(self._items.keys())
+        return iter(viewkeys(self._items))
 
     @synchronized
     def __getitem__(self, k):
 
     @synchronized
     def __getitem__(self, k):
@@ -695,7 +719,7 @@ class RichCollectionBase(CollectionBase):
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
-        self._committed = False
+        self.set_committed(False)
         self.notify(DEL, self, p, None)
 
     @synchronized
         self.notify(DEL, self, p, None)
 
     @synchronized
@@ -706,12 +730,12 @@ class RichCollectionBase(CollectionBase):
     @synchronized
     def values(self):
         """Get a list of files and collection objects directly contained in this collection."""
     @synchronized
     def values(self):
         """Get a list of files and collection objects directly contained in this collection."""
-        return self._items.values()
+        return listvalues(self._items)
 
     @synchronized
     def items(self):
         """Get a list of (name, object) tuples directly contained in this collection."""
 
     @synchronized
     def items(self):
         """Get a list of (name, object) tuples directly contained in this collection."""
-        return self._items.items()
+        return listitems(self._items)
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
@@ -732,19 +756,19 @@ class RichCollectionBase(CollectionBase):
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
-            raise IOError(errno.ENOENT, "File not found")
+            raise IOError(errno.ENOENT, "File not found", path)
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
+                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
-            self._committed = False
+            self.set_committed(False)
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
 
     def _clonefrom(self, source):
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
 
     def _clonefrom(self, source):
-        for k,v in source.items():
+        for k,v in listitems(source):
             self._items[k] = v.clone(self, k)
 
     def clone(self):
             self._items[k] = v.clone(self, k)
 
     def clone(self):
@@ -773,7 +797,7 @@ class RichCollectionBase(CollectionBase):
         """
 
         if target_name in self and not overwrite:
         """
 
         if target_name in self and not overwrite:
-            raise IOError(errno.EEXIST, "File already exists")
+            raise IOError(errno.EEXIST, "File already exists", target_name)
 
         modified_from = None
         if target_name in self:
 
         modified_from = None
         if target_name in self:
@@ -787,7 +811,7 @@ class RichCollectionBase(CollectionBase):
             item = source_obj.clone(self, target_name)
 
         self._items[target_name] = item
             item = source_obj.clone(self, target_name)
 
         self._items[target_name] = item
-        self._committed = False
+        self.set_committed(False)
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
@@ -802,7 +826,7 @@ class RichCollectionBase(CollectionBase):
         if isinstance(source, basestring):
             source_obj = source_collection.find(source)
             if source_obj is None:
         if isinstance(source, basestring):
             source_obj = source_collection.find(source)
             if source_obj is None:
-                raise IOError(errno.ENOENT, "File not found")
+                raise IOError(errno.ENOENT, "File not found", source)
             sourcecomponents = source.split("/")
         else:
             source_obj = source
             sourcecomponents = source.split("/")
         else:
             source_obj = source
@@ -826,9 +850,9 @@ class RichCollectionBase(CollectionBase):
                 target_dir = self
 
         if target_dir is None:
                 target_dir = self
 
         if target_dir is None:
-            raise IOError(errno.ENOENT, "Target directory not found.")
+            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
             target_dir = target_dir[target_name]
             target_name = sourcecomponents[-1]
 
             target_dir = target_dir[target_name]
             target_name = sourcecomponents[-1]
 
@@ -881,7 +905,7 @@ class RichCollectionBase(CollectionBase):
 
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
         if not source_obj.writable():
 
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
         if not source_obj.writable():
-            raise IOError(errno.EROFS, "Source collection is read only.")
+            raise IOError(errno.EROFS, "Source collection is read only", source)
         target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
         target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
@@ -897,7 +921,8 @@ class RichCollectionBase(CollectionBase):
         return self._get_manifest_text(stream_name, True, True)
 
     @synchronized
         return self._get_manifest_text(stream_name, True, True)
 
     @synchronized
-    def manifest_text(self, stream_name=".", strip=False, normalize=False):
+    def manifest_text(self, stream_name=".", strip=False, normalize=False,
+                      only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         This method will flush outstanding blocks to Keep.  By default, it will
         """Get the manifest text for this collection, sub collections and files.
 
         This method will flush outstanding blocks to Keep.  By default, it will
@@ -916,13 +941,18 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, don't commit pending blocks.
+
         """
 
         """
 
-        self._my_block_manager().commit_all()
-        return self._get_manifest_text(stream_name, strip, normalize)
+        if not only_committed:
+            self._my_block_manager().commit_all()
+        return self._get_manifest_text(stream_name, strip, normalize,
+                                       only_committed=only_committed)
 
     @synchronized
 
     @synchronized
-    def _get_manifest_text(self, stream_name, strip, normalize):
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
@@ -938,6 +968,9 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
         """
 
         if not self.committed() or self._manifest_text is None or normalize:
         """
 
         if not self.committed() or self._manifest_text is None or normalize:
@@ -951,6 +984,8 @@ class RichCollectionBase(CollectionBase):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
@@ -960,7 +995,7 @@ class RichCollectionBase(CollectionBase):
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
-                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
             return "".join(buf)
         else:
             if strip:
             return "".join(buf)
         else:
             if strip:
@@ -1003,7 +1038,7 @@ class RichCollectionBase(CollectionBase):
 
         """
         if changes:
 
         """
         if changes:
-            self._committed = False
+            self.set_committed(False)
         for change in changes:
             event_type = change[0]
             path = change[1]
         for change in changes:
             event_type = change[0]
             path = change[1]
@@ -1044,8 +1079,13 @@ class RichCollectionBase(CollectionBase):
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
-        stripped = self.portable_manifest_text()
-        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+        if self._manifest_locator and self.committed():
+            # If the collection is already saved on the API server, and it's committed
+            # then return API server's PDH response.
+            return self._portable_data_hash
+        else:
+            stripped = self.portable_manifest_text().encode()
+            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 
     @synchronized
     def subscribe(self, callback):
 
     @synchronized
     def subscribe(self, callback):
@@ -1086,7 +1126,7 @@ class RichCollectionBase(CollectionBase):
     @synchronized
     def flush(self):
         """Flush bufferblocks to Keep."""
     @synchronized
     def flush(self):
         """Flush bufferblocks to Keep."""
-        for e in self.values():
+        for e in listvalues(self):
             e.flush()
 
 
             e.flush()
 
 
@@ -1135,7 +1175,9 @@ class Collection(RichCollectionBase):
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
-                 block_manager=None):
+                 block_manager=None,
+                 replication_desired=None,
+                 put_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1143,24 +1185,36 @@ class Collection(RichCollectionBase):
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
+        self.put_threads = put_threads
 
         if apiconfig:
             self._config = apiconfig
 
         if apiconfig:
             self._config = apiconfig
@@ -1170,6 +1224,7 @@ class Collection(RichCollectionBase):
         self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
         self.num_retries = num_retries if num_retries is not None else 0
         self._manifest_locator = None
         self._manifest_text = None
+        self._portable_data_hash = None
         self._api_response = None
         self._past_versions = set()
 
         self._api_response = None
         self._past_versions = set()
 
@@ -1177,11 +1232,11 @@ class Collection(RichCollectionBase):
         self.events = None
 
         if manifest_locator_or_text:
         self.events = None
 
         if manifest_locator_or_text:
-            if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
                 self._manifest_locator = manifest_locator_or_text
-            elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
                 self._manifest_locator = manifest_locator_or_text
-            elif re.match(util.manifest_pattern, manifest_locator_or_text):
+            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
@@ -1232,7 +1287,8 @@ class Collection(RichCollectionBase):
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
         return self._api_client
 
     @synchronized
@@ -1247,7 +1303,10 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
         return self._block_manager
 
     def _remember_api_response(self, response):
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1267,6 +1326,11 @@ class Collection(RichCollectionBase):
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
+            self._portable_data_hash = self._api_response['portable_data_hash']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
             return None
         except Exception as e:
             return e
@@ -1278,7 +1342,7 @@ class Collection(RichCollectionBase):
         # mode. Return an exception, or None if successful.
         try:
             self._manifest_text = self._my_keep().get(
         # mode. Return an exception, or None if successful.
         try:
             self._manifest_text = self._my_keep().get(
-                self._manifest_locator, num_retries=self.num_retries)
+                self._manifest_locator, num_retries=self.num_retries).decode()
         except Exception as e:
             return e
 
         except Exception as e:
             return e
 
@@ -1288,10 +1352,10 @@ class Collection(RichCollectionBase):
         error_via_api = None
         error_via_keep = None
         should_try_keep = ((self._manifest_text is None) and
         error_via_api = None
         error_via_keep = None
         should_try_keep = ((self._manifest_text is None) and
-                           util.keep_locator_pattern.match(
+                           arvados.util.keep_locator_pattern.match(
                                self._manifest_locator))
         if ((self._manifest_text is None) and
                                self._manifest_locator))
         if ((self._manifest_text is None) and
-            util.signed_locator_pattern.match(self._manifest_locator)):
+            arvados.util.signed_locator_pattern.match(self._manifest_locator)):
             error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
             error_via_api = self._populate_from_api_server()
             error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
             error_via_api = self._populate_from_api_server()
@@ -1317,7 +1381,7 @@ class Collection(RichCollectionBase):
 
 
     def _has_collection_uuid(self):
 
 
     def _has_collection_uuid(self):
-        return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
 
     def __enter__(self):
         return self
 
     def __enter__(self):
         return self
@@ -1428,7 +1492,8 @@ class Collection(RichCollectionBase):
                 ).execute(
                     num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
                 ).execute(
                     num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
-            self.set_committed()
+            self._portable_data_hash = self._api_response["portable_data_hash"]
+            self.set_committed(True)
 
         return self._manifest_text
 
 
         return self._manifest_text
 
@@ -1477,7 +1542,8 @@ class Collection(RichCollectionBase):
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
-                    "name": name}
+                    "name": name,
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
@@ -1485,9 +1551,10 @@ class Collection(RichCollectionBase):
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
+            self._portable_data_hash = self._api_response["portable_data_hash"]
 
             self._manifest_text = text
 
             self._manifest_text = text
-            self.set_committed()
+            self.set_committed(True)
 
         return text
 
 
         return text
 
@@ -1518,7 +1585,7 @@ class Collection(RichCollectionBase):
                 stream_name = tok.replace('\\040', ' ')
                 blocks = []
                 segments = []
                 stream_name = tok.replace('\\040', ' ')
                 blocks = []
                 segments = []
-                streamoffset = 0L
+                streamoffset = 0
                 state = BLOCKS
                 self.find_or_create(stream_name, COLLECTION)
                 continue
                 state = BLOCKS
                 self.find_or_create(stream_name, COLLECTION)
                 continue
@@ -1526,7 +1593,7 @@ class Collection(RichCollectionBase):
             if state == BLOCKS:
                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                 if block_locator:
             if state == BLOCKS:
                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                 if block_locator:
-                    blocksize = long(block_locator.group(1))
+                    blocksize = int(block_locator.group(1))
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
@@ -1535,8 +1602,8 @@ class Collection(RichCollectionBase):
             if state == SEGMENTS:
                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
                 if file_segment:
             if state == SEGMENTS:
                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
                 if file_segment:
-                    pos = long(file_segment.group(1))
-                    size = long(file_segment.group(2))
+                    pos = int(file_segment.group(1))
+                    size = int(file_segment.group(2))
                     name = file_segment.group(3).replace('\\040', ' ')
                     filepath = os.path.join(stream_name, name)
                     afile = self.find_or_create(filepath, FILE)
                     name = file_segment.group(3).replace('\\040', ' ')
                     filepath = os.path.join(stream_name, name)
                     afile = self.find_or_create(filepath, FILE)
@@ -1552,7 +1619,7 @@ class Collection(RichCollectionBase):
                 stream_name = None
                 state = STREAM_NAME
 
                 stream_name = None
                 state = STREAM_NAME
 
-        self.set_committed()
+        self.set_committed(True)
 
     @synchronized
     def notify(self, event, collection, name, item):
 
     @synchronized
     def notify(self, event, collection, name, item):
@@ -1602,7 +1669,7 @@ class Subcollection(RichCollectionBase):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
-        self._committed = False
+        self.set_committed(False)
         self.flush()
         self.parent.remove(self.name, recursive=True)
         self.parent = newparent
         self.flush()
         self.parent.remove(self.name, recursive=True)
         self.parent = newparent