Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 49d37ebd5aec177f0958088983f483123ce113b3..1c3625e26bb1345673b31af5377d7b9d5282a10b 100644 (file)
@@ -6,12 +6,16 @@ import json
 import re
 from cStringIO import StringIO
 
+from schema_salad.sourceline import SourceLine
+
 import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.utils import aslist
+from cwltool.builder import substitute
 
 import arvados.collection
 import ruamel.yaml as yaml
@@ -19,6 +23,7 @@ import ruamel.yaml as yaml
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
 from ._version import __version__
+from . import done
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -55,7 +60,7 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     loaded = set()
     def loadref(b, u):
-        joined = urlparse.urljoin(b, u)
+        joined = document_loader.fetcher.urljoin(b, u)
         defrg, _ = urlparse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
@@ -83,7 +88,7 @@ def upload_dependencies(arvrunner, name, document_loader,
     sc = scandeps(uri, scanobj,
                   loadref_fields,
                   set(("$include", "$schemas", "location")),
-                  loadref)
+                  loadref, urljoin=document_loader.fetcher.urljoin)
 
     normalizeFilesDirs(sc)
 
@@ -108,6 +113,10 @@ def upload_docker(arvrunner, tool):
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
         if docker_req:
+            if docker_req.get("dockerOutputDirectory"):
+                # TODO: can be supported by containers API, but not jobs API.
+                raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+                    "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
@@ -116,13 +125,25 @@ def upload_docker(arvrunner, tool):
 def upload_instance(arvrunner, name, tool, job_order):
         upload_docker(arvrunner, tool)
 
+        for t in tool.tool["inputs"]:
+            def setSecondary(fileobj):
+                if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+                    if "secondaryFiles" not in fileobj:
+                        fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+                if isinstance(fileobj, list):
+                    for e in fileobj:
+                        setSecondary(e)
+
+            if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+                setSecondary(job_order[shortname(t["id"])])
+
         workflowmapper = upload_dependencies(arvrunner,
                                              name,
                                              tool.doc_loader,
                                              tool.tool,
                                              tool.tool["id"],
                                              True)
-
         jobmapper = upload_dependencies(arvrunner,
                                         os.path.basename(job_order.get("id", "#")),
                                         tool.doc_loader,
@@ -144,7 +165,9 @@ def arvados_jobs_image(arvrunner):
     return img
 
 class Runner(object):
-    def __init__(self, runner, tool, job_order, enable_reuse, output_name):
+    def __init__(self, runner, tool, job_order, enable_reuse,
+                 output_name, output_tags, submit_runner_ram=0,
+                 name=None, on_error=None):
         self.arvrunner = runner
         self.tool = tool
         self.job_order = job_order
@@ -153,48 +176,78 @@ class Runner(object):
         self.uuid = None
         self.final_output = None
         self.output_name = output_name
+        self.output_tags = output_tags
+        self.name = name
+        self.on_error = on_error
+
+        if submit_runner_ram:
+            self.submit_runner_ram = submit_runner_ram
+        else:
+            self.submit_runner_ram = 1024
+
+        if self.submit_runner_ram <= 0:
+            raise Exception("Value of --submit-runner-ram must be greater than zero")
 
     def update_pipeline_component(self, record):
         pass
 
     def arvados_job_spec(self, *args, **kwargs):
-        self.name = os.path.basename(self.tool.tool["id"])
+        if self.name is None:
+            self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
+
+        # Need to filter this out, gets added by cwltool when providing
+        # parameters on the command line.
+        if "job_order" in self.job_order:
+            del self.job_order["job_order"]
+
         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
         adjustDirObjs(self.job_order, trim_listing)
         return workflowmapper
 
     def done(self, record):
-        if record["state"] == "Complete":
-            if record.get("exit_code") is not None:
-                if record["exit_code"] == 33:
-                    processStatus = "UnsupportedRequirement"
-                elif record["exit_code"] == 0:
-                    processStatus = "success"
+        try:
+            if record["state"] == "Complete":
+                if record.get("exit_code") is not None:
+                    if record["exit_code"] == 33:
+                        processStatus = "UnsupportedRequirement"
+                    elif record["exit_code"] == 0:
+                        processStatus = "success"
+                    else:
+                        processStatus = "permanentFail"
                 else:
-                    processStatus = "permanentFail"
+                    processStatus = "success"
             else:
-                processStatus = "success"
-        else:
-            processStatus = "permanentFail"
+                processStatus = "permanentFail"
 
-        outputs = None
-        try:
-            try:
-                self.final_output = record["output"]
-                outc = arvados.collection.CollectionReader(self.final_output,
+            outputs = {}
+
+            if processStatus == "permanentFail":
+                logc = arvados.collection.CollectionReader(record["log"],
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
+                done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
+
+            self.final_output = record["output"]
+            outc = arvados.collection.CollectionReader(self.final_output,
+                                                       api_client=self.arvrunner.api,
+                                                       keep_client=self.arvrunner.keep_client,
+                                                       num_retries=self.arvrunner.num_retries)
+            if "cwl.output.json" in outc:
                 with outc.open("cwl.output.json") as f:
-                    outputs = json.load(f)
-                def keepify(fileobj):
-                    path = fileobj["location"]
-                    if not path.startswith("keep:"):
-                        fileobj["location"] = "keep:%s/%s" % (record["output"], path)
-                adjustFileObjs(outputs, keepify)
-                adjustDirObjs(outputs, keepify)
-            except Exception as e:
-                logger.error("While getting final output object: %s", e)
+                    if f.size() > 0:
+                        outputs = json.load(f)
+            def keepify(fileobj):
+                path = fileobj["location"]
+                if not path.startswith("keep:"):
+                    fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+            adjustFileObjs(outputs, keepify)
+            adjustDirObjs(outputs, keepify)
+        except Exception as e:
+            logger.exception("[%s] While getting final output object: %s", self.name, e)
+            self.arvrunner.output_callback({}, "permanentFail")
+        else:
             self.arvrunner.output_callback(outputs, processStatus)
         finally:
-            del self.arvrunner.processes[record["uuid"]]
+            if record["uuid"] in self.arvrunner.processes:
+                del self.arvrunner.processes[record["uuid"]]