Merge branch '16353-cwl-1.2' refs #16353
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 21 Jul 2020 20:59:14 +0000 (16:59 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 21 Jul 2020 20:59:14 +0000 (16:59 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/test_submit.py

index 2b55ce9df5afa6b4a5e7d98ded954be50ae40aa0..fb23c2ccf73df514923f4fd0041814c6e8751833 100644 (file)
@@ -150,21 +150,28 @@ class ArvadosContainer(JobBase):
                 with Perf(metrics, "createfiles %s" % self.name):
                     for f, p in sorteditems:
                         if not p.target:
-                            pass
-                        elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
+                            continue
+
+                        if p.target.startswith("/"):
+                            dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
+                        else:
+                            dst = p.target
+
+                        if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
                             if p.resolved.startswith("_:"):
-                                vwd.mkdirs(p.target)
+                                vwd.mkdirs(dst)
                             else:
                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
-                                vwd.copy(path or ".", p.target, source_collection=source)
+                                vwd.copy(path or ".", dst, source_collection=source)
                         elif p.type == "CreateFile":
                             if self.arvrunner.secret_store.has_secret(p.resolved):
-                                secret_mounts["%s/%s" % (self.outdir, p.target)] = {
+                                mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
+                                secret_mounts[mountpoint] = {
                                     "kind": "text",
                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
                                 }
                             else:
-                                with vwd.open(p.target, "w") as n:
+                                with vwd.open(dst, "w") as n:
                                     n.write(p.resolved)
 
                 def keepemptydirs(p):
@@ -191,10 +198,14 @@ class ArvadosContainer(JobBase):
                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
                         (prev is not None and p.target.startswith(prev))):
                         continue
-                    mountpoint = "%s/%s" % (self.outdir, p.target)
+                    if p.target.startswith("/"):
+                        dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
+                    else:
+                        dst = p.target
+                    mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
                     mounts[mountpoint] = {"kind": "collection",
                                           "portable_data_hash": vwd.portable_data_hash(),
-                                          "path": p.target}
+                                          "path": dst}
                     if p.type.startswith("Writable"):
                         mounts[mountpoint]["writable"] = True
                     prev = p.target + "/"
@@ -316,6 +327,7 @@ class ArvadosContainer(JobBase):
                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
         except Exception:
             logger.exception("%s got an error", self.arvrunner.label(self))
+            logger.debug("Container request was %s", container_request)
             self.output_callback({}, "permanentFail")
 
     def done(self, record):
index 704edaccb903eb83f1e66c983eb007fe1c4f8711..a9361a85f9fe66a5260a64b6719fa74c28cdeee2 100644 (file)
@@ -57,7 +57,7 @@ class ArvadosCommandTool(CommandLineTool):
                                  "/keep/%s/%s")
 
     def job(self, joborder, output_callback, runtimeContext):
-        builder = make_builder(joborder, self.hints, self.requirements, runtimeContext)
+        builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
 
         if runtimeContext.work_api == "containers":
index ddd3c00764c7b0fb42fe97adf50622bd1b23e9cb..97c5fafe792fc06ce099e6a9bc6934671ace580d 100644 (file)
@@ -141,7 +141,8 @@ class ArvadosWorkflowStep(WorkflowStep):
         runtimeContext = runtimeContext.copy()
         runtimeContext.toplevel = True  # Preserve behavior for #13365
 
-        builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements, runtimeContext)
+        builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
+                               runtimeContext, self.metadata)
         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
 
@@ -161,7 +162,7 @@ class ArvadosWorkflow(Workflow):
 
     def job(self, joborder, output_callback, runtimeContext):
 
-        builder = make_builder(joborder, self.hints, self.requirements, runtimeContext)
+        builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
 
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
index ec91eea6aa807eaea1f37012b0fa1f04d21a1f1f..e8d1347ddfeec7545b8ab9740de38b78b55b4e75 100644 (file)
@@ -507,7 +507,7 @@ The 'jobs' API is no longer supported.
                                               }).execute(num_retries=self.num_retries)
             except Exception:
                 logger.exception("Setting container output")
-                return
+                raise
 
     def apply_reqs(self, job_order_object, tool):
         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
index 4cd204f7df83ba49197f2cdb6ab2a61673a40b28..5bad290773be9f49ef2e87b10b2dac48e70ef75b 100644 (file)
@@ -285,6 +285,7 @@ class StagingPathMapper(PathMapper):
     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
         loc = obj["location"]
+        stagedir = obj.get("dirname") or stagedir
         tgt = os.path.join(stagedir, obj["basename"])
         basetgt, baseext = os.path.splitext(tgt)
 
index 71e499ebcab0cca29ccbee7a350cfbbb5aaa6e19..b10f02d1401b9e31014eb30b32e18adfdcb394d2 100644 (file)
@@ -83,7 +83,7 @@ def find_defaults(d, op):
             for i in viewvalues(d):
                 find_defaults(i, op)
 
-def make_builder(joborder, hints, requirements, runtimeContext):
+def make_builder(joborder, hints, requirements, runtimeContext, metadata):
     return Builder(
                  job=joborder,
                  files=[],               # type: List[Dict[Text, Text]]
@@ -106,6 +106,7 @@ def make_builder(joborder, hints, requirements, runtimeContext):
                  outdir="",              # type: Text
                  tmpdir="",              # type: Text
                  stagedir="",            # type: Text
+                 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
                 )
 
 def search_schemadef(name, reqs):
@@ -172,7 +173,10 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         specs = []
         primary["secondaryFiles"] = secondaryspec
         for i, sf in enumerate(aslist(secondaryspec)):
-            pattern = builder.do_eval(sf["pattern"], context=primary)
+            if builder.cwlVersion == "v1.0":
+                pattern = builder.do_eval(sf, context=primary)
+            else:
+                pattern = builder.do_eval(sf["pattern"], context=primary)
             if pattern is None:
                 continue
             if isinstance(pattern, list):
@@ -263,6 +267,8 @@ def upload_dependencies(arvrunner, name, document_loader,
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
+    metadata = scanobj
+
     sc_result = scandeps(uri, scanobj,
                   loadref_fields,
                   set(("$include", "$schemas", "location")),
@@ -354,7 +360,8 @@ def upload_dependencies(arvrunner, name, document_loader,
         builder = make_builder(builder_job_order,
                                obj.get("hints", []),
                                obj.get("requirements", []),
-                               ArvRuntimeContext())
+                               ArvRuntimeContext(),
+                               metadata)
         discover_secondary_files(arvrunner.fs_access,
                                  builder,
                                  obj["inputs"],
@@ -516,7 +523,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
     builder = make_builder(builder_job_order,
                            tool.hints,
                            tool.requirements,
-                           ArvRuntimeContext())
+                           ArvRuntimeContext(),
+                           tool.metadata)
     # Now update job_order with secondaryFiles
     discover_secondary_files(arvrunner.fs_access,
                              builder,
index 40ee679857f4429b0e32cf491339e144695489de..d703fcbc55fdec7889108ca01c898365117733cd 100644 (file)
@@ -39,8 +39,8 @@ setup(name='arvados-cwl-runner',
       # file to determine what version of cwltool and schema-salad to
       # build.
       install_requires=[
-          'cwltool==3.0.20200530110633',
-          'schema-salad==6.0.20200601095207',
+          'cwltool==3.0.20200720165847',
+          'schema-salad==7.0.20200612160654',
           'arvados-python-client{}'.format(pysdk_dep),
           'setuptools',
           'ciso8601 >= 2.0.0'
index 562664c698b34df87ec2f19eb64d867ec5461698..0698db70ff68534ba70aa4176c5487f308cf2559 100644 (file)
@@ -527,9 +527,12 @@ class TestSubmit(unittest.TestCase):
 
     @mock.patch("arvados_cwl.task_queue.TaskQueue")
     @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
-    @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection", return_value = (None, None))
+    @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection")
     @stubs
     def test_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
+        final_output_c = arvados.collection.Collection()
+        make_output.return_value = ({},final_output_c)
+
         def set_final_output(job_order, output_callback, runtimeContext):
             output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
             return []
@@ -538,16 +541,19 @@ class TestSubmit(unittest.TestCase):
         exited = arvados_cwl.main(
             ["--debug", "--local", "--storage-classes=foo",
                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
-            sys.stdin, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
 
         make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
         self.assertEqual(exited, 0)
 
     @mock.patch("arvados_cwl.task_queue.TaskQueue")
     @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
-    @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection", return_value = (None, None))
+    @mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection")
     @stubs
     def test_default_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
+        final_output_c = arvados.collection.Collection()
+        make_output.return_value = ({},final_output_c)
+
         def set_final_output(job_order, output_callback, runtimeContext):
             output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
             return []
@@ -556,7 +562,7 @@ class TestSubmit(unittest.TestCase):
         exited = arvados_cwl.main(
             ["--debug", "--local",
                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
-            sys.stdin, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
 
         make_output.assert_called_with(u'Output of submit_wf.cwl', ['default'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
         self.assertEqual(exited, 0)
@@ -1103,7 +1109,10 @@ class TestSubmit(unittest.TestCase):
                                 "outputs": [
                                     {
                                         "id": "#secret_job.cwl/out",
-                                        "type": "stdout"
+                                        "type": "File",
+                                        "outputBinding": {
+                                              "glob": "hashed_example.txt"
+                                        }
                                     }
                                 ],
                                 "stdout": "hashed_example.txt",
@@ -1312,7 +1321,7 @@ class TestSubmit(unittest.TestCase):
                     stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
 
                 self.assertEqual(exited, 1)
-                self.assertRegexpMatches(
+                self.assertRegex(
                     re.sub(r'[ \n]+', ' ', capture_stderr.getvalue()),
                     r"Expected collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz to be 99999999999999999999999999999998\+99 but API server reported 99999999999999999999999999999997\+99")
             finally:
@@ -1335,7 +1344,7 @@ class TestSubmit(unittest.TestCase):
 
         try:
             self.assertEqual(exited, 1)
-            self.assertRegexpMatches(
+            self.assertRegex(
                 capture_stderr.getvalue(),
                 r"Collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz not found")
         finally: