+ target_dir = self
+
+ if target_dir is None:
+ raise IOError(errno.ENOENT, "Target directory not found", target_name)
+
+ if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+ target_dir = target_dir[target_name]
+ target_name = sourcecomponents[-1]
+
+ return (source_obj, target_dir, target_name)
+
+ @must_be_writable
+ @synchronized
+ def copy(self, source, target_path, source_collection=None, overwrite=False):
+ """Copy a file or subcollection to a new path in this collection.
+
+ :source:
+ A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
+
+ :target_path:
+ Destination file or path. If the target path already exists and is a
+ subcollection, the item will be placed inside the subcollection. If
+ the target path already exists and is a file, this will raise an error
+ unless you specify `overwrite=True`.
+
+ :source_collection:
+ Collection to copy `source_path` from (default `self`)
+
+ :overwrite:
+ Whether to overwrite target file if it already exists.
+ """
+
+ source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+ target_dir.add(source_obj, target_name, overwrite, False)
+
+ @must_be_writable
+ @synchronized
+ def rename(self, source, target_path, source_collection=None, overwrite=False):
+ """Move a file or subcollection from `source_collection` to a new path in this collection.
+
+ :source:
+ A string with a path to source file or subcollection.
+
+ :target_path:
+ Destination file or path. If the target path already exists and is a
+ subcollection, the item will be placed inside the subcollection. If
+ the target path already exists and is a file, this will raise an error
+ unless you specify `overwrite=True`.
+
+ :source_collection:
+ Collection to copy `source_path` from (default `self`)
+
+ :overwrite:
+ Whether to overwrite target file if it already exists.
+ """
+
+ source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+ if not source_obj.writable():
+ raise IOError(errno.EROFS, "Source collection is read only", source)
+ target_dir.add(source_obj, target_name, overwrite, True)
+
+ def portable_manifest_text(self, stream_name="."):
+ """Get the manifest text for this collection, sub collections and files.
+
+ This method does not flush outstanding blocks to Keep. It will return
+ a normalized manifest with access tokens stripped.
+
+ :stream_name:
+ Name to use for this stream (directory)
+
+ """
+ return self._get_manifest_text(stream_name, True, True)
+
+ @synchronized
+ def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ """Get the manifest text for this collection, sub collections and files.
+
+ This method will flush outstanding blocks to Keep. By default, it will
+ not normalize an unmodified manifest or strip access tokens.
+
+ :stream_name:
+ Name to use for this stream (directory)
+
+ :strip:
+ If True, remove signing tokens from block locators if present.
+ If False (default), block locators are left unchanged.
+
+ :normalize:
+ If True, always export the manifest text in normalized form
+ even if the Collection is not modified. If False (default) and the collection
+ is not modified, return the original manifest text even if it is not
+ in normalized form.
+
+ """
+
+ self._my_block_manager().commit_all()
+ return self._get_manifest_text(stream_name, strip, normalize)
+
+ @synchronized
+ def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+ """Get the manifest text for this collection, sub collections and files.
+
+ :stream_name:
+ Name to use for this stream (directory)
+
+ :strip:
+ If True, remove signing tokens from block locators if present.
+ If False (default), block locators are left unchanged.
+
+ :normalize:
+ If True, always export the manifest text in normalized form
+ even if the Collection is not modified. If False (default) and the collection
+ is not modified, return the original manifest text even if it is not
+ in normalized form.
+
+ :only_committed:
+ If True, only include blocks that were already committed to Keep.
+
+ """
+
+ if not self.committed() or self._manifest_text is None or normalize:
+ stream = {}
+ buf = []
+ sorted_keys = sorted(self.keys())
+ for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
+ # Create a stream per file `k`
+ arvfile = self[filename]
+ filestream = []
+ for segment in arvfile.segments():
+ loc = segment.locator
+ if arvfile.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
+ loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
+ if strip:
+ loc = KeepLocator(loc).stripped()
+ filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
+ segment.segment_offset, segment.range_size))
+ stream[filename] = filestream
+ if stream:
+ buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
+ for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
+ buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+ return "".join(buf)
+ else:
+ if strip:
+ return self.stripped_manifest()
+ else:
+ return self._manifest_text
+
+ @synchronized
+ def diff(self, end_collection, prefix=".", holding_collection=None):
+ """Generate list of add/modify/delete actions.
+
+ When given to `apply`, will change `self` to match `end_collection`
+
+ """
+ changes = []
+ if holding_collection is None:
+ holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
+ for k in self:
+ if k not in end_collection:
+ changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
+ for k in end_collection:
+ if k in self:
+ if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
+ changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
+ elif end_collection[k] != self[k]:
+ changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+ else:
+ changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+ else:
+ changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
+ return changes
+
+ @must_be_writable
+ @synchronized
+ def apply(self, changes):
+ """Apply changes from `diff`.
+
+ If a change conflicts with a local change, it will be saved to an
+ alternate path indicating the conflict.
+
+ """
+ if changes:
+ self._committed = False
+ for change in changes:
+ event_type = change[0]
+ path = change[1]
+ initial = change[2]
+ local = self.find(path)
+ conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
+ time.gmtime()))
+ if event_type == ADD:
+ if local is None:
+ # No local file at path, safe to copy over new file
+ self.copy(initial, path)
+ elif local is not None and local != initial:
+ # There is already local file and it is different:
+ # save change to conflict file.
+ self.copy(initial, conflictpath)
+ elif event_type == MOD or event_type == TOK:
+ final = change[3]
+ if local == initial:
+ # Local matches the "initial" item so it has not
+ # changed locally and is safe to update.
+ if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
+ # Replace contents of local file with new contents
+ local.replace_contents(final)
+ else:
+ # Overwrite path with new item; this can happen if
+ # path was a file and is now a collection or vice versa
+ self.copy(final, path, overwrite=True)
+ else:
+ # Local is missing (presumably deleted) or local doesn't
+ # match the "start" value, so save change to conflict file
+ self.copy(final, conflictpath)
+ elif event_type == DEL:
+ if local == initial:
+ # Local item matches "initial" value, so it is safe to remove.
+ self.remove(path, recursive=True)
+ # else, the file is modified or already removed, in either
+ # case we don't want to try to remove it.
+
+ def portable_data_hash(self):
+ """Get the portable data hash for this collection's manifest."""
+ stripped = self.portable_manifest_text()
+ return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+
+ @synchronized
+ def subscribe(self, callback):
+ if self._callback is None:
+ self._callback = callback
+ else:
+ raise errors.ArgumentError("A callback is already set on this collection.")
+
+ @synchronized
+ def unsubscribe(self):
+ if self._callback is not None:
+ self._callback = None
+
+ @synchronized
+ def notify(self, event, collection, name, item):
+ if self._callback:
+ self._callback(event, collection, name, item)
+ self.root_collection().notify(event, collection, name, item)
+
+ @synchronized
+ def __eq__(self, other):
+ if other is self:
+ return True
+ if not isinstance(other, RichCollectionBase):
+ return False
+ if len(self._items) != len(other):
+ return False
+ for k in self._items:
+ if k not in other:
+ return False
+ if self._items[k] != other[k]:
+ return False
+ return True
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ @synchronized
+ def flush(self):
+ """Flush bufferblocks to Keep."""
+ for e in self.values():
+ e.flush()
+
+
+class Collection(RichCollectionBase):
+ """Represents the root of an Arvados Collection.
+
+ This class is threadsafe. The root collection object, all subcollections
+ and files are protected by a single lock (i.e. each access locks the entire
+ collection).
+
+ Brief summary of
+ useful methods:
+
+ :To read an existing file:
+ `c.open("myfile", "r")`
+
+ :To write a new file:
+ `c.open("myfile", "w")`
+
+ :To determine if a file exists:
+ `c.find("myfile") is not None`
+
+ :To copy a file:
+ `c.copy("source", "dest")`
+
+ :To delete a file:
+ `c.remove("myfile")`
+
+ :To save to an existing collection record:
+ `c.save()`
+
+ :To save a new collection record:
+ `c.save_new()`
+
+ :To merge remote changes into this object:
+ `c.update()`
+
+ Must be associated with an API server Collection record (during
+ initialization, or using `save_new`) to use `save` or `update`
+
+ """
+
+ def __init__(self, manifest_locator_or_text=None,
+ api_client=None,
+ keep_client=None,
+ num_retries=None,
+ parent=None,
+ apiconfig=None,
+ block_manager=None,
+ replication_desired=None):
+ """Collection constructor.
+
+ :manifest_locator_or_text:
+ One of Arvados collection UUID, block locator of
+ a manifest, raw manifest text, or None (to create an empty collection).
+ :parent:
+ the parent Collection, may be None.
+
+ :apiconfig:
+ A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
+ Prefer this over supplying your own api_client and keep_client (except in testing).
+ Will use default config settings if not specified.
+
+ :api_client:
+ The API client object to use for requests. If not specified, create one using `apiconfig`.
+
+ :keep_client:
+ the Keep client to use for requests. If not specified, create one using `apiconfig`.
+
+ :num_retries:
+ the number of retries for API and Keep requests.
+
+ :block_manager:
+ the block manager to use. If not specified, create one.
+
+ :replication_desired:
+ How many copies should Arvados maintain. If None, API server default
+ configuration applies. If not None, this value will also be used
+ for determining the number of block copies being written.
+
+ """
+ super(Collection, self).__init__(parent)
+ self._api_client = api_client
+ self._keep_client = keep_client
+ self._block_manager = block_manager
+ self.replication_desired = replication_desired
+
+ if apiconfig:
+ self._config = apiconfig
+ else:
+ self._config = config.settings()
+
+ self.num_retries = num_retries if num_retries is not None else 0
+ self._manifest_locator = None
+ self._manifest_text = None
+ self._api_response = None
+ self._past_versions = set()
+
+ self.lock = threading.RLock()
+ self.events = None
+
+ if manifest_locator_or_text:
+ if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ elif re.match(util.manifest_pattern, manifest_locator_or_text):
+ self._manifest_text = manifest_locator_or_text
+ else:
+ raise errors.ArgumentError(
+ "Argument to CollectionReader is not a manifest or a collection UUID")
+
+ try:
+ self._populate()
+ except (IOError, errors.SyntaxError) as e:
+ raise errors.ArgumentError("Error processing manifest text: %s", e)
+
+ def root_collection(self):
+ return self
+
+ def stream_name(self):
+ return "."
+
+ def writable(self):
+ return True
+
+ @synchronized
+ def known_past_version(self, modified_at_and_portable_data_hash):
+ return modified_at_and_portable_data_hash in self._past_versions
+
+ @synchronized
+ @retry_method
+ def update(self, other=None, num_retries=None):
+ """Merge the latest collection on the API server with the current collection."""
+
+ if other is None:
+ if self._manifest_locator is None:
+ raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
+ response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+ if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
+ response.get("portable_data_hash") != self.portable_data_hash()):
+ # The record on the server is different from our current one, but we've seen it before,
+ # so ignore it because it's already been merged.
+ # However, if it's the same as our current record, proceed with the update, because we want to update
+ # our tokens.
+ return
+ else:
+ self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+ other = CollectionReader(response["manifest_text"])
+ baseline = CollectionReader(self._manifest_text)
+ self.apply(baseline.diff(other))
+ self._manifest_text = self.manifest_text()
+
+ @synchronized
+ def _my_api(self):
+ if self._api_client is None:
+ self._api_client = ThreadSafeApiCache(self._config)
+ if self._keep_client is None:
+ self._keep_client = self._api_client.keep
+ return self._api_client
+
+ @synchronized
+ def _my_keep(self):
+ if self._keep_client is None:
+ if self._api_client is None:
+ self._my_api()