ba41d7d97ea8085e1898daf075f327b8dd26d5cc
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15 from cwltool.utils import aslist
16 from cwltool.builder import substitute
17
18 import arvados.collection
19 import ruamel.yaml as yaml
20
21 from .arvdocker import arv_docker_get_image
22 from .pathmapper import ArvPathMapper
23 from ._version import __version__
24
25 logger = logging.getLogger('arvados.cwl-runner')
26
27 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
28
29 def trim_listing(obj):
30     """Remove 'listing' field from Directory objects that are keep references.
31
32     When Directory objects represent Keep references, it redundant and
33     potentially very expensive to pass fully enumerated Directory objects
34     between instances of cwl-runner (e.g. a submitting a job, or using the
35     RunInSingleContainer feature), so delete the 'listing' field when it is
36     safe to do so.
37     """
38
39     if obj.get("location", "").startswith("keep:") and "listing" in obj:
40         del obj["listing"]
41     if obj.get("location", "").startswith("_:"):
42         del obj["location"]
43
44 def upload_dependencies(arvrunner, name, document_loader,
45                         workflowobj, uri, loadref_run):
46     """Upload the dependencies of the workflowobj document to Keep.
47
48     Returns a pathmapper object mapping local paths to keep references.  Also
49     does an in-place update of references in "workflowobj".
50
51     Use scandeps to find $import, $include, $schemas, run, File and Directory
52     fields that represent external references.
53
54     If workflowobj has an "id" field, this will reload the document to ensure
55     it is scanning the raw document prior to preprocessing.
56     """
57
58     loaded = set()
59     def loadref(b, u):
60         joined = urlparse.urljoin(b, u)
61         defrg, _ = urlparse.urldefrag(joined)
62         if defrg not in loaded:
63             loaded.add(defrg)
64             # Use fetch_text to get raw file (before preprocessing).
65             text = document_loader.fetch_text(defrg)
66             if isinstance(text, bytes):
67                 textIO = StringIO(text.decode('utf-8'))
68             else:
69                 textIO = StringIO(text)
70             return yaml.safe_load(textIO)
71         else:
72             return {}
73
74     if loadref_run:
75         loadref_fields = set(("$import", "run"))
76     else:
77         loadref_fields = set(("$import",))
78
79     scanobj = workflowobj
80     if "id" in workflowobj:
81         # Need raw file content (before preprocessing) to ensure
82         # that external references in $include and $mixin are captured.
83         scanobj = loadref("", workflowobj["id"])
84
85     sc = scandeps(uri, scanobj,
86                   loadref_fields,
87                   set(("$include", "$schemas", "location")),
88                   loadref)
89
90     normalizeFilesDirs(sc)
91
92     if "id" in workflowobj:
93         sc.append({"class": "File", "location": workflowobj["id"]})
94
95     mapper = ArvPathMapper(arvrunner, sc, "",
96                            "keep:%s",
97                            "keep:%s/%s",
98                            name=name)
99
100     def setloc(p):
101         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
102             p["location"] = mapper.mapper(p["location"]).resolved
103     adjustFileObjs(workflowobj, setloc)
104     adjustDirObjs(workflowobj, setloc)
105
106     return mapper
107
108
109 def upload_docker(arvrunner, tool):
110     if isinstance(tool, CommandLineTool):
111         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
112         if docker_req:
113             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
114     elif isinstance(tool, cwltool.workflow.Workflow):
115         for s in tool.steps:
116             upload_docker(arvrunner, s.embedded_tool)
117
118 def upload_instance(arvrunner, name, tool, job_order):
119         upload_docker(arvrunner, tool)
120
121         for t in tool.tool["inputs"]:
122             def setSecondary(fileobj):
123                 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
124                     if "secondaryFiles" not in fileobj:
125                         fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
126
127                 if isinstance(fileobj, list):
128                     for e in fileobj:
129                         setSecondary(e)
130
131             if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
132                 setSecondary(job_order[shortname(t["id"])])
133
134         workflowmapper = upload_dependencies(arvrunner,
135                                              name,
136                                              tool.doc_loader,
137                                              tool.tool,
138                                              tool.tool["id"],
139                                              True)
140         jobmapper = upload_dependencies(arvrunner,
141                                         os.path.basename(job_order.get("id", "#")),
142                                         tool.doc_loader,
143                                         job_order,
144                                         job_order.get("id", "#"),
145                                         False)
146
147         if "id" in job_order:
148             del job_order["id"]
149
150         return workflowmapper
151
152 def arvados_jobs_image(arvrunner):
153     img = "arvados/jobs:"+__version__
154     try:
155         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
156     except Exception as e:
157         raise Exception("Docker image %s is not available\n%s" % (img, e) )
158     return img
159
160 class Runner(object):
161     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
162         self.arvrunner = runner
163         self.tool = tool
164         self.job_order = job_order
165         self.running = False
166         self.enable_reuse = enable_reuse
167         self.uuid = None
168         self.final_output = None
169         self.output_name = output_name
170
171     def update_pipeline_component(self, record):
172         pass
173
174     def arvados_job_spec(self, *args, **kwargs):
175         self.name = os.path.basename(self.tool.tool["id"])
176         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
177         adjustDirObjs(self.job_order, trim_listing)
178         return workflowmapper
179
180     def done(self, record):
181         if record["state"] == "Complete":
182             if record.get("exit_code") is not None:
183                 if record["exit_code"] == 33:
184                     processStatus = "UnsupportedRequirement"
185                 elif record["exit_code"] == 0:
186                     processStatus = "success"
187                 else:
188                     processStatus = "permanentFail"
189             else:
190                 processStatus = "success"
191         else:
192             processStatus = "permanentFail"
193
194         outputs = None
195         try:
196             try:
197                 self.final_output = record["output"]
198                 outc = arvados.collection.CollectionReader(self.final_output,
199                                                            api_client=self.arvrunner.api,
200                                                            keep_client=self.arvrunner.keep_client,
201                                                            num_retries=self.arvrunner.num_retries)
202                 with outc.open("cwl.output.json") as f:
203                     outputs = json.load(f)
204                 def keepify(fileobj):
205                     path = fileobj["location"]
206                     if not path.startswith("keep:"):
207                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
208                 adjustFileObjs(outputs, keepify)
209                 adjustDirObjs(outputs, keepify)
210             except Exception as e:
211                 logger.error("While getting final output object: %s", e)
212             self.arvrunner.output_callback(outputs, processStatus)
213         finally:
214             del self.arvrunner.processes[record["uuid"]]