"resumable writer can't accept unsourced data")
return super(ResumableCollectionWriter, self).write(data)
+
ADD = "add"
DEL = "del"
MOD = "mod"
FILE = "file"
COLLECTION = "collection"
-class SynchronizedCollectionBase(CollectionBase):
+class RichCollectionBase(CollectionBase):
"""Base class for Collections and Subcollections.
Implements the majority of functionality relating to accessing items in the
self._items[pathcomponents[0]] = item
self._modified = True
self.notify(ADD, self, pathcomponents[0], item)
- if isinstance(item, SynchronizedCollectionBase):
+ if isinstance(item, RichCollectionBase):
return item.find_or_create(pathcomponents[1], create_type)
else:
raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
if len(pathcomponents) == 1:
return item
else:
- if isinstance(item, SynchronizedCollectionBase):
+ if isinstance(item, RichCollectionBase):
if pathcomponents[1]:
return item.find(pathcomponents[1])
else:
def exists(self, path):
"""Test if there is a file or collection at `path`."""
- return self.find(path) != None
+ return self.find(path) is not None
@must_be_writable
@synchronized
if item is None:
raise IOError((errno.ENOENT, "File not found"))
if len(pathcomponents) == 1:
- if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
+ if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
deleteditem = self._items[pathcomponents[0]]
del self._items[pathcomponents[0]]
else:
self.notify(ADD, self, target_name, dup)
-
@must_be_writable
@synchronized
def copy(self, source, target_path, source_collection=None, overwrite=False):
target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
- if target_name in target_dir and isinstance(self[target_name], SynchronizedCollectionBase) and sourcecomponents:
+ if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
target_dir = target_dir[target_name]
target_name = sourcecomponents[-1]
"""
if self.modified() or self._manifest_text is None or normalize:
- item = self
stream = {}
buf = []
- sorted_keys = sorted(item.keys())
- for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
+ sorted_keys = sorted(self.keys())
+ for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
# Create a stream per file `k`
- arvfile = item[filename]
+ arvfile = self[filename]
filestream = []
for segment in arvfile.segments():
loc = segment.locator
stream[filename] = filestream
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
- for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
- buf.append(item[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
+ for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
+ buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
return "".join(buf)
else:
if strip:
def __eq__(self, other):
if other is self:
return True
- if not isinstance(other, SynchronizedCollectionBase):
+ if not isinstance(other, RichCollectionBase):
return False
if len(self._items) != len(other):
return False
return not self.__eq__(other)
-class Collection(SynchronizedCollectionBase):
+class Collection(RichCollectionBase):
"""Represents the root of an Arvados Collection.
This class is threadsafe. The root collection object, all subcollections
if self._block_manager is not None:
self._block_manager.stop_threads()
+ @synchronized
+ def manifest_locator(self):
+ """Get the manifest locator, if any.
+
+ The manifest locator will be set when the collection is loaded from an
+ API server record or the portable data hash of a manifest.
+
+ The manifest locator will be None if the collection is newly created or
+ was created directly from manifest text. The method `save_new()` will
+ assign a manifest locator.
+
+ """
+ return self._manifest_locator
+
@synchronized
def clone(self, new_parent=None, readonly=False, new_config=None):
if new_config is None:
return self._api_response
def find_or_create(self, path, create_type):
- """See `SynchronizedCollectionBase.find_or_create`"""
+ """See `RichCollectionBase.find_or_create`"""
if path == ".":
return self
else:
return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
def find(self, path):
- """See `SynchronizedCollectionBase.find`"""
+ """See `RichCollectionBase.find`"""
if path == ".":
return self
else:
return super(Collection, self).find(path[2:] if path.startswith("./") else path)
def remove(self, path, recursive=False):
- """See `SynchronizedCollectionBase.remove`"""
+ """See `RichCollectionBase.remove`"""
if path == ".":
raise errors.ArgumentError("Cannot remove '.'")
else:
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
- update=True), write the manifest to Keep, and update the collection
- record.
+ merge=True, the default), write the manifest to Keep, and update the
+ collection record.
Will raise AssertionError if not associated with a collection record on
the API server. If you want to save a manifest to Keep only, see
self.set_unmodified()
-class Subcollection(SynchronizedCollectionBase):
+class Subcollection(RichCollectionBase):
"""This is a subdirectory within a collection that doesn't have its own API
server record.