Merge master to output-tags branch and resolve conflict
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index cf2f1db997b9db9688b52258994bc6179863783d..5cc447e9a3bad9202d9e77fb53919dcc66b804c8 100644 (file)
@@ -9,19 +9,37 @@ from cStringIO import StringIO
 import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.utils import aslist
+from cwltool.builder import substitute
 
 import arvados.collection
 import ruamel.yaml as yaml
 
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
+from ._version import __version__
 
 logger = logging.getLogger('arvados.cwl-runner')
 
-cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
+
+def trim_listing(obj):
+    """Remove 'listing' field from Directory objects that are keep references.
+
+    When Directory objects represent Keep references, it redundant and
+    potentially very expensive to pass fully enumerated Directory objects
+    between instances of cwl-runner (e.g. a submitting a job, or using the
+    RunInSingleContainer feature), so delete the 'listing' field when it is
+    safe to do so.
+    """
+
+    if obj.get("location", "").startswith("keep:") and "listing" in obj:
+        del obj["listing"]
+    if obj.get("location", "").startswith("_:"):
+        del obj["location"]
 
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run):
@@ -80,8 +98,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                            name=name)
 
     def setloc(p):
-        if not p["location"].startswith("_:") and not p["location"].startswith("keep:"):
-            p["location"] = mapper.mapper(p["location"]).target
+        if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+            p["location"] = mapper.mapper(p["location"]).resolved
     adjustFileObjs(workflowobj, setloc)
     adjustDirObjs(workflowobj, setloc)
 
@@ -92,49 +110,77 @@ def upload_docker(arvrunner, tool):
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
         if docker_req:
+            if docker_req.get("dockerOutputDirectory"):
+                # TODO: can be supported by containers API, but not jobs API.
+                raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
 
+def upload_instance(arvrunner, name, tool, job_order):
+        upload_docker(arvrunner, tool)
+
+        for t in tool.tool["inputs"]:
+            def setSecondary(fileobj):
+                if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+                    if "secondaryFiles" not in fileobj:
+                        fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+                if isinstance(fileobj, list):
+                    for e in fileobj:
+                        setSecondary(e)
+
+            if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+                setSecondary(job_order[shortname(t["id"])])
+
+        workflowmapper = upload_dependencies(arvrunner,
+                                             name,
+                                             tool.doc_loader,
+                                             tool.tool,
+                                             tool.tool["id"],
+                                             True)
+        jobmapper = upload_dependencies(arvrunner,
+                                        os.path.basename(job_order.get("id", "#")),
+                                        tool.doc_loader,
+                                        job_order,
+                                        job_order.get("id", "#"),
+                                        False)
+
+        if "id" in job_order:
+            del job_order["id"]
+
+        return workflowmapper
+
+def arvados_jobs_image(arvrunner):
+    img = "arvados/jobs:"+__version__
+    try:
+        arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+    except Exception as e:
+        raise Exception("Docker image %s is not available\n%s" % (img, e) )
+    return img
 
 class Runner(object):
-    def __init__(self, runner, tool, job_order, enable_reuse):
+    def __init__(self, runner, tool, job_order, enable_reuse, output_name, output_tags):
         self.arvrunner = runner
         self.tool = tool
         self.job_order = job_order
         self.running = False
         self.enable_reuse = enable_reuse
         self.uuid = None
+        self.final_output = None
+        self.output_name = output_name
+        self.output_tags = output_tags
 
     def update_pipeline_component(self, record):
         pass
 
     def arvados_job_spec(self, *args, **kwargs):
-        upload_docker(self.arvrunner, self.tool)
-
         self.name = os.path.basename(self.tool.tool["id"])
-
-        workflowmapper = upload_dependencies(self.arvrunner,
-                                             self.name,
-                                             self.tool.doc_loader,
-                                             self.tool.tool,
-                                             self.tool.tool["id"],
-                                             True)
-
-        jobmapper = upload_dependencies(self.arvrunner,
-                                        os.path.basename(self.job_order.get("id", "#")),
-                                        self.tool.doc_loader,
-                                        self.job_order,
-                                        self.job_order.get("id", "#"),
-                                        False)
-
-        if "id" in self.job_order:
-            del self.job_order["id"]
-
+        workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+        adjustDirObjs(self.job_order, trim_listing)
         return workflowmapper
 
-
     def done(self, record):
         if record["state"] == "Complete":
             if record.get("exit_code") is not None:
@@ -152,7 +198,11 @@ class Runner(object):
         outputs = None
         try:
             try:
-                outc = arvados.collection.Collection(record["output"])
+                self.final_output = record["output"]
+                outc = arvados.collection.CollectionReader(self.final_output,
+                                                           api_client=self.arvrunner.api,
+                                                           keep_client=self.arvrunner.keep_client,
+                                                           num_retries=self.arvrunner.num_retries)
                 with outc.open("cwl.output.json") as f:
                     outputs = json.load(f)
                 def keepify(fileobj):