+ raise errors.ArgumentError(
+ "Argument to CollectionReader is not a manifest or a collection UUID")
+
+ try:
+ self._populate()
+ except (IOError, errors.SyntaxError) as e:
+ raise errors.ArgumentError("Error processing manifest text: %s", e)
+
+ def root_collection(self):
+ return self
+
+ def stream_name(self):
+ return "."
+
+ def writable(self):
+ return True
+
+ @synchronized
+ def known_past_version(self, modified_at_and_portable_data_hash):
+ return modified_at_and_portable_data_hash in self._past_versions
+
+ @synchronized
+ @retry_method
+ def update(self, other=None, num_retries=None):
+ """Merge the latest collection on the API server with the current collection."""
+
+ if other is None:
+ if self._manifest_locator is None:
+ raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
+ response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+ if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
+ response.get("portable_data_hash") != self.portable_data_hash()):
+ # The record on the server is different from our current one, but we've seen it before,
+ # so ignore it because it's already been merged.
+ # However, if it's the same as our current record, proceed with the update, because we want to update
+ # our tokens.
+ return
+ else:
+ self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+ other = CollectionReader(response["manifest_text"])
+ baseline = CollectionReader(self._manifest_text)
+ self.apply(baseline.diff(other))
+ self._manifest_text = self.manifest_text()
+
+ @synchronized
+ def _my_api(self):
+ if self._api_client is None:
+ self._api_client = ThreadSafeApiCache(self._config)
+ if self._keep_client is None:
+ self._keep_client = self._api_client.keep
+ return self._api_client
+
+ @synchronized
+ def _my_keep(self):
+ if self._keep_client is None:
+ if self._api_client is None:
+ self._my_api()
+ else:
+ self._keep_client = KeepClient(api_client=self._api_client)
+ return self._keep_client
+
+ @synchronized
+ def _my_block_manager(self):
+ if self._block_manager is None:
+ copies = (self.replication_desired or
+ self._my_api()._rootDesc.get('defaultCollectionReplication',
+ 2))
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies)
+ return self._block_manager
+
+ def _remember_api_response(self, response):
+ self._api_response = response
+ self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+
+ def _populate_from_api_server(self):
+ # As in KeepClient itself, we must wait until the last
+ # possible moment to instantiate an API client, in order to
+ # avoid tripping up clients that don't have access to an API
+ # server. If we do build one, make sure our Keep client uses
+ # it. If instantiation fails, we'll fall back to the except
+ # clause, just like any other Collection lookup
+ # failure. Return an exception, or None if successful.
+ try:
+ self._remember_api_response(self._my_api().collections().get(
+ uuid=self._manifest_locator).execute(
+ num_retries=self.num_retries))
+ self._manifest_text = self._api_response['manifest_text']
+ # If not overriden via kwargs, we should try to load the
+ # replication_desired from the API server
+ if self.replication_desired is None:
+ self.replication_desired = self._api_response.get('replication_desired', None)
+ return None
+ except Exception as e:
+ return e
+
+ def _populate_from_keep(self):
+ # Retrieve a manifest directly from Keep. This has a chance of
+ # working if [a] the locator includes a permission signature
+ # or [b] the Keep services are operating in world-readable
+ # mode. Return an exception, or None if successful.
+ try:
+ self._manifest_text = self._my_keep().get(
+ self._manifest_locator, num_retries=self.num_retries)
+ except Exception as e:
+ return e
+
+ def _populate(self):
+ if self._manifest_locator is None and self._manifest_text is None:
+ return
+ error_via_api = None
+ error_via_keep = None
+ should_try_keep = ((self._manifest_text is None) and
+ util.keep_locator_pattern.match(
+ self._manifest_locator))
+ if ((self._manifest_text is None) and
+ util.signed_locator_pattern.match(self._manifest_locator)):
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ error_via_api = self._populate_from_api_server()
+ if error_via_api is not None and not should_try_keep:
+ raise error_via_api
+ if ((self._manifest_text is None) and
+ not error_via_keep and
+ should_try_keep):
+ # Looks like a keep locator, and we didn't already try keep above
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ # Nothing worked!
+ raise errors.NotFoundError(
+ ("Failed to retrieve collection '{}' " +
+ "from either API server ({}) or Keep ({})."
+ ).format(
+ self._manifest_locator,
+ error_via_api,
+ error_via_keep))
+ # populate
+ self._baseline_manifest = self._manifest_text
+ self._import_manifest(self._manifest_text)
+
+
+ def _has_collection_uuid(self):
+ return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """Support scoped auto-commit in a with: block."""
+ if exc_type is None:
+ if self.writable() and self._has_collection_uuid():
+ self.save()
+ self.stop_threads()
+
+ def stop_threads(self):
+ if self._block_manager is not None:
+ self._block_manager.stop_threads()
+
+ @synchronized
+ def manifest_locator(self):
+ """Get the manifest locator, if any.
+
+ The manifest locator will be set when the collection is loaded from an
+ API server record or the portable data hash of a manifest.
+
+ The manifest locator will be None if the collection is newly created or
+ was created directly from manifest text. The method `save_new()` will
+ assign a manifest locator.
+
+ """
+ return self._manifest_locator
+
+ @synchronized
+ def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
+ if new_config is None:
+ new_config = self._config
+ if readonly:
+ newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
+ else:
+ newcollection = Collection(parent=new_parent, apiconfig=new_config)
+
+ newcollection._clonefrom(self)
+ return newcollection
+
+ @synchronized
+ def api_response(self):
+ """Returns information about this Collection fetched from the API server.
+
+ If the Collection exists in Keep but not the API server, currently
+ returns None. Future versions may provide a synthetic response.
+
+ """
+ return self._api_response
+
+ def find_or_create(self, path, create_type):
+ """See `RichCollectionBase.find_or_create`"""
+ if path == ".":
+ return self
+ else:
+ return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
+
+ def find(self, path):
+ """See `RichCollectionBase.find`"""
+ if path == ".":
+ return self
+ else:
+ return super(Collection, self).find(path[2:] if path.startswith("./") else path)
+
+ def remove(self, path, recursive=False):
+ """See `RichCollectionBase.remove`"""
+ if path == ".":
+ raise errors.ArgumentError("Cannot remove '.'")
+ else:
+ return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
+
+ @must_be_writable
+ @synchronized
+ @retry_method
+ def save(self, merge=True, num_retries=None):
+ """Save collection to an existing collection record.
+
+ Commit pending buffer blocks to Keep, merge with remote record (if
+ merge=True, the default), and update the collection record. Returns
+ the current manifest text.
+
+ Will raise AssertionError if not associated with a collection record on
+ the API server. If you want to save a manifest to Keep only, see
+ `save_new()`.
+
+ :merge:
+ Update and merge remote changes before saving. Otherwise, any
+ remote changes will be ignored and overwritten.
+
+ :num_retries:
+ Retry count on API calls (if None, use the collection default)
+
+ """
+ if not self.committed():
+ if not self._has_collection_uuid():
+ raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
+
+ self._my_block_manager().commit_all()
+
+ if merge:
+ self.update()
+
+ text = self.manifest_text(strip=False)
+ self._remember_api_response(self._my_api().collections().update(
+ uuid=self._manifest_locator,
+ body={'manifest_text': text}
+ ).execute(
+ num_retries=num_retries))
+ self._manifest_text = self._api_response["manifest_text"]
+ self.set_committed()
+
+ return self._manifest_text
+
+
+ @must_be_writable
+ @synchronized
+ @retry_method
+ def save_new(self, name=None,
+ create_collection_record=True,
+ owner_uuid=None,
+ ensure_unique_name=False,
+ num_retries=None):
+ """Save collection to a new collection record.
+
+ Commit pending buffer blocks to Keep and, when create_collection_record
+ is True (default), create a new collection record. After creating a
+ new collection record, this Collection object will be associated with
+ the new record used by `save()`. Returns the current manifest text.
+
+ :name:
+ The collection name.
+
+ :create_collection_record:
+ If True, create a collection record on the API server.
+ If False, only commit blocks to Keep and return the manifest text.
+
+ :owner_uuid:
+ the user, or project uuid that will own this collection.
+ If None, defaults to the current user.
+
+ :ensure_unique_name:
+ If True, ask the API server to rename the collection
+ if it conflicts with a collection with the same name and owner. If
+ False, a name conflict will result in an error.
+
+ :num_retries:
+ Retry count on API calls (if None, use the collection default)
+
+ """
+ self._my_block_manager().commit_all()
+ text = self.manifest_text(strip=False)
+
+ if create_collection_record:
+ if name is None:
+ name = "New collection"
+ ensure_unique_name = True
+
+ body = {"manifest_text": text,
+ "name": name,
+ "replication_desired": self.replication_desired}
+ if owner_uuid:
+ body["owner_uuid"] = owner_uuid
+
+ self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
+ text = self._api_response["manifest_text"]
+
+ self._manifest_locator = self._api_response["uuid"]
+
+ self._manifest_text = text
+ self.set_committed()
+
+ return text
+
+ @synchronized
+ def _import_manifest(self, manifest_text):
+ """Import a manifest into a `Collection`.
+
+ :manifest_text:
+ The manifest text to import from.
+
+ """
+ if len(self) > 0:
+ raise ArgumentError("Can only import manifest into an empty collection")
+
+ STREAM_NAME = 0
+ BLOCKS = 1
+ SEGMENTS = 2
+
+ stream_name = None
+ state = STREAM_NAME
+
+ for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+ tok = token_and_separator.group(1)
+ sep = token_and_separator.group(2)
+
+ if state == STREAM_NAME:
+ # starting a new stream
+ stream_name = tok.replace('\\040', ' ')
+ blocks = []
+ segments = []
+ streamoffset = 0L
+ state = BLOCKS
+ self.find_or_create(stream_name, COLLECTION)
+ continue
+
+ if state == BLOCKS:
+ block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+ if block_locator:
+ blocksize = long(block_locator.group(1))
+ blocks.append(Range(tok, streamoffset, blocksize, 0))
+ streamoffset += blocksize
+ else:
+ state = SEGMENTS
+
+ if state == SEGMENTS:
+ file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ if file_segment:
+ pos = long(file_segment.group(1))
+ size = long(file_segment.group(2))
+ name = file_segment.group(3).replace('\\040', ' ')
+ filepath = os.path.join(stream_name, name)
+ afile = self.find_or_create(filepath, FILE)
+ if isinstance(afile, ArvadosFile):
+ afile.add_segment(blocks, pos, size)
+ else:
+ raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+ else:
+ # error!
+ raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
+
+ if sep == "\n":
+ stream_name = None
+ state = STREAM_NAME
+
+ self.set_committed()
+
+ @synchronized
+ def notify(self, event, collection, name, item):
+ if self._callback:
+ self._callback(event, collection, name, item)
+
+
+class Subcollection(RichCollectionBase):
+ """This is a subdirectory within a collection that doesn't have its own API
+ server record.
+
+ Subcollection locking falls under the umbrella lock of its root collection.
+
+ """
+
+ def __init__(self, parent, name):
+ super(Subcollection, self).__init__(parent)
+ self.lock = self.root_collection().lock
+ self._manifest_text = None
+ self.name = name
+ self.num_retries = parent.num_retries
+
+ def root_collection(self):
+ return self.parent.root_collection()
+
+ def writable(self):
+ return self.root_collection().writable()
+
+ def _my_api(self):
+ return self.root_collection()._my_api()
+
+ def _my_keep(self):
+ return self.root_collection()._my_keep()
+
+ def _my_block_manager(self):
+ return self.root_collection()._my_block_manager()
+
+ def stream_name(self):
+ return os.path.join(self.parent.stream_name(), self.name)
+
+ @synchronized
+ def clone(self, new_parent, new_name):
+ c = Subcollection(new_parent, new_name)
+ c._clonefrom(self)
+ return c
+
+ @must_be_writable
+ @synchronized
+ def _reparent(self, newparent, newname):
+ self._committed = False
+ self.flush()
+ self.parent.remove(self.name, recursive=True)
+ self.parent = newparent
+ self.name = newname
+ self.lock = self.parent.root_collection().lock
+
+
+class CollectionReader(Collection):
+ """A read-only collection object.
+
+ Initialize from an api collection record locator, a portable data hash of a
+ manifest, or raw manifest text. See `Collection` constructor for detailed
+ options.
+
+ """
+ def __init__(self, manifest_locator_or_text, *args, **kwargs):
+ self._in_init = True
+ super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
+ self._in_init = False
+
+ # Forego any locking since it should never change once initialized.
+ self.lock = NoopLock()
+
+ # Backwards compatability with old CollectionReader
+ # all_streams() and all_files()
+ self._streams = None
+
+ def writable(self):
+ return self._in_init
+
+ def _populate_streams(orig_func):
+ @functools.wraps(orig_func)
+ def populate_streams_wrapper(self, *args, **kwargs):
+ # Defer populating self._streams until needed since it creates a copy of the manifest.
+ if self._streams is None:
+ if self._manifest_text:
+ self._streams = [sline.split()
+ for sline in self._manifest_text.split("\n")
+ if sline]
+ else:
+ self._streams = []
+ return orig_func(self, *args, **kwargs)
+ return populate_streams_wrapper
+
+ @_populate_streams
+ def normalize(self):
+ """Normalize the streams returned by `all_streams`.
+
+ This method is kept for backwards compatability and only affects the
+ behavior of `all_streams()` and `all_files()`
+
+ """
+
+ # Rearrange streams
+ streams = {}
+ for s in self.all_streams():
+ for f in s.all_files():
+ streamname, filename = split(s.name() + "/" + f.name())
+ if streamname not in streams:
+ streams[streamname] = {}
+ if filename not in streams[streamname]:
+ streams[streamname][filename] = []
+ for r in f.segments:
+ streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
+
+ self._streams = [normalize_stream(s, streams[s])
+ for s in sorted(streams)]
+ @_populate_streams
+ def all_streams(self):
+ return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
+ for s in self._streams]
+
+ @_populate_streams
+ def all_files(self):
+ for s in self.all_streams():
+ for f in s.all_files():
+ yield f