Merge branch '8784-dir-listings'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11
12 from StringIO import StringIO
13
14 from schema_salad.sourceline import SourceLine
15
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49 def upload_dependencies(arvrunner, name, document_loader,
50                         workflowobj, uri, loadref_run, include_primary=True):
51     """Upload the dependencies of the workflowobj document to Keep.
52
53     Returns a pathmapper object mapping local paths to keep references.  Also
54     does an in-place update of references in "workflowobj".
55
56     Use scandeps to find $import, $include, $schemas, run, File and Directory
57     fields that represent external references.
58
59     If workflowobj has an "id" field, this will reload the document to ensure
60     it is scanning the raw document prior to preprocessing.
61     """
62
63     loaded = set()
64     def loadref(b, u):
65         joined = document_loader.fetcher.urljoin(b, u)
66         defrg, _ = urlparse.urldefrag(joined)
67         if defrg not in loaded:
68             loaded.add(defrg)
69             # Use fetch_text to get raw file (before preprocessing).
70             text = document_loader.fetch_text(defrg)
71             if isinstance(text, bytes):
72                 textIO = StringIO(text.decode('utf-8'))
73             else:
74                 textIO = StringIO(text)
75             return yaml.safe_load(textIO)
76         else:
77             return {}
78
79     if loadref_run:
80         loadref_fields = set(("$import", "run"))
81     else:
82         loadref_fields = set(("$import",))
83
84     scanobj = workflowobj
85     if "id" in workflowobj:
86         # Need raw file content (before preprocessing) to ensure
87         # that external references in $include and $mixin are captured.
88         scanobj = loadref("", workflowobj["id"])
89
90     sc = scandeps(uri, scanobj,
91                   loadref_fields,
92                   set(("$include", "$schemas", "location")),
93                   loadref, urljoin=document_loader.fetcher.urljoin)
94
95     normalizeFilesDirs(sc)
96
97     if include_primary and "id" in workflowobj:
98         sc.append({"class": "File", "location": workflowobj["id"]})
99
100     if "$schemas" in workflowobj:
101         for s in workflowobj["$schemas"]:
102             sc.append({"class": "File", "location": s})
103
104     mapper = ArvPathMapper(arvrunner, sc, "",
105                            "keep:%s",
106                            "keep:%s/%s",
107                            name=name,
108                            single_collection=True)
109
110     def setloc(p):
111         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
112             p["location"] = mapper.mapper(p["location"]).resolved
113     adjustFileObjs(workflowobj, setloc)
114     adjustDirObjs(workflowobj, setloc)
115
116     if "$schemas" in workflowobj:
117         sch = []
118         for s in workflowobj["$schemas"]:
119             sch.append(mapper.mapper(s).resolved)
120         workflowobj["$schemas"] = sch
121
122     return mapper
123
124
125 def upload_docker(arvrunner, tool):
126     """Uploads Docker images used in CommandLineTool objects."""
127
128     if isinstance(tool, CommandLineTool):
129         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
130         if docker_req:
131             if docker_req.get("dockerOutputDirectory"):
132                 # TODO: can be supported by containers API, but not jobs API.
133                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
134                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
135             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
136     elif isinstance(tool, cwltool.workflow.Workflow):
137         for s in tool.steps:
138             upload_docker(arvrunner, s.embedded_tool)
139
140 def packed_workflow(arvrunner, tool):
141     """Create a packed workflow.
142
143     A "packed" workflow is one where all the components have been combined into a single document."""
144
145     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
146                 tool.tool["id"], tool.metadata)
147
148 def tag_git_version(packed):
149     if tool.tool["id"].startswith("file://"):
150         path = os.path.dirname(tool.tool["id"][7:])
151         try:
152             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
153         except (OSError, subprocess.CalledProcessError):
154             pass
155         else:
156             packed["http://schema.org/version"] = githash
157
158
159 def upload_job_order(arvrunner, name, tool, job_order):
160     """Upload local files referenced in the input object and return updated input
161     object with 'location' updated to the proper keep references.
162     """
163
164     for t in tool.tool["inputs"]:
165         def setSecondary(fileobj):
166             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
167                 if "secondaryFiles" not in fileobj:
168                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
169
170             if isinstance(fileobj, list):
171                 for e in fileobj:
172                     setSecondary(e)
173
174         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
175             setSecondary(job_order[shortname(t["id"])])
176
177     jobmapper = upload_dependencies(arvrunner,
178                                     name,
179                                     tool.doc_loader,
180                                     job_order,
181                                     job_order.get("id", "#"),
182                                     False)
183
184     if "id" in job_order:
185         del job_order["id"]
186
187     # Need to filter this out, gets added by cwltool when providing
188     # parameters on the command line.
189     if "job_order" in job_order:
190         del job_order["job_order"]
191
192     return job_order
193
194 def upload_workflow_deps(arvrunner, tool, override_tools):
195     # Ensure that Docker images needed by this workflow are available
196
197     upload_docker(arvrunner, tool)
198
199     document_loader = tool.doc_loader
200
201     def upload_tool_deps(deptool):
202         if "id" in deptool:
203             upload_dependencies(arvrunner,
204                                 "%s dependencies" % (shortname(deptool["id"])),
205                                 document_loader,
206                                 deptool,
207                                 deptool["id"],
208                                 False,
209                                 include_primary=False)
210             document_loader.idx[deptool["id"]] = deptool
211             override_tools[deptool["id"]] = json.dumps(deptool)
212
213     tool.visit(upload_tool_deps)
214
215 def arvados_jobs_image(arvrunner, img):
216     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
217
218     try:
219         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
220     except Exception as e:
221         raise Exception("Docker image %s is not available\n%s" % (img, e) )
222     return img
223
224 def upload_workflow_collection(arvrunner, name, packed):
225     collection = arvados.collection.Collection(api_client=arvrunner.api,
226                                                keep_client=arvrunner.keep_client,
227                                                num_retries=arvrunner.num_retries)
228     with collection.open("workflow.cwl", "w") as f:
229         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
230
231     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
232                ["name", "like", name+"%"]]
233     if arvrunner.project_uuid:
234         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
235     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
236
237     if exists["items"]:
238         logger.info("Using collection %s", exists["items"][0]["uuid"])
239     else:
240         collection.save_new(name=name,
241                             owner_uuid=arvrunner.project_uuid,
242                             ensure_unique_name=True,
243                             num_retries=arvrunner.num_retries)
244         logger.info("Uploaded to %s", collection.manifest_locator())
245
246     return collection.portable_data_hash()
247
248
249 class Runner(object):
250     """Base class for runner processes, which submit an instance of
251     arvados-cwl-runner and wait for the final result."""
252
253     def __init__(self, runner, tool, job_order, enable_reuse,
254                  output_name, output_tags, submit_runner_ram=0,
255                  name=None, on_error=None, submit_runner_image=None,
256                  intermediate_output_ttl=0):
257         self.arvrunner = runner
258         self.tool = tool
259         self.job_order = job_order
260         self.running = False
261         self.enable_reuse = enable_reuse
262         self.uuid = None
263         self.final_output = None
264         self.output_name = output_name
265         self.output_tags = output_tags
266         self.name = name
267         self.on_error = on_error
268         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
269         self.intermediate_output_ttl = intermediate_output_ttl
270
271         if submit_runner_ram:
272             self.submit_runner_ram = submit_runner_ram
273         else:
274             self.submit_runner_ram = 3000
275
276         if self.submit_runner_ram <= 0:
277             raise Exception("Value of --submit-runner-ram must be greater than zero")
278
279     def update_pipeline_component(self, record):
280         pass
281
282     def done(self, record):
283         """Base method for handling a completed runner."""
284
285         try:
286             if record["state"] == "Complete":
287                 if record.get("exit_code") is not None:
288                     if record["exit_code"] == 33:
289                         processStatus = "UnsupportedRequirement"
290                     elif record["exit_code"] == 0:
291                         processStatus = "success"
292                     else:
293                         processStatus = "permanentFail"
294                 else:
295                     processStatus = "success"
296             else:
297                 processStatus = "permanentFail"
298
299             outputs = {}
300
301             if processStatus == "permanentFail":
302                 logc = arvados.collection.CollectionReader(record["log"],
303                                                            api_client=self.arvrunner.api,
304                                                            keep_client=self.arvrunner.keep_client,
305                                                            num_retries=self.arvrunner.num_retries)
306                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
307
308             self.final_output = record["output"]
309             outc = arvados.collection.CollectionReader(self.final_output,
310                                                        api_client=self.arvrunner.api,
311                                                        keep_client=self.arvrunner.keep_client,
312                                                        num_retries=self.arvrunner.num_retries)
313             if "cwl.output.json" in outc:
314                 with outc.open("cwl.output.json") as f:
315                     if f.size() > 0:
316                         outputs = json.load(f)
317             def keepify(fileobj):
318                 path = fileobj["location"]
319                 if not path.startswith("keep:"):
320                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
321             adjustFileObjs(outputs, keepify)
322             adjustDirObjs(outputs, keepify)
323         except Exception as e:
324             logger.exception("[%s] While getting final output object: %s", self.name, e)
325             self.arvrunner.output_callback({}, "permanentFail")
326         else:
327             self.arvrunner.output_callback(outputs, processStatus)
328         finally:
329             if record["uuid"] in self.arvrunner.processes:
330                 del self.arvrunner.processes[record["uuid"]]