8654: Scanning for and uploading dependencies for submit works. Add test for
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 21 Mar 2016 20:24:06 +0000 (16:24 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 21 Mar 2016 20:24:06 +0000 (16:24 -0400)
--submit.  Fix cwl-runner to handle CWL File objects in script_parameters.

crunch_scripts/cwl-runner
sdk/cwl/arvados_cwl/__init__.py
sdk/python/arvados/commands/run.py

index d04242686575499e14378952e9a265360dd3f64c..81755b8ca5b4cb52c3d75cad56c115cbfc9f0117 100755 (executable)
@@ -20,10 +20,12 @@ try:
 
     print job_order_object
 
-    for k,v in job_order_object.items():
+    def keeppath(v):
         if arvados.util.keep_locator_pattern.match(v):
             job_order_object[k] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
 
+    adjustFiles(job_order_object, keeppath)
+
     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
 
     t = cwltool.main.load_tool(job_order_object, False, True, runner.arvMakeTool, True)
index c0adb33550f10e2a95a26becbca3c961a4379ac3..d4c0b64ca167ad7abea638df2e061e266080d6a6 100644 (file)
@@ -19,8 +19,9 @@ import logging
 import re
 import os
 import sys
+import functools
 
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles, scandeps
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -249,7 +250,7 @@ class ArvadosJob(object):
 
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
     def __init__(self, arvrunner, referenced_files, basedir,
-                 collection_pattern, file_pattern, **kwargs):
+                 collection_pattern, file_pattern, name=None, **kwargs):
         self._pathmap = arvrunner.get_uploaded()
         uploadfiles = []
 
@@ -276,6 +277,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
                                              dry_run=kwargs.get("dry_run"),
                                              num_retries=3,
                                              fnPattern=file_pattern,
+                                             name=name,
                                              project=arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
@@ -368,22 +370,43 @@ class ArvCwlRunner(object):
         pass
 
     def submit(self, tool, job_order, input_basedir, args, **kwargs):
-        files = set()
-        def visitFiles(self, path):
+        workflowfiles = set()
+        jobfiles = set()
+        workflowfiles.add(tool.tool["id"])
+
+        def visitFiles(files, path):
             files.add(path)
+            return path
 
-        adjustFiles(process.scandeps("", tool.tool,
-                                     set(("run")),
-                                     set(("$schemas", "path"))),
-                    visitFiles)
-        adjustFiles(job_order, visitFiles)
+        document_loader, _, _ = cwltool.process.get_schema()
+        def loadref(b, u):
+            return document_loader.resolve_ref(u, base_url=b)[0]
 
-        mapper = ArvPathMapper(self, files, "",
-                               "$(task.keep)/%s",
-                               "$(task.keep)/%s/%s",
-                               **kwargs)
+        adjustFiles(scandeps("", tool.tool,
+                             set(("run",)),
+                             set(("$schemas", "path")),
+                             loadref),
+                    functools.partial(visitFiles, workflowfiles))
+        adjustFiles(job_order, functools.partial(visitFiles, jobfiles))
+
+        workflowmapper = ArvPathMapper(self, workflowfiles, "",
+                                       "%s",
+                                       "%s/%s",
+                                       name=os.path.basename(tool.tool["id"]),
+                                       **kwargs)
+
+        jobmapper = ArvPathMapper(self, jobfiles, "",
+                                  "%s",
+                                  "%s/%s",
+                                  name=os.path.basename(job_order.get("id", "#")),
+                                  **kwargs)
 
-        job_order = adjustFiles(job_order, lambda p: mapper.mapper(p))
+        adjustFiles(job_order, lambda p: jobmapper.mapper(p)[1])
+
+        if "id" in job_order:
+            del job_order["id"]
+
+        job_order["cwl:tool"] = workflowmapper.mapper(tool.tool["id"])[1]
 
         response = self.api.jobs().create(body={
             "script": "cwl-runner",
@@ -394,14 +417,18 @@ class ArvCwlRunner(object):
                 "docker_image": "arvados/jobs"
             }
         }, find_or_create=args.enable_reuse).execute(num_retries=self.num_retries)
-        print response["uuid"]
-        return None
+
+        logger.info("Submitted job %s", response["uuid"])
+
+        return
 
 
     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
+        useruuid = self.api.users().current().execute()["uuid"]
+        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+
         if args.submit:
-            self.submit(tool, job_order, input_basedir, args, **kwargs)
-            return
+            return self.submit(tool, job_order, input_basedir, args, **kwargs)
 
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
@@ -430,9 +457,6 @@ class ArvCwlRunner(object):
         kwargs["outdir"] = "$(task.outdir)"
         kwargs["tmpdir"] = "$(task.tmpdir)"
 
-        useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
-
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
         else:
@@ -500,12 +524,20 @@ def main(args, stdout, stderr, api_client=None):
                         default=True, dest="enable_reuse",
                         help="")
     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
-    parser.add_argument("--submit", type=str, help="Submit job and print job uuid.")
+    parser.add_argument("--submit", action="store_true", help="Submit job and print job uuid.",
+                        default=False)
 
     try:
-        runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+        if api_client is None:
+            api_client=arvados.api('v1', model=OrderedJsonModel())
+        runner = ArvCwlRunner(api_client)
     except Exception as e:
         logger.error(e)
         return 1
 
-    return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
+    return cwltool.main.main(args,
+                             stdout=stdout,
+                             stderr=stderr,
+                             executor=runner.arvExecutor,
+                             makeTool=runner.arvMakeTool,
+                             parser=parser)
index ef39be81a4650cda86e20c6d13a7d23848398ecb..046f7066373559452b2c8b24ab58041365b6cb73 100644 (file)
@@ -101,7 +101,7 @@ def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
 
     return prefix+fn
 
-def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)"):
+def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
     # Find the smallest path prefix that includes all the files that need to be uploaded.
     # This starts at the root and iteratively removes common parent directory prefixes
     # until all file pathes no longer have a common parent.
@@ -148,7 +148,10 @@ def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPatter
                 stream = sp[0]
                 collection.start_new_stream(stream)
             collection.write_file(f.fn, sp[1])
-        item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
+        body = {"owner_uuid": project, "manifest_text": collection.manifest_text()}
+        if name is not None:
+            body["name"] = name
+        item = api.collections().create(body=body).execute()
         pdh = item["portable_data_hash"]
         logger.info("Uploaded to %s", item["uuid"])