From 5bb11227e0d7d37cc6cf574b5d5f289afd1a43b1 Mon Sep 17 00:00:00 2001 From: Peter Amstutz <peter.amstutz@curoverse.com> Date: Mon, 2 Feb 2015 09:33:28 -0500 Subject: [PATCH] 4823: Adding @_synchronized to protect arvfile and block manager. Updated docstrings to PEP 257 and PEP 287. --- sdk/python/arvados/arvfile.py | 359 +++++++++++++++++-------------- sdk/python/arvados/collection.py | 211 ++++++++++-------- 2 files changed, 323 insertions(+), 247 deletions(-) diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index bab14d4294..29db2cb996 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -9,6 +9,7 @@ import hashlib import hashlib import threading import Queue +import copy def split(path): """split(path) -> streamname, filename @@ -217,30 +218,39 @@ class StreamFileReader(ArvadosFileReaderBase): class BufferBlock(object): -''' +""" A BufferBlock is a stand-in for a Keep block that is in the process of being written. Writers can append to it, get the size, and compute the Keep locator. There are three valid states: -WRITABLE - can append +WRITABLE + Can append to block. -PENDING - is in the process of being uploaded to Keep, append is an error +PENDING + Block is in the process of being uploaded to Keep, append is an error. -COMMITTED - the block has been written to Keep, its internal buffer has been -released, and the BufferBlock should be discarded in favor of fetching the -block through normal Keep means. -''' +COMMITTED + The block has been written to Keep, its internal buffer has been + released, fetching the block will fetch it via keep client (since we + discarded the internal copy), and identifiers referring to the BufferBlock + can be replaced with the block locator. +""" WRITABLE = 0 PENDING = 1 COMMITTED = 2 def __init__(self, blockid, starting_capacity, owner): - ''' - blockid: the identifier for this block - starting_capacity: the initial buffer capacity - owner: ArvadosFile that owns this block - ''' + """ + :blockid: + the identifier for this block + + :starting_capacity: + the initial buffer capacity + + :owner: + ArvadosFile that owns this block + """ self.blockid = blockid self.buffer_block = bytearray(starting_capacity) self.buffer_view = memoryview(self.buffer_block) @@ -250,11 +260,11 @@ block through normal Keep means. self.owner = owner def append(self, data): - ''' + """ Append some data to the buffer. Only valid if the block is in WRITABLE state. Implements an expanding buffer, doubling capacity as needed to accomdate all the data. - ''' + """ if self.state == BufferBlock.WRITABLE: while (self.write_pointer+len(data)) > len(self.buffer_block): new_buffer_block = bytearray(len(self.buffer_block) * 2) @@ -268,34 +278,41 @@ block through normal Keep means. raise AssertionError("Buffer block is not writable") def size(self): - '''Amount of data written to the buffer''' + """Amount of data written to the buffer""" return self.write_pointer def locator(self): - '''The Keep locator for this buffer's contents.''' + """The Keep locator for this buffer's contents.""" if self._locator is None: self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) return self._locator class AsyncKeepWriteErrors(Exception): - ''' + """ Roll up one or more Keep write exceptions (generated by background threads) into a single one. - ''' + """ def __init__(self, errors): self.errors = errors def __repr__(self): return "\n".join(self.errors) +def _synchronized(orig_func): + @functools.wraps(orig_func) + def wrapper(self, *args, **kwargs): + with self.lock: + return orig_func(self, *args, **kwargs) + return wrapper + class BlockManager(object): - ''' + """ BlockManager handles buffer blocks, background block uploads, and background block prefetch for a Collection of ArvadosFiles. - ''' + """ def __init__(self, keep): - '''keep: KeepClient object to use''' + """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = {} self._put_queue = None @@ -303,26 +320,39 @@ class BlockManager(object): self._put_threads = None self._prefetch_queue = None self._prefetch_threads = None + self.lock = threading.Lock() + @_synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): - ''' + """ Allocate a new, empty bufferblock in WRITABLE state and return it. - blockid: optional block identifier, otherwise one will be automatically assigned - starting_capacity: optional capacity, otherwise will use default capacity - owner: ArvadosFile that owns this block - ''' + + :blockid: + optional block identifier, otherwise one will be automatically assigned + + :starting_capacity: + optional capacity, otherwise will use default capacity + + :owner: + ArvadosFile that owns this block + """ if blockid is None: blockid = "bufferblock%i" % len(self._bufferblocks) bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bb.blockid] = bb return bb + @_synchronized def dup_block(self, blockid, owner): - ''' + """ Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock. - blockid: the block to copy. May be an existing buffer block id. - owner: ArvadosFile that owns the new block - ''' + + :blockid: + the block to copy. May be an existing buffer block id. + + :owner: + ArvadosFile that owns the new block + """ new_blockid = "bufferblock%i" % len(self._bufferblocks) block = self._bufferblocks[blockid] bb = BufferBlock(new_blockid, len(block), owner) @@ -330,13 +360,15 @@ class BlockManager(object): self._bufferblocks[bb.blockid] = bb return bb + @_synchronized def is_bufferblock(self, id): return id in self._bufferblocks + @_synchronized def stop_threads(self): - ''' + """ Shut down and wait for background upload and download threads to finish. - ''' + """ if self._put_threads is not None: for t in self._put_threads: self._put_queue.put(None) @@ -355,15 +387,15 @@ class BlockManager(object): self._prefetch_queue = None def commit_bufferblock(self, block): - ''' + """ Initiate a background upload of a bufferblock. This will block if the upload queue is at capacity, otherwise it will return immediately. - ''' + """ def worker(self): - ''' + """ Background uploader thread. - ''' + """ while True: try: b = self._put_queue.get() @@ -380,73 +412,80 @@ class BlockManager(object): if self._put_queue is not None: self._put_queue.task_done() - if self._put_threads is None: - # Start uploader threads. - - # If we don't limit the Queue size, the upload queue can quickly - # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep - # servers. - # - # With two upload threads and a queue size of 2, this means up to 4 - # blocks pending. If they are full 64 MiB blocks, that means up to - # 256 MiB of internal buffering, which is the same size as the - # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) - self._put_errors = Queue.Queue() - self._put_threads = [threading.Thread(target=worker, args=(self,)), - threading.Thread(target=worker, args=(self,))] - for t in self._put_threads: - t.daemon = True - t.start() + with self.lock: + if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. + self._put_queue = Queue.Queue(maxsize=2) + self._put_errors = Queue.Queue() + self._put_threads = [threading.Thread(target=worker, args=(self,)), + threading.Thread(target=worker, args=(self,))] + for t in self._put_threads: + t.daemon = True + t.start() # Mark the block as PENDING so to disallow any more appends. block.state = BufferBlock.PENDING self._put_queue.put(block) def get_block(self, locator, num_retries, cache_only=False): - ''' + """ Fetch a block. First checks to see if the locator is a BufferBlock and return that, if not, passes the request through to KeepClient.get(). - ''' - if locator in self._bufferblocks: - bb = self._bufferblocks[locator] - if bb.state != BufferBlock.COMMITTED: - return bb.buffer_view[0:bb.write_pointer].tobytes() - else: - locator = bb._locator + """ + with self.lock: + if locator in self._bufferblocks: + bb = self._bufferblocks[locator] + if bb.state != BufferBlock.COMMITTED: + return bb.buffer_view[0:bb.write_pointer].tobytes() + else: + locator = bb._locator return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only) def commit_all(self): - ''' + """ Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this is a synchronous call, and will not return until all buffer blocks are uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to upload. - ''' - for k,v in self._bufferblocks.items(): + """ + with self.lock: + items = self._bufferblocks.items() + + for k,v in items: if v.state == BufferBlock.WRITABLE: self.commit_bufferblock(v) - if self._put_queue is not None: - self._put_queue.join() - if not self._put_errors.empty(): - e = [] - try: - while True: - e.append(self._put_errors.get(False)) - except Queue.Empty: - pass - raise AsyncKeepWriteErrors(e) + + with self.lock: + if self._put_queue is not None: + self._put_queue.join() + if not self._put_errors.empty(): + e = [] + try: + while True: + e.append(self._put_errors.get(False)) + except Queue.Empty: + pass + raise AsyncKeepWriteErrors(e) def block_prefetch(self, locator): - ''' + """ Initiate a background download of a block. This assumes that the underlying KeepClient implements a block cache, so repeated requests for the same block will not result in repeated downloads (unless the block is evicted from the cache.) This method does not block. - ''' + """ def worker(self): - '''Background downloader thread.''' + """Background downloader thread.""" while True: try: b = self._prefetch_queue.get() @@ -456,73 +495,84 @@ class BlockManager(object): except: pass - if locator in self._bufferblocks: - return - if self._prefetch_threads is None: - self._prefetch_queue = Queue.Queue() - self._prefetch_threads = [threading.Thread(target=worker, args=(self,)), - threading.Thread(target=worker, args=(self,))] - for t in self._prefetch_threads: - t.daemon = True - t.start() + with self.lock: + if locator in self._bufferblocks: + return + if self._prefetch_threads is None: + self._prefetch_queue = Queue.Queue() + self._prefetch_threads = [threading.Thread(target=worker, args=(self,)), + threading.Thread(target=worker, args=(self,))] + for t in self._prefetch_threads: + t.daemon = True + t.start() self._prefetch_queue.put(locator) class ArvadosFile(object): - ''' + """ ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access. - ''' + """ def __init__(self, parent, stream=[], segments=[]): - ''' - stream: a list of Range objects representing a block stream - segments: a list of Range objects representing segments - ''' + """ + :stream: + a list of Range objects representing a block stream + + :segments: + a list of Range objects representing segments + """ self.parent = parent self._modified = True - self.segments = [] + self._segments = [] for s in segments: - self.add_segment(stream, s.locator, s.range_size) + self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None self.lock = threading.Lock() + @_synchronized + def segments(self): + return copy.copy(self._segments) + + @_synchronized def clone(self, num_retries): - '''Make a copy of this file.''' - with self.lock: - cp = ArvadosFile() - cp.parent = self.parent - cp._modified = False + """Make a copy of this file.""" + cp = ArvadosFile() + cp.parent = self.parent + cp._modified = False - map_loc = {} - for r in self.segments: - new_loc = r.locator - if self.parent._my_block_manager().is_bufferblock(r.locator): - if r.locator not in map_loc: - map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid - new_loc = map_loc[r.locator] + map_loc = {} + for r in self._segments: + new_loc = r.locator + if self.parent._my_block_manager().is_bufferblock(r.locator): + if r.locator not in map_loc: + map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid + new_loc = map_loc[r.locator] - cp.segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset)) + cp.segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset)) - return cp + return cp + @_synchronized def set_unmodified(self): - '''Clear the modified flag''' + """Clear the modified flag""" self._modified = False + @_synchronized def modified(self): - '''Test the modified flag''' + """Test the modified flag""" return self._modified + @_synchronized def truncate(self, size): - ''' - Adjust the size of the file. If "size" is less than the size of the file, - the file contents after "size" will be discarded. If "size" is greater + """ + Adjust the size of the file. If `size` is less than the size of the file, + the file contents after `size` will be discarded. If `size` is greater than the current size of the file, an IOError will be raised. - ''' + """ if size < self.size(): new_segs = [] - for r in self.segments: + for r in self._segments: range_end = r.range_start+r.range_size if r.range_start >= size: # segment is past the trucate size, all done @@ -535,24 +585,24 @@ class ArvadosFile(object): else: new_segs.append(r) - self.segments = new_segs + self._segments = new_segs self._modified = True elif size > self.size(): raise IOError("truncate() does not support extending the file size") - + @_synchronized def readfrom(self, offset, size, num_retries): - ''' - read upto "size" bytes from the file starting at "offset". - ''' + """ + read upto `size` bytes from the file starting at `offset`. + """ if size == 0 or offset >= self.size(): return '' data = [] - for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE): + for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE): self.parent._my_block_manager().block_prefetch(lr.locator) - for lr in locators_and_ranges(self.segments, offset, size): + for lr in locators_and_ranges(self._segments, offset, size): d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data)) if d: data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size]) @@ -561,13 +611,13 @@ class ArvadosFile(object): return ''.join(data) def _repack_writes(self): - ''' + """ Test if the buffer block has more data than is referenced by actual segments (this happens when a buffered write over-writes a file range written in a previous buffered write). Re-pack the buffer block for efficiency and to avoid leaking information. - ''' - segs = self.segments + """ + segs = self._segments # Sum up the segments to get the total bytes of the file referencing # into the buffer block. @@ -584,11 +634,12 @@ class ArvadosFile(object): self._current_bblock = new_bb + @_synchronized def writeto(self, offset, data, num_retries): - ''' - Write "data" to the file starting at "offset". This will update + """ + Write `data` to the file starting at `offset`. This will update existing bytes and/or extend the size of the file as necessary. - ''' + """ if len(data) == 0: return @@ -610,23 +661,30 @@ class ArvadosFile(object): self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) - replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) + replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) + @_synchronized def add_segment(self, blocks, pos, size): - ''' - Add a segment to the end of the file, with "pos" and "offset" referencing a - section of the stream described by "blocks" (a list of Range objects) - ''' + # Synchronized public api, see _add_segment + self._add_segment(blocks, pos, size) + + def _add_segment(self, blocks, pos, size): + """ + Add a segment to the end of the file, with `pos` and `offset` referencing a + section of the stream described by `blocks` (a list of Range objects) + """ self._modified = True for lr in locators_and_ranges(blocks, pos, size): - last = self.segments[-1] if self.segments else Range(0, 0, 0) + last = self._segments[-1] if self._segments else Range(0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) - self.segments.append(r) + self._segments.append(r) + + @_synchronized def size(self): - '''Get the file size''' - if self.segments: - n = self.segments[-1] + """Get the file size""" + if self._segments: + n = self._segments[-1] return n.range_start + n.range_size else: return 0 @@ -635,7 +693,7 @@ class ArvadosFile(object): class ArvadosFileReader(ArvadosFileReaderBase): def __init__(self, arvadosfile, name, mode="r", num_retries=None): super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries) - self.arvadosfile = arvadosfile.clone() + self.arvadosfile = arvadosfile def size(self): return self.arvadosfile.size() @@ -643,7 +701,7 @@ class ArvadosFileReader(ArvadosFileReaderBase): @ArvadosFileBase._before_close @retry_method def read(self, size, num_retries=None): - """Read up to 'size' bytes from the stream, starting at the current file position""" + """Read up to `size` bytes from the stream, starting at the current file position""" data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries) self._filepos += len(data) return data @@ -651,29 +709,16 @@ class ArvadosFileReader(ArvadosFileReaderBase): @ArvadosFileBase._before_close @retry_method def readfrom(self, offset, size, num_retries=None): - """Read up to 'size' bytes from the stream, starting at the current file position""" + """Read up to `size` bytes from the stream, starting at the current file position""" return self.arvadosfile.readfrom(offset, size, num_retries) def flush(self): pass -class SynchronizedArvadosFile(object): - def __init__(self, arvadosfile): - self.arvadosfile = arvadosfile - - def clone(self): - return self - - def __getattr__(self, name): - with self.arvadosfile.lock: - return getattr(self.arvadosfile, name) - - class ArvadosFileWriter(ArvadosFileReader): def __init__(self, arvadosfile, name, mode, num_retries=None): - self.arvadosfile = SynchronizedArvadosFile(arvadosfile) - super(ArvadosFileWriter, self).__init__(self.arvadosfile, name, mode, num_retries=num_retries) + super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries) @ArvadosFileBase._before_close @retry_method diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index a0eda1175a..682880359c 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -641,25 +641,30 @@ class ResumableCollectionWriter(CollectionWriter): class Collection(CollectionBase): - '''An abstract Arvados collection, consisting of a set of files and + """An abstract Arvados collection, consisting of a set of files and sub-collections. - ''' + """ def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None, keep_client=None, num_retries=0, block_manager=None): - '''manifest_locator_or_text: One of Arvados collection UUID, block locator of - a manifest, raw manifest text, or None (to create an empty collection). - - parent: the parent Collection, may be None. - - api_client: The API client object to use for requests. If None, use default. - - keep_client: the Keep client to use for requests. If None, use default. + """ + :manifest_locator_or_text: + One of Arvados collection UUID, block locator of + a manifest, raw manifest text, or None (to create an empty collection). + :parent: + the parent Collection, may be None. + :api_client: + The API client object to use for requests. If None, use default. + :keep_client: + the Keep client to use for requests. If None, use default. + :num_retries: + the number of retries for API and Keep requests. + :block_manager: + the block manager to use. If None, use parent's block + manager or create one. - num_retries: the number of retries for API and Keep requests. + """ - block_manager: the block manager to use. If None, create one. - ''' self.parent = parent self._items = None self._api_client = api_client @@ -779,25 +784,29 @@ class Collection(CollectionBase): return self def __exit__(self, exc_type, exc_value, traceback): - '''Support scoped auto-commit in a with: block''' + """Support scoped auto-commit in a with: block""" self.save(no_locator=True) if self._block_manager is not None: self._block_manager.stop_threads() @_populate_first def find(self, path, create=False, create_collection=False): - '''Recursively search the specified file path. May return either a Collection + """Recursively search the specified file path. May return either a Collection or ArvadosFile. - create: If true, create path components (i.e. Collections) that are - missing. If "create" is False, return None if a path component is not - found. + :create: + If true, create path components (i.e. Collections) that are + missing. If "create" is False, return None if a path component is + not found. + + :create_collection: + If the path is not found, "create" is True, and + "create_collection" is False, then create and return a new + ArvadosFile for the last path component. If "create_collection" is + True, then create and return a new Collection for the last path + component. - create_collection: If the path is not found, "create" is True, and - "create_collection" is False, then create and return a new ArvadosFile - for the last path component. If "create_collection" is True, then - create and return a new Collection for the last path component. - ''' + """ p = path.split("/") if p[0] == '.': del p[0] @@ -826,7 +835,8 @@ class Collection(CollectionBase): @_populate_first def api_response(self): - """api_response() -> dict or None + """ + api_response() -> dict or None Returns information about this Collection fetched from the API server. If the Collection exists in Keep but not the API server, currently @@ -835,19 +845,23 @@ class Collection(CollectionBase): return self._api_response def open(self, path, mode): - '''Open a file-like object for access. - - path: path to a file in the collection - - mode: one of "r", "r+", "w", "w+", "a", "a+" - "r" opens for reading - - "r+" opens for reading and writing. Reads/writes share a file pointer. - - "w", "w+" truncates to 0 and opens for reading and writing. Reads/writes share a file pointer. - - "a", "a+" opens for reading and writing. All writes are appended to the end of the file. Writing does not affect the file pointer for reading. - ''' + """Open a file-like object for access. + + :path: + path to a file in the collection + :mode: + one of "r", "r+", "w", "w+", "a", "a+" + :"r": + opens for reading + :"r+": + opens for reading and writing. Reads/writes share a file pointer. + :"w", "w+": + truncates to 0 and opens for reading and writing. Reads/writes share a file pointer. + :"a", "a+": + opens for reading and writing. All writes are appended to + the end of the file. Writing does not affect the file pointer for + reading. + """ mode = mode.replace("b", "") if len(mode) == 0 or mode[0] not in ("r", "w", "a"): raise ArgumentError("Bad mode '%s'" % mode) @@ -869,8 +883,8 @@ class Collection(CollectionBase): @_populate_first def modified(self): - '''Test if the collection (or any subcollection or file) has been modified - since it was created.''' + """Test if the collection (or any subcollection or file) has been modified + since it was created.""" for k,v in self._items.items(): if v.modified(): return True @@ -878,65 +892,65 @@ class Collection(CollectionBase): @_populate_first def set_unmodified(self): - '''Recursively clear modified flag''' + """Recursively clear modified flag""" for k,v in self._items.items(): v.set_unmodified() @_populate_first def __iter__(self): - '''Iterate over names of files and collections contained in this collection.''' + """Iterate over names of files and collections contained in this collection.""" return self._items.iterkeys() @_populate_first def iterkeys(self): - '''Iterate over names of files and collections directly contained in this collection.''' + """Iterate over names of files and collections directly contained in this collection.""" return self._items.iterkeys() @_populate_first def __getitem__(self, k): - '''Get a file or collection that is directly contained by this collection. Use - find() for path serach.''' + """Get a file or collection that is directly contained by this collection. Use + find() for path serach.""" return self._items[k] @_populate_first def __contains__(self, k): - '''If there is a file or collection a directly contained by this collection - with name "k".''' + """If there is a file or collection a directly contained by this collection + with name "k".""" return k in self._items @_populate_first def __len__(self): - '''Get the number of items directly contained in this collection''' + """Get the number of items directly contained in this collection""" return len(self._items) @_populate_first def __delitem__(self, p): - '''Delete an item by name which is directly contained by this collection.''' + """Delete an item by name which is directly contained by this collection.""" del self._items[p] @_populate_first def keys(self): - '''Get a list of names of files and collections directly contained in this collection.''' + """Get a list of names of files and collections directly contained in this collection.""" return self._items.keys() @_populate_first def values(self): - '''Get a list of files and collection objects directly contained in this collection.''' + """Get a list of files and collection objects directly contained in this collection.""" return self._items.values() @_populate_first def items(self): - '''Get a list of (name, object) tuples directly contained in this collection.''' + """Get a list of (name, object) tuples directly contained in this collection.""" return self._items.items() @_populate_first def exists(self, path): - '''Test if there is a file or collection at "path"''' + """Test if there is a file or collection at "path" """ return self.find(path) != None @_populate_first def remove(self, path): - '''Test if there is a file or collection at "path"''' + """Test if there is a file or collection at "path" """ p = path.split("/") if p[0] == '.': del p[0] @@ -955,16 +969,19 @@ class Collection(CollectionBase): @_populate_first def manifest_text(self, strip=False, normalize=False): - '''Get the manifest text for this collection, sub collections and files. + """Get the manifest text for this collection, sub collections and files. + + :strip: + If True, remove signing tokens from block locators if present. + If False, block locators are left unchanged. - strip: If True, remove signing tokens from block locators if present. - If False, block locators are left unchanged. + :normalize: + If True, always export the manifest text in normalized form + even if the Collection is not modified. If False and the collection + is not modified, return the original manifest text even if it is not + in normalized form. - normalize: If True, always export the manifest text in normalized form - even if the Collection is not modified. If False and the collection is - not modified, return the original manifest text even if it is not in - normalized form. - ''' + """ if self.modified() or self._manifest_text is None or normalize: return export_manifest(self, stream_name=".", portable_locators=strip) else: @@ -974,18 +991,19 @@ class Collection(CollectionBase): return self._manifest_text def portable_data_hash(self): - '''Get the portable data hash for this collection's manifest.''' + """Get the portable data hash for this collection's manifest.""" stripped = self.manifest_text(strip=True) return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) @_populate_first def save(self, no_locator=False): - '''Commit pending buffer blocks to Keep, write the manifest to Keep, and + """Commit pending buffer blocks to Keep, write the manifest to Keep, and update the collection record to Keep. - no_locator: If False and there is no collection uuid associated with - this Collection, raise an error. If True, do not raise an error. - ''' + :no_locator: + If False and there is no collection uuid associated with + this Collection, raise an error. If True, do not raise an error. + """ if self.modified(): self._my_block_manager().commit_all() self._my_keep().put(self.manifest_text(strip=True)) @@ -1001,17 +1019,21 @@ class Collection(CollectionBase): @_populate_first def save_as(self, name, owner_uuid=None, ensure_unique_name=False): - '''Save a new collection record. + """Save a new collection record. + + :name: + The collection name. - name: The collection name. + :owner_uuid: + the user, or project uuid that will own this collection. + If None, defaults to the current user. - owner_uuid: the user, or project uuid that will own this collection. - If None, defaults to the current user. + :ensure_unique_name: + If True, ask the API server to rename the collection + if it conflicts with a collection with the same name and owner. If + False, a name conflict will result in an error. - ensure_unique_name: If True, ask the API server to rename the - collection if it conflicts with a collection with the same name and - owner. If False, a name conflict will result in an error. - ''' + """ self._my_block_manager().commit_all() self._my_keep().put(self.manifest_text(strip=True)) body = {"manifest_text": self.manifest_text(strip=False), @@ -1024,19 +1046,24 @@ class Collection(CollectionBase): def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None): - '''Import a manifest into a Collection. + """Import a manifest into a `Collection`. - manifest_text: The manifest text to import from. + :manifest_text: + The manifest text to import from. - into_collection: The Collection that will be initialized (must be empty). - If None, create a new Collection object. + :into_collection: + The `Collection` that will be initialized (must be empty). + If None, create a new `Collection` object. - api_client: The API client object that will be used when creating a new Collection object. + :api_client: + The API client object that will be used when creating a new `Collection` object. - keep: The keep client object that will be used when creating a new Collection object. + :keep: + The keep client object that will be used when creating a new `Collection` object. - num_retries: the default number of api client and keep retries on error. - ''' + num_retries + the default number of api client and keep retries on error. + """ if into_collection is not None: if len(into_collection) > 0: raise ArgumentError("Can only import manifest into an empty collection") @@ -1093,14 +1120,18 @@ def import_manifest(manifest_text, into_collection=None, api_client=None, keep=N return c def export_manifest(item, stream_name=".", portable_locators=False): - '''Create a manifest for "item" (must be a Collection or ArvadosFile). If - "item" is a is a Collection, this will also export subcollections. - - stream_name: the name of the stream when exporting "item". - - portable_locators: If True, strip any permission hints on block locators. - If False, use block locators as-is. - ''' + """ + :item: + Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If + `item` is a is a `Collection`, this will also export subcollections. + + :stream_name: + the name of the stream when exporting `item`. + + :portable_locators: + If True, strip any permission hints on block locators. + If False, use block locators as-is. + """ buf = "" if isinstance(item, Collection): stream = {} -- 2.30.2