13306: Changes to arvados-cwl-runner code after running futurize --stage2
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 from future import standard_library
2 standard_library.install_aliases()
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 import os
8 import urllib.parse
9 from functools import partial
10 import logging
11 import json
12 import subprocess32 as subprocess
13 from collections import namedtuple
14
15 from io import StringIO
16
17 from schema_salad.sourceline import SourceLine, cmap
18
19 from cwltool.command_line_tool import CommandLineTool
20 import cwltool.workflow
21 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
22 from cwltool.load_tool import fetch_document
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
24 from cwltool.utils import aslist
25 from cwltool.builder import substitute
26 from cwltool.pack import pack
27
28 import arvados.collection
29 import ruamel.yaml as yaml
30
31 import arvados_cwl.arvdocker
32 from .pathmapper import ArvPathMapper, trim_listing
33 from ._version import __version__
34 from . import done
35
36 logger = logging.getLogger('arvados.cwl-runner')
37
38 def trim_anonymous_location(obj):
39     """Remove 'location' field from File and Directory literals.
40
41     To make internal handling easier, literals are assigned a random id for
42     'location'.  However, when writing the record back out, this can break
43     reproducibility.  Since it is valid for literals not have a 'location'
44     field, remove it.
45
46     """
47
48     if obj.get("location", "").startswith("_:"):
49         del obj["location"]
50
51
52 def remove_redundant_fields(obj):
53     for field in ("path", "nameext", "nameroot", "dirname"):
54         if field in obj:
55             del obj[field]
56
57
58 def find_defaults(d, op):
59     if isinstance(d, list):
60         for i in d:
61             find_defaults(i, op)
62     elif isinstance(d, dict):
63         if "default" in d:
64             op(d)
65         else:
66             for i in d.values():
67                 find_defaults(i, op)
68
69 def setSecondary(t, fileobj, discovered):
70     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
71         if "secondaryFiles" not in fileobj:
72             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
73             if discovered is not None:
74                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
75     elif isinstance(fileobj, list):
76         for e in fileobj:
77             setSecondary(t, e, discovered)
78
79 def discover_secondary_files(inputs, job_order, discovered=None):
80     for t in inputs:
81         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
82             setSecondary(t, job_order[shortname(t["id"])], discovered)
83
84
85 def upload_dependencies(arvrunner, name, document_loader,
86                         workflowobj, uri, loadref_run,
87                         include_primary=True, discovered_secondaryfiles=None):
88     """Upload the dependencies of the workflowobj document to Keep.
89
90     Returns a pathmapper object mapping local paths to keep references.  Also
91     does an in-place update of references in "workflowobj".
92
93     Use scandeps to find $import, $include, $schemas, run, File and Directory
94     fields that represent external references.
95
96     If workflowobj has an "id" field, this will reload the document to ensure
97     it is scanning the raw document prior to preprocessing.
98     """
99
100     loaded = set()
101     def loadref(b, u):
102         joined = document_loader.fetcher.urljoin(b, u)
103         defrg, _ = urllib.parse.urldefrag(joined)
104         if defrg not in loaded:
105             loaded.add(defrg)
106             # Use fetch_text to get raw file (before preprocessing).
107             text = document_loader.fetch_text(defrg)
108             if isinstance(text, bytes):
109                 textIO = StringIO(text.decode('utf-8'))
110             else:
111                 textIO = StringIO(text)
112             return yaml.safe_load(textIO)
113         else:
114             return {}
115
116     if loadref_run:
117         loadref_fields = set(("$import", "run"))
118     else:
119         loadref_fields = set(("$import",))
120
121     scanobj = workflowobj
122     if "id" in workflowobj:
123         # Need raw file content (before preprocessing) to ensure
124         # that external references in $include and $mixin are captured.
125         scanobj = loadref("", workflowobj["id"])
126
127     sc_result = scandeps(uri, scanobj,
128                   loadref_fields,
129                   set(("$include", "$schemas", "location")),
130                   loadref, urljoin=document_loader.fetcher.urljoin)
131
132     sc = []
133     def only_real(obj):
134         # Only interested in local files than need to be uploaded,
135         # don't include file literals, keep references, etc.
136         sp = obj.get("location", "").split(":")
137         if len(sp) > 1 and sp[0] in ("file", "http", "https"):
138             sc.append(obj)
139
140     visit_class(sc_result, ("File", "Directory"), only_real)
141
142     normalizeFilesDirs(sc)
143
144     if include_primary and "id" in workflowobj:
145         sc.append({"class": "File", "location": workflowobj["id"]})
146
147     if "$schemas" in workflowobj:
148         for s in workflowobj["$schemas"]:
149             sc.append({"class": "File", "location": s})
150
151     def visit_default(obj):
152         remove = [False]
153         def ensure_default_location(f):
154             if "location" not in f and "path" in f:
155                 f["location"] = f["path"]
156                 del f["path"]
157             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
158                 # Doesn't exist, remove from list of dependencies to upload
159                 sc[:] = [x for x in sc if x["location"] != f["location"]]
160                 # Delete "default" from workflowobj
161                 remove[0] = True
162         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
163         if remove[0]:
164             del obj["default"]
165
166     find_defaults(workflowobj, visit_default)
167
168     discovered = {}
169     def discover_default_secondary_files(obj):
170         discover_secondary_files(obj["inputs"],
171                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
172                                  discovered)
173
174     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
175
176     for d in list(discovered.keys()):
177         # Only interested in discovered secondaryFiles which are local
178         # files that need to be uploaded.
179         if d.startswith("file:"):
180             sc.extend(discovered[d])
181         else:
182             del discovered[d]
183
184     mapper = ArvPathMapper(arvrunner, sc, "",
185                            "keep:%s",
186                            "keep:%s/%s",
187                            name=name,
188                            single_collection=True)
189
190     def setloc(p):
191         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
192             p["location"] = mapper.mapper(p["location"]).resolved
193
194     visit_class(workflowobj, ("File", "Directory"), setloc)
195     visit_class(discovered, ("File", "Directory"), setloc)
196
197     if discovered_secondaryfiles is not None:
198         for d in discovered:
199             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
200
201     if "$schemas" in workflowobj:
202         sch = []
203         for s in workflowobj["$schemas"]:
204             sch.append(mapper.mapper(s).resolved)
205         workflowobj["$schemas"] = sch
206
207     return mapper
208
209
210 def upload_docker(arvrunner, tool):
211     """Uploads Docker images used in CommandLineTool objects."""
212
213     if isinstance(tool, CommandLineTool):
214         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
215         if docker_req:
216             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
217                 # TODO: can be supported by containers API, but not jobs API.
218                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
219                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
220             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
221         else:
222             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
223     elif isinstance(tool, cwltool.workflow.Workflow):
224         for s in tool.steps:
225             upload_docker(arvrunner, s.embedded_tool)
226
227
228 def packed_workflow(arvrunner, tool, merged_map):
229     """Create a packed workflow.
230
231     A "packed" workflow is one where all the components have been combined into a single document."""
232
233     rewrites = {}
234     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
235                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
236
237     rewrite_to_orig = {v: k for k,v in list(rewrites.items())}
238
239     def visit(v, cur_id):
240         if isinstance(v, dict):
241             if v.get("class") in ("CommandLineTool", "Workflow"):
242                 if "id" not in v:
243                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
244                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
245             if "location" in v and not v["location"].startswith("keep:"):
246                 v["location"] = merged_map[cur_id].resolved[v["location"]]
247             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
248                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
249             if v.get("class") == "DockerRequirement":
250                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
251             for l in v:
252                 visit(v[l], cur_id)
253         if isinstance(v, list):
254             for l in v:
255                 visit(l, cur_id)
256     visit(packed, None)
257     return packed
258
259
260 def tag_git_version(packed):
261     if tool.tool["id"].startswith("file://"):
262         path = os.path.dirname(tool.tool["id"][7:])
263         try:
264             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
265         except (OSError, subprocess.CalledProcessError):
266             pass
267         else:
268             packed["http://schema.org/version"] = githash
269
270
271 def upload_job_order(arvrunner, name, tool, job_order):
272     """Upload local files referenced in the input object and return updated input
273     object with 'location' updated to the proper keep references.
274     """
275
276     discover_secondary_files(tool.tool["inputs"], job_order)
277
278     jobmapper = upload_dependencies(arvrunner,
279                                     name,
280                                     tool.doc_loader,
281                                     job_order,
282                                     job_order.get("id", "#"),
283                                     False)
284
285     if "id" in job_order:
286         del job_order["id"]
287
288     # Need to filter this out, gets added by cwltool when providing
289     # parameters on the command line.
290     if "job_order" in job_order:
291         del job_order["job_order"]
292
293     return job_order
294
295 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
296
297 def upload_workflow_deps(arvrunner, tool):
298     # Ensure that Docker images needed by this workflow are available
299
300     upload_docker(arvrunner, tool)
301
302     document_loader = tool.doc_loader
303
304     merged_map = {}
305
306     def upload_tool_deps(deptool):
307         if "id" in deptool:
308             discovered_secondaryfiles = {}
309             pm = upload_dependencies(arvrunner,
310                                      "%s dependencies" % (shortname(deptool["id"])),
311                                      document_loader,
312                                      deptool,
313                                      deptool["id"],
314                                      False,
315                                      include_primary=False,
316                                      discovered_secondaryfiles=discovered_secondaryfiles)
317             document_loader.idx[deptool["id"]] = deptool
318             toolmap = {}
319             for k,v in list(pm.items()):
320                 toolmap[k] = v.resolved
321             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
322
323     tool.visit(upload_tool_deps)
324
325     return merged_map
326
327 def arvados_jobs_image(arvrunner, img):
328     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
329
330     try:
331         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
332     except Exception as e:
333         raise Exception("Docker image %s is not available\n%s" % (img, e) )
334
335
336 def upload_workflow_collection(arvrunner, name, packed):
337     collection = arvados.collection.Collection(api_client=arvrunner.api,
338                                                keep_client=arvrunner.keep_client,
339                                                num_retries=arvrunner.num_retries)
340     with collection.open("workflow.cwl", "w") as f:
341         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
342
343     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
344                ["name", "like", name+"%"]]
345     if arvrunner.project_uuid:
346         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
347     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
348
349     if exists["items"]:
350         logger.info("Using collection %s", exists["items"][0]["uuid"])
351     else:
352         collection.save_new(name=name,
353                             owner_uuid=arvrunner.project_uuid,
354                             ensure_unique_name=True,
355                             num_retries=arvrunner.num_retries)
356         logger.info("Uploaded to %s", collection.manifest_locator())
357
358     return collection.portable_data_hash()
359
360
361 class Runner(Process):
362     """Base class for runner processes, which submit an instance of
363     arvados-cwl-runner and wait for the final result."""
364
365     def __init__(self, runner, tool, loadingContext, enable_reuse,
366                  output_name, output_tags, submit_runner_ram=0,
367                  name=None, on_error=None, submit_runner_image=None,
368                  intermediate_output_ttl=0, merged_map=None,
369                  priority=None, secret_store=None,
370                  collection_cache_size=256,
371                  collection_cache_is_default=True):
372
373         super(Runner, self).__init__(tool.tool, loadingContext)
374
375         self.arvrunner = runner
376         self.embedded_tool = tool
377         self.job_order = None
378         self.running = False
379         if enable_reuse:
380             # If reuse is permitted by command line arguments but
381             # disabled by the workflow itself, disable it.
382             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
383             if reuse_req:
384                 enable_reuse = reuse_req["enableReuse"]
385         self.enable_reuse = enable_reuse
386         self.uuid = None
387         self.final_output = None
388         self.output_name = output_name
389         self.output_tags = output_tags
390         self.name = name
391         self.on_error = on_error
392         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
393         self.intermediate_output_ttl = intermediate_output_ttl
394         self.priority = priority
395         self.secret_store = secret_store
396
397         self.submit_runner_cores = 1
398         self.submit_runner_ram = 1024  # defaut 1 GiB
399         self.collection_cache_size = collection_cache_size
400
401         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
402         if runner_resource_req:
403             if runner_resource_req.get("coresMin"):
404                 self.submit_runner_cores = runner_resource_req["coresMin"]
405             if runner_resource_req.get("ramMin"):
406                 self.submit_runner_ram = runner_resource_req["ramMin"]
407             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
408                 self.collection_cache_size = runner_resource_req["keep_cache"]
409
410         if submit_runner_ram:
411             # Command line / initializer overrides default and/or spec from workflow
412             self.submit_runner_ram = submit_runner_ram
413
414         if self.submit_runner_ram <= 0:
415             raise Exception("Value of submit-runner-ram must be greater than zero")
416
417         if self.submit_runner_cores <= 0:
418             raise Exception("Value of submit-runner-cores must be greater than zero")
419
420         self.merged_map = merged_map or {}
421
422     def job(self,
423             job_order,         # type: Mapping[Text, Text]
424             output_callbacks,  # type: Callable[[Any, Any], Any]
425             runtimeContext     # type: RuntimeContext
426            ):  # type: (...) -> Generator[Any, None, None]
427         self.job_order = job_order
428         self._init_job(job_order, runtimeContext)
429         yield self
430
431     def update_pipeline_component(self, record):
432         pass
433
434     def done(self, record):
435         """Base method for handling a completed runner."""
436
437         try:
438             if record["state"] == "Complete":
439                 if record.get("exit_code") is not None:
440                     if record["exit_code"] == 33:
441                         processStatus = "UnsupportedRequirement"
442                     elif record["exit_code"] == 0:
443                         processStatus = "success"
444                     else:
445                         processStatus = "permanentFail"
446                 else:
447                     processStatus = "success"
448             else:
449                 processStatus = "permanentFail"
450
451             outputs = {}
452
453             if processStatus == "permanentFail":
454                 logc = arvados.collection.CollectionReader(record["log"],
455                                                            api_client=self.arvrunner.api,
456                                                            keep_client=self.arvrunner.keep_client,
457                                                            num_retries=self.arvrunner.num_retries)
458                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
459
460             self.final_output = record["output"]
461             outc = arvados.collection.CollectionReader(self.final_output,
462                                                        api_client=self.arvrunner.api,
463                                                        keep_client=self.arvrunner.keep_client,
464                                                        num_retries=self.arvrunner.num_retries)
465             if "cwl.output.json" in outc:
466                 with outc.open("cwl.output.json", "rb") as f:
467                     if f.size() > 0:
468                         outputs = json.load(f)
469             def keepify(fileobj):
470                 path = fileobj["location"]
471                 if not path.startswith("keep:"):
472                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
473             adjustFileObjs(outputs, keepify)
474             adjustDirObjs(outputs, keepify)
475         except Exception as e:
476             logger.exception("[%s] While getting final output object: %s", self.name, e)
477             self.arvrunner.output_callback({}, "permanentFail")
478         else:
479             self.arvrunner.output_callback(outputs, processStatus)