Merge branch 'master' into 10645-cr-mounts-display
[arvados.git] / sdk / cwl / arvados_cwl / arvjob.py
index d6055d371009b6b5b94bd3aaa6957c5f53368c3d..e86f5055c54c76a46f74ca47438a846a56790664 100644 (file)
@@ -4,17 +4,22 @@ import copy
 import json
 import time
 
-from cwltool.process import get_feature, shortname
+from cwltool.process import get_feature, shortname, UnsupportedRequirement
 from cwltool.errors import WorkflowException
 from cwltool.draft2tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+
+from schema_salad.sourceline import SourceLine
+
+import ruamel.yaml as yaml
 
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image
-from .pathmapper import InitialWorkDirPathMapper
+from .runner import Runner, arvados_jobs_image, packed_workflow, upload_workflow_collection, trim_anonymous_location
+from .pathmapper import VwdPathMapper, trim_listing
 from .perf import Perf
 from . import done
 from ._version import __version__
@@ -46,8 +51,8 @@ class ArvadosJob(object):
                                                     keep_client=self.arvrunner.keep_client,
                                                     num_retries=self.arvrunner.num_retries)
                 script_parameters["task.vwd"] = {}
-                generatemapper = InitialWorkDirPathMapper([self.generatefiles], "", "",
-                                                          separateDirs=False)
+                generatemapper = VwdPathMapper([self.generatefiles], "", "",
+                                               separateDirs=False)
 
                 with Perf(metrics, "createfiles %s" % self.name):
                     for f, p in generatemapper.items():
@@ -55,8 +60,9 @@ class ArvadosJob(object):
                             with vwd.open(p.target, "w") as n:
                                 n.write(p.resolved.encode("utf-8"))
 
-                with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                    vwd.save_new()
+                if vwd:
+                    with Perf(metrics, "generatefiles.save_new %s" % self.name):
+                        vwd.save_new()
 
                 for f, p in generatemapper.items():
                     if p.type == "File":
@@ -92,7 +98,7 @@ class ArvadosJob(object):
                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
                 runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
             else:
-                runtime_constraints["docker_image"] = arvados_jobs_image(self.arvrunner)
+                runtime_constraints["docker_image"] = "arvados/jobs"
 
         resources = self.builder.resources
         if resources is not None:
@@ -136,13 +142,14 @@ class ArvadosJob(object):
 
             self.update_pipeline_component(response)
 
-            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-
-            if response["state"] in ("Complete", "Failed", "Cancelled"):
+            if response["state"] == "Complete":
+                logger.info("%s reused job %s", self.arvrunner.label(self), response["uuid"])
                 with Perf(metrics, "done %s" % self.name):
                     self.done(response)
+            else:
+                logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
         except Exception as e:
-            logger.exception("Job %s error" % (self.name))
+            logger.exception("%s error" % (self.arvrunner.label(self)))
             self.output_callback({}, "permanentFail")
 
     def update_pipeline_component(self, record):
@@ -203,11 +210,15 @@ class ArvadosJob(object):
                             if g:
                                 dirs[g.group(1)] = g.group(2)
 
+                    if processStatus == "permanentFail":
+                        done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+
                     with Perf(metrics, "output collection %s" % self.name):
                         outputs = done.done(self, record, dirs["tmpdir"],
                                             dirs["outdir"], dirs["keep"])
             except WorkflowException as e:
-                logger.error("Error while collecting output for job %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("%s unable to collect output from %s:\n%s",
+                             self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
             except Exception as e:
                 logger.exception("Got unknown exception while collecting output for job %s:", self.name)
@@ -219,11 +230,10 @@ class ArvadosJob(object):
                 logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
                 outputs = {}
                 processStatus = "permanentFail"
-
-            self.output_callback(outputs, processStatus)
         finally:
-            del self.arvrunner.processes[record["uuid"]]
-
+            self.output_callback(outputs, processStatus)
+            if record["uuid"] in self.arvrunner.processes:
+                del self.arvrunner.processes[record["uuid"]]
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
@@ -236,9 +246,16 @@ class RunnerJob(Runner):
         a pipeline template or pipeline instance.
         """
 
-        workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+        if self.tool.tool["id"].startswith("keep:"):
+            self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
+        else:
+            packed = packed_workflow(self.arvrunner, self.tool)
+            wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
+            self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
 
-        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+        adjustDirObjs(self.job_order, trim_listing)
+        adjustFileObjs(self.job_order, trim_anonymous_location)
+        adjustDirObjs(self.job_order, trim_anonymous_location)
 
         if self.output_name:
             self.job_order["arv:output_name"] = self.output_name
@@ -248,6 +265,9 @@ class RunnerJob(Runner):
 
         self.job_order["arv:enable_reuse"] = self.enable_reuse
 
+        if self.on_error:
+            self.job_order["arv:on_error"] = self.on_error
+
         return {
             "script": "cwl-runner",
             "script_version": "master",
@@ -255,7 +275,7 @@ class RunnerJob(Runner):
             "repository": "arvados",
             "script_parameters": self.job_order,
             "runtime_constraints": {
-                "docker_image": arvados_jobs_image(self.arvrunner),
+                "docker_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
                 "min_ram_mb_per_node": self.submit_runner_ram
             }
         }
@@ -349,10 +369,12 @@ class RunnerTemplate(object):
             if not isinstance(types, list):
                 types = [types]
             param['required'] = 'null' not in types
-            non_null_types = set(types) - set(['null'])
+            non_null_types = [t for t in types if t != "null"]
             if len(non_null_types) == 1:
                 the_type = [c for c in non_null_types][0]
-                dataclass = self.type_to_dataclass.get(the_type)
+                dataclass = None
+                if isinstance(the_type, basestring):
+                    dataclass = self.type_to_dataclass.get(the_type)
                 if dataclass:
                     param['dataclass'] = dataclass
             # Note: If we didn't figure out a single appropriate