18723: Update cwltool version for scandirs fix
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109                  container_engine="docker"
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if isinstance(inputschema, basestring):
132         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
133         if sd:
134             inputschema = sd
135         else:
136             return
137
138     if "secondaryFiles" in inputschema:
139         # set secondaryFiles, may be inherited by compound types.
140         secondaryspec = inputschema["secondaryFiles"]
141
142     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143         not isinstance(inputschema["type"], basestring)):
144         # compound type (union, array, record)
145         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146
147     elif (inputschema["type"] == "record" and
148           isinstance(primary, Mapping)):
149         #
150         # record type, find secondary files associated with fields.
151         #
152         for f in inputschema["fields"]:
153             p = primary.get(shortname(f["name"]))
154             if p:
155                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156
157     elif (inputschema["type"] == "array" and
158           isinstance(primary, Sequence)):
159         #
160         # array type, find secondary files of elements
161         #
162         for p in primary:
163             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164
165     elif (inputschema["type"] == "File" and
166           secondaryspec and
167           isinstance(primary, Mapping) and
168           primary.get("class") == "File" and
169           "secondaryFiles" not in primary):
170         #
171         # Found a file, check for secondaryFiles
172         #
173         specs = []
174         primary["secondaryFiles"] = secondaryspec
175         for i, sf in enumerate(aslist(secondaryspec)):
176             if builder.cwlVersion == "v1.0":
177                 pattern = builder.do_eval(sf, context=primary)
178             else:
179                 pattern = builder.do_eval(sf["pattern"], context=primary)
180             if pattern is None:
181                 continue
182             if isinstance(pattern, list):
183                 specs.extend(pattern)
184             elif isinstance(pattern, dict):
185                 specs.append(pattern)
186             elif isinstance(pattern, str):
187                 if builder.cwlVersion == "v1.0":
188                     specs.append({"pattern": pattern, "required": True})
189                 else:
190                     specs.append({"pattern": pattern, "required": sf.get("required")})
191             else:
192                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193                     "Expression must return list, object, string or null")
194
195         found = []
196         for i, sf in enumerate(specs):
197             if isinstance(sf, dict):
198                 if sf.get("class") == "File":
199                     pattern = None
200                     if sf.get("location") is None:
201                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202                             "File object is missing 'location': %s" % sf)
203                     sfpath = sf["location"]
204                     required = True
205                 else:
206                     pattern = sf["pattern"]
207                     required = sf.get("required")
208             elif isinstance(sf, str):
209                 pattern = sf
210                 required = True
211             else:
212                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213                     "Expression must return list, object, string or null")
214
215             if pattern is not None:
216                 sfpath = substitute(primary["location"], pattern)
217
218             required = builder.do_eval(required, context=primary)
219
220             if fsaccess.exists(sfpath):
221                 if pattern is not None:
222                     found.append({"location": sfpath, "class": "File"})
223                 else:
224                     found.append(sf)
225             elif required:
226                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227                     "Required secondary file '%s' does not exist" % sfpath)
228
229         primary["secondaryFiles"] = cmap(found)
230         if discovered is not None:
231             discovered[primary["location"]] = primary["secondaryFiles"]
232     elif inputschema["type"] not in primitive_types_set:
233         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
234
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236     for inputschema in inputs:
237         primary = job_order.get(shortname(inputschema["id"]))
238         if isinstance(primary, (Mapping, Sequence)):
239             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
240
241 def upload_dependencies(arvrunner, name, document_loader,
242                         workflowobj, uri, loadref_run,
243                         include_primary=True, discovered_secondaryfiles=None):
244     """Upload the dependencies of the workflowobj document to Keep.
245
246     Returns a pathmapper object mapping local paths to keep references.  Also
247     does an in-place update of references in "workflowobj".
248
249     Use scandeps to find $import, $include, $schemas, run, File and Directory
250     fields that represent external references.
251
252     If workflowobj has an "id" field, this will reload the document to ensure
253     it is scanning the raw document prior to preprocessing.
254     """
255
256     loaded = set()
257     def loadref(b, u):
258         joined = document_loader.fetcher.urljoin(b, u)
259         defrg, _ = urllib.parse.urldefrag(joined)
260         if defrg not in loaded:
261             loaded.add(defrg)
262             # Use fetch_text to get raw file (before preprocessing).
263             text = document_loader.fetch_text(defrg)
264             if isinstance(text, bytes):
265                 textIO = StringIO(text.decode('utf-8'))
266             else:
267                 textIO = StringIO(text)
268             return yaml.safe_load(textIO)
269         else:
270             return {}
271
272     if loadref_run:
273         loadref_fields = set(("$import", "run"))
274     else:
275         loadref_fields = set(("$import",))
276
277     scanobj = workflowobj
278     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
279         # Need raw file content (before preprocessing) to ensure
280         # that external references in $include and $mixin are captured.
281         scanobj = loadref("", workflowobj["id"])
282
283     metadata = scanobj
284
285     sc_result = scandeps(uri, scanobj,
286                          loadref_fields,
287                          set(("$include", "$schemas", "location")),
288                          loadref, urljoin=document_loader.fetcher.urljoin,
289                          nestdirs=False)
290
291     sc = []
292     uuids = {}
293
294     def collect_uuids(obj):
295         loc = obj.get("location", "")
296         sp = loc.split(":")
297         if sp[0] == "keep":
298             # Collect collection uuids that need to be resolved to
299             # portable data hashes
300             gp = collection_uuid_pattern.match(loc)
301             if gp:
302                 uuids[gp.groups()[0]] = obj
303             if collectionUUID in obj:
304                 uuids[obj[collectionUUID]] = obj
305
306     def collect_uploads(obj):
307         loc = obj.get("location", "")
308         sp = loc.split(":")
309         if len(sp) < 1:
310             return
311         if sp[0] in ("file", "http", "https"):
312             # Record local files than need to be uploaded,
313             # don't include file literals, keep references, etc.
314             sc.append(obj)
315         collect_uuids(obj)
316
317     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
318     visit_class(sc_result, ("File", "Directory"), collect_uploads)
319
320     # Resolve any collection uuids we found to portable data hashes
321     # and assign them to uuid_map
322     uuid_map = {}
323     fetch_uuids = list(uuids.keys())
324     while fetch_uuids:
325         # For a large number of fetch_uuids, API server may limit
326         # response size, so keep fetching from API server has nothing
327         # more to give us.
328         lookups = arvrunner.api.collections().list(
329             filters=[["uuid", "in", fetch_uuids]],
330             count="none",
331             select=["uuid", "portable_data_hash"]).execute(
332                 num_retries=arvrunner.num_retries)
333
334         if not lookups["items"]:
335             break
336
337         for l in lookups["items"]:
338             uuid_map[l["uuid"]] = l["portable_data_hash"]
339
340         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
341
342     normalizeFilesDirs(sc)
343
344     if include_primary and "id" in workflowobj:
345         sc.append({"class": "File", "location": workflowobj["id"]})
346
347     if "$schemas" in workflowobj:
348         for s in workflowobj["$schemas"]:
349             sc.append({"class": "File", "location": s})
350
351     def visit_default(obj):
352         remove = [False]
353         def ensure_default_location(f):
354             if "location" not in f and "path" in f:
355                 f["location"] = f["path"]
356                 del f["path"]
357             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
358                 # Doesn't exist, remove from list of dependencies to upload
359                 sc[:] = [x for x in sc if x["location"] != f["location"]]
360                 # Delete "default" from workflowobj
361                 remove[0] = True
362         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
363         if remove[0]:
364             del obj["default"]
365
366     find_defaults(workflowobj, visit_default)
367
368     discovered = {}
369     def discover_default_secondary_files(obj):
370         builder_job_order = {}
371         for t in obj["inputs"]:
372             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
373         # Need to create a builder object to evaluate expressions.
374         builder = make_builder(builder_job_order,
375                                obj.get("hints", []),
376                                obj.get("requirements", []),
377                                ArvRuntimeContext(),
378                                metadata)
379         discover_secondary_files(arvrunner.fs_access,
380                                  builder,
381                                  obj["inputs"],
382                                  builder_job_order,
383                                  discovered)
384
385     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
386     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
387
388     for d in list(discovered):
389         # Only interested in discovered secondaryFiles which are local
390         # files that need to be uploaded.
391         if d.startswith("file:"):
392             sc.extend(discovered[d])
393         else:
394             del discovered[d]
395
396     mapper = ArvPathMapper(arvrunner, sc, "",
397                            "keep:%s",
398                            "keep:%s/%s",
399                            name=name,
400                            single_collection=True)
401
402     def setloc(p):
403         loc = p.get("location")
404         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
405             p["location"] = mapper.mapper(p["location"]).resolved
406             return
407
408         if not loc:
409             return
410
411         if collectionUUID in p:
412             uuid = p[collectionUUID]
413             if uuid not in uuid_map:
414                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
415                     "Collection uuid %s not found" % uuid)
416             gp = collection_pdh_pattern.match(loc)
417             if gp and uuid_map[uuid] != gp.groups()[0]:
418                 # This file entry has both collectionUUID and a PDH
419                 # location. If the PDH doesn't match the one returned
420                 # the API server, raise an error.
421                 raise SourceLine(p, "location", validate.ValidationException).makeError(
422                     "Expected collection uuid %s to be %s but API server reported %s" % (
423                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
424
425         gp = collection_uuid_pattern.match(loc)
426         if not gp:
427             return
428         uuid = gp.groups()[0]
429         if uuid not in uuid_map:
430             raise SourceLine(p, "location", validate.ValidationException).makeError(
431                 "Collection uuid %s not found" % uuid)
432         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
433         p[collectionUUID] = uuid
434
435     visit_class(workflowobj, ("File", "Directory"), setloc)
436     visit_class(discovered, ("File", "Directory"), setloc)
437
438     if discovered_secondaryfiles is not None:
439         for d in discovered:
440             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
441
442     if "$schemas" in workflowobj:
443         sch = CommentedSeq()
444         for s in workflowobj["$schemas"]:
445             if s in mapper:
446                 sch.append(mapper.mapper(s).resolved)
447         workflowobj["$schemas"] = sch
448
449     return mapper
450
451
452 def upload_docker(arvrunner, tool):
453     """Uploads Docker images used in CommandLineTool objects."""
454
455     if isinstance(tool, CommandLineTool):
456         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
457         if docker_req:
458             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
459                 # TODO: can be supported by containers API, but not jobs API.
460                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
461                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
462             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
463                                                        arvrunner.runtimeContext.force_docker_pull,
464                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
465         else:
466             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
467                                                        True, arvrunner.project_uuid,
468                                                        arvrunner.runtimeContext.force_docker_pull,
469                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
470     elif isinstance(tool, cwltool.workflow.Workflow):
471         for s in tool.steps:
472             upload_docker(arvrunner, s.embedded_tool)
473
474
475 def packed_workflow(arvrunner, tool, merged_map):
476     """Create a packed workflow.
477
478     A "packed" workflow is one where all the components have been combined into a single document."""
479
480     rewrites = {}
481     packed = pack(arvrunner.loadingContext, tool.tool["id"],
482                   rewrite_out=rewrites,
483                   loader=tool.doc_loader)
484
485     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
486
487     def visit(v, cur_id):
488         if isinstance(v, dict):
489             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
490                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
491                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
492                 if "id" in v:
493                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
494             if "path" in v and "location" not in v:
495                 v["location"] = v["path"]
496                 del v["path"]
497             if "location" in v and cur_id in merged_map:
498                 if v["location"] in merged_map[cur_id].resolved:
499                     v["location"] = merged_map[cur_id].resolved[v["location"]]
500                 if v["location"] in merged_map[cur_id].secondaryFiles:
501                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
502             if v.get("class") == "DockerRequirement":
503                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
504                                                                                                              arvrunner.project_uuid,
505                                                                                                              arvrunner.runtimeContext.force_docker_pull,
506                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix)
507             for l in v:
508                 visit(v[l], cur_id)
509         if isinstance(v, list):
510             for l in v:
511                 visit(l, cur_id)
512     visit(packed, None)
513     return packed
514
515
516 def tag_git_version(packed):
517     if tool.tool["id"].startswith("file://"):
518         path = os.path.dirname(tool.tool["id"][7:])
519         try:
520             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
521         except (OSError, subprocess.CalledProcessError):
522             pass
523         else:
524             packed["http://schema.org/version"] = githash
525
526
527 def upload_job_order(arvrunner, name, tool, job_order):
528     """Upload local files referenced in the input object and return updated input
529     object with 'location' updated to the proper keep references.
530     """
531
532     # Make a copy of the job order and set defaults.
533     builder_job_order = copy.copy(job_order)
534
535     # fill_in_defaults throws an error if there are any
536     # missing required parameters, we don't want it to do that
537     # so make them all optional.
538     inputs_copy = copy.deepcopy(tool.tool["inputs"])
539     for i in inputs_copy:
540         if "null" not in i["type"]:
541             i["type"] = ["null"] + aslist(i["type"])
542
543     fill_in_defaults(inputs_copy,
544                      builder_job_order,
545                      arvrunner.fs_access)
546     # Need to create a builder object to evaluate expressions.
547     builder = make_builder(builder_job_order,
548                            tool.hints,
549                            tool.requirements,
550                            ArvRuntimeContext(),
551                            tool.metadata)
552     # Now update job_order with secondaryFiles
553     discover_secondary_files(arvrunner.fs_access,
554                              builder,
555                              tool.tool["inputs"],
556                              job_order)
557
558     jobmapper = upload_dependencies(arvrunner,
559                                     name,
560                                     tool.doc_loader,
561                                     job_order,
562                                     job_order.get("id", "#"),
563                                     False)
564
565     if "id" in job_order:
566         del job_order["id"]
567
568     # Need to filter this out, gets added by cwltool when providing
569     # parameters on the command line.
570     if "job_order" in job_order:
571         del job_order["job_order"]
572
573     return job_order
574
575 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
576
577 def upload_workflow_deps(arvrunner, tool):
578     # Ensure that Docker images needed by this workflow are available
579
580     upload_docker(arvrunner, tool)
581
582     document_loader = tool.doc_loader
583
584     merged_map = {}
585
586     def upload_tool_deps(deptool):
587         if "id" in deptool:
588             discovered_secondaryfiles = {}
589             pm = upload_dependencies(arvrunner,
590                                      "%s dependencies" % (shortname(deptool["id"])),
591                                      document_loader,
592                                      deptool,
593                                      deptool["id"],
594                                      False,
595                                      include_primary=False,
596                                      discovered_secondaryfiles=discovered_secondaryfiles)
597             document_loader.idx[deptool["id"]] = deptool
598             toolmap = {}
599             for k,v in pm.items():
600                 toolmap[k] = v.resolved
601             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
602
603     tool.visit(upload_tool_deps)
604
605     return merged_map
606
607 def arvados_jobs_image(arvrunner, img):
608     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
609
610     try:
611         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
612                                                           arvrunner.runtimeContext.force_docker_pull,
613                                                           arvrunner.runtimeContext.tmp_outdir_prefix)
614     except Exception as e:
615         raise Exception("Docker image %s is not available\n%s" % (img, e) )
616
617
618 def upload_workflow_collection(arvrunner, name, packed):
619     collection = arvados.collection.Collection(api_client=arvrunner.api,
620                                                keep_client=arvrunner.keep_client,
621                                                num_retries=arvrunner.num_retries)
622     with collection.open("workflow.cwl", "w") as f:
623         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
624
625     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
626                ["name", "like", name+"%"]]
627     if arvrunner.project_uuid:
628         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
629     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
630
631     if exists["items"]:
632         logger.info("Using collection %s", exists["items"][0]["uuid"])
633     else:
634         collection.save_new(name=name,
635                             owner_uuid=arvrunner.project_uuid,
636                             ensure_unique_name=True,
637                             num_retries=arvrunner.num_retries)
638         logger.info("Uploaded to %s", collection.manifest_locator())
639
640     return collection.portable_data_hash()
641
642
643 class Runner(Process):
644     """Base class for runner processes, which submit an instance of
645     arvados-cwl-runner and wait for the final result."""
646
647     def __init__(self, runner, updated_tool,
648                  tool, loadingContext, enable_reuse,
649                  output_name, output_tags, submit_runner_ram=0,
650                  name=None, on_error=None, submit_runner_image=None,
651                  intermediate_output_ttl=0, merged_map=None,
652                  priority=None, secret_store=None,
653                  collection_cache_size=256,
654                  collection_cache_is_default=True):
655
656         loadingContext = loadingContext.copy()
657         loadingContext.metadata = updated_tool.metadata.copy()
658
659         super(Runner, self).__init__(updated_tool.tool, loadingContext)
660
661         self.arvrunner = runner
662         self.embedded_tool = tool
663         self.job_order = None
664         self.running = False
665         if enable_reuse:
666             # If reuse is permitted by command line arguments but
667             # disabled by the workflow itself, disable it.
668             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
669             if reuse_req:
670                 enable_reuse = reuse_req["enableReuse"]
671         self.enable_reuse = enable_reuse
672         self.uuid = None
673         self.final_output = None
674         self.output_name = output_name
675         self.output_tags = output_tags
676         self.name = name
677         self.on_error = on_error
678         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
679         self.intermediate_output_ttl = intermediate_output_ttl
680         self.priority = priority
681         self.secret_store = secret_store
682         self.enable_dev = loadingContext.enable_dev
683
684         self.submit_runner_cores = 1
685         self.submit_runner_ram = 1024  # defaut 1 GiB
686         self.collection_cache_size = collection_cache_size
687
688         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
689         if runner_resource_req:
690             if runner_resource_req.get("coresMin"):
691                 self.submit_runner_cores = runner_resource_req["coresMin"]
692             if runner_resource_req.get("ramMin"):
693                 self.submit_runner_ram = runner_resource_req["ramMin"]
694             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
695                 self.collection_cache_size = runner_resource_req["keep_cache"]
696
697         if submit_runner_ram:
698             # Command line / initializer overrides default and/or spec from workflow
699             self.submit_runner_ram = submit_runner_ram
700
701         if self.submit_runner_ram <= 0:
702             raise Exception("Value of submit-runner-ram must be greater than zero")
703
704         if self.submit_runner_cores <= 0:
705             raise Exception("Value of submit-runner-cores must be greater than zero")
706
707         self.merged_map = merged_map or {}
708
709     def job(self,
710             job_order,         # type: Mapping[Text, Text]
711             output_callbacks,  # type: Callable[[Any, Any], Any]
712             runtimeContext     # type: RuntimeContext
713            ):  # type: (...) -> Generator[Any, None, None]
714         self.job_order = job_order
715         self._init_job(job_order, runtimeContext)
716         yield self
717
718     def update_pipeline_component(self, record):
719         pass
720
721     def done(self, record):
722         """Base method for handling a completed runner."""
723
724         try:
725             if record["state"] == "Complete":
726                 if record.get("exit_code") is not None:
727                     if record["exit_code"] == 33:
728                         processStatus = "UnsupportedRequirement"
729                     elif record["exit_code"] == 0:
730                         processStatus = "success"
731                     else:
732                         processStatus = "permanentFail"
733                 else:
734                     processStatus = "success"
735             else:
736                 processStatus = "permanentFail"
737
738             outputs = {}
739
740             if processStatus == "permanentFail":
741                 logc = arvados.collection.CollectionReader(record["log"],
742                                                            api_client=self.arvrunner.api,
743                                                            keep_client=self.arvrunner.keep_client,
744                                                            num_retries=self.arvrunner.num_retries)
745                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
746
747             self.final_output = record["output"]
748             outc = arvados.collection.CollectionReader(self.final_output,
749                                                        api_client=self.arvrunner.api,
750                                                        keep_client=self.arvrunner.keep_client,
751                                                        num_retries=self.arvrunner.num_retries)
752             if "cwl.output.json" in outc:
753                 with outc.open("cwl.output.json", "rb") as f:
754                     if f.size() > 0:
755                         outputs = json.loads(f.read().decode())
756             def keepify(fileobj):
757                 path = fileobj["location"]
758                 if not path.startswith("keep:"):
759                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
760             adjustFileObjs(outputs, keepify)
761             adjustDirObjs(outputs, keepify)
762         except Exception:
763             logger.exception("[%s] While getting final output object", self.name)
764             self.arvrunner.output_callback({}, "permanentFail")
765         else:
766             self.arvrunner.output_callback(outputs, processStatus)