def set_current_stream_name(self, newstreamname):
if re.search(r'[\t\n]', newstreamname):
raise errors.AssertionError(
- "Manifest stream names cannot contain whitespace")
+ "Manifest stream names cannot contain whitespace: '%s'" %
+ (newstreamname))
self._current_stream_name = '.' if newstreamname=='' else newstreamname
def current_stream_name(self):
else:
item = ArvadosFile(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
return item
else:
# create new collection
item = Subcollection(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
if isinstance(item, RichCollectionBase):
return item.find_or_create(pathcomponents[1], create_type)
else:
- raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
else:
return self
def find(self, path):
"""Recursively search the specified file path.
- May return either a Collection or ArvadosFile. Return None if not
+ May return either a Collection or ArvadosFile. Return None if not
found.
+ If path is invalid (ex: starts with '/'), an IOError exception will be
+ raised.
"""
if not path:
raise errors.ArgumentError("Parameter 'path' is empty.")
pathcomponents = path.split("/", 1)
+ if pathcomponents[0] == '':
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
item = self._items.get(pathcomponents[0])
- if len(pathcomponents) == 1:
+ if item is None:
+ return None
+ elif len(pathcomponents) == 1:
return item
else:
if isinstance(item, RichCollectionBase):
else:
return item
else:
- raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
@synchronized
def mkdirs(self, path):
"""
if self.find(path) != None:
- raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
+ raise IOError(errno.EEXIST, "Directory or file exists", path)
return self.find_or_create(path, COLLECTION)
arvfile = self.find(path)
if arvfile is None:
- raise IOError(errno.ENOENT, "File not found")
+ raise IOError(errno.ENOENT, "File not found", path)
if not isinstance(arvfile, ArvadosFile):
- raise IOError(errno.EISDIR, "Is a directory: %s" % path)
+ raise IOError(errno.EISDIR, "Is a directory", path)
if mode[0] == "w":
arvfile.truncate(0)
@synchronized
def committed(self):
"""Determine if the collection has been committed to the API server."""
-
- if self._committed is False:
- return False
- for v in self._items.values():
- if v.committed() is False:
- return False
- return True
+ return self._committed
@synchronized
- def set_committed(self):
- """Recursively set committed flag to True."""
- self._committed = True
- for k,v in self._items.items():
- v.set_committed()
+ def set_committed(self, value=True):
+ """Recursively set committed flag.
+
+ If value is True, set committed to be True for this and all children.
+
+ If value is False, set committed to be False for this and all parents.
+ """
+ if value == self._committed:
+ return
+ if value:
+ for k,v in self._items.items():
+ v.set_committed(True)
+ self._committed = True
+ else:
+ self._committed = False
+ if self.parent is not None:
+ self.parent.set_committed(False)
@synchronized
def __iter__(self):
def __delitem__(self, p):
"""Delete an item by name which is directly contained by this collection."""
del self._items[p]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, p, None)
@synchronized
pathcomponents = path.split("/", 1)
item = self._items.get(pathcomponents[0])
if item is None:
- raise IOError(errno.ENOENT, "File not found")
+ raise IOError(errno.ENOENT, "File not found", path)
if len(pathcomponents) == 1:
if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
- raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
+ raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
deleteditem = self._items[pathcomponents[0]]
del self._items[pathcomponents[0]]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
item.remove(pathcomponents[1])
"""
if target_name in self and not overwrite:
- raise IOError(errno.EEXIST, "File already exists")
+ raise IOError(errno.EEXIST, "File already exists", target_name)
modified_from = None
if target_name in self:
item = source_obj.clone(self, target_name)
self._items[target_name] = item
- self._committed = False
+ self.set_committed(False)
if modified_from:
self.notify(MOD, self, target_name, (modified_from, item))
if isinstance(source, basestring):
source_obj = source_collection.find(source)
if source_obj is None:
- raise IOError(errno.ENOENT, "File not found")
+ raise IOError(errno.ENOENT, "File not found", source)
sourcecomponents = source.split("/")
else:
source_obj = source
target_dir = self
if target_dir is None:
- raise IOError(errno.ENOENT, "Target directory not found.")
+ raise IOError(errno.ENOENT, "Target directory not found", target_name)
- if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+ if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
target_dir = target_dir[target_name]
target_name = sourcecomponents[-1]
source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
if not source_obj.writable():
- raise IOError(errno.EROFS, "Source collection is read only.")
+ raise IOError(errno.EROFS, "Source collection is read only", source)
target_dir.add(source_obj, target_name, overwrite, True)
def portable_manifest_text(self, stream_name="."):
return self._get_manifest_text(stream_name, True, True)
@synchronized
- def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ def manifest_text(self, stream_name=".", strip=False, normalize=False,
+ only_committed=False):
"""Get the manifest text for this collection, sub collections and files.
This method will flush outstanding blocks to Keep. By default, it will
is not modified, return the original manifest text even if it is not
in normalized form.
+ :only_committed:
+ If True, don't commit pending blocks.
+
"""
- self._my_block_manager().commit_all()
- return self._get_manifest_text(stream_name, strip, normalize)
+ if not only_committed:
+ self._my_block_manager().commit_all()
+ return self._get_manifest_text(stream_name, strip, normalize,
+ only_committed=only_committed)
@synchronized
def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
- buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+ buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
return "".join(buf)
else:
if strip:
"""
if changes:
- self._committed = False
+ self.set_committed(False)
for change in changes:
event_type = change[0]
path = change[1]
def portable_data_hash(self):
"""Get the portable data hash for this collection's manifest."""
- stripped = self.portable_manifest_text()
- return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ if self._manifest_locator and self.committed():
+ # If the collection is already saved on the API server, and it's committed
+ # then return API server's PDH response.
+ return self._portable_data_hash
+ else:
+ stripped = self.portable_manifest_text()
+ return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
@synchronized
def subscribe(self, callback):
parent=None,
apiconfig=None,
block_manager=None,
- replication_desired=None):
+ replication_desired=None,
+ put_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
self._keep_client = keep_client
self._block_manager = block_manager
self.replication_desired = replication_desired
+ self.put_threads = put_threads
if apiconfig:
self._config = apiconfig
self.num_retries = num_retries if num_retries is not None else 0
self._manifest_locator = None
self._manifest_text = None
+ self._portable_data_hash = None
self._api_response = None
self._past_versions = set()
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies)
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
return self._block_manager
def _remember_api_response(self, response):
uuid=self._manifest_locator).execute(
num_retries=self.num_retries))
self._manifest_text = self._api_response['manifest_text']
+ self._portable_data_hash = self._api_response['portable_data_hash']
# If not overriden via kwargs, we should try to load the
# replication_desired from the API server
if self.replication_desired is None:
).execute(
num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
- self.set_committed()
+ self._portable_data_hash = self._api_response["portable_data_hash"]
+ self.set_committed(True)
return self._manifest_text
text = self._api_response["manifest_text"]
self._manifest_locator = self._api_response["uuid"]
+ self._portable_data_hash = self._api_response["portable_data_hash"]
self._manifest_text = text
- self.set_committed()
+ self.set_committed(True)
return text
stream_name = None
state = STREAM_NAME
- self.set_committed()
+ self.set_committed(True)
@synchronized
def notify(self, event, collection, name, item):
@must_be_writable
@synchronized
def _reparent(self, newparent, newname):
- self._committed = False
+ self.set_committed(False)
self.flush()
self.parent.remove(self.name, recursive=True)
self.parent = newparent