Merge branch 'master' into 14946-ruby-2.5
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
index c5fd7e3cf40890f56f3e21fb3abb397dea533aa5..190ea35e2a3c2d21b6739db2b7d61e748b80fd7d 100644 (file)
@@ -6,7 +6,7 @@ from __future__ import division
 from builtins import next
 from builtins import object
 from builtins import str
-from future.utils import viewvalues
+from future.utils import viewvalues, viewitems
 
 import argparse
 import logging
@@ -131,6 +131,8 @@ class ArvCwlExecutor(object):
         self.poll_interval = 12
         self.loadingContext = None
         self.should_estimate_cache_size = True
+        self.fs_access = None
+        self.secret_store = None
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -152,7 +154,7 @@ class ArvCwlExecutor(object):
                                            num_retries=self.num_retries)
 
         self.work_api = None
-        expected_api = ["jobs", "containers"]
+        expected_api = ["containers", "jobs"]
         for api in expected_api:
             try:
                 methods = self.api._rootDesc.get('resources')[api]['methods']
@@ -365,7 +367,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
                 while keys:
                     page = keys[:pageSize]
-                    keys = keys[pageSize:]
                     try:
                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
                     except Exception:
@@ -381,6 +382,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                 "new_attributes": p
                             }
                         })
+                    keys = keys[pageSize:]
+
                 finish_poll = time.time()
                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
@@ -405,7 +408,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
             except (KeyboardInterrupt, SystemExit):
                 break
 
-    def check_features(self, obj):
+    def check_features(self, obj, parentfield=""):
         if isinstance(obj, dict):
             if obj.get("writable") and self.work_api != "containers":
                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
@@ -419,12 +422,15 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                             "Option 'dockerOutputDirectory' must be an absolute path.")
             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
-            for v in viewvalues(obj):
-                self.check_features(v)
+            if obj.get("class") == "InplaceUpdateRequirement":
+                if obj["inplaceUpdate"] and parentfield == "requirements":
+                    raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
+            for k,v in viewitems(obj):
+                self.check_features(v, parentfield=k)
         elif isinstance(obj, list):
             for i,v in enumerate(obj):
                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
-                    self.check_features(v)
+                    self.check_features(v, parentfield=parentfield)
 
     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
         outputObj = copy.deepcopy(outputObj)
@@ -524,6 +530,18 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                        'progress':1.0
                                    }).execute(num_retries=self.num_retries)
 
+    def apply_reqs(self, job_order_object, tool):
+        if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
+            if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
+                raise WorkflowException(
+                    "`cwl:requirements` in the input object is not part of CWL "
+                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
+                    "can set the cwlVersion to v1.1 or greater and re-run with "
+                    "--enable-dev.")
+            job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
+            for req in job_reqs:
+                tool.requirements.append(req)
+
     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
         self.debug = runtimeContext.debug
 
@@ -550,6 +568,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         if not runtimeContext.name:
             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
 
+        # Upload local file references in the job order.
+        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+                                     tool, job_order)
+
         submitting = (runtimeContext.update_workflow or
                       runtimeContext.create_workflow or
                       (runtimeContext.submit and not
@@ -579,10 +601,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         loadingContext.metadata = tool.metadata
         tool = load_tool(tool.tool, loadingContext)
 
-        # Upload local file references in the job order.
-        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
-                                     tool, job_order)
-
         existing_uuid = runtimeContext.update_workflow
         if existing_uuid or runtimeContext.create_workflow:
             # Create a pipeline template or workflow record and exit.
@@ -606,6 +624,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                         merged_map=merged_map),
                         "success")
 
+        self.apply_reqs(job_order, tool)
+
         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
         self.eval_timeout = runtimeContext.eval_timeout