+ADD = "add"
+DEL = "del"
+MOD = "mod"
+TOK = "tok"
+FILE = "file"
+COLLECTION = "collection"
+
+class RichCollectionBase(CollectionBase):
+ """Base class for Collections and Subcollections.
+
+ Implements the majority of functionality relating to accessing items in the
+ Collection.
+
+ """
+
+ def __init__(self, parent=None):
+ self.parent = parent
+ self._committed = False
+ self._callback = None
+ self._items = {}
+
+ def _my_api(self):
+ raise NotImplementedError()
+
+ def _my_keep(self):
+ raise NotImplementedError()
+
+ def _my_block_manager(self):
+ raise NotImplementedError()
+
+ def writable(self):
+ raise NotImplementedError()
+
+ def root_collection(self):
+ raise NotImplementedError()
+
+ def notify(self, event, collection, name, item):
+ raise NotImplementedError()
+
+ def stream_name(self):
+ raise NotImplementedError()
+
+ @must_be_writable
+ @synchronized
+ def find_or_create(self, path, create_type):
+ """Recursively search the specified file path.
+
+ May return either a `Collection` or `ArvadosFile`. If not found, will
+ create a new item at the specified path based on `create_type`. Will
+ create intermediate subcollections needed to contain the final item in
+ the path.
+
+ :create_type:
+ One of `arvados.collection.FILE` or
+ `arvados.collection.COLLECTION`. If the path is not found, and value
+ of create_type is FILE then create and return a new ArvadosFile for
+ the last path component. If COLLECTION, then create and return a new
+ Collection for the last path component.
+
+ """
+
+ pathcomponents = path.split("/", 1)
+ if pathcomponents[0]:
+ item = self._items.get(pathcomponents[0])
+ if len(pathcomponents) == 1:
+ if item is None:
+ # create new file
+ if create_type == COLLECTION:
+ item = Subcollection(self, pathcomponents[0])
+ else:
+ item = ArvadosFile(self, pathcomponents[0])
+ self._items[pathcomponents[0]] = item
+ self._committed = False
+ self.notify(ADD, self, pathcomponents[0], item)
+ return item
+ else:
+ if item is None:
+ # create new collection
+ item = Subcollection(self, pathcomponents[0])
+ self._items[pathcomponents[0]] = item
+ self._committed = False
+ self.notify(ADD, self, pathcomponents[0], item)
+ if isinstance(item, RichCollectionBase):
+ return item.find_or_create(pathcomponents[1], create_type)
+ else:
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+ else:
+ return self
+
+ @synchronized
+ def find(self, path):
+ """Recursively search the specified file path.
+
+ May return either a Collection or ArvadosFile. Return None if not
+ found.
+
+ """
+ if not path:
+ raise errors.ArgumentError("Parameter 'path' is empty.")
+
+ pathcomponents = path.split("/", 1)
+ item = self._items.get(pathcomponents[0])
+ if len(pathcomponents) == 1:
+ return item
+ else:
+ if isinstance(item, RichCollectionBase):
+ if pathcomponents[1]:
+ return item.find(pathcomponents[1])
+ else:
+ return item
+ else:
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
+ @synchronized
+ def mkdirs(self, path):
+ """Recursive subcollection create.
+
+ Like `os.makedirs()`. Will create intermediate subcollections needed
+ to contain the leaf subcollection path.
+
+ """
+
+ if self.find(path) != None:
+ raise IOError(errno.EEXIST, "Directory or file exists", path)
+
+ return self.find_or_create(path, COLLECTION)
+
+ def open(self, path, mode="r"):
+ """Open a file-like object for access.
+
+ :path:
+ path to a file in the collection
+ :mode:
+ one of "r", "r+", "w", "w+", "a", "a+"
+ :"r":
+ opens for reading
+ :"r+":
+ opens for reading and writing. Reads/writes share a file pointer.
+ :"w", "w+":
+ truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
+ :"a", "a+":
+ opens for reading and writing. All writes are appended to
+ the end of the file. Writing does not affect the file pointer for
+ reading.
+ """
+ mode = mode.replace("b", "")
+ if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
+ raise errors.ArgumentError("Bad mode '%s'" % mode)
+ create = (mode != "r")
+
+ if create and not self.writable():
+ raise IOError(errno.EROFS, "Collection is read only")
+
+ if create:
+ arvfile = self.find_or_create(path, FILE)
+ else:
+ arvfile = self.find(path)
+
+ if arvfile is None:
+ raise IOError(errno.ENOENT, "File not found", path)
+ if not isinstance(arvfile, ArvadosFile):
+ raise IOError(errno.EISDIR, "Is a directory", path)
+
+ if mode[0] == "w":
+ arvfile.truncate(0)
+
+ name = os.path.basename(path)
+
+ if mode == "r":
+ return ArvadosFileReader(arvfile, num_retries=self.num_retries)
+ else:
+ return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
+
+ def modified(self):
+ """Determine if the collection has been modified since last commited."""
+ return not self.committed()
+
+ @synchronized
+ def committed(self):
+ """Determine if the collection has been committed to the API server."""
+
+ if self._committed is False:
+ return False
+ for v in self._items.values():
+ if v.committed() is False:
+ return False
+ return True
+
+ @synchronized
+ def set_committed(self):
+ """Recursively set committed flag to True."""
+ self._committed = True
+ for k,v in self._items.items():
+ v.set_committed()
+
+ @synchronized
+ def __iter__(self):
+ """Iterate over names of files and collections contained in this collection."""
+ return iter(self._items.keys())
+
+ @synchronized
+ def __getitem__(self, k):
+ """Get a file or collection that is directly contained by this collection.
+
+ If you want to search a path, use `find()` instead.
+
+ """
+ return self._items[k]
+
+ @synchronized
+ def __contains__(self, k):
+ """Test if there is a file or collection a directly contained by this collection."""
+ return k in self._items
+
+ @synchronized
+ def __len__(self):
+ """Get the number of items directly contained in this collection."""
+ return len(self._items)
+
+ @must_be_writable
+ @synchronized
+ def __delitem__(self, p):
+ """Delete an item by name which is directly contained by this collection."""
+ del self._items[p]
+ self._committed = False
+ self.notify(DEL, self, p, None)
+
+ @synchronized
+ def keys(self):
+ """Get a list of names of files and collections directly contained in this collection."""
+ return self._items.keys()
+
+ @synchronized
+ def values(self):
+ """Get a list of files and collection objects directly contained in this collection."""
+ return self._items.values()
+
+ @synchronized
+ def items(self):
+ """Get a list of (name, object) tuples directly contained in this collection."""
+ return self._items.items()
+
+ def exists(self, path):
+ """Test if there is a file or collection at `path`."""
+ return self.find(path) is not None
+
+ @must_be_writable
+ @synchronized
+ def remove(self, path, recursive=False):
+ """Remove the file or subcollection (directory) at `path`.
+
+ :recursive:
+ Specify whether to remove non-empty subcollections (True), or raise an error (False).
+ """
+
+ if not path:
+ raise errors.ArgumentError("Parameter 'path' is empty.")
+
+ pathcomponents = path.split("/", 1)
+ item = self._items.get(pathcomponents[0])
+ if item is None:
+ raise IOError(errno.ENOENT, "File not found", path)
+ if len(pathcomponents) == 1:
+ if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
+ raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
+ deleteditem = self._items[pathcomponents[0]]
+ del self._items[pathcomponents[0]]
+ self._committed = False
+ self.notify(DEL, self, pathcomponents[0], deleteditem)
+ else:
+ item.remove(pathcomponents[1])
+
+ def _clonefrom(self, source):
+ for k,v in source.items():
+ self._items[k] = v.clone(self, k)
+
+ def clone(self):
+ raise NotImplementedError()
+
+ @must_be_writable
+ @synchronized
+ def add(self, source_obj, target_name, overwrite=False, reparent=False):
+ """Copy or move a file or subcollection to this collection.
+
+ :source_obj:
+ An ArvadosFile, or Subcollection object
+
+ :target_name:
+ Destination item name. If the target name already exists and is a
+ file, this will raise an error unless you specify `overwrite=True`.
+
+ :overwrite:
+ Whether to overwrite target file if it already exists.
+
+ :reparent:
+ If True, source_obj will be moved from its parent collection to this collection.
+ If False, source_obj will be copied and the parent collection will be
+ unmodified.
+
+ """
+
+ if target_name in self and not overwrite:
+ raise IOError(errno.EEXIST, "File already exists", target_name)
+
+ modified_from = None
+ if target_name in self:
+ modified_from = self[target_name]
+
+ # Actually make the move or copy.
+ if reparent:
+ source_obj._reparent(self, target_name)
+ item = source_obj
+ else:
+ item = source_obj.clone(self, target_name)
+
+ self._items[target_name] = item
+ self._committed = False
+
+ if modified_from:
+ self.notify(MOD, self, target_name, (modified_from, item))
+ else:
+ self.notify(ADD, self, target_name, item)
+
+ def _get_src_target(self, source, target_path, source_collection, create_dest):
+ if source_collection is None:
+ source_collection = self
+
+ # Find the object
+ if isinstance(source, basestring):
+ source_obj = source_collection.find(source)
+ if source_obj is None:
+ raise IOError(errno.ENOENT, "File not found", source)
+ sourcecomponents = source.split("/")
+ else:
+ source_obj = source
+ sourcecomponents = None
+
+ # Find parent collection the target path
+ targetcomponents = target_path.split("/")
+
+ # Determine the name to use.
+ target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
+
+ if not target_name:
+ raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
+
+ if create_dest:
+ target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+ else:
+ if len(targetcomponents) > 1:
+ target_dir = self.find("/".join(targetcomponents[0:-1]))
+ else:
+ target_dir = self
+
+ if target_dir is None:
+ raise IOError(errno.ENOENT, "Target directory not found", target_name)
+
+ if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+ target_dir = target_dir[target_name]
+ target_name = sourcecomponents[-1]
+
+ return (source_obj, target_dir, target_name)
+
+ @must_be_writable
+ @synchronized
+ def copy(self, source, target_path, source_collection=None, overwrite=False):
+ """Copy a file or subcollection to a new path in this collection.
+
+ :source:
+ A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
+
+ :target_path:
+ Destination file or path. If the target path already exists and is a
+ subcollection, the item will be placed inside the subcollection. If
+ the target path already exists and is a file, this will raise an error
+ unless you specify `overwrite=True`.
+
+ :source_collection:
+ Collection to copy `source_path` from (default `self`)
+
+ :overwrite:
+ Whether to overwrite target file if it already exists.
+ """
+
+ source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+ target_dir.add(source_obj, target_name, overwrite, False)
+
+ @must_be_writable
+ @synchronized
+ def rename(self, source, target_path, source_collection=None, overwrite=False):
+ """Move a file or subcollection from `source_collection` to a new path in this collection.
+
+ :source:
+ A string with a path to source file or subcollection.
+
+ :target_path:
+ Destination file or path. If the target path already exists and is a
+ subcollection, the item will be placed inside the subcollection. If
+ the target path already exists and is a file, this will raise an error
+ unless you specify `overwrite=True`.
+
+ :source_collection:
+ Collection to copy `source_path` from (default `self`)
+
+ :overwrite:
+ Whether to overwrite target file if it already exists.
+ """
+
+ source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+ if not source_obj.writable():
+ raise IOError(errno.EROFS, "Source collection is read only", source)
+ target_dir.add(source_obj, target_name, overwrite, True)
+
+ def portable_manifest_text(self, stream_name="."):
+ """Get the manifest text for this collection, sub collections and files.
+
+ This method does not flush outstanding blocks to Keep. It will return
+ a normalized manifest with access tokens stripped.
+
+ :stream_name:
+ Name to use for this stream (directory)
+
+ """
+ return self._get_manifest_text(stream_name, True, True)
+
+ @synchronized
+ def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ """Get the manifest text for this collection, sub collections and files.
+
+ This method will flush outstanding blocks to Keep. By default, it will
+ not normalize an unmodified manifest or strip access tokens.
+
+ :stream_name:
+ Name to use for this stream (directory)
+
+ :strip:
+ If True, remove signing tokens from block locators if present.
+ If False (default), block locators are left unchanged.
+
+ :normalize:
+ If True, always export the manifest text in normalized form
+ even if the Collection is not modified. If False (default) and the collection
+ is not modified, return the original manifest text even if it is not
+ in normalized form.
+
+ """
+
+ self._my_block_manager().commit_all()
+ return self._get_manifest_text(stream_name, strip, normalize)
+
+ @synchronized
+ def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+ """Get the manifest text for this collection, sub collections and files.
+
+ :stream_name:
+ Name to use for this stream (directory)
+
+ :strip:
+ If True, remove signing tokens from block locators if present.
+ If False (default), block locators are left unchanged.
+
+ :normalize:
+ If True, always export the manifest text in normalized form
+ even if the Collection is not modified. If False (default) and the collection
+ is not modified, return the original manifest text even if it is not
+ in normalized form.
+
+ :only_committed:
+ If True, only include blocks that were already committed to Keep.
+
+ """
+
+ if not self.committed() or self._manifest_text is None or normalize:
+ stream = {}
+ buf = []
+ sorted_keys = sorted(self.keys())
+ for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
+ # Create a stream per file `k`
+ arvfile = self[filename]
+ filestream = []
+ for segment in arvfile.segments():
+ loc = segment.locator
+ if arvfile.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
+ loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
+ if strip:
+ loc = KeepLocator(loc).stripped()
+ filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
+ segment.segment_offset, segment.range_size))
+ stream[filename] = filestream
+ if stream:
+ buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
+ for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
+ buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+ return "".join(buf)
+ else:
+ if strip:
+ return self.stripped_manifest()
+ else:
+ return self._manifest_text
+
+ @synchronized
+ def diff(self, end_collection, prefix=".", holding_collection=None):
+ """Generate list of add/modify/delete actions.
+
+ When given to `apply`, will change `self` to match `end_collection`
+
+ """
+ changes = []
+ if holding_collection is None:
+ holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
+ for k in self:
+ if k not in end_collection:
+ changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
+ for k in end_collection:
+ if k in self:
+ if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
+ changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
+ elif end_collection[k] != self[k]:
+ changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+ else:
+ changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+ else:
+ changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
+ return changes
+
+ @must_be_writable
+ @synchronized
+ def apply(self, changes):
+ """Apply changes from `diff`.
+
+ If a change conflicts with a local change, it will be saved to an
+ alternate path indicating the conflict.