From 5bb11227e0d7d37cc6cf574b5d5f289afd1a43b1 Mon Sep 17 00:00:00 2001
From: Peter Amstutz <peter.amstutz@curoverse.com>
Date: Mon, 2 Feb 2015 09:33:28 -0500
Subject: [PATCH] 4823: Adding @_synchronized to protect arvfile and block
 manager.  Updated docstrings to PEP 257 and PEP 287.

---
 sdk/python/arvados/arvfile.py    | 359 +++++++++++++++++--------------
 sdk/python/arvados/collection.py | 211 ++++++++++--------
 2 files changed, 323 insertions(+), 247 deletions(-)

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index bab14d4294..29db2cb996 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -9,6 +9,7 @@ import hashlib
 import hashlib
 import threading
 import Queue
+import copy
 
 def split(path):
     """split(path) -> streamname, filename
@@ -217,30 +218,39 @@ class StreamFileReader(ArvadosFileReaderBase):
 
 
 class BufferBlock(object):
-'''
+"""
 A BufferBlock is a stand-in for a Keep block that is in the process of being
 written.  Writers can append to it, get the size, and compute the Keep locator.
 
 There are three valid states:
 
-WRITABLE - can append
+WRITABLE
+  Can append to block.
 
-PENDING - is in the process of being uploaded to Keep, append is an error
+PENDING
+  Block is in the process of being uploaded to Keep, append is an error.
 
-COMMITTED - the block has been written to Keep, its internal buffer has been
-released, and the BufferBlock should be discarded in favor of fetching the
-block through normal Keep means.
-'''
+COMMITTED
+  The block has been written to Keep, its internal buffer has been
+  released, fetching the block will fetch it via keep client (since we
+  discarded the internal copy), and identifiers referring to the BufferBlock
+  can be replaced with the block locator.
+"""
     WRITABLE = 0
     PENDING = 1
     COMMITTED = 2
 
     def __init__(self, blockid, starting_capacity, owner):
-        '''
-        blockid: the identifier for this block
-        starting_capacity: the initial buffer capacity
-        owner: ArvadosFile that owns this block
-        '''
+        """
+        :blockid:
+          the identifier for this block
+
+        :starting_capacity:
+          the initial buffer capacity
+
+        :owner:
+          ArvadosFile that owns this block
+        """
         self.blockid = blockid
         self.buffer_block = bytearray(starting_capacity)
         self.buffer_view = memoryview(self.buffer_block)
@@ -250,11 +260,11 @@ block through normal Keep means.
         self.owner = owner
 
     def append(self, data):
-        '''
+        """
         Append some data to the buffer.  Only valid if the block is in WRITABLE
         state.  Implements an expanding buffer, doubling capacity as needed to
         accomdate all the data.
-        '''
+        """
         if self.state == BufferBlock.WRITABLE:
             while (self.write_pointer+len(data)) > len(self.buffer_block):
                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
@@ -268,34 +278,41 @@ block through normal Keep means.
             raise AssertionError("Buffer block is not writable")
 
     def size(self):
-        '''Amount of data written to the buffer'''
+        """Amount of data written to the buffer"""
         return self.write_pointer
 
     def locator(self):
-        '''The Keep locator for this buffer's contents.'''
+        """The Keep locator for this buffer's contents."""
         if self._locator is None:
             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
         return self._locator
 
 
 class AsyncKeepWriteErrors(Exception):
-    '''
+    """
     Roll up one or more Keep write exceptions (generated by background
     threads) into a single one.
-    '''
+    """
     def __init__(self, errors):
         self.errors = errors
 
     def __repr__(self):
         return "\n".join(self.errors)
 
+def _synchronized(orig_func):
+    @functools.wraps(orig_func)
+    def wrapper(self, *args, **kwargs):
+        with self.lock:
+            return orig_func(self, *args, **kwargs)
+    return wrapper
+
 class BlockManager(object):
-    '''
+    """
     BlockManager handles buffer blocks, background block uploads, and
     background block prefetch for a Collection of ArvadosFiles.
-    '''
+    """
     def __init__(self, keep):
-        '''keep: KeepClient object to use'''
+        """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = {}
         self._put_queue = None
@@ -303,26 +320,39 @@ class BlockManager(object):
         self._put_threads = None
         self._prefetch_queue = None
         self._prefetch_threads = None
+        self.lock = threading.Lock()
 
+    @_synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
-        '''
+        """
         Allocate a new, empty bufferblock in WRITABLE state and return it.
-        blockid: optional block identifier, otherwise one will be automatically assigned
-        starting_capacity: optional capacity, otherwise will use default capacity
-        owner: ArvadosFile that owns this block
-        '''
+
+        :blockid:
+          optional block identifier, otherwise one will be automatically assigned
+
+        :starting_capacity:
+          optional capacity, otherwise will use default capacity
+
+        :owner:
+          ArvadosFile that owns this block
+        """
         if blockid is None:
             blockid = "bufferblock%i" % len(self._bufferblocks)
         bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
         self._bufferblocks[bb.blockid] = bb
         return bb
 
+    @_synchronized
     def dup_block(self, blockid, owner):
-        '''
+        """
         Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
-        blockid: the block to copy.  May be an existing buffer block id.
-        owner: ArvadosFile that owns the new block
-        '''
+
+        :blockid:
+          the block to copy.  May be an existing buffer block id.
+
+        :owner:
+          ArvadosFile that owns the new block
+        """
         new_blockid = "bufferblock%i" % len(self._bufferblocks)
         block = self._bufferblocks[blockid]
         bb = BufferBlock(new_blockid, len(block), owner)
@@ -330,13 +360,15 @@ class BlockManager(object):
         self._bufferblocks[bb.blockid] = bb
         return bb
 
+    @_synchronized
     def is_bufferblock(self, id):
         return id in self._bufferblocks
 
+    @_synchronized
     def stop_threads(self):
-        '''
+        """
         Shut down and wait for background upload and download threads to finish.
-        '''
+        """
         if self._put_threads is not None:
             for t in self._put_threads:
                 self._put_queue.put(None)
@@ -355,15 +387,15 @@ class BlockManager(object):
         self._prefetch_queue = None
 
     def commit_bufferblock(self, block):
-        '''
+        """
         Initiate a background upload of a bufferblock.  This will block if the
         upload queue is at capacity, otherwise it will return immediately.
-        '''
+        """
 
         def worker(self):
-            '''
+            """
             Background uploader thread.
-            '''
+            """
             while True:
                 try:
                     b = self._put_queue.get()
@@ -380,73 +412,80 @@ class BlockManager(object):
                     if self._put_queue is not None:
                         self._put_queue.task_done()
 
-        if self._put_threads is None:
-            # Start uploader threads.
-
-            # If we don't limit the Queue size, the upload queue can quickly
-            # grow to take up gigabytes of RAM if the writing process is
-            # generating data more quickly than it can be send to the Keep
-            # servers.
-            #
-            # With two upload threads and a queue size of 2, this means up to 4
-            # blocks pending.  If they are full 64 MiB blocks, that means up to
-            # 256 MiB of internal buffering, which is the same size as the
-            # default download block cache in KeepClient.
-            self._put_queue = Queue.Queue(maxsize=2)
-            self._put_errors = Queue.Queue()
-            self._put_threads = [threading.Thread(target=worker, args=(self,)),
-                                 threading.Thread(target=worker, args=(self,))]
-            for t in self._put_threads:
-                t.daemon = True
-                t.start()
+        with self.lock:
+            if self._put_threads is None:
+                # Start uploader threads.
+
+                # If we don't limit the Queue size, the upload queue can quickly
+                # grow to take up gigabytes of RAM if the writing process is
+                # generating data more quickly than it can be send to the Keep
+                # servers.
+                #
+                # With two upload threads and a queue size of 2, this means up to 4
+                # blocks pending.  If they are full 64 MiB blocks, that means up to
+                # 256 MiB of internal buffering, which is the same size as the
+                # default download block cache in KeepClient.
+                self._put_queue = Queue.Queue(maxsize=2)
+                self._put_errors = Queue.Queue()
+                self._put_threads = [threading.Thread(target=worker, args=(self,)),
+                                     threading.Thread(target=worker, args=(self,))]
+                for t in self._put_threads:
+                    t.daemon = True
+                    t.start()
 
         # Mark the block as PENDING so to disallow any more appends.
         block.state = BufferBlock.PENDING
         self._put_queue.put(block)
 
     def get_block(self, locator, num_retries, cache_only=False):
-        '''
+        """
         Fetch a block.  First checks to see if the locator is a BufferBlock and
         return that, if not, passes the request through to KeepClient.get().
-        '''
-        if locator in self._bufferblocks:
-            bb = self._bufferblocks[locator]
-            if bb.state != BufferBlock.COMMITTED:
-                return bb.buffer_view[0:bb.write_pointer].tobytes()
-            else:
-                locator = bb._locator
+        """
+        with self.lock:
+            if locator in self._bufferblocks:
+                bb = self._bufferblocks[locator]
+                if bb.state != BufferBlock.COMMITTED:
+                    return bb.buffer_view[0:bb.write_pointer].tobytes()
+                else:
+                    locator = bb._locator
         return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
 
     def commit_all(self):
-        '''
+        """
         Commit all outstanding buffer blocks.  Unlike commit_bufferblock(), this
         is a synchronous call, and will not return until all buffer blocks are
         uploaded.  Raises AsyncKeepWriteErrors() if any blocks failed to
         upload.
-        '''
-        for k,v in self._bufferblocks.items():
+        """
+        with self.lock:
+            items = self._bufferblocks.items()
+
+        for k,v in items:
             if v.state == BufferBlock.WRITABLE:
                 self.commit_bufferblock(v)
-        if self._put_queue is not None:
-            self._put_queue.join()
-            if not self._put_errors.empty():
-                e = []
-                try:
-                    while True:
-                        e.append(self._put_errors.get(False))
-                except Queue.Empty:
-                    pass
-                raise AsyncKeepWriteErrors(e)
+
+        with self.lock:
+            if self._put_queue is not None:
+                self._put_queue.join()
+                if not self._put_errors.empty():
+                    e = []
+                    try:
+                        while True:
+                            e.append(self._put_errors.get(False))
+                    except Queue.Empty:
+                        pass
+                    raise AsyncKeepWriteErrors(e)
 
     def block_prefetch(self, locator):
-        '''
+        """
         Initiate a background download of a block.  This assumes that the
         underlying KeepClient implements a block cache, so repeated requests
         for the same block will not result in repeated downloads (unless the
         block is evicted from the cache.)  This method does not block.
-        '''
+        """
         def worker(self):
-            '''Background downloader thread.'''
+            """Background downloader thread."""
             while True:
                 try:
                     b = self._prefetch_queue.get()
@@ -456,73 +495,84 @@ class BlockManager(object):
                 except:
                     pass
 
-        if locator in self._bufferblocks:
-            return
-        if self._prefetch_threads is None:
-            self._prefetch_queue = Queue.Queue()
-            self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
-                                      threading.Thread(target=worker, args=(self,))]
-            for t in self._prefetch_threads:
-                t.daemon = True
-                t.start()
+        with self.lock:
+            if locator in self._bufferblocks:
+                return
+            if self._prefetch_threads is None:
+                self._prefetch_queue = Queue.Queue()
+                self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
+                                          threading.Thread(target=worker, args=(self,))]
+                for t in self._prefetch_threads:
+                    t.daemon = True
+                    t.start()
         self._prefetch_queue.put(locator)
 
 
 class ArvadosFile(object):
-    '''
+    """
     ArvadosFile manages the underlying representation of a file in Keep as a sequence of
     segments spanning a set of blocks, and implements random read/write access.
-    '''
+    """
 
     def __init__(self, parent, stream=[], segments=[]):
-        '''
-        stream: a list of Range objects representing a block stream
-        segments: a list of Range objects representing segments
-        '''
+        """
+        :stream:
+          a list of Range objects representing a block stream
+
+        :segments:
+          a list of Range objects representing segments
+        """
         self.parent = parent
         self._modified = True
-        self.segments = []
+        self._segments = []
         for s in segments:
-            self.add_segment(stream, s.locator, s.range_size)
+            self._add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
         self.lock = threading.Lock()
 
+    @_synchronized
+    def segments(self):
+        return copy.copy(self._segments)
+
+    @_synchronized
     def clone(self, num_retries):
-        '''Make a copy of this file.'''
-        with self.lock:
-            cp = ArvadosFile()
-            cp.parent = self.parent
-            cp._modified = False
+        """Make a copy of this file."""
+        cp = ArvadosFile()
+        cp.parent = self.parent
+        cp._modified = False
 
-            map_loc = {}
-            for r in self.segments:
-                new_loc = r.locator
-                if self.parent._my_block_manager().is_bufferblock(r.locator):
-                    if r.locator not in map_loc:
-                        map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
-                    new_loc = map_loc[r.locator]
+        map_loc = {}
+        for r in self._segments:
+            new_loc = r.locator
+            if self.parent._my_block_manager().is_bufferblock(r.locator):
+                if r.locator not in map_loc:
+                    map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
+                new_loc = map_loc[r.locator]
 
-                cp.segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
+            cp.segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
 
-            return cp
+        return cp
 
+    @_synchronized
     def set_unmodified(self):
-        '''Clear the modified flag'''
+        """Clear the modified flag"""
         self._modified = False
 
+    @_synchronized
     def modified(self):
-        '''Test the modified flag'''
+        """Test the modified flag"""
         return self._modified
 
+    @_synchronized
     def truncate(self, size):
-        '''
-        Adjust the size of the file.  If "size" is less than the size of the file,
-        the file contents after "size" will be discarded.  If "size" is greater
+        """
+        Adjust the size of the file.  If `size` is less than the size of the file,
+        the file contents after `size` will be discarded.  If `size` is greater
         than the current size of the file, an IOError will be raised.
-        '''
+        """
         if size < self.size():
             new_segs = []
-            for r in self.segments:
+            for r in self._segments:
                 range_end = r.range_start+r.range_size
                 if r.range_start >= size:
                     # segment is past the trucate size, all done
@@ -535,24 +585,24 @@ class ArvadosFile(object):
                 else:
                     new_segs.append(r)
 
-            self.segments = new_segs
+            self._segments = new_segs
             self._modified = True
         elif size > self.size():
             raise IOError("truncate() does not support extending the file size")
 
-
+    @_synchronized
     def readfrom(self, offset, size, num_retries):
-        '''
-        read upto "size" bytes from the file starting at "offset".
-        '''
+        """
+        read upto `size` bytes from the file starting at `offset`.
+        """
         if size == 0 or offset >= self.size():
             return ''
         data = []
 
-        for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE):
+        for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
             self.parent._my_block_manager().block_prefetch(lr.locator)
 
-        for lr in locators_and_ranges(self.segments, offset, size):
+        for lr in locators_and_ranges(self._segments, offset, size):
             d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
             if d:
                 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
@@ -561,13 +611,13 @@ class ArvadosFile(object):
         return ''.join(data)
 
     def _repack_writes(self):
-        '''
+        """
         Test if the buffer block has more data than is referenced by actual segments
         (this happens when a buffered write over-writes a file range written in
         a previous buffered write).  Re-pack the buffer block for efficiency
         and to avoid leaking information.
-        '''
-        segs = self.segments
+        """
+        segs = self._segments
 
         # Sum up the segments to get the total bytes of the file referencing
         # into the buffer block.
@@ -584,11 +634,12 @@ class ArvadosFile(object):
 
             self._current_bblock = new_bb
 
+    @_synchronized
     def writeto(self, offset, data, num_retries):
-        '''
-        Write "data" to the file starting at "offset".  This will update
+        """
+        Write `data` to the file starting at `offset`.  This will update
         existing bytes and/or extend the size of the file as necessary.
-        '''
+        """
         if len(data) == 0:
             return
 
@@ -610,23 +661,30 @@ class ArvadosFile(object):
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         self._current_bblock.append(data)
-        replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
 
+    @_synchronized
     def add_segment(self, blocks, pos, size):
-        '''
-        Add a segment to the end of the file, with "pos" and "offset" referencing a
-        section of the stream described by "blocks" (a list of Range objects)
-        '''
+        # Synchronized public api, see _add_segment
+        self._add_segment(blocks, pos, size)
+
+    def _add_segment(self, blocks, pos, size):
+        """
+        Add a segment to the end of the file, with `pos` and `offset` referencing a
+        section of the stream described by `blocks` (a list of Range objects)
+        """
         self._modified = True
         for lr in locators_and_ranges(blocks, pos, size):
-            last = self.segments[-1] if self.segments else Range(0, 0, 0)
+            last = self._segments[-1] if self._segments else Range(0, 0, 0)
             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
-            self.segments.append(r)
+            self._segments.append(r)
+
 
+    @_synchronized
     def size(self):
-        '''Get the file size'''
-        if self.segments:
-            n = self.segments[-1]
+        """Get the file size"""
+        if self._segments:
+            n = self._segments[-1]
             return n.range_start + n.range_size
         else:
             return 0
@@ -635,7 +693,7 @@ class ArvadosFile(object):
 class ArvadosFileReader(ArvadosFileReaderBase):
     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
-        self.arvadosfile = arvadosfile.clone()
+        self.arvadosfile = arvadosfile
 
     def size(self):
         return self.arvadosfile.size()
@@ -643,7 +701,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
     @ArvadosFileBase._before_close
     @retry_method
     def read(self, size, num_retries=None):
-        """Read up to 'size' bytes from the stream, starting at the current file position"""
+        """Read up to `size` bytes from the stream, starting at the current file position"""
         data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
         self._filepos += len(data)
         return data
@@ -651,29 +709,16 @@ class ArvadosFileReader(ArvadosFileReaderBase):
     @ArvadosFileBase._before_close
     @retry_method
     def readfrom(self, offset, size, num_retries=None):
-        """Read up to 'size' bytes from the stream, starting at the current file position"""
+        """Read up to `size` bytes from the stream, starting at the current file position"""
         return self.arvadosfile.readfrom(offset, size, num_retries)
 
     def flush(self):
         pass
 
 
-class SynchronizedArvadosFile(object):
-    def __init__(self, arvadosfile):
-        self.arvadosfile = arvadosfile
-
-    def clone(self):
-        return self
-
-    def __getattr__(self, name):
-        with self.arvadosfile.lock:
-            return getattr(self.arvadosfile, name)
-
-
 class ArvadosFileWriter(ArvadosFileReader):
     def __init__(self, arvadosfile, name, mode, num_retries=None):
-        self.arvadosfile = SynchronizedArvadosFile(arvadosfile)
-        super(ArvadosFileWriter, self).__init__(self.arvadosfile, name, mode, num_retries=num_retries)
+        super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
 
     @ArvadosFileBase._before_close
     @retry_method
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index a0eda1175a..682880359c 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -641,25 +641,30 @@ class ResumableCollectionWriter(CollectionWriter):
 
 
 class Collection(CollectionBase):
-    '''An abstract Arvados collection, consisting of a set of files and
+    """An abstract Arvados collection, consisting of a set of files and
     sub-collections.
-    '''
+    """
 
     def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None,
                  keep_client=None, num_retries=0, block_manager=None):
-        '''manifest_locator_or_text: One of Arvados collection UUID, block locator of
-        a manifest, raw manifest text, or None (to create an empty collection).
-
-        parent: the parent Collection, may be None.
-
-        api_client: The API client object to use for requests.  If None, use default.
-
-        keep_client: the Keep client to use for requests.  If None, use default.
+        """
+        :manifest_locator_or_text:
+          One of Arvados collection UUID, block locator of
+          a manifest, raw manifest text, or None (to create an empty collection).
+        :parent:
+          the parent Collection, may be None.
+        :api_client:
+          The API client object to use for requests.  If None, use default.
+        :keep_client:
+          the Keep client to use for requests.  If None, use default.
+        :num_retries:
+          the number of retries for API and Keep requests.
+        :block_manager:
+          the block manager to use.  If None, use parent's block
+          manager or create one.
 
-        num_retries: the number of retries for API and Keep requests.
+        """
 
-        block_manager: the block manager to use.  If None, create one.
-        '''
         self.parent = parent
         self._items = None
         self._api_client = api_client
@@ -779,25 +784,29 @@ class Collection(CollectionBase):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
-        '''Support scoped auto-commit in a with: block'''
+        """Support scoped auto-commit in a with: block"""
         self.save(no_locator=True)
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
     @_populate_first
     def find(self, path, create=False, create_collection=False):
-        '''Recursively search the specified file path.  May return either a Collection
+        """Recursively search the specified file path.  May return either a Collection
         or ArvadosFile.
 
-        create: If true, create path components (i.e. Collections) that are
-        missing.  If "create" is False, return None if a path component is not
-        found.
+        :create:
+          If true, create path components (i.e. Collections) that are
+          missing.  If "create" is False, return None if a path component is
+          not found.
+
+        :create_collection:
+          If the path is not found, "create" is True, and
+          "create_collection" is False, then create and return a new
+          ArvadosFile for the last path component.  If "create_collection" is
+          True, then create and return a new Collection for the last path
+          component.
 
-        create_collection: If the path is not found, "create" is True, and
-        "create_collection" is False, then create and return a new ArvadosFile
-        for the last path component.  If "create_collection" is True, then
-        create and return a new Collection for the last path component.
-        '''
+        """
         p = path.split("/")
         if p[0] == '.':
             del p[0]
@@ -826,7 +835,8 @@ class Collection(CollectionBase):
 
     @_populate_first
     def api_response(self):
-        """api_response() -> dict or None
+        """
+        api_response() -> dict or None
 
         Returns information about this Collection fetched from the API server.
         If the Collection exists in Keep but not the API server, currently
@@ -835,19 +845,23 @@ class Collection(CollectionBase):
         return self._api_response
 
     def open(self, path, mode):
-        '''Open a file-like object for access.
-
-        path: path to a file in the collection
-
-        mode: one of "r", "r+", "w", "w+", "a", "a+"
-        "r" opens for reading
-
-        "r+" opens for reading and writing.  Reads/writes share a file pointer.
-
-        "w", "w+" truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
-
-        "a", "a+" opens for reading and writing.  All writes are appended to the end of the file.  Writing does not affect the file pointer for reading.
-        '''
+        """Open a file-like object for access.
+
+        :path:
+          path to a file in the collection
+        :mode:
+          one of "r", "r+", "w", "w+", "a", "a+"
+          :"r":
+            opens for reading
+          :"r+":
+            opens for reading and writing.  Reads/writes share a file pointer.
+          :"w", "w+":
+            truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
+          :"a", "a+":
+            opens for reading and writing.  All writes are appended to
+            the end of the file.  Writing does not affect the file pointer for
+            reading.
+        """
         mode = mode.replace("b", "")
         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
             raise ArgumentError("Bad mode '%s'" % mode)
@@ -869,8 +883,8 @@ class Collection(CollectionBase):
 
     @_populate_first
     def modified(self):
-        '''Test if the collection (or any subcollection or file) has been modified
-        since it was created.'''
+        """Test if the collection (or any subcollection or file) has been modified
+        since it was created."""
         for k,v in self._items.items():
             if v.modified():
                 return True
@@ -878,65 +892,65 @@ class Collection(CollectionBase):
 
     @_populate_first
     def set_unmodified(self):
-        '''Recursively clear modified flag'''
+        """Recursively clear modified flag"""
         for k,v in self._items.items():
             v.set_unmodified()
 
     @_populate_first
     def __iter__(self):
-        '''Iterate over names of files and collections contained in this collection.'''
+        """Iterate over names of files and collections contained in this collection."""
         return self._items.iterkeys()
 
     @_populate_first
     def iterkeys(self):
-        '''Iterate over names of files and collections directly contained in this collection.'''
+        """Iterate over names of files and collections directly contained in this collection."""
         return self._items.iterkeys()
 
     @_populate_first
     def __getitem__(self, k):
-        '''Get a file or collection that is directly contained by this collection.  Use
-        find() for path serach.'''
+        """Get a file or collection that is directly contained by this collection.  Use
+        find() for path serach."""
         return self._items[k]
 
     @_populate_first
     def __contains__(self, k):
-        '''If there is a file or collection a directly contained by this collection
-        with name "k".'''
+        """If there is a file or collection a directly contained by this collection
+        with name "k"."""
         return k in self._items
 
     @_populate_first
     def __len__(self):
-        '''Get the number of items directly contained in this collection'''
+        """Get the number of items directly contained in this collection"""
         return len(self._items)
 
     @_populate_first
     def __delitem__(self, p):
-        '''Delete an item by name which is directly contained by this collection.'''
+        """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
 
     @_populate_first
     def keys(self):
-        '''Get a list of names of files and collections directly contained in this collection.'''
+        """Get a list of names of files and collections directly contained in this collection."""
         return self._items.keys()
 
     @_populate_first
     def values(self):
-        '''Get a list of files and collection objects directly contained in this collection.'''
+        """Get a list of files and collection objects directly contained in this collection."""
         return self._items.values()
 
     @_populate_first
     def items(self):
-        '''Get a list of (name, object) tuples directly contained in this collection.'''
+        """Get a list of (name, object) tuples directly contained in this collection."""
         return self._items.items()
 
     @_populate_first
     def exists(self, path):
-        '''Test if there is a file or collection at "path"'''
+        """Test if there is a file or collection at "path" """
         return self.find(path) != None
 
     @_populate_first
     def remove(self, path):
-        '''Test if there is a file or collection at "path"'''
+        """Test if there is a file or collection at "path" """
         p = path.split("/")
         if p[0] == '.':
             del p[0]
@@ -955,16 +969,19 @@ class Collection(CollectionBase):
 
     @_populate_first
     def manifest_text(self, strip=False, normalize=False):
-        '''Get the manifest text for this collection, sub collections and files.
+        """Get the manifest text for this collection, sub collections and files.
+
+        :strip:
+          If True, remove signing tokens from block locators if present.
+          If False, block locators are left unchanged.
 
-        strip: If True, remove signing tokens from block locators if present.
-        If False, block locators are left unchanged.
+        :normalize:
+          If True, always export the manifest text in normalized form
+          even if the Collection is not modified.  If False and the collection
+          is not modified, return the original manifest text even if it is not
+          in normalized form.
 
-        normalize: If True, always export the manifest text in normalized form
-        even if the Collection is not modified.  If False and the collection is
-        not modified, return the original manifest text even if it is not in
-        normalized form.
-        '''
+        """
         if self.modified() or self._manifest_text is None or normalize:
             return export_manifest(self, stream_name=".", portable_locators=strip)
         else:
@@ -974,18 +991,19 @@ class Collection(CollectionBase):
                 return self._manifest_text
 
     def portable_data_hash(self):
-        '''Get the portable data hash for this collection's manifest.'''
+        """Get the portable data hash for this collection's manifest."""
         stripped = self.manifest_text(strip=True)
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
     @_populate_first
     def save(self, no_locator=False):
-        '''Commit pending buffer blocks to Keep, write the manifest to Keep, and
+        """Commit pending buffer blocks to Keep, write the manifest to Keep, and
         update the collection record to Keep.
 
-        no_locator: If False and there is no collection uuid associated with
-        this Collection, raise an error.  If True, do not raise an error.
-        '''
+        :no_locator:
+          If False and there is no collection uuid associated with
+          this Collection, raise an error.  If True, do not raise an error.
+        """
         if self.modified():
             self._my_block_manager().commit_all()
             self._my_keep().put(self.manifest_text(strip=True))
@@ -1001,17 +1019,21 @@ class Collection(CollectionBase):
 
     @_populate_first
     def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
-        '''Save a new collection record.
+        """Save a new collection record.
+
+        :name:
+          The collection name.
 
-        name: The collection name.
+        :owner_uuid:
+          the user, or project uuid that will own this collection.
+          If None, defaults to the current user.
 
-        owner_uuid: the user, or project uuid that will own this collection.
-        If None, defaults to the current user.
+        :ensure_unique_name:
+          If True, ask the API server to rename the collection
+          if it conflicts with a collection with the same name and owner.  If
+          False, a name conflict will result in an error.
 
-        ensure_unique_name: If True, ask the API server to rename the
-        collection if it conflicts with a collection with the same name and
-        owner.  If False, a name conflict will result in an error.
-        '''
+        """
         self._my_block_manager().commit_all()
         self._my_keep().put(self.manifest_text(strip=True))
         body = {"manifest_text": self.manifest_text(strip=False),
@@ -1024,19 +1046,24 @@ class Collection(CollectionBase):
 
 
 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
-    '''Import a manifest into a Collection.
+    """Import a manifest into a `Collection`.
 
-    manifest_text: The manifest text to import from.
+    :manifest_text:
+      The manifest text to import from.
 
-    into_collection: The Collection that will be initialized (must be empty).
-    If None, create a new Collection object.
+    :into_collection:
+      The `Collection` that will be initialized (must be empty).
+      If None, create a new `Collection` object.
 
-    api_client: The API client object that will be used when creating a new Collection object.
+    :api_client:
+      The API client object that will be used when creating a new `Collection` object.
 
-    keep: The keep client object that will be used when creating a new Collection object.
+    :keep:
+      The keep client object that will be used when creating a new `Collection` object.
 
-    num_retries: the default number of api client and keep retries on error.
-    '''
+    num_retries
+      the default number of api client and keep retries on error.
+    """
     if into_collection is not None:
         if len(into_collection) > 0:
             raise ArgumentError("Can only import manifest into an empty collection")
@@ -1093,14 +1120,18 @@ def import_manifest(manifest_text, into_collection=None, api_client=None, keep=N
     return c
 
 def export_manifest(item, stream_name=".", portable_locators=False):
-    '''Create a manifest for "item" (must be a Collection or ArvadosFile).  If
-    "item" is a is a Collection, this will also export subcollections.
-
-    stream_name: the name of the stream when exporting "item".
-
-    portable_locators: If True, strip any permission hints on block locators.
-    If False, use block locators as-is.
-    '''
+    """
+    :item:
+      Create a manifest for `item` (must be a `Collection` or `ArvadosFile`).  If
+      `item` is a is a `Collection`, this will also export subcollections.
+
+    :stream_name:
+      the name of the stream when exporting `item`.
+
+    :portable_locators:
+      If True, strip any permission hints on block locators.
+      If False, use block locators as-is.
+    """
     buf = ""
     if isinstance(item, Collection):
         stream = {}
-- 
2.30.2