Merge branch 'master' into 4823-python-sdk-writable-collection-api
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 12 Feb 2015 18:35:38 +0000 (13:35 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 12 Feb 2015 18:35:38 +0000 (13:35 -0500)
Conflicts:
sdk/python/arvados/collection.py
sdk/python/tests/test_keep_client.py

1  2 
sdk/python/arvados/collection.py
sdk/python/arvados/commands/put.py
sdk/python/arvados/keep.py
sdk/python/tests/arvados_testutil.py
sdk/python/tests/test_collections.py
sdk/python/tests/test_keep_client.py
services/fuse/arvados_fuse/__init__.py

index 20f5c40bfccdf8eb945db95bf575e6f2396782f7,7bfdf782f8d06b03d6ac482fa64872d1eb8ff9be..82708cc02ebd5b99cd96b3a3095c934e4d95b2ea
@@@ -2,25 -2,59 +2,25 @@@ import functool
  import logging
  import os
  import re
 +import errno
 +import time
  
  from collections import deque
  from stat import *
  
 -from .arvfile import ArvadosFileBase
 +from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
  from keep import *
 -from .stream import StreamReader, split
 +from .stream import StreamReader, normalize_stream, locator_block_size
 +from .ranges import Range, LocatorAndRange
 +from .safeapi import SafeApi
  import config
  import errors
  import util
 +import events
 +from arvados.retry import retry_method
  
  _logger = logging.getLogger('arvados.collection')
  
 -def normalize_stream(s, stream):
 -    stream_tokens = [s]
 -    sortedfiles = list(stream.keys())
 -    sortedfiles.sort()
 -
 -    blocks = {}
 -    streamoffset = 0L
 -    for f in sortedfiles:
 -        for b in stream[f]:
 -            if b[arvados.LOCATOR] not in blocks:
 -                stream_tokens.append(b[arvados.LOCATOR])
 -                blocks[b[arvados.LOCATOR]] = streamoffset
 -                streamoffset += b[arvados.BLOCKSIZE]
 -
 -    if len(stream_tokens) == 1:
 -        stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
 -
 -    for f in sortedfiles:
 -        current_span = None
 -        fout = f.replace(' ', '\\040')
 -        for segment in stream[f]:
 -            segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
 -            if current_span is None:
 -                current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
 -            else:
 -                if segmentoffset == current_span[1]:
 -                    current_span[1] += segment[arvados.SEGMENTSIZE]
 -                else:
 -                    stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
 -                    current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
 -
 -        if current_span is not None:
 -            stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
 -
 -        if not stream[f]:
 -            stream_tokens.append("0:0:{0}".format(fout))
 -
 -    return stream_tokens
 -
 -
  class CollectionBase(object):
      def __enter__(self):
          return self
@@@ -187,7 -221,7 +187,7 @@@ class CollectionReader(CollectionBase)
                  if filename not in streams[streamname]:
                      streams[streamname][filename] = []
                  for r in f.segments:
 -                    streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
 +                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
  
          self._streams = [normalize_stream(s, streams[s])
                           for s in sorted(streams)]
@@@ -268,7 -302,9 +268,7 @@@ class _WriterFile(ArvadosFileBase)
  
  
  class CollectionWriter(CollectionBase):
-     def __init__(self, api_client=None, num_retries=0):
 -    KEEP_BLOCK_SIZE = 2**26
 -
+     def __init__(self, api_client=None, num_retries=0, replication=None):
          """Instantiate a CollectionWriter.
  
          CollectionWriter lets you build a new Arvados Collection from scratch.
            service requests.  Default 0.  You may change this value
            after instantiation, but note those changes may not
            propagate to related objects like the Keep client.
+         * replication: The number of copies of each block to store.
+           If this argument is None or not supplied, replication is
+           the server-provided default if available, otherwise 2.
          """
          self._api_client = api_client
          self.num_retries = num_retries
+         self.replication = (2 if replication is None else replication)
          self._keep_client = None
          self._data_buffer = []
          self._data_buffer_len = 0
  
      def _work_file(self):
          while True:
 -            buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
 +            buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
              if not buf:
                  break
              self.write(buf)
          self._data_buffer.append(newdata)
          self._data_buffer_len += len(newdata)
          self._current_stream_length += len(newdata)
 -        while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
 +        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
              self.flush_data()
  
      def open(self, streampath, filename=None):
          data_buffer = ''.join(self._data_buffer)
          if data_buffer:
              self._current_stream_locators.append(
-                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
+                 self._my_keep().put(
 -                    data_buffer[0:self.KEEP_BLOCK_SIZE],
++                    data_buffer[0:config.KEEP_BLOCK_SIZE],
+                     copies=self.replication))
 -            self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
 +            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
              self._data_buffer_len = len(self._data_buffer[0])
  
      def start_new_file(self, newfilename=None):
          self._current_file_name = None
  
      def finish(self):
-         # Store the manifest in Keep and return its locator.
-         return self._my_keep().put(self.manifest_text())
+         """Store the manifest in Keep and return its locator.
+         This is useful for storing manifest fragments (task outputs)
+         temporarily in Keep during a Crunch job.
+         In other cases you should make a collection instead, by
+         sending manifest_text() to the API server's "create
+         collection" endpoint.
+         """
+         return self._my_keep().put(self.manifest_text(), copies=self.replication)
  
      def portable_data_hash(self):
          stripped = self.stripped_manifest()
@@@ -551,10 -601,9 +565,9 @@@ class ResumableCollectionWriter(Collect
                     '_data_buffer', '_dependencies', '_finished_streams',
                     '_queued_dirents', '_queued_trees']
  
-     def __init__(self, api_client=None, num_retries=0):
+     def __init__(self, api_client=None, **kwargs):
          self._dependencies = {}
-         super(ResumableCollectionWriter, self).__init__(
-             api_client, num_retries=num_retries)
+         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
  
      @classmethod
      def from_state(cls, state, *init_args, **init_kwargs):
              raise errors.AssertionError(
                  "resumable writer can't accept unsourced data")
          return super(ResumableCollectionWriter, self).write(data)
 +
 +ADD = "add"
 +DEL = "del"
 +MOD = "mod"
 +
 +class SynchronizedCollectionBase(CollectionBase):
 +    """Base class for Collections and Subcollections.  Implements the majority of
 +    functionality relating to accessing items in the Collection."""
 +
 +    def __init__(self, parent=None):
 +        self.parent = parent
 +        self._modified = True
 +        self._items = {}
 +
 +    def _my_api(self):
 +        raise NotImplementedError()
 +
 +    def _my_keep(self):
 +        raise NotImplementedError()
 +
 +    def _my_block_manager(self):
 +        raise NotImplementedError()
 +
 +    def _populate(self):
 +        raise NotImplementedError()
 +
 +    def sync_mode(self):
 +        raise NotImplementedError()
 +
 +    def root_collection(self):
 +        raise NotImplementedError()
 +
 +    def notify(self, event, collection, name, item):
 +        raise NotImplementedError()
 +
 +    @synchronized
 +    def find(self, path, create=False, create_collection=False):
 +        """Recursively search the specified file path.  May return either a Collection
 +        or ArvadosFile.
 +
 +        :create:
 +          If true, create path components (i.e. Collections) that are
 +          missing.  If "create" is False, return None if a path component is
 +          not found.
 +
 +        :create_collection:
 +          If the path is not found, "create" is True, and
 +          "create_collection" is False, then create and return a new
 +          ArvadosFile for the last path component.  If "create_collection" is
 +          True, then create and return a new Collection for the last path
 +          component.
 +
 +        """
 +        if create and self.sync_mode() == SYNC_READONLY:
 +            raise IOError((errno.EROFS, "Collection is read only"))
 +
 +        p = path.split("/")
 +        if p[0] == '.':
 +            del p[0]
 +
 +        if p and p[0]:
 +            item = self._items.get(p[0])
 +            if len(p) == 1:
 +                # item must be a file
 +                if item is None and create:
 +                    # create new file
 +                    if create_collection:
 +                        item = Subcollection(self)
 +                    else:
 +                        item = ArvadosFile(self)
 +                    self._items[p[0]] = item
 +                    self._modified = True
 +                    self.notify(ADD, self, p[0], item)
 +                return item
 +            else:
 +                if item is None and create:
 +                    # create new collection
 +                    item = Subcollection(self)
 +                    self._items[p[0]] = item
 +                    self._modified = True
 +                    self.notify(ADD, self, p[0], item)
 +                del p[0]
 +                if isinstance(item, SynchronizedCollectionBase):
 +                    return item.find("/".join(p), create=create)
 +                else:
 +                    raise errors.ArgumentError("Interior path components must be subcollection")
 +        else:
 +            return self
 +
 +    def open(self, path, mode):
 +        """Open a file-like object for access.
 +
 +        :path:
 +          path to a file in the collection
 +        :mode:
 +          one of "r", "r+", "w", "w+", "a", "a+"
 +          :"r":
 +            opens for reading
 +          :"r+":
 +            opens for reading and writing.  Reads/writes share a file pointer.
 +          :"w", "w+":
 +            truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
 +          :"a", "a+":
 +            opens for reading and writing.  All writes are appended to
 +            the end of the file.  Writing does not affect the file pointer for
 +            reading.
 +        """
 +        mode = mode.replace("b", "")
 +        if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
 +            raise ArgumentError("Bad mode '%s'" % mode)
 +        create = (mode != "r")
 +
 +        if create and self.sync_mode() == SYNC_READONLY:
 +            raise IOError((errno.EROFS, "Collection is read only"))
 +
 +        f = self.find(path, create=create)
 +
 +        if f is None:
 +            raise IOError((errno.ENOENT, "File not found"))
 +        if not isinstance(f, ArvadosFile):
 +            raise IOError((errno.EISDIR, "Path must refer to a file."))
 +
 +        if mode[0] == "w":
 +            f.truncate(0)
 +
 +        if mode == "r":
 +            return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
 +        else:
 +            return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
 +
 +    @synchronized
 +    def modified(self):
 +        """Test if the collection (or any subcollection or file) has been modified
 +        since it was created."""
 +        if self._modified:
 +            return True
 +        for k,v in self._items.items():
 +            if v.modified():
 +                return True
 +        return False
 +
 +    @synchronized
 +    def set_unmodified(self):
 +        """Recursively clear modified flag"""
 +        self._modified = False
 +        for k,v in self._items.items():
 +            v.set_unmodified()
 +
 +    @synchronized
 +    def __iter__(self):
 +        """Iterate over names of files and collections contained in this collection."""
 +        return self._items.keys().__iter__()
 +
 +    @synchronized
 +    def iterkeys(self):
 +        """Iterate over names of files and collections directly contained in this collection."""
 +        return self._items.keys()
 +
 +    @synchronized
 +    def __getitem__(self, k):
 +        """Get a file or collection that is directly contained by this collection.  If
 +        you want to search a path, use `find()` instead.
 +        """
 +        return self._items[k]
 +
 +    @synchronized
 +    def __contains__(self, k):
 +        """If there is a file or collection a directly contained by this collection
 +        with name "k"."""
 +        return k in self._items
 +
 +    @synchronized
 +    def __len__(self):
 +        """Get the number of items directly contained in this collection"""
 +        return len(self._items)
 +
 +    @must_be_writable
 +    @synchronized
 +    def __delitem__(self, p):
 +        """Delete an item by name which is directly contained by this collection."""
 +        del self._items[p]
 +        self._modified = True
 +        self.notify(DEL, self, p, None)
 +
 +    @synchronized
 +    def keys(self):
 +        """Get a list of names of files and collections directly contained in this collection."""
 +        return self._items.keys()
 +
 +    @synchronized
 +    def values(self):
 +        """Get a list of files and collection objects directly contained in this collection."""
 +        return self._items.values()
 +
 +    @synchronized
 +    def items(self):
 +        """Get a list of (name, object) tuples directly contained in this collection."""
 +        return self._items.items()
 +
 +    def exists(self, path):
 +        """Test if there is a file or collection at "path" """
 +        return self.find(path) != None
 +
 +    @must_be_writable
 +    @synchronized
 +    def remove(self, path, rm_r=False):
 +        """Remove the file or subcollection (directory) at `path`.
 +        :rm_r:
 +          Specify whether to remove non-empty subcollections (True), or raise an error (False).
 +        """
 +        p = path.split("/")
 +        if p[0] == '.':
 +            # Remove '.' from the front of the path
 +            del p[0]
 +
 +        if len(p) > 0:
 +            item = self._items.get(p[0])
 +            if item is None:
 +                raise IOError((errno.ENOENT, "File not found"))
 +            if len(p) == 1:
 +                if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
 +                    raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
 +                d = self._items[p[0]]
 +                del self._items[p[0]]
 +                self._modified = True
 +                self.notify(DEL, self, p[0], d)
 +            else:
 +                del p[0]
 +                item.remove("/".join(p))
 +        else:
 +            raise IOError((errno.ENOENT, "File not found"))
 +
 +    def _cloneinto(self, target):
 +        for k,v in self._items.items():
 +            target._items[k] = v.clone(target)
 +
 +    def clone(self):
 +        raise NotImplementedError()
 +
 +    @must_be_writable
 +    @synchronized
 +    def copy(self, source, target_path, source_collection=None, overwrite=False):
 +        """Copy a file or subcollection to a new path in this collection.
 +
 +        :source:
 +          An ArvadosFile, Subcollection, or string with a path to source file or subcollection
 +
 +        :target_path:
 +          Destination file or path.  If the target path already exists and is a
 +          subcollection, the item will be placed inside the subcollection.  If
 +          the target path already exists and is a file, this will raise an error
 +          unless you specify `overwrite=True`.
 +
 +        :source_collection:
 +          Collection to copy `source_path` from (default `self`)
 +
 +        :overwrite:
 +          Whether to overwrite target file if it already exists.
 +        """
 +        if source_collection is None:
 +            source_collection = self
 +
 +        # Find the object to copy
 +        if isinstance(source, basestring):
 +            source_obj = source_collection.find(source)
 +            if source_obj is None:
 +                raise IOError((errno.ENOENT, "File not found"))
 +            sp = source.split("/")
 +        else:
 +            source_obj = source
 +            sp = None
 +
 +        # Find parent collection the target path
 +        tp = target_path.split("/")
 +
 +        # Determine the name to use.
 +        target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
 +
 +        if not target_name:
 +            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 +
 +        target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
 +
 +        with target_dir.lock:
 +            if target_name in target_dir:
 +                if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
 +                    target_dir = target_dir[target_name]
 +                    target_name = sp[-1]
 +                elif not overwrite:
 +                    raise IOError((errno.EEXIST, "File already exists"))
 +
 +            mod = None
 +            if target_name in target_dir:
 +                mod = target_dir[target_name]
 +
 +            # Actually make the copy.
 +            dup = source_obj.clone(target_dir)
 +            target_dir._items[target_name] = dup
 +            target_dir._modified = True
 +
 +        if mod:
 +            self.notify(MOD, target_dir, target_name, (mod, dup))
 +        else:
 +            self.notify(ADD, target_dir, target_name, dup)
 +
 +    @synchronized
 +    def manifest_text(self, strip=False, normalize=False):
 +        """Get the manifest text for this collection, sub collections and files.
 +
 +        :strip:
 +          If True, remove signing tokens from block locators if present.
 +          If False, block locators are left unchanged.
 +
 +        :normalize:
 +          If True, always export the manifest text in normalized form
 +          even if the Collection is not modified.  If False and the collection
 +          is not modified, return the original manifest text even if it is not
 +          in normalized form.
 +
 +        """
 +        if self.modified() or self._manifest_text is None or normalize:
 +            return export_manifest(self, stream_name=".", portable_locators=strip)
 +        else:
 +            if strip:
 +                return self.stripped_manifest()
 +            else:
 +                return self._manifest_text
 +
 +    @synchronized
 +    def diff(self, end_collection, prefix=".", holding_collection=None):
 +        """
 +        Generate list of add/modify/delete actions which, when given to `apply`, will
 +        change `self` to match `end_collection`
 +        """
 +        changes = []
 +        if holding_collection is None:
 +            holding_collection = CollectionRoot(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY)
 +        for k in self:
 +            if k not in end_collection:
 +               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
 +        for k in end_collection:
 +            if k in self:
 +                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 +                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 +                elif end_collection[k] != self[k]:
 +                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
 +            else:
 +                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
 +        return changes
 +
 +    @must_be_writable
 +    @synchronized
 +    def apply(self, changes):
 +        """
 +        Apply changes from `diff`.  If a change conflicts with a local change, it
 +        will be saved to an alternate path indicating the conflict.
 +        """
 +        for c in changes:
 +            path = c[1]
 +            initial = c[2]
 +            local = self.find(path)
 +            conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
 +                                                                    time.gmtime()))
 +            if c[0] == ADD:
 +                if local is None:
 +                    # No local file at path, safe to copy over new file
 +                    self.copy(initial, path)
 +                elif local is not None and local != initial:
 +                    # There is already local file and it is different:
 +                    # save change to conflict file.
 +                    self.copy(initial, conflictpath)
 +            elif c[0] == MOD:
 +                if local == initial:
 +                    # Local matches the "initial" item so it has not
 +                    # changed locally and is safe to update.
 +                    if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
 +                        # Replace contents of local file with new contents
 +                        local.replace_contents(c[3])
 +                    else:
 +                        # Overwrite path with new item; this can happen if
 +                        # path was a file and is now a collection or vice versa
 +                        self.copy(c[3], path, overwrite=True)
 +                else:
 +                    # Local is missing (presumably deleted) or local doesn't
 +                    # match the "start" value, so save change to conflict file
 +                    self.copy(c[3], conflictpath)
 +            elif c[0] == DEL:
 +                if local == initial:
 +                    # Local item matches "initial" value, so it is safe to remove.
 +                    self.remove(path, rm_r=True)
 +                # else, the file is modified or already removed, in either
 +                # case we don't want to try to remove it.
 +
 +    def portable_data_hash(self):
 +        """Get the portable data hash for this collection's manifest."""
 +        stripped = self.manifest_text(strip=True)
 +        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 +
 +    @synchronized
 +    def __eq__(self, other):
 +        if other is self:
 +            return True
 +        if not isinstance(other, SynchronizedCollectionBase):
 +            return False
 +        if len(self._items) != len(other):
 +            return False
 +        for k in self._items:
 +            if k not in other:
 +                return False
 +            if self._items[k] != other[k]:
 +                return False
 +        return True
 +
 +    def __ne__(self, other):
 +        return not self.__eq__(other)
 +
 +class CollectionRoot(SynchronizedCollectionBase):
 +    """Represents the root of an Arvados Collection, which may be associated with
 +    an API server Collection record.
 +
 +    Brief summary of useful methods:
 +
 +    :To read an existing file:
 +      `c.open("myfile", "r")`
 +
 +    :To write a new file:
 +      `c.open("myfile", "w")`
 +
 +    :To determine if a file exists:
 +      `c.find("myfile") is not None`
 +
 +    :To copy a file:
 +      `c.copy("source", "dest")`
 +
 +    :To delete a file:
 +      `c.remove("myfile")`
 +
 +    :To save to an existing collection record:
 +      `c.save()`
 +
 +    :To save a new collection record:
 +    `c.save_new()`
 +
 +    :To merge remote changes into this object:
 +      `c.update()`
 +
 +    This class is threadsafe.  The root collection object, all subcollections
 +    and files are protected by a single lock (i.e. each access locks the entire
 +    collection).
 +
 +    """
 +
 +    def __init__(self, manifest_locator_or_text=None,
 +                 parent=None,
 +                 apiconfig=None,
 +                 api_client=None,
 +                 keep_client=None,
 +                 num_retries=None,
 +                 block_manager=None,
 +                 sync=None):
 +        """:manifest_locator_or_text:
 +          One of Arvados collection UUID, block locator of
 +          a manifest, raw manifest text, or None (to create an empty collection).
 +        :parent:
 +          the parent Collection, may be None.
 +        :apiconfig:
 +          A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
 +          Prefer this over supplying your own api_client and keep_client (except in testing).
 +          Will use default config settings if not specified.
 +        :api_client:
 +          The API client object to use for requests.  If not specified, create one using `apiconfig`.
 +        :keep_client:
 +          the Keep client to use for requests.  If not specified, create one using `apiconfig`.
 +        :num_retries:
 +          the number of retries for API and Keep requests.
 +        :block_manager:
 +          the block manager to use.  If not specified, create one.
 +        :sync:
 +          Set synchronization policy with API server collection record.
 +          :SYNC_READONLY:
 +            Collection is read only.  No synchronization.  This mode will
 +            also forego locking, which gives better performance.
 +          :SYNC_EXPLICIT:
 +            Collection is writable.  Synchronize on explicit request via `update()` or `save()`
 +          :SYNC_LIVE:
 +            Collection is writable.  Synchronize with server in response to
 +            background websocket events, on block write, or on file close.
 +
 +        """
 +        super(CollectionRoot, self).__init__(parent)
 +        self._api_client = api_client
 +        self._keep_client = keep_client
 +        self._block_manager = block_manager
 +
 +        if apiconfig:
 +            self._config = apiconfig
 +        else:
 +            self._config = config.settings()
 +
 +        self.num_retries = num_retries
 +        self._manifest_locator = None
 +        self._manifest_text = None
 +        self._api_response = None
 +
 +        if sync is None:
 +            raise errors.ArgumentError("Must specify sync mode")
 +
 +        self._sync = sync
 +        self.lock = threading.RLock()
 +        self.callbacks = []
 +        self.events = None
 +
 +        if manifest_locator_or_text:
 +            if re.match(util.keep_locator_pattern, manifest_locator_or_text):
 +                self._manifest_locator = manifest_locator_or_text
 +            elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
 +                self._manifest_locator = manifest_locator_or_text
 +            elif re.match(util.manifest_pattern, manifest_locator_or_text):
 +                self._manifest_text = manifest_locator_or_text
 +            else:
 +                raise errors.ArgumentError(
 +                    "Argument to CollectionReader must be a manifest or a collection UUID")
 +
 +            self._populate()
 +
 +            if self._sync == SYNC_LIVE:
 +                if not self._has_collection_uuid():
 +                    raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
 +                self.events = events.subscribe(arvados.api(apiconfig=self._config),
 +                                               [["object_uuid", "=", self._manifest_locator]],
 +                                               self.on_message)
 +
 +
 +    def root_collection(self):
 +        return self
 +
 +    def sync_mode(self):
 +        return self._sync
 +
 +    def on_message(self, event):
 +        if event.get("object_uuid") == self._manifest_locator:
 +            self.update()
 +
 +    @staticmethod
 +    def create(name, owner_uuid=None, sync=SYNC_EXPLICIT, apiconfig=None):
 +        """Create a new empty Collection with associated collection record."""
 +        c = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
 +        c.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
 +        if sync == SYNC_LIVE:
 +            c.events = events.subscribe(arvados.api(apiconfig=self._config), [["object_uuid", "=", c._manifest_locator]], c.on_message)
 +        return c
 +
 +    @synchronized
 +    @retry_method
 +    def update(self, other=None, num_retries=None):
 +        if other is None:
 +            if self._manifest_locator is None:
 +                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
 +            n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
 +            other = import_collection(n["manifest_text"])
 +        baseline = import_collection(self._manifest_text)
 +        self.apply(other.diff(baseline))
 +
 +    @synchronized
 +    def _my_api(self):
 +        if self._api_client is None:
 +            self._api_client = arvados.SafeApi(self._config)
 +            self._keep_client = self._api_client.keep
 +        return self._api_client
 +
 +    @synchronized
 +    def _my_keep(self):
 +        if self._keep_client is None:
 +            if self._api_client is None:
 +                self._my_api()
 +            else:
 +                self._keep_client = KeepClient(api=self._api_client)
 +        return self._keep_client
 +
 +    @synchronized
 +    def _my_block_manager(self):
 +        if self._block_manager is None:
 +            self._block_manager = BlockManager(self._my_keep())
 +        return self._block_manager
 +
 +    def _populate_from_api_server(self):
 +        # As in KeepClient itself, we must wait until the last
 +        # possible moment to instantiate an API client, in order to
 +        # avoid tripping up clients that don't have access to an API
 +        # server.  If we do build one, make sure our Keep client uses
 +        # it.  If instantiation fails, we'll fall back to the except
 +        # clause, just like any other Collection lookup
 +        # failure. Return an exception, or None if successful.
 +        try:
 +            self._api_response = self._my_api().collections().get(
 +                uuid=self._manifest_locator).execute(
 +                    num_retries=self.num_retries)
 +            self._manifest_text = self._api_response['manifest_text']
 +            return None
 +        except Exception as e:
 +            return e
 +
 +    def _populate_from_keep(self):
 +        # Retrieve a manifest directly from Keep. This has a chance of
 +        # working if [a] the locator includes a permission signature
 +        # or [b] the Keep services are operating in world-readable
 +        # mode. Return an exception, or None if successful.
 +        try:
 +            self._manifest_text = self._my_keep().get(
 +                self._manifest_locator, num_retries=self.num_retries)
 +        except Exception as e:
 +            return e
 +
 +    def _populate(self):
 +        if self._manifest_locator is None and self._manifest_text is None:
 +            return
 +        error_via_api = None
 +        error_via_keep = None
 +        should_try_keep = ((self._manifest_text is None) and
 +                           util.keep_locator_pattern.match(
 +                               self._manifest_locator))
 +        if ((self._manifest_text is None) and
 +            util.signed_locator_pattern.match(self._manifest_locator)):
 +            error_via_keep = self._populate_from_keep()
 +        if self._manifest_text is None:
 +            error_via_api = self._populate_from_api_server()
 +            if error_via_api is not None and not should_try_keep:
 +                raise error_via_api
 +        if ((self._manifest_text is None) and
 +            not error_via_keep and
 +            should_try_keep):
 +            # Looks like a keep locator, and we didn't already try keep above
 +            error_via_keep = self._populate_from_keep()
 +        if self._manifest_text is None:
 +            # Nothing worked!
 +            raise arvados.errors.NotFoundError(
 +                ("Failed to retrieve collection '{}' " +
 +                 "from either API server ({}) or Keep ({})."
 +                 ).format(
 +                    self._manifest_locator,
 +                    error_via_api,
 +                    error_via_keep))
 +        # populate
 +        self._baseline_manifest = self._manifest_text
 +        import_manifest(self._manifest_text, self)
 +
 +        if self._sync == SYNC_READONLY:
 +            # Now that we're populated, knowing that this will be readonly,
 +            # forego any further locking.
 +            self.lock = NoopLock()
 +
 +    def _has_collection_uuid(self):
 +        return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
 +
 +    def __enter__(self):
 +        return self
 +
 +    def __exit__(self, exc_type, exc_value, traceback):
 +        """Support scoped auto-commit in a with: block"""
 +        if self._sync != SYNC_READONLY and self._has_collection_uuid():
 +            self.save()
 +        if self._block_manager is not None:
 +            self._block_manager.stop_threads()
 +
 +    @synchronized
 +    def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
 +        if new_config is None:
 +            new_config = self._config
 +        c = CollectionRoot(parent=new_parent, apiconfig=new_config, sync=new_sync)
 +        if new_sync == SYNC_READONLY:
 +            c.lock = NoopLock()
 +        c._items = {}
 +        self._cloneinto(c)
 +        return c
 +
 +    @synchronized
 +    def api_response(self):
 +        """
 +        api_response() -> dict or None
 +
 +        Returns information about this Collection fetched from the API server.
 +        If the Collection exists in Keep but not the API server, currently
 +        returns None.  Future versions may provide a synthetic response.
 +        """
 +        return self._api_response
 +
 +    @must_be_writable
 +    @synchronized
 +    @retry_method
 +    def save(self, merge=True, num_retries=None):
 +        """Commit pending buffer blocks to Keep, merge with remote record (if
 +        update=True), write the manifest to Keep, and update the collection
 +        record.  Will raise AssertionError if not associated with a collection
 +        record on the API server.  If you want to save a manifest to Keep only,
 +        see `save_new()`.
 +
 +        :update:
 +          Update and merge remote changes before saving.  Otherwise, any
 +          remote changes will be ignored and overwritten.
 +
 +        """
 +        if self.modified():
 +            if not self._has_collection_uuid():
 +                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
 +            self._my_block_manager().commit_all()
 +            if merge:
 +                self.update()
 +            self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
 +
 +            mt = self.manifest_text(strip=False)
 +            self._api_response = self._my_api().collections().update(
 +                uuid=self._manifest_locator,
 +                body={'manifest_text': mt}
 +                ).execute(
 +                    num_retries=num_retries)
 +            self._manifest_text = mt
 +            self.set_unmodified()
 +
 +    @must_be_writable
 +    @synchronized
 +    @retry_method
 +    def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
 +        """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
 +        a new collection record (if create_collection_record True).  After
 +        creating a new collection record, this Collection object will be
 +        associated with the new record for `save()` and SYNC_LIVE updates.
 +
 +        :name:
 +          The collection name.
 +
 +        :keep_only:
 +          Only save the manifest to keep, do not create a collection record.
 +
 +        :owner_uuid:
 +          the user, or project uuid that will own this collection.
 +          If None, defaults to the current user.
 +
 +        :ensure_unique_name:
 +          If True, ask the API server to rename the collection
 +          if it conflicts with a collection with the same name and owner.  If
 +          False, a name conflict will result in an error.
 +
 +        """
 +        self._my_block_manager().commit_all()
 +        self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
 +        mt = self.manifest_text(strip=False)
 +
 +        if create_collection_record:
 +            if name is None:
 +                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
 +
 +            body = {"manifest_text": mt,
 +                    "name": name}
 +            if owner_uuid:
 +                body["owner_uuid"] = owner_uuid
 +
 +            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
 +
 +            if self.events:
 +                self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
 +
 +            self._manifest_locator = self._api_response["uuid"]
 +
 +            if self.events:
 +                self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
 +
 +        self._manifest_text = mt
 +        self.set_unmodified()
 +
 +    @synchronized
 +    def subscribe(self, callback):
 +        self.callbacks.append(callback)
 +
 +    @synchronized
 +    def unsubscribe(self, callback):
 +        self.callbacks.remove(callback)
 +
 +    @synchronized
 +    def notify(self, event, collection, name, item):
 +        for c in self.callbacks:
 +            c(event, collection, name, item)
 +
 +def ReadOnlyCollection(*args, **kwargs):
 +    kwargs["sync"] = SYNC_READONLY
 +    return CollectionRoot(*args, **kwargs)
 +
 +def WritableCollection(*args, **kwargs):
 +    kwargs["sync"] = SYNC_EXPLICIT
 +    return CollectionRoot(*args, **kwargs)
 +
 +def LiveCollection(*args, **kwargs):
 +    kwargs["sync"] = SYNC_LIVE
 +    return CollectionRoot(*args, **kwargs)
 +
 +
 +class Subcollection(SynchronizedCollectionBase):
 +    """This is a subdirectory within a collection that doesn't have its own API
 +    server record.  It falls under the umbrella of the root collection."""
 +
 +    def __init__(self, parent):
 +        super(Subcollection, self).__init__(parent)
 +        self.lock = self.root_collection().lock
 +
 +    def root_collection(self):
 +        return self.parent.root_collection()
 +
 +    def sync_mode(self):
 +        return self.root_collection().sync_mode()
 +
 +    def _my_api(self):
 +        return self.root_collection()._my_api()
 +
 +    def _my_keep(self):
 +        return self.root_collection()._my_keep()
 +
 +    def _my_block_manager(self):
 +        return self.root_collection()._my_block_manager()
 +
 +    def _populate(self):
 +        self.root_collection()._populate()
 +
 +    def notify(self, event, collection, name, item):
 +        return self.root_collection().notify(event, collection, name, item)
 +
 +    @synchronized
 +    def clone(self, new_parent):
 +        c = Subcollection(new_parent)
 +        self._cloneinto(c)
 +        return c
 +
 +def import_manifest(manifest_text,
 +                    into_collection=None,
 +                    api_client=None,
 +                    keep=None,
 +                    num_retries=None,
 +                    sync=SYNC_READONLY):
 +    """Import a manifest into a `Collection`.
 +
 +    :manifest_text:
 +      The manifest text to import from.
 +
 +    :into_collection:
 +      The `Collection` that will be initialized (must be empty).
 +      If None, create a new `Collection` object.
 +
 +    :api_client:
 +      The API client object that will be used when creating a new `Collection` object.
 +
 +    :keep:
 +      The keep client object that will be used when creating a new `Collection` object.
 +
 +    :num_retries:
 +      the default number of api client and keep retries on error.
 +
 +    :sync:
 +      Collection sync mode (only if into_collection is None)
 +    """
 +    if into_collection is not None:
 +        if len(into_collection) > 0:
 +            raise ArgumentError("Can only import manifest into an empty collection")
 +        c = into_collection
 +    else:
 +        c = CollectionRoot(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
 +
 +    save_sync = c.sync_mode()
 +    c._sync = None
 +
 +    STREAM_NAME = 0
 +    BLOCKS = 1
 +    SEGMENTS = 2
 +
 +    stream_name = None
 +    state = STREAM_NAME
 +
 +    for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
 +        tok = n.group(1)
 +        sep = n.group(2)
 +
 +        if state == STREAM_NAME:
 +            # starting a new stream
 +            stream_name = tok.replace('\\040', ' ')
 +            blocks = []
 +            segments = []
 +            streamoffset = 0L
 +            state = BLOCKS
 +            continue
 +
 +        if state == BLOCKS:
 +            s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
 +            if s:
 +                blocksize = long(s.group(1))
 +                blocks.append(Range(tok, streamoffset, blocksize))
 +                streamoffset += blocksize
 +            else:
 +                state = SEGMENTS
 +
 +        if state == SEGMENTS:
 +            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
 +            if s:
 +                pos = long(s.group(1))
 +                size = long(s.group(2))
 +                name = s.group(3).replace('\\040', ' ')
 +                f = c.find("%s/%s" % (stream_name, name), create=True)
 +                f.add_segment(blocks, pos, size)
 +            else:
 +                # error!
 +                raise errors.SyntaxError("Invalid manifest format")
 +
 +        if sep == "\n":
 +            stream_name = None
 +            state = STREAM_NAME
 +
 +    c.set_unmodified()
 +    c._sync = save_sync
 +    return c
 +
 +def export_manifest(item, stream_name=".", portable_locators=False):
 +    """
 +    :item:
 +      Create a manifest for `item` (must be a `Collection` or `ArvadosFile`).  If
 +      `item` is a is a `Collection`, this will also export subcollections.
 +
 +    :stream_name:
 +      the name of the stream when exporting `item`.
 +
 +    :portable_locators:
 +      If True, strip any permission hints on block locators.
 +      If False, use block locators as-is.
 +    """
 +    buf = ""
 +    if isinstance(item, SynchronizedCollectionBase):
 +        stream = {}
 +        sorted_keys = sorted(item.keys())
 +        for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
 +            v = item[k]
 +            st = []
 +            for s in v.segments():
 +                loc = s.locator
 +                if loc.startswith("bufferblock"):
 +                    loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
 +                if portable_locators:
 +                    loc = KeepLocator(loc).stripped()
 +                st.append(LocatorAndRange(loc, locator_block_size(loc),
 +                                     s.segment_offset, s.range_size))
 +            stream[k] = st
 +        if stream:
 +            buf += ' '.join(normalize_stream(stream_name, stream))
 +            buf += "\n"
 +        for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
 +            buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
 +    elif isinstance(item, ArvadosFile):
 +        st = []
 +        for s in item.segments:
 +            loc = s.locator
 +            if loc.startswith("bufferblock"):
 +                loc = item._bufferblocks[loc].calculate_locator()
 +            if portable_locators:
 +                loc = KeepLocator(loc).stripped()
 +            st.append(LocatorAndRange(loc, locator_block_size(loc),
 +                                 s.segment_offset, s.range_size))
 +        stream[stream_name] = st
 +        buf += ' '.join(normalize_stream(stream_name, stream))
 +        buf += "\n"
 +    return buf
index 95ba17280142cac3b51db05c376bb04b62953d4d,4383514df5f42eb34c400a24e58d5652330cce3a..34d0203ec48c380d720367e2492d24924f57e608
@@@ -110,6 -110,13 +110,13 @@@ Print the portable data hash instead o
  created by the upload.
  """)
  
+ upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
+                          help="""
+ Set the replication level for the new collection: how many different
+ physical storage devices (e.g., disks) should have a copy of each data
+ block. Default is to use the server-provided default (if any) or 2.
+ """)
  run_opts = argparse.ArgumentParser(add_help=False)
  
  run_opts.add_argument('--project-uuid', metavar='UUID', help="""
@@@ -253,24 -260,23 +260,23 @@@ class ArvPutCollectionWriter(arvados.Re
      STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
                     ['bytes_written', '_seen_inputs'])
  
-     def __init__(self, cache=None, reporter=None, bytes_expected=None,
-                  api_client=None, num_retries=0):
+     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
          self.bytes_written = 0
          self._seen_inputs = []
          self.cache = cache
          self.reporter = reporter
          self.bytes_expected = bytes_expected
-         super(ArvPutCollectionWriter, self).__init__(
-             api_client, num_retries=num_retries)
+         super(ArvPutCollectionWriter, self).__init__(**kwargs)
  
      @classmethod
      def from_cache(cls, cache, reporter=None, bytes_expected=None,
-                    num_retries=0):
+                    num_retries=0, replication=0):
          try:
              state = cache.load()
              state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
              writer = cls.from_state(state, cache, reporter, bytes_expected,
-                                     num_retries=num_retries)
+                                     num_retries=num_retries,
+                                     replication=replication)
          except (TypeError, ValueError,
                  arvados.errors.StaleWriterStateError) as error:
              return cls(cache, reporter, bytes_expected, num_retries=num_retries)
  
      def flush_data(self):
          start_buffer_len = self._data_buffer_len
 -        start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
 +        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
          super(ArvPutCollectionWriter, self).flush_data()
          if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
              self.bytes_written += (start_buffer_len - self._data_buffer_len)
              self.report_progress()
 -            if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
 +            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
                  self.cache_state()
  
      def _record_new_input(self, input_type, source_name, dest_name):
@@@ -402,6 -408,11 +408,11 @@@ def main(arguments=None, stdout=sys.std
          print >>stderr, error
          sys.exit(1)
  
+     # Apply default replication, if none specified. TODO (#3410): Use
+     # default replication given by discovery document.
+     if args.replication <= 0:
+         args.replication = 2
      if args.progress:
          reporter = progress_writer(human_progress)
      elif args.batch_progress:
              sys.exit(1)
  
      if resume_cache is None:
-         writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
-                                         num_retries=args.retries)
+         writer = ArvPutCollectionWriter(
+             resume_cache, reporter, bytes_expected,
+             num_retries=args.retries,
+             replication=args.replication)
      else:
          writer = ArvPutCollectionWriter.from_cache(
-             resume_cache, reporter, bytes_expected, num_retries=args.retries)
+             resume_cache, reporter, bytes_expected,
+             num_retries=args.retries,
+             replication=args.replication)
  
      # Install our signal handler for each code in CAUGHT_SIGNALS, and save
      # the originals.
              manifest_text = writer.manifest_text()
              if args.normalize:
                  manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
+             replication_attr = 'replication_desired'
+             if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
+                 # API calls it 'redundancy' until #3410.
+                 replication_attr = 'redundancy'
              # Register the resulting collection in Arvados.
              collection = api_client.collections().create(
                  body={
                      'owner_uuid': project_uuid,
                      'name': collection_name,
-                     'manifest_text': manifest_text
+                     'manifest_text': manifest_text,
+                     replication_attr: args.replication,
                      },
                  ensure_unique_name=True
                  ).execute(num_retries=args.retries)
index d1b07cce0d558eff9a13a8ce57026718bfd6a4a3,262e68864db7a7e12847a138de9922c489f473e2..cc7dbd4161d1586e0c530c2d90a97c5c099670d0
@@@ -58,9 -58,6 +58,9 @@@ class KeepLocator(object)
                               self.permission_hint()] + self.hints
              if s is not None)
  
 +    def stripped(self):
 +        return "%s+%i" % (self.md5sum, self.size)
 +
      def _make_hex_prop(name, length):
          # Build and return a new property with the given name that
          # must be a hex string of the given length.
@@@ -174,7 -171,8 +174,7 @@@ class KeepBlockCache(object)
  
      def cap_cache(self):
          '''Cap the cache size to self.cache_max'''
 -        self._cache_lock.acquire()
 -        try:
 +        with self._cache_lock:
              # Select all slots except those where ready.is_set() and content is
              # None (that means there was an error reading the block).
              self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
                          del self._cache[i]
                          break
                  sm = sum([slot.size() for slot in self._cache])
 -        finally:
 -            self._cache_lock.release()
 +
 +    def _get(self, locator):
 +        # Test if the locator is already in the cache
 +        for i in xrange(0, len(self._cache)):
 +            if self._cache[i].locator == locator:
 +                n = self._cache[i]
 +                if i != 0:
 +                    # move it to the front
 +                    del self._cache[i]
 +                    self._cache.insert(0, n)
 +                return n
 +        return None
 +
 +    def get(self, locator):
 +        with self._cache_lock:
 +            return self._get(locator)
  
      def reserve_cache(self, locator):
          '''Reserve a cache slot for the specified locator,
          or return the existing slot.'''
 -        self._cache_lock.acquire()
 -        try:
 -            # Test if the locator is already in the cache
 -            for i in xrange(0, len(self._cache)):
 -                if self._cache[i].locator == locator:
 -                    n = self._cache[i]
 -                    if i != 0:
 -                        # move it to the front
 -                        del self._cache[i]
 -                        self._cache.insert(0, n)
 -                    return n, False
 -
 -            # Add a new cache slot for the locator
 -            n = KeepBlockCache.CacheSlot(locator)
 -            self._cache.insert(0, n)
 -            return n, True
 -        finally:
 -            self._cache_lock.release()
 +        with self._cache_lock:
 +            n = self._get(locator)
 +            if n:
 +                return n, False
 +            else:
 +                # Add a new cache slot for the locator
 +                n = KeepBlockCache.CacheSlot(locator)
 +                self._cache.insert(0, n)
 +                return n, True
  
  class KeepClient(object):
  
          HTTP_ERRORS = (requests.exceptions.RequestException,
                         socket.error, ssl.SSLError)
  
 -        def __init__(self, root, **headers):
 +        def __init__(self, root, session, **headers):
              self.root = root
              self.last_result = None
              self.success_flag = None
 +            self.session = session
              self.get_headers = {'Accept': 'application/octet-stream'}
              self.get_headers.update(headers)
              self.put_headers = headers
              _logger.debug("Request: GET %s", url)
              try:
                  with timer.Timer() as t:
 -                    result = requests.get(url.encode('utf-8'),
 +                    result = self.session.get(url.encode('utf-8'),
                                            headers=self.get_headers,
                                            timeout=timeout)
              except self.HTTP_ERRORS as e:
                  content = result.content
                  _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                               self.last_status(), len(content), t.msecs,
 -                             (len(content)/(1024.0*1024))/t.secs)
 +                             (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
                  if self.success_flag:
                      resp_md5 = hashlib.md5(content).hexdigest()
                      if resp_md5 == locator.md5sum:
              url = self.root + hash_s
              _logger.debug("Request: PUT %s", url)
              try:
 -                result = requests.put(url.encode('utf-8'),
 +                result = self.session.put(url.encode('utf-8'),
                                        data=body,
                                        headers=self.put_headers,
                                        timeout=timeout)
          def run_with_limiter(self, limiter):
              if self.service.finished():
                  return
 -            _logger.debug("KeepWriterThread %s proceeding %s %s",
 +            _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
                            str(threading.current_thread()),
                            self.args['data_hash'],
 +                          len(self.args['data']),
                            self.args['service_root'])
              self._success = bool(self.service.put(
                  self.args['data_hash'],
              status = self.service.last_status()
              if self._success:
                  result = self.service.last_result
 -                _logger.debug("KeepWriterThread %s succeeded %s %s",
 +                _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                                str(threading.current_thread()),
                                self.args['data_hash'],
 +                              len(self.args['data']),
                                self.args['service_root'])
                  # Tick the 'done' counter for the number of replica
                  # reported stored by the server, for the case that
      def __init__(self, api_client=None, proxy=None,
                   timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                   api_token=None, local_store=None, block_cache=None,
 -                 num_retries=0):
 +                 num_retries=0, session=None):
          """Initialize a new KeepClient.
  
          Arguments:
              self.put = self.local_store_put
          else:
              self.num_retries = num_retries
 +            self.session = session if session is not None else requests.Session()
              if proxy:
                  if not proxy.endswith('/'):
                      proxy += '/'
          local_roots = self.weighted_service_roots(md5_s, force_rebuild)
          for root in local_roots:
              if root not in roots_map:
 -                roots_map[root] = self.KeepService(root, **headers)
 +                roots_map[root] = self.KeepService(root, self.session, **headers)
          return local_roots
  
      @staticmethod
              return None
  
      @retry.retry_method
 -    def get(self, loc_s, num_retries=None):
 +    def get(self, loc_s, num_retries=None, cache_only=False):
          """Get data from Keep.
  
          This method fetches one or more blocks of data from Keep.  It
            to fetch data from every available Keep service, along with any
            that are named in location hints in the locator.  The default value
            is set when the KeepClient is initialized.
 +        * cache_only: If true, return the block data only if already present in
 +          cache, otherwise return None.
          """
          if ',' in loc_s:
              return ''.join(self.get(x) for x in loc_s.split(','))
          locator = KeepLocator(loc_s)
          expect_hash = locator.md5sum
  
 +        if cache_only:
 +            slot = self.block_cache.get(expect_hash)
 +            if slot.ready.is_set():
 +                return slot.get()
 +            else:
 +                return None
 +
          slot, first = self.block_cache.reserve_cache(expect_hash)
          if not first:
              v = slot.get()
          hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
                        for hint in locator.hints if hint.startswith('K@')]
          # Map root URLs their KeepService objects.
 -        roots_map = {root: self.KeepService(root) for root in hint_roots}
 +        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
          blob = None
          loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                 backoff_start=2)
                  "failed to write {} (wanted {} copies but wrote {})".format(
                      data_hash, copies, thread_limiter.done()), service_errors)
  
-     # Local storage methods need no-op num_retries arguments to keep
-     # integration tests happy.  With better isolation they could
-     # probably be removed again.
-     def local_store_put(self, data, num_retries=0):
+     def local_store_put(self, data, copies=1, num_retries=None):
+         """A stub for put().
+         This method is used in place of the real put() method when
+         using local storage (see constructor's local_store argument).
+         copies and num_retries arguments are ignored: they are here
+         only for the sake of offering the same call signature as
+         put().
+         Data stored this way can be retrieved via local_store_get().
+         """
          md5 = hashlib.md5(data).hexdigest()
          locator = '%s+%d' % (md5, len(data))
          with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
                    os.path.join(self.local_store, md5))
          return locator
  
-     def local_store_get(self, loc_s, num_retries=0):
+     def local_store_get(self, loc_s, num_retries=None):
+         """Companion to local_store_put()."""
          try:
              locator = KeepLocator(loc_s)
          except ValueError:
              return ''
          with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
              return f.read()
 +
 +    def is_cached(self, locator):
 +        return self.block_cache.reserve_cache(expect_hash)
index 474c068b85d4eb05af274f381f0eb6be5a0789dd,124fb3570af36f21f4df974e162019bea599f569..aa0da982be2ed831982a6dccd22899f185eddb11
@@@ -1,5 -1,6 +1,6 @@@
  #!/usr/bin/env python
  
+ import arvados
  import errno
  import hashlib
  import httplib
@@@ -39,30 -40,17 +40,30 @@@ def fake_requests_response(code, body, 
      r.raw = io.BytesIO(body)
      return r
  
 +def mock_put_responses(body, *codes, **headers):
 +    m = mock.MagicMock()
 +    if isinstance(body, tuple):
 +        codes = list(codes)
 +        codes.insert(0, body)
 +        m.return_value.put.side_effect = (fake_requests_response(code, b, **headers) for b, code in codes)
 +    else:
 +        m.return_value.put.side_effect = (fake_requests_response(code, body, **headers) for code in codes)
 +    return mock.patch('requests.Session', m)
 +
  def mock_get_responses(body, *codes, **headers):
 -    return mock.patch('requests.get', side_effect=(
 -        fake_requests_response(code, body, **headers) for code in codes))
 +    m = mock.MagicMock()
 +    m.return_value.get.side_effect = (fake_requests_response(code, body, **headers) for code in codes)
 +    return mock.patch('requests.Session', m)
  
 -def mock_put_responses(body, *codes, **headers):
 -    return mock.patch('requests.put', side_effect=(
 -        fake_requests_response(code, body, **headers) for code in codes))
 +def mock_get(side_effect):
 +    m = mock.MagicMock()
 +    m.return_value.get.side_effect = side_effect
 +    return mock.patch('requests.Session', m)
  
 -def mock_requestslib_responses(method, body, *codes, **headers):
 -    return mock.patch(method, side_effect=(
 -        fake_requests_response(code, body, **headers) for code in codes))
 +def mock_put(side_effect):
 +    m = mock.MagicMock()
 +    m.return_value.put.side_effect = side_effect
 +    return mock.patch('requests.Session', m)
  
  class MockStreamReader(object):
      def __init__(self, name='.', *data):
          return self._name
  
      def readfrom(self, start, size, num_retries=None):
 -        return self._data[start:start + size]
 +        self._readfrom(start, size, num_retries=num_retries)
  
 +    def _readfrom(self, start, size, num_retries=None):
 +        return self._data[start:start + size]
  
+ class ApiClientMock(object):
+     def api_client_mock(self):
+         return mock.MagicMock(name='api_client_mock')
+     def mock_keep_services(self, api_mock=None, status=200, count=12,
+                            service_type='disk',
+                            service_host=None,
+                            service_port=None,
+                            service_ssl_flag=False):
+         if api_mock is None:
+             api_mock = self.api_client_mock()
+         body = {
+             'items_available': count,
+             'items': [{
+                 'uuid': 'zzzzz-bi6l4-{:015x}'.format(i),
+                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
+                 'service_host': service_host or 'keep0x{:x}'.format(i),
+                 'service_port': service_port or 65535-i,
+                 'service_ssl_flag': service_ssl_flag,
+                 'service_type': service_type,
+             } for i in range(0, count)]
+         }
+         self._mock_api_call(api_mock.keep_services().accessible, status, body)
+         return api_mock
+     def _mock_api_call(self, mock_method, code, body):
+         mock_method = mock_method().execute
+         if code == 200:
+             mock_method.return_value = body
+         else:
+             mock_method.side_effect = arvados.errors.ApiError(
+                 fake_httplib2_response(code), "{}")
  class ArvadosBaseTestCase(unittest.TestCase):
      # This class provides common utility functions for our tests.
  
index e19ddba22fcda1ec9747344be57703a098246a5d,4ca8dfef9e98fc941abbbe96e4335a6eb865a232..2af36227e9d7e6e043041e2701d095d52b8614a2
@@@ -14,9 -14,6 +14,9 @@@ import unittes
  
  import run_test_server
  import arvados_testutil as tutil
 +from arvados.ranges import Range, LocatorAndRange
 +from arvados import import_manifest, export_manifest
 +from arvados.arvfile import SYNC_EXPLICIT
  
  class TestResumableWriter(arvados.ResumableCollectionWriter):
      KEEP_BLOCK_SIZE = 1024  # PUT to Keep every 1K.
@@@ -213,97 -210,97 +213,97 @@@ class ArvadosCollectionsTest(run_test_s
          self.assertEqual(arvados.CollectionReader(m8, self.api_client).manifest_text(normalize=True), m8)
  
      def test_locators_and_ranges(self):
 -        blocks2 = [['a', 10, 0],
 -                  ['b', 10, 10],
 -                  ['c', 10, 20],
 -                  ['d', 10, 30],
 -                  ['e', 10, 40],
 -                  ['f', 10, 50]]
 -
 -        self.assertEqual(arvados.locators_and_ranges(blocks2,  2,  2), [['a', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 12, 2), [['b', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 22, 2), [['c', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 32, 2), [['d', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 42, 2), [['e', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 52, 2), [['f', 10, 2, 2]])
 +        blocks2 = [Range('a', 0, 10),
 +                   Range('b', 10, 10),
 +                   Range('c', 20, 10),
 +                   Range('d', 30, 10),
 +                   Range('e', 40, 10),
 +                   Range('f', 50, 10)]
 +
 +        self.assertEqual(arvados.locators_and_ranges(blocks2,  2,  2), [LocatorAndRange('a', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 12, 2), [LocatorAndRange('b', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 22, 2), [LocatorAndRange('c', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 32, 2), [LocatorAndRange('d', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 42, 2), [LocatorAndRange('e', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 52, 2), [LocatorAndRange('f', 10, 2, 2)])
          self.assertEqual(arvados.locators_and_ranges(blocks2, 62, 2), [])
          self.assertEqual(arvados.locators_and_ranges(blocks2, -2, 2), [])
  
 -        self.assertEqual(arvados.locators_and_ranges(blocks2,  0,  2), [['a', 10, 0, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 10, 2), [['b', 10, 0, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 20, 2), [['c', 10, 0, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 30, 2), [['d', 10, 0, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 40, 2), [['e', 10, 0, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 50, 2), [['f', 10, 0, 2]])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2,  0,  2), [LocatorAndRange('a', 10, 0, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 10, 2), [LocatorAndRange('b', 10, 0, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 20, 2), [LocatorAndRange('c', 10, 0, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 30, 2), [LocatorAndRange('d', 10, 0, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 40, 2), [LocatorAndRange('e', 10, 0, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 50, 2), [LocatorAndRange('f', 10, 0, 2)])
          self.assertEqual(arvados.locators_and_ranges(blocks2, 60, 2), [])
          self.assertEqual(arvados.locators_and_ranges(blocks2, -2, 2), [])
  
 -        self.assertEqual(arvados.locators_and_ranges(blocks2,  9,  2), [['a', 10, 9, 1], ['b', 10, 0, 1]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 19, 2), [['b', 10, 9, 1], ['c', 10, 0, 1]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 29, 2), [['c', 10, 9, 1], ['d', 10, 0, 1]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 39, 2), [['d', 10, 9, 1], ['e', 10, 0, 1]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 49, 2), [['e', 10, 9, 1], ['f', 10, 0, 1]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks2, 59, 2), [['f', 10, 9, 1]])
 -
 -
 -        blocks3 = [['a', 10, 0],
 -                  ['b', 10, 10],
 -                  ['c', 10, 20],
 -                  ['d', 10, 30],
 -                  ['e', 10, 40],
 -                  ['f', 10, 50],
 -                  ['g', 10, 60]]
 -
 -        self.assertEqual(arvados.locators_and_ranges(blocks3,  2,  2), [['a', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks3, 12, 2), [['b', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks3, 22, 2), [['c', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks3, 32, 2), [['d', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks3, 42, 2), [['e', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks3, 52, 2), [['f', 10, 2, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks3, 62, 2), [['g', 10, 2, 2]])
 -
 -
 -        blocks = [['a', 10, 0],
 -                  ['b', 15, 10],
 -                  ['c', 5, 25]]
 +        self.assertEqual(arvados.locators_and_ranges(blocks2,  9,  2), [LocatorAndRange('a', 10, 9, 1), LocatorAndRange('b', 10, 0, 1)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 19, 2), [LocatorAndRange('b', 10, 9, 1), LocatorAndRange('c', 10, 0, 1)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 29, 2), [LocatorAndRange('c', 10, 9, 1), LocatorAndRange('d', 10, 0, 1)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 39, 2), [LocatorAndRange('d', 10, 9, 1), LocatorAndRange('e', 10, 0, 1)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 49, 2), [LocatorAndRange('e', 10, 9, 1), LocatorAndRange('f', 10, 0, 1)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks2, 59, 2), [LocatorAndRange('f', 10, 9, 1)])
 +
 +
 +        blocks3 = [Range('a', 0, 10),
 +                  Range('b', 10, 10),
 +                  Range('c', 20, 10),
 +                  Range('d', 30, 10),
 +                  Range('e', 40, 10),
 +                  Range('f', 50, 10),
 +                   Range('g', 60, 10)]
 +
 +        self.assertEqual(arvados.locators_and_ranges(blocks3,  2,  2), [LocatorAndRange('a', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks3, 12, 2), [LocatorAndRange('b', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks3, 22, 2), [LocatorAndRange('c', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks3, 32, 2), [LocatorAndRange('d', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks3, 42, 2), [LocatorAndRange('e', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks3, 52, 2), [LocatorAndRange('f', 10, 2, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks3, 62, 2), [LocatorAndRange('g', 10, 2, 2)])
 +
 +
 +        blocks = [Range('a', 0, 10),
 +                  Range('b', 10, 15),
 +                  Range('c', 25, 5)]
          self.assertEqual(arvados.locators_and_ranges(blocks, 1, 0), [])
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 5), [['a', 10, 0, 5]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 3, 5), [['a', 10, 3, 5]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 10), [['a', 10, 0, 10]])
 -
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 11), [['a', 10, 0, 10],
 -                                                                      ['b', 15, 0, 1]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 1, 11), [['a', 10, 1, 9],
 -                                                                      ['b', 15, 0, 2]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 25), [['a', 10, 0, 10],
 -                                                                      ['b', 15, 0, 15]])
 -
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 30), [['a', 10, 0, 10],
 -                                                                      ['b', 15, 0, 15],
 -                                                                      ['c', 5, 0, 5]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 1, 30), [['a', 10, 1, 9],
 -                                                                      ['b', 15, 0, 15],
 -                                                                      ['c', 5, 0, 5]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 31), [['a', 10, 0, 10],
 -                                                                      ['b', 15, 0, 15],
 -                                                                      ['c', 5, 0, 5]])
 -
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 15, 5), [['b', 15, 5, 5]])
 -
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 8, 17), [['a', 10, 8, 2],
 -                                                                      ['b', 15, 0, 15]])
 -
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 8, 20), [['a', 10, 8, 2],
 -                                                                      ['b', 15, 0, 15],
 -                                                                      ['c', 5, 0, 3]])
 -
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 26, 2), [['c', 5, 1, 2]])
 -
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 9, 15), [['a', 10, 9, 1],
 -                                                                      ['b', 15, 0, 14]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 10, 15), [['b', 15, 0, 15]])
 -        self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [['b', 15, 1, 14],
 -                                                                       ['c', 5, 0, 1]])
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 5), [LocatorAndRange('a', 10, 0, 5)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 3, 5), [LocatorAndRange('a', 10, 3, 5)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 10), [LocatorAndRange('a', 10, 0, 10)])
 +
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 11), [LocatorAndRange('a', 10, 0, 10),
 +                                                                      LocatorAndRange('b', 15, 0, 1)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 1, 11), [LocatorAndRange('a', 10, 1, 9),
 +                                                                      LocatorAndRange('b', 15, 0, 2)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 25), [LocatorAndRange('a', 10, 0, 10),
 +                                                                      LocatorAndRange('b', 15, 0, 15)])
 +
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 30), [LocatorAndRange('a', 10, 0, 10),
 +                                                                      LocatorAndRange('b', 15, 0, 15),
 +                                                                      LocatorAndRange('c', 5, 0, 5)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 1, 30), [LocatorAndRange('a', 10, 1, 9),
 +                                                                      LocatorAndRange('b', 15, 0, 15),
 +                                                                      LocatorAndRange('c', 5, 0, 5)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 0, 31), [LocatorAndRange('a', 10, 0, 10),
 +                                                                      LocatorAndRange('b', 15, 0, 15),
 +                                                                      LocatorAndRange('c', 5, 0, 5)])
 +
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 15, 5), [LocatorAndRange('b', 15, 5, 5)])
 +
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 8, 17), [LocatorAndRange('a', 10, 8, 2),
 +                                                                      LocatorAndRange('b', 15, 0, 15)])
 +
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 8, 20), [LocatorAndRange('a', 10, 8, 2),
 +                                                                      LocatorAndRange('b', 15, 0, 15),
 +                                                                      LocatorAndRange('c', 5, 0, 3)])
 +
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 26, 2), [LocatorAndRange('c', 5, 1, 2)])
 +
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 9, 15), [LocatorAndRange('a', 10, 9, 1),
 +                                                                      LocatorAndRange('b', 15, 0, 14)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 10, 15), [LocatorAndRange('b', 15, 0, 15)])
 +        self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [LocatorAndRange('b', 15, 1, 14),
 +                                                                       LocatorAndRange('c', 5, 0, 1)])
  
      class MockKeep(object):
          def __init__(self, content, num_retries=0):
                  ).manifest_text())
  
  
- class CollectionTestMixin(object):
-     PROXY_RESPONSE = {
-         'items_available': 1,
-         'items': [{
-                 'uuid': 'zzzzz-bi6l4-mockproxy012345',
-                 'owner_uuid': 'zzzzz-tpzed-mockowner012345',
-                 'service_host': tutil.TEST_HOST,
-                 'service_port': 65535,
-                 'service_ssl_flag': True,
-                 'service_type': 'proxy',
-                 }]}
+ class CollectionTestMixin(tutil.ApiClientMock):
      API_COLLECTIONS = run_test_server.fixture('collections')
      DEFAULT_COLLECTION = API_COLLECTIONS['foo_file']
      DEFAULT_DATA_HASH = DEFAULT_COLLECTION['portable_data_hash']
      ALT_DATA_HASH = ALT_COLLECTION['portable_data_hash']
      ALT_MANIFEST = ALT_COLLECTION['manifest_text']
  
-     def _mock_api_call(self, mock_method, code, body):
-         mock_method = mock_method().execute
-         if code == 200:
-             mock_method.return_value = body
-         else:
-             mock_method.side_effect = arvados.errors.ApiError(
-                 tutil.fake_httplib2_response(code), "{}")
-     def mock_keep_services(self, api_mock, code, body):
-         self._mock_api_call(api_mock.keep_services().accessible, code, body)
-     def api_client_mock(self, code=200):
-         client = mock.MagicMock(name='api_client')
-         self.mock_keep_services(client, code, self.PROXY_RESPONSE)
+     def api_client_mock(self, status=200):
+         client = super(CollectionTestMixin, self).api_client_mock()
+         self.mock_keep_services(client, status=status, service_type='proxy', count=1)
          return client
  
  
@@@ -546,9 -522,9 +525,9 @@@ class CollectionReaderTestCase(unittest
          body = self.API_COLLECTIONS.get(body)
          self._mock_api_call(api_mock.collections().get, code, body)
  
-     def api_client_mock(self, code=200):
-         client = super(CollectionReaderTestCase, self).api_client_mock(code)
-         self.mock_get_collection(client, code, 'foo_file')
+     def api_client_mock(self, status=200):
+         client = super(CollectionReaderTestCase, self).api_client_mock()
+         self.mock_get_collection(client, status, 'foo_file')
          return client
  
      def test_init_no_default_retries(self):
@@@ -706,8 -682,8 +685,8 @@@ class CollectionWriterTestCase(unittest
          return tutil.mock_put_responses(body, *codes, **headers)
  
      def foo_writer(self, **kwargs):
-         api_client = self.api_client_mock()
-         writer = arvados.CollectionWriter(api_client, **kwargs)
+         kwargs.setdefault('api_client', self.api_client_mock())
+         writer = arvados.CollectionWriter(**kwargs)
          writer.start_new_file('foo')
          writer.write('foo')
          return writer
              with self.assertRaises(arvados.errors.KeepWriteError):
                  writer.finish()
  
+     def test_write_insufficient_replicas_via_proxy(self):
+         writer = self.foo_writer(replication=3)
+         with self.mock_keep(None, 200, headers={'x-keep-replicas-stored': 2}):
+             with self.assertRaises(arvados.errors.KeepWriteError):
+                 writer.manifest_text()
+     def test_write_insufficient_replicas_via_disks(self):
+         client = mock.MagicMock(name='api_client')
+         self.mock_keep_services(client, status=200, service_type='disk', count=2)
+         writer = self.foo_writer(api_client=client, replication=3)
+         with self.mock_keep(
+                 None, 200, 200,
+                 **{'x-keep-replicas-stored': 1}) as keepmock:
+             with self.assertRaises(arvados.errors.KeepWriteError):
+                 writer.manifest_text()
+     def test_write_three_replicas(self):
+         client = mock.MagicMock(name='api_client')
+         self.mock_keep_services(client, status=200, service_type='disk', count=6)
+         writer = self.foo_writer(api_client=client, replication=3)
+         with self.mock_keep(
+                 None, 200, 500, 200, 500, 200, 200,
+                 **{'x-keep-replicas-stored': 1}) as keepmock:
+             writer.manifest_text()
+             self.assertEqual(5, keepmock.call_count)
      def test_write_whole_collection_through_retries(self):
          writer = self.foo_writer(num_retries=2)
          with self.mock_keep(self.DEFAULT_DATA_HASH,
  
      def test_open_flush(self):
          client = self.api_client_mock()
 -        writer = arvados.CollectionWriter(client)
 -        with writer.open('flush_test') as out_file:
 -            out_file.write('flush1')
 -            data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
 -            with self.mock_keep(data_loc1, 200) as keep_mock:
 +        data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
 +        data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
 +        with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
 +            writer = arvados.CollectionWriter(client)
 +            with writer.open('flush_test') as out_file:
 +                out_file.write('flush1')
                  out_file.flush()
 -            out_file.write('flush2')
 -            data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
 -        with self.mock_keep(data_loc2, 200) as keep_mock:
 +                out_file.write('flush2')
              self.assertEqual(". {} {} 0:12:flush_test\n".format(data_loc1,
                                                                  data_loc2),
                               writer.manifest_text())
  
      def test_two_opens_two_streams(self):
          client = self.api_client_mock()
 -        writer = arvados.CollectionWriter(client)
 -        with writer.open('file') as out_file:
 -            out_file.write('file')
 -            data_loc1 = hashlib.md5('file').hexdigest() + '+4'
 -        with self.mock_keep(data_loc1, 200) as keep_mock:
 +        data_loc1 = hashlib.md5('file').hexdigest() + '+4'
 +        data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
 +        with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
 +            writer = arvados.CollectionWriter(client)
 +            with writer.open('file') as out_file:
 +                out_file.write('file')
              with writer.open('./dir', 'indir') as out_file:
                  out_file.write('indir')
 -                data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
 -        with self.mock_keep(data_loc2, 200) as keep_mock:
              expected = ". {} 0:4:file\n./dir {} 0:5:indir\n".format(
                  data_loc1, data_loc2)
              self.assertEqual(expected, writer.manifest_text())
          self.assertRaises(arvados.errors.AssertionError, writer.open, 'two')
  
  
 +class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
 +    def test_import_manifest(self):
 +        m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 +. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
 +. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
 +"""
 +        self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.ReadOnlyCollection(m1)))
 +
 +    def test_init_manifest(self):
 +        m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 +. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
 +. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
 +"""
 +        self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.ReadOnlyCollection(m1)))
 +
 +
 +    def test_remove(self):
 +        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n')
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n", export_manifest(c))
 +        self.assertIn("count1.txt", c)
 +        c.remove("count1.txt")
 +        self.assertNotIn("count1.txt", c)
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", export_manifest(c))
 +
 +    def test_remove_in_subdir(self):
 +        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
 +        c.remove("foo/count2.txt")
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", export_manifest(c))
 +
 +    def test_remove_empty_subdir(self):
 +        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
 +        c.remove("foo/count2.txt")
 +        c.remove("foo")
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", export_manifest(c))
 +
 +    def test_remove_nonempty_subdir(self):
 +        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
 +        with self.assertRaises(IOError):
 +            c.remove("foo")
 +        c.remove("foo", rm_r=True)
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", export_manifest(c))
 +
 +    def test_copy_to_dir1(self):
 +        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c.copy("count1.txt", "foo/count2.txt")
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", export_manifest(c))
 +
 +    def test_copy_to_dir2(self):
 +        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c.copy("count1.txt", "foo")
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", export_manifest(c))
 +
 +    def test_copy_to_dir2(self):
 +        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c.copy("count1.txt", "foo/")
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", export_manifest(c))
 +
 +    def test_copy_file(self):
 +        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c.copy("count1.txt", "count2.txt")
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n", c.manifest_text())
 +
 +    def test_clone(self):
 +        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
 +        cl = c.clone()
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", export_manifest(cl))
 +
 +    def test_diff1(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c2 = arvados.WritableCollection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
 +        d = c2.diff(c1)
 +        self.assertEqual(d, [('del', './count2.txt', c2["count2.txt"]),
 +                             ('add', './count1.txt', c1["count1.txt"])])
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
 +                             ('add', './count2.txt', c2["count2.txt"])])
 +        self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
 +        c1.apply(d)
 +        self.assertEqual(c1.manifest_text(), c2.manifest_text())
 +
 +    def test_diff2(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c2 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        d = c2.diff(c1)
 +        self.assertEqual(d, [])
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [])
 +
 +        self.assertEqual(c1.manifest_text(), c2.manifest_text())
 +        c1.apply(d)
 +        self.assertEqual(c1.manifest_text(), c2.manifest_text())
 +
 +    def test_diff3(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c2 = arvados.WritableCollection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt\n')
 +        d = c2.diff(c1)
 +        self.assertEqual(d, [('mod', './count1.txt', c2["count1.txt"], c1["count1.txt"])])
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
 +
 +        self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
 +        c1.apply(d)
 +        self.assertEqual(c1.manifest_text(), c2.manifest_text())
 +
 +    def test_diff4(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c2 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt 10:20:count2.txt\n')
 +        d = c2.diff(c1)
 +        self.assertEqual(d, [('del', './count2.txt', c2["count2.txt"])])
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('add', './count2.txt', c2["count2.txt"])])
 +
 +        self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
 +        c1.apply(d)
 +        self.assertEqual(c1.manifest_text(), c2.manifest_text())
 +
 +    def test_diff5(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c2 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
 +        d = c2.diff(c1)
 +        self.assertEqual(d, [('del', './foo', c2["foo"])])
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('add', './foo', c2["foo"])])
 +
 +        self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
 +        c1.apply(d)
 +        self.assertEqual(c1.manifest_text(), c2.manifest_text())
 +
 +    def test_diff6(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
 +        c2 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:3:count3.txt\n')
 +
 +        d = c2.diff(c1)
 +        self.assertEqual(d, [('del', './foo/count3.txt', c2.find("foo/count3.txt")),
 +                             ('add', './foo/count2.txt', c1.find("foo/count2.txt"))])
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('del', './foo/count2.txt', c1.find("foo/count2.txt")),
 +                             ('add', './foo/count3.txt', c2.find("foo/count3.txt"))])
 +
 +        self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
 +        c1.apply(d)
 +        self.assertEqual(c1.manifest_text(), c2.manifest_text())
 +
 +    def test_diff7(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
 +        c2 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:3:foo\n')
 +        d = c2.diff(c1)
 +        self.assertEqual(d, [('mod', './foo', c2["foo"], c1["foo"])])
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('mod', './foo', c1["foo"], c2["foo"])])
 +
 +        self.assertNotEqual(c1.manifest_text(), c2.manifest_text())
 +        c1.apply(d)
 +        self.assertEqual(c1.manifest_text(), c2.manifest_text())
 +
 +    def test_conflict1(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c2 = arvados.WritableCollection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count2.txt\n')
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
 +                             ('add', './count2.txt', c2["count2.txt"])])
 +        with c1.open("count1.txt", "w") as f:
 +            f.write("zzzzz")
 +
 +        # c1 changed, so it should not be deleted.
 +        c1.apply(d)
 +        self.assertEqual(c1.manifest_text(), ". 95ebc3c7b3b9f1d2c40fec14415d3cb8+5 5348b82a029fd9e971a811ce1f71360b+43 0:5:count1.txt 5:10:count2.txt\n")
 +
 +    def test_conflict2(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
 +        c2 = arvados.WritableCollection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt')
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
 +        with c1.open("count1.txt", "w") as f:
 +            f.write("zzzzz")
 +
 +        # c1 changed, so c2 mod will go to a conflict file
 +        c1.apply(d)
 +        self.assertTrue(re.match(r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1.txt 5:10:count1.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$",
 +                                 c1.manifest_text()))
 +
 +    def test_conflict3(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
 +        c2 = arvados.WritableCollection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt\n')
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('del', './count2.txt', c1["count2.txt"]),
 +                             ('add', './count1.txt', c2["count1.txt"])])
 +        with c1.open("count1.txt", "w") as f:
 +            f.write("zzzzz")
 +
 +        # c1 added count1.txt, so c2 add will go to a conflict file
 +        c1.apply(d)
 +        self.assertTrue(re.match(r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1.txt 5:10:count1.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$",
 +                                 c1.manifest_text()))
 +
 +    def test_conflict4(self):
 +        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
 +        c2 = arvados.WritableCollection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt')
 +        d = c1.diff(c2)
 +        self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
 +        c1.remove("count1.txt")
 +
 +        # c1 deleted, so c2 mod will go to a conflict file
 +        c1.apply(d)
 +        self.assertTrue(re.match(r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$",
 +                                 c1.manifest_text()))
 +
 +    def test_notify1(self):
 +        c1 = arvados.WritableCollection(sync=SYNC_EXPLICIT)
 +        events = []
 +        c1.subscribe(lambda event, collection, name, item: events.append((event, collection, name, item)))
 +        c1.find("")
 +
  if __name__ == '__main__':
      unittest.main()
index 885ae7c1d0bd66acf25be72a1fd623607db9892d,6d4d3cd8b816ea3854151285cd46cdebf60f5e63..1baf1357ccf8dbfdc78960d4a63b326595e249c5
@@@ -1,6 -1,7 +1,7 @@@
  import hashlib
  import mock
  import os
+ import random
  import re
  import socket
  import unittest
@@@ -188,11 -189,12 +189,12 @@@ class KeepOptionalPermission(run_test_s
  class KeepProxyTestCase(run_test_server.TestCaseWithServers):
      MAIN_SERVER = {}
      KEEP_SERVER = {}
-     KEEP_PROXY_SERVER = {'auth': 'admin'}
+     KEEP_PROXY_SERVER = {}
  
      @classmethod
      def setUpClass(cls):
          super(KeepProxyTestCase, cls).setUpClass()
+         run_test_server.authorize_with('active')
          cls.api_client = arvados.api('v1')
  
      def tearDown(self):
          self.assertTrue(keep_client.using_proxy)
  
  
- class KeepClientServiceTestCase(unittest.TestCase):
-     def mock_keep_services(self, *services):
-         api_client = mock.MagicMock(name='api_client')
-         api_client.keep_services().accessible().execute.return_value = {
-             'items_available': len(services),
-             'items': [{
-                     'uuid': 'zzzzz-bi6l4-{:015x}'.format(index),
-                     'owner_uuid': 'zzzzz-tpzed-000000000000000',
-                     'service_host': host,
-                     'service_port': port,
-                     'service_ssl_flag': ssl,
-                     'service_type': servtype,
-                     } for index, (host, port, ssl, servtype)
-                       in enumerate(services)],
-             }
-         return api_client
-     def mock_n_keep_disks(self, service_count):
-         return self.mock_keep_services(
-             *[("keep0x{:x}".format(index), 80, False, 'disk')
-               for index in range(service_count)])
-     def get_service_roots(self, *services):
-         api_client = self.mock_keep_services(*services)
+ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
+     def get_service_roots(self, api_client):
          keep_client = arvados.KeepClient(api_client=api_client)
          services = keep_client.weighted_service_roots('000000')
          return [urlparse.urlparse(url) for url in sorted(services)]
  
      def test_ssl_flag_respected_in_roots(self):
-         services = self.get_service_roots(('keep', 10, False, 'disk'),
-                                           ('keep', 20, True, 'disk'))
-         self.assertEqual(10, services[0].port)
-         self.assertEqual('http', services[0].scheme)
-         self.assertEqual(20, services[1].port)
-         self.assertEqual('https', services[1].scheme)
+         for ssl_flag in [False, True]:
+             services = self.get_service_roots(self.mock_keep_services(
+                 service_ssl_flag=ssl_flag))
+             self.assertEqual(
+                 ('https' if ssl_flag else 'http'), services[0].scheme)
  
      def test_correct_ports_with_ipv6_addresses(self):
-         service = self.get_service_roots(('100::1', 10, True, 'proxy'))[0]
+         service = self.get_service_roots(self.mock_keep_services(
+             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
          self.assertEqual('100::1', service.hostname)
          self.assertEqual(10, service.port)
  
      # when connected directly to a Keep server (i.e. non-proxy timeout)
  
      def test_get_timeout(self):
-         api_client = self.mock_keep_services(('keep', 10, False, 'disk'))
+         api_client = self.mock_keep_services(count=1)
 -        keep_client = arvados.KeepClient(api_client=api_client)
          force_timeout = [socket.timeout("timed out")]
 -        with mock.patch('requests.get', side_effect=force_timeout) as mock_request:
 +        with tutil.mock_get(force_timeout) as mock_session:
 +            keep_client = arvados.KeepClient(api_client=api_client)
              with self.assertRaises(arvados.errors.KeepReadError):
                  keep_client.get('ffffffffffffffffffffffffffffffff')
 -            self.assertTrue(mock_request.called)
 +            self.assertTrue(mock_session.return_value.get.called)
              self.assertEqual(
                  arvados.KeepClient.DEFAULT_TIMEOUT,
 -                mock_request.call_args[1]['timeout'])
 +                mock_session.return_value.get.call_args[1]['timeout'])
  
      def test_put_timeout(self):
-         api_client = self.mock_keep_services(('keep', 10, False, 'disk'))
+         api_client = self.mock_keep_services(count=1)
 -        keep_client = arvados.KeepClient(api_client=api_client)
          force_timeout = [socket.timeout("timed out")]
 -        with mock.patch('requests.put', side_effect=force_timeout) as mock_request:
 +        with tutil.mock_put(force_timeout) as mock_session:
 +            keep_client = arvados.KeepClient(api_client=api_client)
              with self.assertRaises(arvados.errors.KeepWriteError):
                  keep_client.put('foo')
 -            self.assertTrue(mock_request.called)
 +            self.assertTrue(mock_session.return_value.put.called)
              self.assertEqual(
                  arvados.KeepClient.DEFAULT_TIMEOUT,
 -                mock_request.call_args[1]['timeout'])
 +                mock_session.return_value.put.call_args[1]['timeout'])
  
      def test_proxy_get_timeout(self):
          # Force a timeout, verifying that the requests.get or
          # requests.put method was called with the proxy_timeout
          # setting rather than the default timeout.
-         api_client = self.mock_keep_services(('keep', 10, False, 'proxy'))
+         api_client = self.mock_keep_services(service_type='proxy', count=1)
 -        keep_client = arvados.KeepClient(api_client=api_client)
          force_timeout = [socket.timeout("timed out")]
 -        with mock.patch('requests.get', side_effect=force_timeout) as mock_request:
 +        with tutil.mock_get(force_timeout) as mock_session:
 +            keep_client = arvados.KeepClient(api_client=api_client)
              with self.assertRaises(arvados.errors.KeepReadError):
                  keep_client.get('ffffffffffffffffffffffffffffffff')
 -            self.assertTrue(mock_request.called)
 +            self.assertTrue(mock_session.return_value.get.called)
              self.assertEqual(
                  arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
 -                mock_request.call_args[1]['timeout'])
 +                mock_session.return_value.get.call_args[1]['timeout'])
  
      def test_proxy_put_timeout(self):
          # Force a timeout, verifying that the requests.get or
          # requests.put method was called with the proxy_timeout
          # setting rather than the default timeout.
-         api_client = self.mock_keep_services(('keep', 10, False, 'proxy'))
+         api_client = self.mock_keep_services(service_type='proxy', count=1)
 -        keep_client = arvados.KeepClient(api_client=api_client)
          force_timeout = [socket.timeout("timed out")]
 -        with mock.patch('requests.put', side_effect=force_timeout) as mock_request:
 +        with tutil.mock_put(force_timeout) as mock_session:
 +            keep_client = arvados.KeepClient(api_client=api_client)
              with self.assertRaises(arvados.errors.KeepWriteError):
                  keep_client.put('foo')
 -            self.assertTrue(mock_request.called)
 +            self.assertTrue(mock_session.return_value.put.called)
              self.assertEqual(
                  arvados.KeepClient.DEFAULT_PROXY_TIMEOUT,
 -                mock_request.call_args[1]['timeout'])
 +                mock_session.return_value.put.call_args[1]['timeout'])
  
      def test_probe_order_reference_set(self):
          # expected_order[i] is the probe order for
          hashes = [
              hashlib.md5("{:064x}".format(x)).hexdigest()
              for x in range(len(expected_order))]
-         api_client = self.mock_n_keep_disks(16)
+         api_client = self.mock_keep_services(count=16)
          keep_client = arvados.KeepClient(api_client=api_client)
          for i, hash in enumerate(hashes):
              roots = keep_client.weighted_service_roots(hash)
          hashes = [
              hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
          initial_services = 12
-         api_client = self.mock_n_keep_disks(initial_services)
+         api_client = self.mock_keep_services(count=initial_services)
          keep_client = arvados.KeepClient(api_client=api_client)
          probes_before = [
              keep_client.weighted_service_roots(hash) for hash in hashes]
          for added_services in range(1, 12):
-             api_client = self.mock_n_keep_disks(initial_services+added_services)
+             api_client = self.mock_keep_services(count=initial_services+added_services)
              keep_client = arvados.KeepClient(api_client=api_client)
              total_penalty = 0
              for hash_index in range(len(hashes)):
          data = '0' * 64
          if verb == 'get':
              data = hashlib.md5(data).hexdigest() + '+1234'
-         api_client = self.mock_n_keep_disks(16)
+         # Arbitrary port number:
+         aport = random.randint(1024,65535)
+         api_client = self.mock_keep_services(service_port=aport, count=16)
          keep_client = arvados.KeepClient(api_client=api_client)
          with mock.patch('requests.' + verb,
                          side_effect=socket.timeout) as req_mock, \
              getattr(keep_client, verb)(data)
          urls = [urlparse.urlparse(url)
                  for url in err_check.exception.service_errors()]
-         self.assertEqual([('keep0x' + c, 80) for c in '3eab2d5fc9681074'],
+         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
                           [(url.hostname, url.port) for url in urls])
  
      def test_get_error_shows_probe_order(self):
          self.check_no_services_error('put', arvados.errors.KeepWriteError)
  
      def check_errors_from_last_retry(self, verb, exc_class):
-         api_client = self.mock_n_keep_disks(2)
+         api_client = self.mock_keep_services(count=2)
 -        keep_client = arvados.KeepClient(api_client=api_client)
          req_mock = getattr(tutil, 'mock_{}_responses'.format(verb))(
              "retry error reporting test", 500, 500, 403, 403)
          with req_mock, tutil.skip_sleep, \
                  self.assertRaises(exc_class) as err_check:
 +            keep_client = arvados.KeepClient(api_client=api_client)
              getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
                                         num_retries=3)
          self.assertEqual([403, 403], [
      def test_put_error_does_not_include_successful_puts(self):
          data = 'partial failure test'
          data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
-         api_client = self.mock_n_keep_disks(3)
+         api_client = self.mock_keep_services(count=3)
 -        keep_client = arvados.KeepClient(api_client=api_client)
          with tutil.mock_put_responses(data_loc, 200, 500, 500) as req_mock, \
                  self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
 +            keep_client = arvados.KeepClient(api_client=api_client)
              keep_client.put(data)
          self.assertEqual(2, len(exc_check.exception.service_errors()))
  
@@@ -559,13 -541,17 +541,13 @@@ class KeepClientRetryGetTestCase(KeepCl
              self.check_success(locator=self.HINTED_LOCATOR)
  
      def test_try_next_server_after_timeout(self):
 -        side_effects = [
 -            socket.timeout("timed out"),
 -            tutil.fake_requests_response(200, self.DEFAULT_EXPECT)]
 -        with mock.patch('requests.get',
 -                        side_effect=iter(side_effects)):
 +        with tutil.mock_get([
 +                socket.timeout("timed out"),
 +                tutil.fake_requests_response(200, self.DEFAULT_EXPECT)]):
              self.check_success(locator=self.HINTED_LOCATOR)
  
      def test_retry_data_with_wrong_checksum(self):
 -        side_effects = (tutil.fake_requests_response(200, s)
 -                        for s in ['baddata', self.TEST_DATA])
 -        with mock.patch('requests.get', side_effect=side_effects):
 +        with tutil.mock_get((tutil.fake_requests_response(200, s) for s in ['baddata', self.TEST_DATA])):
              self.check_success(locator=self.HINTED_LOCATOR)
  
  
index 8ab2e8debe3a2f0e060c275e5089712321165896,73a609c3a99d0cc181165325366805a92f7aa764..22274eba78dc18ff6019c2c8874857b6ad2f6af0
@@@ -30,6 -30,38 +30,6 @@@ _logger = logging.getLogger('arvados.ar
  # appear as underscores in the fuse mount.)
  _disallowed_filename_characters = re.compile('[\x00/]')
  
 -class SafeApi(object):
 -    '''Threadsafe wrapper for API object.  This stores and returns a different api
 -    object per thread, because httplib2 which underlies apiclient is not
 -    threadsafe.
 -    '''
 -
 -    def __init__(self, config):
 -        self.host = config.get('ARVADOS_API_HOST')
 -        self.api_token = config.get('ARVADOS_API_TOKEN')
 -        self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
 -        self.local = threading.local()
 -        self.block_cache = arvados.KeepBlockCache()
 -
 -    def localapi(self):
 -        if 'api' not in self.local.__dict__:
 -            self.local.api = arvados.api('v1', False, self.host,
 -                                         self.api_token, self.insecure)
 -        return self.local.api
 -
 -    def localkeep(self):
 -        if 'keep' not in self.local.__dict__:
 -            self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
 -        return self.local.keep
 -
 -    def __getattr__(self, name):
 -        # Proxy nonexistent attributes to the local API client.
 -        try:
 -            return getattr(self.localapi(), name)
 -        except AttributeError:
 -            return super(SafeApi, self).__getattr__(name)
 -
 -
  def convertTime(t):
      '''Parse Arvados timestamp to unix time.'''
      try:
@@@ -448,8 -480,8 +448,8 @@@ class TagsDirectory(RecursiveInvalidate
                  ).execute(num_retries=self.num_retries)
          if "items" in tags:
              self.merge(tags['items'],
-                        lambda i: i['name'] if 'name' in i else i['uuid'],
-                        lambda a, i: a.tag == i,
+                        lambda i: i['name'],
+                        lambda a, i: a.tag == i['name'],
                         lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
  
  
@@@ -552,6 -584,11 +552,6 @@@ class ProjectDirectory(Directory)
  
              contents = arvados.util.list_all(self.api.groups().contents,
                                               self.num_retries, uuid=self.uuid)
 -            # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
 -            contents += arvados.util.list_all(
 -                self.api.links().list, self.num_retries,
 -                filters=[['tail_uuid', '=', self.uuid],
 -                         ['link_class', '=', 'name']])
  
          # end with llfuse.lock_released, re-acquire lock
  
@@@ -881,5 -918,5 +881,5 @@@ class Operations(llfuse.Operations)
      # arv-mount.
      # The workaround is to implement it with the proper number of parameters,
      # and then everything works out.
 -    def create(self, p1, p2, p3, p4, p5):
 +    def create(self, inode_parent, name, mode, flags, ctx):
          raise llfuse.FUSEError(errno.EROFS)