else:
item = ArvadosFile(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
return item
else:
# create new collection
item = Subcollection(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
- self._committed = False
+ self.set_committed(False)
self.notify(ADD, self, pathcomponents[0], item)
if isinstance(item, RichCollectionBase):
return item.find_or_create(pathcomponents[1], create_type)
@synchronized
def committed(self):
"""Determine if the collection has been committed to the API server."""
-
- if self._committed is False:
- return False
- for v in self._items.values():
- if v.committed() is False:
- return False
- return True
+ return self._committed
@synchronized
- def set_committed(self):
- """Recursively set committed flag to True."""
- self._committed = True
- for k,v in self._items.items():
- v.set_committed()
+ def set_committed(self, value=True):
+ """Recursively set committed flag.
+
+ If value is True, set committed to be True for this and all children.
+
+ If value is False, set committed to be False for this and all parents.
+ """
+ if value == self._committed:
+ return
+ if value:
+ for k,v in self._items.items():
+ v.set_committed(True)
+ self._committed = True
+ else:
+ self._committed = False
+ if self.parent is not None:
+ self.parent.set_committed(False)
@synchronized
def __iter__(self):
def __delitem__(self, p):
"""Delete an item by name which is directly contained by this collection."""
del self._items[p]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, p, None)
@synchronized
raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
deleteditem = self._items[pathcomponents[0]]
del self._items[pathcomponents[0]]
- self._committed = False
+ self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
item.remove(pathcomponents[1])
item = source_obj.clone(self, target_name)
self._items[target_name] = item
- self._committed = False
+ self.set_committed(False)
if modified_from:
self.notify(MOD, self, target_name, (modified_from, item))
return self._get_manifest_text(stream_name, True, True)
@synchronized
- def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ def manifest_text(self, stream_name=".", strip=False, normalize=False,
+ only_committed=False):
"""Get the manifest text for this collection, sub collections and files.
This method will flush outstanding blocks to Keep. By default, it will
is not modified, return the original manifest text even if it is not
in normalized form.
+ :only_committed:
+ If True, don't commit pending blocks.
+
"""
- self._my_block_manager().commit_all()
- return self._get_manifest_text(stream_name, strip, normalize)
+ if not only_committed:
+ self._my_block_manager().commit_all()
+ return self._get_manifest_text(stream_name, strip, normalize,
+ only_committed=only_committed)
@synchronized
def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
if stream:
buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
- buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
+ buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
return "".join(buf)
else:
if strip:
"""
if changes:
- self._committed = False
+ self.set_committed(False)
for change in changes:
event_type = change[0]
path = change[1]
def portable_data_hash(self):
"""Get the portable data hash for this collection's manifest."""
- stripped = self.portable_manifest_text()
- return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ if self._manifest_locator and self.committed():
+ # If the collection is already saved on the API server, and it's committed
+ # then return API server's PDH response.
+ return self._portable_data_hash
+ else:
+ stripped = self.portable_manifest_text()
+ return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
@synchronized
def subscribe(self, callback):
parent=None,
apiconfig=None,
block_manager=None,
- replication_desired=None):
+ replication_desired=None,
+ put_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
self._keep_client = keep_client
self._block_manager = block_manager
self.replication_desired = replication_desired
+ self.put_threads = put_threads
if apiconfig:
self._config = apiconfig
self.num_retries = num_retries if num_retries is not None else 0
self._manifest_locator = None
self._manifest_text = None
+ self._portable_data_hash = None
self._api_response = None
self._past_versions = set()
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies)
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
return self._block_manager
def _remember_api_response(self, response):
uuid=self._manifest_locator).execute(
num_retries=self.num_retries))
self._manifest_text = self._api_response['manifest_text']
+ self._portable_data_hash = self._api_response['portable_data_hash']
# If not overriden via kwargs, we should try to load the
# replication_desired from the API server
if self.replication_desired is None:
).execute(
num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
- self.set_committed()
+ self._portable_data_hash = self._api_response["portable_data_hash"]
+ self.set_committed(True)
return self._manifest_text
text = self._api_response["manifest_text"]
self._manifest_locator = self._api_response["uuid"]
+ self._portable_data_hash = self._api_response["portable_data_hash"]
self._manifest_text = text
- self.set_committed()
+ self.set_committed(True)
return text
stream_name = None
state = STREAM_NAME
- self.set_committed()
+ self.set_committed(True)
@synchronized
def notify(self, event, collection, name, item):
@must_be_writable
@synchronized
def _reparent(self, newparent, newname):
- self._committed = False
+ self.set_committed(False)
self.flush()
self.parent.remove(self.name, recursive=True)
self.parent = newparent