Merge branch '9783-cwl-error-invalid-dir' closes #9783
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7
8 import cwltool.draft2tool
9 from cwltool.draft2tool import CommandLineTool
10 import cwltool.workflow
11 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
12 from cwltool.load_tool import fetch_document
13 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
14
15 import arvados.collection
16
17 from .arvdocker import arv_docker_get_image
18 from .pathmapper import ArvPathMapper
19
20 logger = logging.getLogger('arvados.cwl-runner')
21
22 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
23
24 class Runner(object):
25     def __init__(self, runner, tool, job_order, enable_reuse):
26         self.arvrunner = runner
27         self.tool = tool
28         self.job_order = job_order
29         self.running = False
30         self.enable_reuse = enable_reuse
31         self.uuid = None
32
33     def update_pipeline_component(self, record):
34         pass
35
36     def upload_docker(self, tool):
37         if isinstance(tool, CommandLineTool):
38             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
39             if docker_req:
40                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
41         elif isinstance(tool, cwltool.workflow.Workflow):
42             for s in tool.steps:
43                 self.upload_docker(s.embedded_tool)
44
45
46     def arvados_job_spec(self, *args, **kwargs):
47         self.upload_docker(self.tool)
48
49         workflowfiles = []
50         jobfiles = []
51         workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
52
53         self.name = os.path.basename(self.tool.tool["id"])
54
55         def visitFiles(files, path):
56             files.append(path)
57
58         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
59         loaded = set()
60         def loadref(b, u):
61             joined = urlparse.urljoin(b, u)
62             if joined not in loaded:
63                 loaded.add(joined)
64                 return document_loader.fetch(urlparse.urljoin(b, u))
65             else:
66                 return {}
67
68         sc = scandeps(uri, workflowobj,
69                       set(("$import", "run")),
70                       set(("$include", "$schemas", "path", "location")),
71                       loadref)
72         adjustFileObjs(sc, partial(visitFiles, workflowfiles))
73         adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
74         adjustDirObjs(sc, partial(visitFiles, workflowfiles))
75         adjustDirObjs(self.job_order, partial(visitFiles, jobfiles))
76
77         normalizeFilesDirs(jobfiles)
78         normalizeFilesDirs(workflowfiles)
79
80         keepprefix = kwargs.get("keepprefix", "")
81         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
82                                        keepprefix+"%s",
83                                        keepprefix+"%s/%s",
84                                        name=self.name,
85                                        **kwargs)
86
87         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
88                                   keepprefix+"%s",
89                                   keepprefix+"%s/%s",
90                                   name=os.path.basename(self.job_order.get("id", "#")),
91                                   **kwargs)
92
93         def setloc(p):
94             p["location"] = jobmapper.mapper(p["location"])[1]
95         adjustFileObjs(self.job_order, setloc)
96         adjustDirObjs(self.job_order, setloc)
97
98         if "id" in self.job_order:
99             del self.job_order["id"]
100
101         return workflowmapper
102
103
104     def done(self, record):
105         if record["state"] == "Complete":
106             if record.get("exit_code") is not None:
107                 if record["exit_code"] == 33:
108                     processStatus = "UnsupportedRequirement"
109                 elif record["exit_code"] == 0:
110                     processStatus = "success"
111                 else:
112                     processStatus = "permanentFail"
113             else:
114                 processStatus = "success"
115         else:
116             processStatus = "permanentFail"
117
118         outputs = None
119         try:
120             try:
121                 outc = arvados.collection.Collection(record["output"])
122                 with outc.open("cwl.output.json") as f:
123                     outputs = json.load(f)
124                 def keepify(fileobj):
125                     path = fileobj["location"]
126                     if not path.startswith("keep:"):
127                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
128                 adjustFileObjs(outputs, keepify)
129                 adjustDirObjs(outputs, keepify)
130             except Exception as e:
131                 logger.error("While getting final output object: %s", e)
132             self.arvrunner.output_callback(outputs, processStatus)
133         finally:
134             del self.arvrunner.processes[record["uuid"]]