X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/aff4a730ad890564ee05c2395c4ebb49458e3cdc..9ac57b0bc6cb5d90da57c943df489401c63b7a7f:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 6677ca6102..38e794c24a 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -644,14 +644,13 @@ class RichCollectionBase(CollectionBase): return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries) def modified(self): + """Determine if the collection has been modified since last commited.""" return not self.committed() - def set_unmodified(self): - self.set_committed() - @synchronized def committed(self): - """Test if the collection and all subcollection and files are committed.""" + """Determine if the collection has been committed to the API server.""" + if self._committed is False: return False for v in self._items.values(): @@ -661,7 +660,7 @@ class RichCollectionBase(CollectionBase): @synchronized def set_committed(self): - """Recursively set committed flag.""" + """Recursively set committed flag to True.""" self._committed = True for k,v in self._items.items(): v.set_committed() @@ -1007,7 +1006,7 @@ class RichCollectionBase(CollectionBase): path = change[1] initial = change[2] local = self.find(path) - conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S", + conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", time.gmtime())) if event_type == ADD: if local is None: @@ -1169,6 +1168,7 @@ class Collection(RichCollectionBase): self._manifest_locator = None self._manifest_text = None self._api_response = None + self._past_versions = set() self.lock = threading.RLock() self.events = None @@ -1198,6 +1198,10 @@ class Collection(RichCollectionBase): def writable(self): return True + @synchronized + def known_past_version(self, modified_at_and_portable_data_hash): + return modified_at_and_portable_data_hash in self._past_versions + @synchronized @retry_method def update(self, other=None, num_retries=None): @@ -1207,6 +1211,11 @@ class Collection(RichCollectionBase): if self._manifest_locator is None: raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) + if self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))): + # We've merged this record this before. Don't do anything. + return + else: + self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) other = CollectionReader(response["manifest_text"]) baseline = CollectionReader(self._manifest_text) self.apply(baseline.diff(other)) @@ -1234,6 +1243,10 @@ class Collection(RichCollectionBase): self._block_manager = _BlockManager(self._my_keep()) return self._block_manager + def _remember_api_response(self, response): + self._api_response = response + self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) + def _populate_from_api_server(self): # As in KeepClient itself, we must wait until the last # possible moment to instantiate an API client, in order to @@ -1243,9 +1256,9 @@ class Collection(RichCollectionBase): # clause, just like any other Collection lookup # failure. Return an exception, or None if successful. try: - self._api_response = self._my_api().collections().get( + self._remember_api_response(self._my_api().collections().get( uuid=self._manifest_locator).execute( - num_retries=self.num_retries) + num_retries=self.num_retries)) self._manifest_text = self._api_response['manifest_text'] return None except Exception as e: @@ -1307,6 +1320,9 @@ class Collection(RichCollectionBase): if exc_type is None: if self.writable() and self._has_collection_uuid(): self.save() + self.stop_threads() + + def stop_threads(self): if self._block_manager is not None: self._block_manager.stop_threads() @@ -1399,11 +1415,11 @@ class Collection(RichCollectionBase): self.update() text = self.manifest_text(strip=False) - self._api_response = self._my_api().collections().update( + self._remember_api_response(self._my_api().collections().update( uuid=self._manifest_locator, body={'manifest_text': text} ).execute( - num_retries=num_retries) + num_retries=num_retries)) self._manifest_text = self._api_response["manifest_text"] self.set_committed() @@ -1450,7 +1466,7 @@ class Collection(RichCollectionBase): if create_collection_record: if name is None: - name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime())) + name = "New collection" ensure_unique_name = True body = {"manifest_text": text, @@ -1458,7 +1474,7 @@ class Collection(RichCollectionBase): if owner_uuid: body["owner_uuid"] = owner_uuid - self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries) + self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) text = self._api_response["manifest_text"] self._manifest_locator = self._api_response["uuid"] @@ -1541,7 +1557,7 @@ class Subcollection(RichCollectionBase): """This is a subdirectory within a collection that doesn't have its own API server record. - It falls under the umbrella of the root collection. + Subcollection locking falls under the umbrella lock of its root collection. """