13365: Fix state logging
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11 from collections import namedtuple
12
13 from StringIO import StringIO
14
15 from schema_salad.sourceline import SourceLine, cmap
16
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49
50 def remove_redundant_fields(obj):
51     for field in ("path", "nameext", "nameroot", "dirname"):
52         if field in obj:
53             del obj[field]
54
55
56 def find_defaults(d, op):
57     if isinstance(d, list):
58         for i in d:
59             find_defaults(i, op)
60     elif isinstance(d, dict):
61         if "default" in d:
62             op(d)
63         else:
64             for i in d.itervalues():
65                 find_defaults(i, op)
66
67
68 def discover_secondary_files(inputs, job_order, discovered=None):
69     for t in inputs:
70         def setSecondary(fileobj):
71             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
72                 if "secondaryFiles" not in fileobj:
73                     fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
74                     if discovered is not None:
75                         discovered[fileobj["location"]] = fileobj["secondaryFiles"]
76
77             if isinstance(fileobj, list):
78                 for e in fileobj:
79                     setSecondary(e)
80
81         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
82             setSecondary(job_order[shortname(t["id"])])
83
84
85 def upload_dependencies(arvrunner, name, document_loader,
86                         workflowobj, uri, loadref_run,
87                         include_primary=True, discovered_secondaryfiles=None):
88     """Upload the dependencies of the workflowobj document to Keep.
89
90     Returns a pathmapper object mapping local paths to keep references.  Also
91     does an in-place update of references in "workflowobj".
92
93     Use scandeps to find $import, $include, $schemas, run, File and Directory
94     fields that represent external references.
95
96     If workflowobj has an "id" field, this will reload the document to ensure
97     it is scanning the raw document prior to preprocessing.
98     """
99
100     loaded = set()
101     def loadref(b, u):
102         joined = document_loader.fetcher.urljoin(b, u)
103         defrg, _ = urlparse.urldefrag(joined)
104         if defrg not in loaded:
105             loaded.add(defrg)
106             # Use fetch_text to get raw file (before preprocessing).
107             text = document_loader.fetch_text(defrg)
108             if isinstance(text, bytes):
109                 textIO = StringIO(text.decode('utf-8'))
110             else:
111                 textIO = StringIO(text)
112             return yaml.safe_load(textIO)
113         else:
114             return {}
115
116     if loadref_run:
117         loadref_fields = set(("$import", "run"))
118     else:
119         loadref_fields = set(("$import",))
120
121     scanobj = workflowobj
122     if "id" in workflowobj:
123         # Need raw file content (before preprocessing) to ensure
124         # that external references in $include and $mixin are captured.
125         scanobj = loadref("", workflowobj["id"])
126
127     sc = scandeps(uri, scanobj,
128                   loadref_fields,
129                   set(("$include", "$schemas", "location")),
130                   loadref, urljoin=document_loader.fetcher.urljoin)
131
132     normalizeFilesDirs(sc)
133
134     if include_primary and "id" in workflowobj:
135         sc.append({"class": "File", "location": workflowobj["id"]})
136
137     if "$schemas" in workflowobj:
138         for s in workflowobj["$schemas"]:
139             sc.append({"class": "File", "location": s})
140
141     def visit_default(obj):
142         remove = [False]
143         def ensure_default_location(f):
144             if "location" not in f and "path" in f:
145                 f["location"] = f["path"]
146                 del f["path"]
147             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
148                 # Doesn't exist, remove from list of dependencies to upload
149                 sc[:] = [x for x in sc if x["location"] != f["location"]]
150                 # Delete "default" from workflowobj
151                 remove[0] = True
152         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
153         if remove[0]:
154             del obj["default"]
155
156     find_defaults(workflowobj, visit_default)
157
158     discovered = {}
159     def discover_default_secondary_files(obj):
160         discover_secondary_files(obj["inputs"],
161                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
162                                  discovered)
163
164     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
165
166     for d in discovered:
167         sc.extend(discovered[d])
168
169     mapper = ArvPathMapper(arvrunner, sc, "",
170                            "keep:%s",
171                            "keep:%s/%s",
172                            name=name,
173                            single_collection=True)
174
175     def setloc(p):
176         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
177             p["location"] = mapper.mapper(p["location"]).resolved
178
179     visit_class(workflowobj, ("File", "Directory"), setloc)
180     visit_class(discovered, ("File", "Directory"), setloc)
181
182     if discovered_secondaryfiles is not None:
183         for d in discovered:
184             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
185
186     if "$schemas" in workflowobj:
187         sch = []
188         for s in workflowobj["$schemas"]:
189             sch.append(mapper.mapper(s).resolved)
190         workflowobj["$schemas"] = sch
191
192     return mapper
193
194
195 def upload_docker(arvrunner, tool):
196     """Uploads Docker images used in CommandLineTool objects."""
197
198     if isinstance(tool, CommandLineTool):
199         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
200         if docker_req:
201             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
202                 # TODO: can be supported by containers API, but not jobs API.
203                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
204                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
205             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
206         else:
207             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
208     elif isinstance(tool, cwltool.workflow.Workflow):
209         for s in tool.steps:
210             upload_docker(arvrunner, s.embedded_tool)
211
212
213 def packed_workflow(arvrunner, tool, merged_map):
214     """Create a packed workflow.
215
216     A "packed" workflow is one where all the components have been combined into a single document."""
217
218     rewrites = {}
219     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
220                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
221
222     rewrite_to_orig = {}
223     for k,v in rewrites.items():
224         rewrite_to_orig[v] = k
225
226     def visit(v, cur_id):
227         if isinstance(v, dict):
228             if v.get("class") in ("CommandLineTool", "Workflow"):
229                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
230             if "location" in v and not v["location"].startswith("keep:"):
231                 v["location"] = merged_map[cur_id].resolved[v["location"]]
232             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
233                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
234             for l in v:
235                 visit(v[l], cur_id)
236         if isinstance(v, list):
237             for l in v:
238                 visit(l, cur_id)
239     visit(packed, None)
240     return packed
241
242
243 def tag_git_version(packed):
244     if tool.tool["id"].startswith("file://"):
245         path = os.path.dirname(tool.tool["id"][7:])
246         try:
247             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
248         except (OSError, subprocess.CalledProcessError):
249             pass
250         else:
251             packed["http://schema.org/version"] = githash
252
253
254 def upload_job_order(arvrunner, name, tool, job_order):
255     """Upload local files referenced in the input object and return updated input
256     object with 'location' updated to the proper keep references.
257     """
258
259     discover_secondary_files(tool.tool["inputs"], job_order)
260
261     jobmapper = upload_dependencies(arvrunner,
262                                     name,
263                                     tool.doc_loader,
264                                     job_order,
265                                     job_order.get("id", "#"),
266                                     False)
267
268     if "id" in job_order:
269         del job_order["id"]
270
271     # Need to filter this out, gets added by cwltool when providing
272     # parameters on the command line.
273     if "job_order" in job_order:
274         del job_order["job_order"]
275
276     return job_order
277
278 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
279
280 def upload_workflow_deps(arvrunner, tool):
281     # Ensure that Docker images needed by this workflow are available
282
283     upload_docker(arvrunner, tool)
284
285     document_loader = tool.doc_loader
286
287     merged_map = {}
288
289     def upload_tool_deps(deptool):
290         if "id" in deptool:
291             discovered_secondaryfiles = {}
292             pm = upload_dependencies(arvrunner,
293                                      "%s dependencies" % (shortname(deptool["id"])),
294                                      document_loader,
295                                      deptool,
296                                      deptool["id"],
297                                      False,
298                                      include_primary=False,
299                                      discovered_secondaryfiles=discovered_secondaryfiles)
300             document_loader.idx[deptool["id"]] = deptool
301             toolmap = {}
302             for k,v in pm.items():
303                 toolmap[k] = v.resolved
304             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
305
306     tool.visit(upload_tool_deps)
307
308     return merged_map
309
310 def arvados_jobs_image(arvrunner, img):
311     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
312
313     try:
314         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
315     except Exception as e:
316         raise Exception("Docker image %s is not available\n%s" % (img, e) )
317     return img
318
319 def upload_workflow_collection(arvrunner, name, packed):
320     collection = arvados.collection.Collection(api_client=arvrunner.api,
321                                                keep_client=arvrunner.keep_client,
322                                                num_retries=arvrunner.num_retries)
323     with collection.open("workflow.cwl", "w") as f:
324         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
325
326     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
327                ["name", "like", name+"%"]]
328     if arvrunner.project_uuid:
329         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
330     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
331
332     if exists["items"]:
333         logger.info("Using collection %s", exists["items"][0]["uuid"])
334     else:
335         collection.save_new(name=name,
336                             owner_uuid=arvrunner.project_uuid,
337                             ensure_unique_name=True,
338                             num_retries=arvrunner.num_retries)
339         logger.info("Uploaded to %s", collection.manifest_locator())
340
341     return collection.portable_data_hash()
342
343
344 class Runner(object):
345     """Base class for runner processes, which submit an instance of
346     arvados-cwl-runner and wait for the final result."""
347
348     def __init__(self, runner, tool, job_order, enable_reuse,
349                  output_name, output_tags, submit_runner_ram=0,
350                  name=None, on_error=None, submit_runner_image=None,
351                  intermediate_output_ttl=0, merged_map=None, priority=None,
352                  secret_store=None):
353         self.arvrunner = runner
354         self.tool = tool
355         self.job_order = job_order
356         self.running = False
357         if enable_reuse:
358             # If reuse is permitted by command line arguments but
359             # disabled by the workflow itself, disable it.
360             reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
361             if reuse_req:
362                 enable_reuse = reuse_req["enableReuse"]
363         self.enable_reuse = enable_reuse
364         self.uuid = None
365         self.final_output = None
366         self.output_name = output_name
367         self.output_tags = output_tags
368         self.name = name
369         self.on_error = on_error
370         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
371         self.intermediate_output_ttl = intermediate_output_ttl
372         self.priority = priority
373         self.secret_store = secret_store
374
375         if submit_runner_ram:
376             self.submit_runner_ram = submit_runner_ram
377         else:
378             self.submit_runner_ram = 3000
379
380         if self.submit_runner_ram <= 0:
381             raise Exception("Value of --submit-runner-ram must be greater than zero")
382
383         self.merged_map = merged_map or {}
384
385     def update_pipeline_component(self, record):
386         pass
387
388     def done(self, record):
389         """Base method for handling a completed runner."""
390
391         try:
392             if record["state"] == "Complete":
393                 if record.get("exit_code") is not None:
394                     if record["exit_code"] == 33:
395                         processStatus = "UnsupportedRequirement"
396                     elif record["exit_code"] == 0:
397                         processStatus = "success"
398                     else:
399                         processStatus = "permanentFail"
400                 else:
401                     processStatus = "success"
402             else:
403                 processStatus = "permanentFail"
404
405             outputs = {}
406
407             if processStatus == "permanentFail":
408                 logc = arvados.collection.CollectionReader(record["log"],
409                                                            api_client=self.arvrunner.api,
410                                                            keep_client=self.arvrunner.keep_client,
411                                                            num_retries=self.arvrunner.num_retries)
412                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
413
414             self.final_output = record["output"]
415             outc = arvados.collection.CollectionReader(self.final_output,
416                                                        api_client=self.arvrunner.api,
417                                                        keep_client=self.arvrunner.keep_client,
418                                                        num_retries=self.arvrunner.num_retries)
419             if "cwl.output.json" in outc:
420                 with outc.open("cwl.output.json") as f:
421                     if f.size() > 0:
422                         outputs = json.load(f)
423             def keepify(fileobj):
424                 path = fileobj["location"]
425                 if not path.startswith("keep:"):
426                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
427             adjustFileObjs(outputs, keepify)
428             adjustDirObjs(outputs, keepify)
429         except Exception as e:
430             logger.exception("[%s] While getting final output object: %s", self.name, e)
431             self.arvrunner.output_callback({}, "permanentFail")
432         else:
433             self.arvrunner.output_callback(outputs, processStatus)