18947: Remove errant uses of runsu.sh.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109                  container_engine="docker"
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if isinstance(inputschema, basestring):
132         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
133         if sd:
134             inputschema = sd
135         else:
136             return
137
138     if "secondaryFiles" in inputschema:
139         # set secondaryFiles, may be inherited by compound types.
140         secondaryspec = inputschema["secondaryFiles"]
141
142     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143         not isinstance(inputschema["type"], basestring)):
144         # compound type (union, array, record)
145         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146
147     elif (inputschema["type"] == "record" and
148           isinstance(primary, Mapping)):
149         #
150         # record type, find secondary files associated with fields.
151         #
152         for f in inputschema["fields"]:
153             p = primary.get(shortname(f["name"]))
154             if p:
155                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156
157     elif (inputschema["type"] == "array" and
158           isinstance(primary, Sequence)):
159         #
160         # array type, find secondary files of elements
161         #
162         for p in primary:
163             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164
165     elif (inputschema["type"] == "File" and
166           secondaryspec and
167           isinstance(primary, Mapping) and
168           primary.get("class") == "File" and
169           "secondaryFiles" not in primary):
170         #
171         # Found a file, check for secondaryFiles
172         #
173         specs = []
174         primary["secondaryFiles"] = secondaryspec
175         for i, sf in enumerate(aslist(secondaryspec)):
176             if builder.cwlVersion == "v1.0":
177                 pattern = builder.do_eval(sf, context=primary)
178             else:
179                 pattern = builder.do_eval(sf["pattern"], context=primary)
180             if pattern is None:
181                 continue
182             if isinstance(pattern, list):
183                 specs.extend(pattern)
184             elif isinstance(pattern, dict):
185                 specs.append(pattern)
186             elif isinstance(pattern, str):
187                 if builder.cwlVersion == "v1.0":
188                     specs.append({"pattern": pattern, "required": True})
189                 else:
190                     specs.append({"pattern": pattern, "required": sf.get("required")})
191             else:
192                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193                     "Expression must return list, object, string or null")
194
195         found = []
196         for i, sf in enumerate(specs):
197             if isinstance(sf, dict):
198                 if sf.get("class") == "File":
199                     pattern = None
200                     if sf.get("location") is None:
201                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202                             "File object is missing 'location': %s" % sf)
203                     sfpath = sf["location"]
204                     required = True
205                 else:
206                     pattern = sf["pattern"]
207                     required = sf.get("required")
208             elif isinstance(sf, str):
209                 pattern = sf
210                 required = True
211             else:
212                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213                     "Expression must return list, object, string or null")
214
215             if pattern is not None:
216                 sfpath = substitute(primary["location"], pattern)
217
218             required = builder.do_eval(required, context=primary)
219
220             if fsaccess.exists(sfpath):
221                 if pattern is not None:
222                     found.append({"location": sfpath, "class": "File"})
223                 else:
224                     found.append(sf)
225             elif required:
226                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227                     "Required secondary file '%s' does not exist" % sfpath)
228
229         primary["secondaryFiles"] = cmap(found)
230         if discovered is not None:
231             discovered[primary["location"]] = primary["secondaryFiles"]
232     elif inputschema["type"] not in primitive_types_set:
233         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
234
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236     for inputschema in inputs:
237         primary = job_order.get(shortname(inputschema["id"]))
238         if isinstance(primary, (Mapping, Sequence)):
239             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
240
241 def upload_dependencies(arvrunner, name, document_loader,
242                         workflowobj, uri, loadref_run,
243                         include_primary=True, discovered_secondaryfiles=None):
244     """Upload the dependencies of the workflowobj document to Keep.
245
246     Returns a pathmapper object mapping local paths to keep references.  Also
247     does an in-place update of references in "workflowobj".
248
249     Use scandeps to find $import, $include, $schemas, run, File and Directory
250     fields that represent external references.
251
252     If workflowobj has an "id" field, this will reload the document to ensure
253     it is scanning the raw document prior to preprocessing.
254     """
255
256     loaded = set()
257     def loadref(b, u):
258         joined = document_loader.fetcher.urljoin(b, u)
259         defrg, _ = urllib.parse.urldefrag(joined)
260         if defrg not in loaded:
261             loaded.add(defrg)
262             # Use fetch_text to get raw file (before preprocessing).
263             text = document_loader.fetch_text(defrg)
264             if isinstance(text, bytes):
265                 textIO = StringIO(text.decode('utf-8'))
266             else:
267                 textIO = StringIO(text)
268             yamlloader = YAML(typ='safe', pure=True)
269             return yamlloader.load(textIO)
270         else:
271             return {}
272
273     if loadref_run:
274         loadref_fields = set(("$import", "run"))
275     else:
276         loadref_fields = set(("$import",))
277
278     scanobj = workflowobj
279     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
280         # Need raw file content (before preprocessing) to ensure
281         # that external references in $include and $mixin are captured.
282         scanobj = loadref("", workflowobj["id"])
283
284     metadata = scanobj
285
286     sc_result = scandeps(uri, scanobj,
287                          loadref_fields,
288                          set(("$include", "$schemas", "location")),
289                          loadref, urljoin=document_loader.fetcher.urljoin,
290                          nestdirs=False)
291
292     sc = []
293     uuids = {}
294
295     def collect_uuids(obj):
296         loc = obj.get("location", "")
297         sp = loc.split(":")
298         if sp[0] == "keep":
299             # Collect collection uuids that need to be resolved to
300             # portable data hashes
301             gp = collection_uuid_pattern.match(loc)
302             if gp:
303                 uuids[gp.groups()[0]] = obj
304             if collectionUUID in obj:
305                 uuids[obj[collectionUUID]] = obj
306
307     def collect_uploads(obj):
308         loc = obj.get("location", "")
309         sp = loc.split(":")
310         if len(sp) < 1:
311             return
312         if sp[0] in ("file", "http", "https"):
313             # Record local files than need to be uploaded,
314             # don't include file literals, keep references, etc.
315             sc.append(obj)
316         collect_uuids(obj)
317
318     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
319     visit_class(sc_result, ("File", "Directory"), collect_uploads)
320
321     # Resolve any collection uuids we found to portable data hashes
322     # and assign them to uuid_map
323     uuid_map = {}
324     fetch_uuids = list(uuids.keys())
325     while fetch_uuids:
326         # For a large number of fetch_uuids, API server may limit
327         # response size, so keep fetching from API server has nothing
328         # more to give us.
329         lookups = arvrunner.api.collections().list(
330             filters=[["uuid", "in", fetch_uuids]],
331             count="none",
332             select=["uuid", "portable_data_hash"]).execute(
333                 num_retries=arvrunner.num_retries)
334
335         if not lookups["items"]:
336             break
337
338         for l in lookups["items"]:
339             uuid_map[l["uuid"]] = l["portable_data_hash"]
340
341         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
342
343     normalizeFilesDirs(sc)
344
345     if include_primary and "id" in workflowobj:
346         sc.append({"class": "File", "location": workflowobj["id"]})
347
348     if "$schemas" in workflowobj:
349         for s in workflowobj["$schemas"]:
350             sc.append({"class": "File", "location": s})
351
352     def visit_default(obj):
353         remove = [False]
354         def ensure_default_location(f):
355             if "location" not in f and "path" in f:
356                 f["location"] = f["path"]
357                 del f["path"]
358             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
359                 # Doesn't exist, remove from list of dependencies to upload
360                 sc[:] = [x for x in sc if x["location"] != f["location"]]
361                 # Delete "default" from workflowobj
362                 remove[0] = True
363         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
364         if remove[0]:
365             del obj["default"]
366
367     find_defaults(workflowobj, visit_default)
368
369     discovered = {}
370     def discover_default_secondary_files(obj):
371         builder_job_order = {}
372         for t in obj["inputs"]:
373             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
374         # Need to create a builder object to evaluate expressions.
375         builder = make_builder(builder_job_order,
376                                obj.get("hints", []),
377                                obj.get("requirements", []),
378                                ArvRuntimeContext(),
379                                metadata)
380         discover_secondary_files(arvrunner.fs_access,
381                                  builder,
382                                  obj["inputs"],
383                                  builder_job_order,
384                                  discovered)
385
386     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
387     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
388
389     for d in list(discovered):
390         # Only interested in discovered secondaryFiles which are local
391         # files that need to be uploaded.
392         if d.startswith("file:"):
393             sc.extend(discovered[d])
394         else:
395             del discovered[d]
396
397     mapper = ArvPathMapper(arvrunner, sc, "",
398                            "keep:%s",
399                            "keep:%s/%s",
400                            name=name,
401                            single_collection=True)
402
403     def setloc(p):
404         loc = p.get("location")
405         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
406             p["location"] = mapper.mapper(p["location"]).resolved
407             return
408
409         if not loc:
410             return
411
412         if collectionUUID in p:
413             uuid = p[collectionUUID]
414             if uuid not in uuid_map:
415                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
416                     "Collection uuid %s not found" % uuid)
417             gp = collection_pdh_pattern.match(loc)
418             if gp and uuid_map[uuid] != gp.groups()[0]:
419                 # This file entry has both collectionUUID and a PDH
420                 # location. If the PDH doesn't match the one returned
421                 # the API server, raise an error.
422                 raise SourceLine(p, "location", validate.ValidationException).makeError(
423                     "Expected collection uuid %s to be %s but API server reported %s" % (
424                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
425
426         gp = collection_uuid_pattern.match(loc)
427         if not gp:
428             return
429         uuid = gp.groups()[0]
430         if uuid not in uuid_map:
431             raise SourceLine(p, "location", validate.ValidationException).makeError(
432                 "Collection uuid %s not found" % uuid)
433         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
434         p[collectionUUID] = uuid
435
436     visit_class(workflowobj, ("File", "Directory"), setloc)
437     visit_class(discovered, ("File", "Directory"), setloc)
438
439     if discovered_secondaryfiles is not None:
440         for d in discovered:
441             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
442
443     if "$schemas" in workflowobj:
444         sch = CommentedSeq()
445         for s in workflowobj["$schemas"]:
446             if s in mapper:
447                 sch.append(mapper.mapper(s).resolved)
448         workflowobj["$schemas"] = sch
449
450     return mapper
451
452
453 def upload_docker(arvrunner, tool):
454     """Uploads Docker images used in CommandLineTool objects."""
455
456     if isinstance(tool, CommandLineTool):
457         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
458         if docker_req:
459             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
460                 # TODO: can be supported by containers API, but not jobs API.
461                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
462                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
463             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
464                                                        arvrunner.runtimeContext.force_docker_pull,
465                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
466                                                        arvrunner.runtimeContext.match_local_docker)
467         else:
468             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
469                                                        True, arvrunner.project_uuid,
470                                                        arvrunner.runtimeContext.force_docker_pull,
471                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
472                                                        arvrunner.runtimeContext.match_local_docker)
473     elif isinstance(tool, cwltool.workflow.Workflow):
474         for s in tool.steps:
475             upload_docker(arvrunner, s.embedded_tool)
476
477
478 def packed_workflow(arvrunner, tool, merged_map):
479     """Create a packed workflow.
480
481     A "packed" workflow is one where all the components have been combined into a single document."""
482
483     rewrites = {}
484     packed = pack(arvrunner.loadingContext, tool.tool["id"],
485                   rewrite_out=rewrites,
486                   loader=tool.doc_loader)
487
488     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
489
490     def visit(v, cur_id):
491         if isinstance(v, dict):
492             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
493                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
494                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
495                 if "id" in v:
496                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
497             if "path" in v and "location" not in v:
498                 v["location"] = v["path"]
499                 del v["path"]
500             if "location" in v and cur_id in merged_map:
501                 if v["location"] in merged_map[cur_id].resolved:
502                     v["location"] = merged_map[cur_id].resolved[v["location"]]
503                 if v["location"] in merged_map[cur_id].secondaryFiles:
504                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
505             if v.get("class") == "DockerRequirement":
506                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
507                                                                                                              arvrunner.project_uuid,
508                                                                                                              arvrunner.runtimeContext.force_docker_pull,
509                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix,
510                                                                                                              arvrunner.runtimeContext.match_local_docker)
511             for l in v:
512                 visit(v[l], cur_id)
513         if isinstance(v, list):
514             for l in v:
515                 visit(l, cur_id)
516     visit(packed, None)
517     return packed
518
519
520 def tag_git_version(packed):
521     if tool.tool["id"].startswith("file://"):
522         path = os.path.dirname(tool.tool["id"][7:])
523         try:
524             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
525         except (OSError, subprocess.CalledProcessError):
526             pass
527         else:
528             packed["http://schema.org/version"] = githash
529
530
531 def upload_job_order(arvrunner, name, tool, job_order):
532     """Upload local files referenced in the input object and return updated input
533     object with 'location' updated to the proper keep references.
534     """
535
536     # Make a copy of the job order and set defaults.
537     builder_job_order = copy.copy(job_order)
538
539     # fill_in_defaults throws an error if there are any
540     # missing required parameters, we don't want it to do that
541     # so make them all optional.
542     inputs_copy = copy.deepcopy(tool.tool["inputs"])
543     for i in inputs_copy:
544         if "null" not in i["type"]:
545             i["type"] = ["null"] + aslist(i["type"])
546
547     fill_in_defaults(inputs_copy,
548                      builder_job_order,
549                      arvrunner.fs_access)
550     # Need to create a builder object to evaluate expressions.
551     builder = make_builder(builder_job_order,
552                            tool.hints,
553                            tool.requirements,
554                            ArvRuntimeContext(),
555                            tool.metadata)
556     # Now update job_order with secondaryFiles
557     discover_secondary_files(arvrunner.fs_access,
558                              builder,
559                              tool.tool["inputs"],
560                              job_order)
561
562     jobmapper = upload_dependencies(arvrunner,
563                                     name,
564                                     tool.doc_loader,
565                                     job_order,
566                                     job_order.get("id", "#"),
567                                     False)
568
569     if "id" in job_order:
570         del job_order["id"]
571
572     # Need to filter this out, gets added by cwltool when providing
573     # parameters on the command line.
574     if "job_order" in job_order:
575         del job_order["job_order"]
576
577     return job_order
578
579 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
580
581 def upload_workflow_deps(arvrunner, tool):
582     # Ensure that Docker images needed by this workflow are available
583
584     upload_docker(arvrunner, tool)
585
586     document_loader = tool.doc_loader
587
588     merged_map = {}
589
590     def upload_tool_deps(deptool):
591         if "id" in deptool:
592             discovered_secondaryfiles = {}
593             pm = upload_dependencies(arvrunner,
594                                      "%s dependencies" % (shortname(deptool["id"])),
595                                      document_loader,
596                                      deptool,
597                                      deptool["id"],
598                                      False,
599                                      include_primary=False,
600                                      discovered_secondaryfiles=discovered_secondaryfiles)
601             document_loader.idx[deptool["id"]] = deptool
602             toolmap = {}
603             for k,v in pm.items():
604                 toolmap[k] = v.resolved
605             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
606
607     tool.visit(upload_tool_deps)
608
609     return merged_map
610
611 def arvados_jobs_image(arvrunner, img):
612     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
613
614     try:
615         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
616                                                           arvrunner.runtimeContext.force_docker_pull,
617                                                           arvrunner.runtimeContext.tmp_outdir_prefix,
618                                                           arvrunner.runtimeContext.match_local_docker)
619     except Exception as e:
620         raise Exception("Docker image %s is not available\n%s" % (img, e) )
621
622
623 def upload_workflow_collection(arvrunner, name, packed):
624     collection = arvados.collection.Collection(api_client=arvrunner.api,
625                                                keep_client=arvrunner.keep_client,
626                                                num_retries=arvrunner.num_retries)
627     with collection.open("workflow.cwl", "w") as f:
628         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
629
630     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
631                ["name", "like", name+"%"]]
632     if arvrunner.project_uuid:
633         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
634     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
635
636     if exists["items"]:
637         logger.info("Using collection %s", exists["items"][0]["uuid"])
638     else:
639         collection.save_new(name=name,
640                             owner_uuid=arvrunner.project_uuid,
641                             ensure_unique_name=True,
642                             num_retries=arvrunner.num_retries)
643         logger.info("Uploaded to %s", collection.manifest_locator())
644
645     return collection.portable_data_hash()
646
647
648 class Runner(Process):
649     """Base class for runner processes, which submit an instance of
650     arvados-cwl-runner and wait for the final result."""
651
652     def __init__(self, runner, updated_tool,
653                  tool, loadingContext, enable_reuse,
654                  output_name, output_tags, submit_runner_ram=0,
655                  name=None, on_error=None, submit_runner_image=None,
656                  intermediate_output_ttl=0, merged_map=None,
657                  priority=None, secret_store=None,
658                  collection_cache_size=256,
659                  collection_cache_is_default=True):
660
661         loadingContext = loadingContext.copy()
662         loadingContext.metadata = updated_tool.metadata.copy()
663
664         super(Runner, self).__init__(updated_tool.tool, loadingContext)
665
666         self.arvrunner = runner
667         self.embedded_tool = tool
668         self.job_order = None
669         self.running = False
670         if enable_reuse:
671             # If reuse is permitted by command line arguments but
672             # disabled by the workflow itself, disable it.
673             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
674             if reuse_req:
675                 enable_reuse = reuse_req["enableReuse"]
676         self.enable_reuse = enable_reuse
677         self.uuid = None
678         self.final_output = None
679         self.output_name = output_name
680         self.output_tags = output_tags
681         self.name = name
682         self.on_error = on_error
683         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
684         self.intermediate_output_ttl = intermediate_output_ttl
685         self.priority = priority
686         self.secret_store = secret_store
687         self.enable_dev = loadingContext.enable_dev
688
689         self.submit_runner_cores = 1
690         self.submit_runner_ram = 1024  # defaut 1 GiB
691         self.collection_cache_size = collection_cache_size
692
693         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
694         if runner_resource_req:
695             if runner_resource_req.get("coresMin"):
696                 self.submit_runner_cores = runner_resource_req["coresMin"]
697             if runner_resource_req.get("ramMin"):
698                 self.submit_runner_ram = runner_resource_req["ramMin"]
699             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
700                 self.collection_cache_size = runner_resource_req["keep_cache"]
701
702         if submit_runner_ram:
703             # Command line / initializer overrides default and/or spec from workflow
704             self.submit_runner_ram = submit_runner_ram
705
706         if self.submit_runner_ram <= 0:
707             raise Exception("Value of submit-runner-ram must be greater than zero")
708
709         if self.submit_runner_cores <= 0:
710             raise Exception("Value of submit-runner-cores must be greater than zero")
711
712         self.merged_map = merged_map or {}
713
714     def job(self,
715             job_order,         # type: Mapping[Text, Text]
716             output_callbacks,  # type: Callable[[Any, Any], Any]
717             runtimeContext     # type: RuntimeContext
718            ):  # type: (...) -> Generator[Any, None, None]
719         self.job_order = job_order
720         self._init_job(job_order, runtimeContext)
721         yield self
722
723     def update_pipeline_component(self, record):
724         pass
725
726     def done(self, record):
727         """Base method for handling a completed runner."""
728
729         try:
730             if record["state"] == "Complete":
731                 if record.get("exit_code") is not None:
732                     if record["exit_code"] == 33:
733                         processStatus = "UnsupportedRequirement"
734                     elif record["exit_code"] == 0:
735                         processStatus = "success"
736                     else:
737                         processStatus = "permanentFail"
738                 else:
739                     processStatus = "success"
740             else:
741                 processStatus = "permanentFail"
742
743             outputs = {}
744
745             if processStatus == "permanentFail":
746                 logc = arvados.collection.CollectionReader(record["log"],
747                                                            api_client=self.arvrunner.api,
748                                                            keep_client=self.arvrunner.keep_client,
749                                                            num_retries=self.arvrunner.num_retries)
750                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
751
752             self.final_output = record["output"]
753             outc = arvados.collection.CollectionReader(self.final_output,
754                                                        api_client=self.arvrunner.api,
755                                                        keep_client=self.arvrunner.keep_client,
756                                                        num_retries=self.arvrunner.num_retries)
757             if "cwl.output.json" in outc:
758                 with outc.open("cwl.output.json", "rb") as f:
759                     if f.size() > 0:
760                         outputs = json.loads(f.read().decode())
761             def keepify(fileobj):
762                 path = fileobj["location"]
763                 if not path.startswith("keep:"):
764                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
765             adjustFileObjs(outputs, keepify)
766             adjustDirObjs(outputs, keepify)
767         except Exception:
768             logger.exception("[%s] While getting final output object", self.name)
769             self.arvrunner.output_callback({}, "permanentFail")
770         else:
771             self.arvrunner.output_callback(outputs, processStatus)