9766: Full workflow: create workflow using command line, select run from
[arvados.git] / sdk / python / arvados / collection.py
index 38e794c24a217ffa5c76c1a7b026e4d432d369a0..56d8b239331a8f65e8b5781376719244c03b2905 100644 (file)
@@ -474,6 +474,7 @@ class ResumableCollectionWriter(CollectionWriter):
 ADD = "add"
 DEL = "del"
 MOD = "mod"
+TOK = "tok"
 FILE = "file"
 COLLECTION = "collection"
 
@@ -921,7 +922,7 @@ class RichCollectionBase(CollectionBase):
         return self._get_manifest_text(stream_name, strip, normalize)
 
     @synchronized
-    def _get_manifest_text(self, stream_name, strip, normalize):
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
@@ -937,6 +938,9 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
         """
 
         if not self.committed() or self._manifest_text is None or normalize:
@@ -950,6 +954,8 @@ class RichCollectionBase(CollectionBase):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
@@ -986,6 +992,8 @@ class RichCollectionBase(CollectionBase):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
+                else:
+                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
             else:
                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
@@ -1016,7 +1024,7 @@ class RichCollectionBase(CollectionBase):
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
-            elif event_type == MOD:
+            elif event_type == MOD or event_type == TOK:
                 final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
@@ -1132,7 +1140,8 @@ class Collection(RichCollectionBase):
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
-                 block_manager=None):
+                 block_manager=None,
+                 replication_desired=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1140,24 +1149,35 @@ class Collection(RichCollectionBase):
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
 
         if apiconfig:
             self._config = apiconfig
@@ -1211,8 +1231,12 @@ class Collection(RichCollectionBase):
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
-            if self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))):
-                # We've merged this record this before.  Don't do anything.
+            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
+                response.get("portable_data_hash") != self.portable_data_hash()):
+                # The record on the server is different from our current one, but we've seen it before,
+                # so ignore it because it's already been merged.
+                # However, if it's the same as our current record, proceed with the update, because we want to update
+                # our tokens.
                 return
             else:
                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
@@ -1225,7 +1249,8 @@ class Collection(RichCollectionBase):
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
@@ -1240,7 +1265,10 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies)
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1260,6 +1288,10 @@ class Collection(RichCollectionBase):
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
@@ -1470,7 +1502,8 @@ class Collection(RichCollectionBase):
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
-                    "name": name}
+                    "name": name,
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid