10001: Update cwltool dependency to get parallel scatter fixes.
[arvados.git] / sdk / python / arvados / collection.py
index 8450bd1ca086687928c43e09b2d497ae598c5128..56d8b239331a8f65e8b5781376719244c03b2905 100644 (file)
@@ -922,7 +922,7 @@ class RichCollectionBase(CollectionBase):
         return self._get_manifest_text(stream_name, strip, normalize)
 
     @synchronized
         return self._get_manifest_text(stream_name, strip, normalize)
 
     @synchronized
-    def _get_manifest_text(self, stream_name, strip, normalize):
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
@@ -938,6 +938,9 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
         """
 
         if not self.committed() or self._manifest_text is None or normalize:
         """
 
         if not self.committed() or self._manifest_text is None or normalize:
@@ -951,6 +954,8 @@ class RichCollectionBase(CollectionBase):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
@@ -1135,7 +1140,8 @@ class Collection(RichCollectionBase):
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
-                 block_manager=None):
+                 block_manager=None,
+                 replication_desired=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1143,24 +1149,35 @@ class Collection(RichCollectionBase):
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
 
         if apiconfig:
             self._config = apiconfig
 
         if apiconfig:
             self._config = apiconfig
@@ -1232,7 +1249,8 @@ class Collection(RichCollectionBase):
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
         return self._api_client
 
     @synchronized
@@ -1247,7 +1265,10 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies)
         return self._block_manager
 
     def _remember_api_response(self, response):
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1267,6 +1288,10 @@ class Collection(RichCollectionBase):
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
             return None
         except Exception as e:
             return e
@@ -1477,7 +1502,8 @@ class Collection(RichCollectionBase):
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
-                    "name": name}
+                    "name": name,
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid