Merge branch '12934-cwl-dir-output' refs #12934
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11
12 from StringIO import StringIO
13
14 from schema_salad.sourceline import SourceLine
15
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49 def remove_redundant_fields(obj):
50     for field in ("path", "nameext", "nameroot", "dirname"):
51         if field in obj:
52             del obj[field]
53
54 def find_defaults(d, op):
55     if isinstance(d, list):
56         for i in d:
57             find_defaults(i, op)
58     elif isinstance(d, dict):
59         if "default" in d:
60             op(d)
61         else:
62             for i in d.itervalues():
63                 find_defaults(i, op)
64
65 def upload_dependencies(arvrunner, name, document_loader,
66                         workflowobj, uri, loadref_run, include_primary=True):
67     """Upload the dependencies of the workflowobj document to Keep.
68
69     Returns a pathmapper object mapping local paths to keep references.  Also
70     does an in-place update of references in "workflowobj".
71
72     Use scandeps to find $import, $include, $schemas, run, File and Directory
73     fields that represent external references.
74
75     If workflowobj has an "id" field, this will reload the document to ensure
76     it is scanning the raw document prior to preprocessing.
77     """
78
79     loaded = set()
80     def loadref(b, u):
81         joined = document_loader.fetcher.urljoin(b, u)
82         defrg, _ = urlparse.urldefrag(joined)
83         if defrg not in loaded:
84             loaded.add(defrg)
85             # Use fetch_text to get raw file (before preprocessing).
86             text = document_loader.fetch_text(defrg)
87             if isinstance(text, bytes):
88                 textIO = StringIO(text.decode('utf-8'))
89             else:
90                 textIO = StringIO(text)
91             return yaml.safe_load(textIO)
92         else:
93             return {}
94
95     if loadref_run:
96         loadref_fields = set(("$import", "run"))
97     else:
98         loadref_fields = set(("$import",))
99
100     scanobj = workflowobj
101     if "id" in workflowobj:
102         # Need raw file content (before preprocessing) to ensure
103         # that external references in $include and $mixin are captured.
104         scanobj = loadref("", workflowobj["id"])
105
106     sc = scandeps(uri, scanobj,
107                   loadref_fields,
108                   set(("$include", "$schemas", "location")),
109                   loadref, urljoin=document_loader.fetcher.urljoin)
110
111     normalizeFilesDirs(sc)
112
113     if include_primary and "id" in workflowobj:
114         sc.append({"class": "File", "location": workflowobj["id"]})
115
116     if "$schemas" in workflowobj:
117         for s in workflowobj["$schemas"]:
118             sc.append({"class": "File", "location": s})
119
120     def capture_default(obj):
121         remove = [False]
122         def add_default(f):
123             if "location" not in f and "path" in f:
124                 f["location"] = f["path"]
125                 del f["path"]
126             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
127                 # Remove from sc
128                 sc[:] = [x for x in sc if x["location"] != f["location"]]
129                 # Delete "default" from workflowobj
130                 remove[0] = True
131         visit_class(obj["default"], ("File", "Directory"), add_default)
132         if remove[0]:
133             del obj["default"]
134
135     find_defaults(workflowobj, capture_default)
136
137     mapper = ArvPathMapper(arvrunner, sc, "",
138                            "keep:%s",
139                            "keep:%s/%s",
140                            name=name,
141                            single_collection=True)
142
143     def setloc(p):
144         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
145             p["location"] = mapper.mapper(p["location"]).resolved
146     adjustFileObjs(workflowobj, setloc)
147     adjustDirObjs(workflowobj, setloc)
148
149     if "$schemas" in workflowobj:
150         sch = []
151         for s in workflowobj["$schemas"]:
152             sch.append(mapper.mapper(s).resolved)
153         workflowobj["$schemas"] = sch
154
155     return mapper
156
157
158 def upload_docker(arvrunner, tool):
159     """Uploads Docker images used in CommandLineTool objects."""
160
161     if isinstance(tool, CommandLineTool):
162         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
163         if docker_req:
164             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
165                 # TODO: can be supported by containers API, but not jobs API.
166                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
167                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
168             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
169         else:
170             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
171     elif isinstance(tool, cwltool.workflow.Workflow):
172         for s in tool.steps:
173             upload_docker(arvrunner, s.embedded_tool)
174
175 def packed_workflow(arvrunner, tool, merged_map):
176     """Create a packed workflow.
177
178     A "packed" workflow is one where all the components have been combined into a single document."""
179
180     rewrites = {}
181     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
182                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
183
184     rewrite_to_orig = {}
185     for k,v in rewrites.items():
186         rewrite_to_orig[v] = k
187
188     def visit(v, cur_id):
189         if isinstance(v, dict):
190             if v.get("class") in ("CommandLineTool", "Workflow"):
191                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
192             if "location" in v and not v["location"].startswith("keep:"):
193                 v["location"] = merged_map[cur_id][v["location"]]
194             for l in v:
195                 visit(v[l], cur_id)
196         if isinstance(v, list):
197             for l in v:
198                 visit(l, cur_id)
199     visit(packed, None)
200     return packed
201
202 def tag_git_version(packed):
203     if tool.tool["id"].startswith("file://"):
204         path = os.path.dirname(tool.tool["id"][7:])
205         try:
206             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
207         except (OSError, subprocess.CalledProcessError):
208             pass
209         else:
210             packed["http://schema.org/version"] = githash
211
212
213 def discover_secondary_files(inputs, job_order):
214     for t in inputs:
215         def setSecondary(fileobj):
216             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
217                 if "secondaryFiles" not in fileobj:
218                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
219
220             if isinstance(fileobj, list):
221                 for e in fileobj:
222                     setSecondary(e)
223
224         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
225             setSecondary(job_order[shortname(t["id"])])
226
227 def upload_job_order(arvrunner, name, tool, job_order):
228     """Upload local files referenced in the input object and return updated input
229     object with 'location' updated to the proper keep references.
230     """
231
232     discover_secondary_files(tool.tool["inputs"], job_order)
233
234     jobmapper = upload_dependencies(arvrunner,
235                                     name,
236                                     tool.doc_loader,
237                                     job_order,
238                                     job_order.get("id", "#"),
239                                     False)
240
241     if "id" in job_order:
242         del job_order["id"]
243
244     # Need to filter this out, gets added by cwltool when providing
245     # parameters on the command line.
246     if "job_order" in job_order:
247         del job_order["job_order"]
248
249     return job_order
250
251 def upload_workflow_deps(arvrunner, tool):
252     # Ensure that Docker images needed by this workflow are available
253
254     upload_docker(arvrunner, tool)
255
256     document_loader = tool.doc_loader
257
258     merged_map = {}
259
260     def upload_tool_deps(deptool):
261         if "id" in deptool:
262             pm = upload_dependencies(arvrunner,
263                                 "%s dependencies" % (shortname(deptool["id"])),
264                                 document_loader,
265                                 deptool,
266                                 deptool["id"],
267                                 False,
268                                 include_primary=False)
269             document_loader.idx[deptool["id"]] = deptool
270             toolmap = {}
271             for k,v in pm.items():
272                 toolmap[k] = v.resolved
273             merged_map[deptool["id"]] = toolmap
274
275     tool.visit(upload_tool_deps)
276
277     return merged_map
278
279 def arvados_jobs_image(arvrunner, img):
280     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
281
282     try:
283         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
284     except Exception as e:
285         raise Exception("Docker image %s is not available\n%s" % (img, e) )
286     return img
287
288 def upload_workflow_collection(arvrunner, name, packed):
289     collection = arvados.collection.Collection(api_client=arvrunner.api,
290                                                keep_client=arvrunner.keep_client,
291                                                num_retries=arvrunner.num_retries)
292     with collection.open("workflow.cwl", "w") as f:
293         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
294
295     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
296                ["name", "like", name+"%"]]
297     if arvrunner.project_uuid:
298         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
299     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
300
301     if exists["items"]:
302         logger.info("Using collection %s", exists["items"][0]["uuid"])
303     else:
304         collection.save_new(name=name,
305                             owner_uuid=arvrunner.project_uuid,
306                             ensure_unique_name=True,
307                             num_retries=arvrunner.num_retries)
308         logger.info("Uploaded to %s", collection.manifest_locator())
309
310     return collection.portable_data_hash()
311
312
313 class Runner(object):
314     """Base class for runner processes, which submit an instance of
315     arvados-cwl-runner and wait for the final result."""
316
317     def __init__(self, runner, tool, job_order, enable_reuse,
318                  output_name, output_tags, submit_runner_ram=0,
319                  name=None, on_error=None, submit_runner_image=None,
320                  intermediate_output_ttl=0, merged_map=None):
321         self.arvrunner = runner
322         self.tool = tool
323         self.job_order = job_order
324         self.running = False
325         if enable_reuse:
326             # If reuse is permitted by command line arguments but
327             # disabled by the workflow itself, disable it.
328             reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
329             if reuse_req:
330                 enable_reuse = reuse_req["enableReuse"]
331         self.enable_reuse = enable_reuse
332         self.uuid = None
333         self.final_output = None
334         self.output_name = output_name
335         self.output_tags = output_tags
336         self.name = name
337         self.on_error = on_error
338         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
339         self.intermediate_output_ttl = intermediate_output_ttl
340
341         if submit_runner_ram:
342             self.submit_runner_ram = submit_runner_ram
343         else:
344             self.submit_runner_ram = 3000
345
346         if self.submit_runner_ram <= 0:
347             raise Exception("Value of --submit-runner-ram must be greater than zero")
348
349         self.merged_map = merged_map or {}
350
351     def update_pipeline_component(self, record):
352         pass
353
354     def done(self, record):
355         """Base method for handling a completed runner."""
356
357         try:
358             if record["state"] == "Complete":
359                 if record.get("exit_code") is not None:
360                     if record["exit_code"] == 33:
361                         processStatus = "UnsupportedRequirement"
362                     elif record["exit_code"] == 0:
363                         processStatus = "success"
364                     else:
365                         processStatus = "permanentFail"
366                 else:
367                     processStatus = "success"
368             else:
369                 processStatus = "permanentFail"
370
371             outputs = {}
372
373             if processStatus == "permanentFail":
374                 logc = arvados.collection.CollectionReader(record["log"],
375                                                            api_client=self.arvrunner.api,
376                                                            keep_client=self.arvrunner.keep_client,
377                                                            num_retries=self.arvrunner.num_retries)
378                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
379
380             self.final_output = record["output"]
381             outc = arvados.collection.CollectionReader(self.final_output,
382                                                        api_client=self.arvrunner.api,
383                                                        keep_client=self.arvrunner.keep_client,
384                                                        num_retries=self.arvrunner.num_retries)
385             if "cwl.output.json" in outc:
386                 with outc.open("cwl.output.json") as f:
387                     if f.size() > 0:
388                         outputs = json.load(f)
389             def keepify(fileobj):
390                 path = fileobj["location"]
391                 if not path.startswith("keep:"):
392                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
393             adjustFileObjs(outputs, keepify)
394             adjustDirObjs(outputs, keepify)
395         except Exception as e:
396             logger.exception("[%s] While getting final output object: %s", self.name, e)
397             self.arvrunner.output_callback({}, "permanentFail")
398         else:
399             self.arvrunner.output_callback(outputs, processStatus)
400         finally:
401             if record["uuid"] in self.arvrunner.processes:
402                 del self.arvrunner.processes[record["uuid"]]